如何使用多个参数的多处理池。Starmap

发布于 2025-01-30 07:21:32 字数 683 浏览 2 评论 0原文

我有一个问题..用于使用pool.Starmap ..

p1 = pd.dataframe(example1)
p2 = pd.dataframe(example2)

pairs = itertools.product(p1.iterrows(), p2.iterrows())
pairs_len = len(p1) * len(p2)

tpairs = tqdm(pairs, desc='Make pair data..', total=pairs_len)

def mkpair(p1, p2, ext=False):
    result = {}
    if not ext:
        for idx, xcol in enumerate(p1.columns):
            result[f"D_{idx}"] = float(p1[xcol]) - float(p2[xcol])
    return result

pool = Pool(process=4)
pool.starmap(mkpair, tpairs)
pool.close()
pool.join()

我想在池中的tpairs中获得一个P1。

但是发生“ typeError:元组索引必须是整数或切片,而不是str”,

我也希望我还想知道是否可以通过添加ext = true参数将其放入[p1,p2,ext]的表达式[P1,P2,Ext]中。

I have a question.. for using Pool.starmap..

p1 = pd.dataframe(example1)
p2 = pd.dataframe(example2)

pairs = itertools.product(p1.iterrows(), p2.iterrows())
pairs_len = len(p1) * len(p2)

tpairs = tqdm(pairs, desc='Make pair data..', total=pairs_len)

def mkpair(p1, p2, ext=False):
    result = {}
    if not ext:
        for idx, xcol in enumerate(p1.columns):
            result[f"D_{idx}"] = float(p1[xcol]) - float(p2[xcol])
    return result

pool = Pool(process=4)
pool.starmap(mkpair, tpairs)
pool.close()
pool.join()

I want to get one of P1.iterrows and one of P2.iterrows in tpairs in the pool and put them as p1 and p2 arguments.

but occur "TypeError: tuple indices must be integers or slices, not str"

and i want I'm also wondering if it's possible to put it in the expression [p1, p2, ext] by adding the ext=True argument.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

记忆消瘦 2025-02-06 07:21:32

您的代码有几个问题:

  1. 当您通过调用方法pandas.iterrows()返回迭代器时长度2其中t [0]是行索引,t [1]pandas.Series实例。您的工作功能mkpair将通过其中两个元组传递,一个来自每个数据框架。作为参数p1p2。但是您正在调用p1.columns其中p1是元组,而元组没有的属性。因此,这应该提出一个完全不同的异常(并且pandas.Series也没有这样的方法)。因此,我看不出您是如何获得您发布的实际代码所要求的例外的。此外,您的语句pool = pool(process = 4)是不正确的,因为正确的关键字参数为 processes not process 。因此,您不可能执行此代码(我会忽略缺少的导入语句,我认为您实际上确实有)。
  2. 您正在创建一个进度栏,该栏将在任务的提交中迭代到多处理池。因此,即使您的工作功能花了一秒钟才能完成(并正确编码),进度栏实际上也会在提交任务时即时射击至100%。因此,您衡量的唯一进展是任务提交,而不是完成

您需要做的事情,假设您希望进度栏在完成任务完成后进行进度,就是使用imap_unorderedapply_ashnc life pownback诸如允许的方法您要在任务完成后更新栏。如果要将结果存储在任务提交顺序中的列表中而不是完成顺序,并且您使用的是imap_unordered,则需要将索引传递给Worker函数然后,它可以通过结果返回。如果您使用apply_async,则代码更简单,但是此方法不允许您在“块”中提交任务,如果提交的总任务的数量非常大(请参见方法方法imap_unordered参数。这是您将如何使用每种方法:

使用imap_unordered

from tqdm import tqdm
from multiprocessing import Pool
import itertools
import pandas as pd

columns = ['x', 'y', 'z']

p1 = pd.DataFrame([[1, 2, 3], [4, 5, 6], [3, 2, 1], [6, 5, 4]], columns=columns)
p2 = pd.DataFrame([[7, 8, 9], [10, 11, 12], [9, 8, 7], [12, 11, 10]], columns=columns)

pairs = itertools.product(p1.iterrows(), p2.iterrows())
pairs_len = len(p1) * len(p2)

def mkpair(t: tuple) -> int:
    from time import sleep

    sleep(1)
    # unpack tuple:
    idx, rows = t
    # unpack again:
    row1, row2 = rows
    series1 = row1[1]
    series2 = row2[1]
    return idx, sum(series1[column] * series2[column] for column in columns)

pool = Pool(processes=4)
results = [0] * pairs_len
with tqdm(total=pairs_len) as pbar:
    for idx, total in pool.imap_unordered(mkpair, enumerate(pairs)):
        results[idx] = total
        pbar.update()
pool.close()
pool.join()
print(results)

使用apply_async

from tqdm import tqdm
from multiprocessing import Pool
import itertools
import pandas as pd

columns = ['x', 'y', 'z']

p1 = pd.DataFrame([[1, 2, 3], [4, 5, 6], [3, 2, 1], [6, 5, 4]], columns=columns)
p2 = pd.DataFrame([[7, 8, 9], [10, 11, 12], [9, 8, 7], [12, 11, 10]], columns=columns)

pairs = itertools.product(p1.iterrows(), p2.iterrows())
pairs_len = len(p1) * len(p2)

def mkpair(row1, row2) -> int:
    from time import sleep

    sleep(1)
    series1 = row1[1]
    series2 = row2[1]
    return sum(series1[column] * series2[column] for column in columns)

def my_callback(_):
    pbar.update()

pool = Pool(processes=4)
with tqdm(total=pairs_len) as pbar:
    async_futures = [pool.apply_async(mkpair, args=(row1, row2), callback=my_callback)
                     for row1, row2 in pairs]
    results = [async_future.get() for async_future in async_futures]
pool.close()
pool.join()
print(results)

You have several issues with your code:

  1. When you iterate the iterator returned by calling method pandas.iterrows() you are passed a tuple t of length 2 where t[0] is the row index and t[1] is a pandas.Series instance. Your worker function, mkpair, will be passed two of these tuples, one from each dataframe. as arguments p1 and p2. But you are calling p1.columns where p1 is a tuple and tuples have no such attribute as columns. So this should have raised an altogether different exception (and a pandas.Series has no such method either). So I don't see how you are getting the exception you claim from the actual code you posted. Moreover, your statement pool = Pool(process=4) is incorrect as the correct keyword argument is processes not process. So you could not possibly be executing this code (I will overlook the missing import statements, which I assume you actually have).
  2. You are creating a progress bar that will be iterated upon submission of the tasks to the multiprocessing pool. So even if your work function took a second to complete (and was coded correctly), the progress bar would practically instantaneously shoot up to 100% as the tasks are submitted. So the only progress you are measuring is task submission, not completion.

What you need to do, assuming you want the progress bar to progress as tasks are completed, is to use a method such as imap_unordered or apply_async with a callback, which will allow you to update the bar as tasks complete. If you want to store the results in a list in task submission order rather than in completion order and you are using imap_unordered, then you need to pass an index to the worker function that it then returns back with the result. The code is simpler if you use apply_async but this method does not allow you to submit tasks in "chunks", which becomes an issue if the number of total tasks being submitted is very large (see the chunksize argument for method imap_unordered). Here is how you would use each method:

Using imap_unordered

from tqdm import tqdm
from multiprocessing import Pool
import itertools
import pandas as pd

columns = ['x', 'y', 'z']

p1 = pd.DataFrame([[1, 2, 3], [4, 5, 6], [3, 2, 1], [6, 5, 4]], columns=columns)
p2 = pd.DataFrame([[7, 8, 9], [10, 11, 12], [9, 8, 7], [12, 11, 10]], columns=columns)

pairs = itertools.product(p1.iterrows(), p2.iterrows())
pairs_len = len(p1) * len(p2)

def mkpair(t: tuple) -> int:
    from time import sleep

    sleep(1)
    # unpack tuple:
    idx, rows = t
    # unpack again:
    row1, row2 = rows
    series1 = row1[1]
    series2 = row2[1]
    return idx, sum(series1[column] * series2[column] for column in columns)

pool = Pool(processes=4)
results = [0] * pairs_len
with tqdm(total=pairs_len) as pbar:
    for idx, total in pool.imap_unordered(mkpair, enumerate(pairs)):
        results[idx] = total
        pbar.update()
pool.close()
pool.join()
print(results)

Using apply_async

from tqdm import tqdm
from multiprocessing import Pool
import itertools
import pandas as pd

columns = ['x', 'y', 'z']

p1 = pd.DataFrame([[1, 2, 3], [4, 5, 6], [3, 2, 1], [6, 5, 4]], columns=columns)
p2 = pd.DataFrame([[7, 8, 9], [10, 11, 12], [9, 8, 7], [12, 11, 10]], columns=columns)

pairs = itertools.product(p1.iterrows(), p2.iterrows())
pairs_len = len(p1) * len(p2)

def mkpair(row1, row2) -> int:
    from time import sleep

    sleep(1)
    series1 = row1[1]
    series2 = row2[1]
    return sum(series1[column] * series2[column] for column in columns)

def my_callback(_):
    pbar.update()

pool = Pool(processes=4)
with tqdm(total=pairs_len) as pbar:
    async_futures = [pool.apply_async(mkpair, args=(row1, row2), callback=my_callback)
                     for row1, row2 in pairs]
    results = [async_future.get() for async_future in async_futures]
pool.close()
pool.join()
print(results)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文