结合 itertools 和多处理?

发布于 2024-12-02 17:43:09 字数 1122 浏览 1 评论 0原文

我有一个 256x256x256 Numpy 数组,其中每个元素都是一个矩阵。我需要对每个矩阵进行一些计算,并且我想使用 multiprocessing 模块来加快速度。

这些计算的结果必须像原始数组一样存储在 256x256x256 数组中,以便原始矩阵中元素 [i,j,k] 处的结果数组必须放入新数组的 [i,j,k] 元素中。

为此,我想创建一个可以以伪方式编写的列表,如 [array[i,j,k], (i, j, k)] 并将其传递给一个“多处理”的函数。 假设 Matrices 是从原始数组中提取的所有矩阵的列表,而 myfunc 是执行计算的函数,则代码看起来有点像这样

import multiprocessing
import numpy as np
from itertools import izip

def myfunc(finput):
    # Do some calculations...
    ...

    # ... and return the result and the index:
    return (result, finput[1])

# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)

# Make function input from the matrices and the indices:
finput = izip(matrices, inds)

pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))

:就像map_async实际上首先创建了这个巨大的finput列表:我的CPU没有做太多事情,但是内存和交换区在几秒钟内就被完全消耗掉了,这是显然不是我想要的。

有没有办法将这个庞大的列表传递给多处理函数,而无需先显式创建它? 或者你知道解决这个问题的另一种方法吗?

非常感谢! :-)

I have a 256x256x256 Numpy array, in which each element is a matrix. I need to do some calculations on each of these matrices, and I want to use the multiprocessing module to speed things up.

The results of these calculations must be stored in a 256x256x256 array like the original one, so that the result of the matrix at element [i,j,k] in the original array must be put in the [i,j,k] element of the new array.

To do this, I want to make a list which could be written in a pseudo-ish way as [array[i,j,k], (i, j, k)] and pass it to a function to be "multiprocessed".
Assuming that matrices is a list of all the matrices extracted from the original array and myfunc is the function doing the calculations, the code would look somewhat like this:

import multiprocessing
import numpy as np
from itertools import izip

def myfunc(finput):
    # Do some calculations...
    ...

    # ... and return the result and the index:
    return (result, finput[1])

# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)

# Make function input from the matrices and the indices:
finput = izip(matrices, inds)

pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))

However, it seems like map_async is actually creating this huge finput-list first: My CPU's aren't doing much, but the memory and swap get completely consumed in a matter of seconds, which is obviously not what I want.

Is there a way to pass this huge list to a multiprocessing function without the need to explicitly create it first?
Or do you know another way of solving this problem?

Thanks a bunch! :-)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

永言不败 2024-12-09 17:43:09

所有 multiprocessing.Pool.map* 方法完全消耗迭代器(演示代码) 一旦调用该函数。要一次向迭代器的映射函数块提供一个块,请使用grouper_nofill

def grouper_nofill(n, iterable):
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
    '''
    it=iter(iterable)
    def take():
        while 1: yield list(itertools.islice(it,n))
    return iter(take().next,[])

chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
    async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)

PS。 pool.map_async 的 chunksize 参数做了一些不同的事情:它将迭代器分成块,然后将每个块提供给调用 map(func,chunk) 的工作进程)。如果 func(item) 完成得太快,这可以为工作进程提供更多数据来咀嚼,但它对您的情况没有帮助,因为迭代器在 map_async< 之后仍然会立即完全消耗/code> 呼叫已发出。

All multiprocessing.Pool.map* methods consume iterators fully(demo code) as soon as the function is called. To feed the map function chunks of the iterator one chunk at a time, use grouper_nofill:

def grouper_nofill(n, iterable):
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
    '''
    it=iter(iterable)
    def take():
        while 1: yield list(itertools.islice(it,n))
    return iter(take().next,[])

chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
    async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)

PS. pool.map_async's chunksize parameter does something different: It breaks the iterable into chunks, then gives each chunk to a worker process which calls map(func,chunk). This can give the worker process more data to chew on if func(item) finishes too quickly, but it does not help in your situation since the iterator still gets consumed fully immediately after the map_async call is issued.

不爱素颜 2024-12-09 17:43:09

我也遇到了这个问题。而不是这样:

res = p.map(func, combinations(arr, select_n))

imap

res = p.imap(func, combinations(arr, select_n))

不消耗它!

I ran into this problem as well. instead of this:

res = p.map(func, combinations(arr, select_n))

do

res = p.imap(func, combinations(arr, select_n))

imap doesn't consume it!

萌酱 2024-12-09 17:43:09

Pool.map_async() 需要知道可迭代的长度才能将工作分派给多个工作人员。由于izip没有__len__,它首先将iterable转换为列表,导致您遇到巨大的内存使用量。

您可以尝试通过使用 __len__ 创建您自己的 izip 样式迭代器来避免此问题。

Pool.map_async() needs to know the length of the iterable to dispatch the work to multiple workers. Since izip has no __len__, it converts the iterable into a list first, causing the huge memory usage you are experiencing.

You could try to sidestep this by creating your own izip-style iterator with __len__.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文