结合 itertools 和多处理?
我有一个 256x256x256 Numpy 数组,其中每个元素都是一个矩阵。我需要对每个矩阵进行一些计算,并且我想使用 multiprocessing
模块来加快速度。
这些计算的结果必须像原始数组一样存储在 256x256x256
数组中,以便原始矩阵中元素 [i,j,k]
处的结果数组必须放入新数组的 [i,j,k]
元素中。
为此,我想创建一个可以以伪方式编写的列表,如 [array[i,j,k], (i, j, k)]
并将其传递给一个“多处理”的函数。 假设 Matrices 是从原始数组中提取的所有矩阵的列表,而 myfunc 是执行计算的函数,则代码看起来有点像这样
import multiprocessing
import numpy as np
from itertools import izip
def myfunc(finput):
# Do some calculations...
...
# ... and return the result and the index:
return (result, finput[1])
# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)
# Make function input from the matrices and the indices:
finput = izip(matrices, inds)
pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))
:就像map_async
实际上首先创建了这个巨大的finput
列表:我的CPU没有做太多事情,但是内存和交换区在几秒钟内就被完全消耗掉了,这是显然不是我想要的。
有没有办法将这个庞大的列表传递给多处理函数,而无需先显式创建它? 或者你知道解决这个问题的另一种方法吗?
非常感谢! :-)
I have a 256x256x256
Numpy array, in which each element is a matrix. I need to do some calculations on each of these matrices, and I want to use the multiprocessing
module to speed things up.
The results of these calculations must be stored in a 256x256x256
array like the original one, so that the result of the matrix at element [i,j,k]
in the original array must be put in the [i,j,k]
element of the new array.
To do this, I want to make a list which could be written in a pseudo-ish way as [array[i,j,k], (i, j, k)]
and pass it to a function to be "multiprocessed".
Assuming that matrices
is a list of all the matrices extracted from the original array and myfunc
is the function doing the calculations, the code would look somewhat like this:
import multiprocessing
import numpy as np
from itertools import izip
def myfunc(finput):
# Do some calculations...
...
# ... and return the result and the index:
return (result, finput[1])
# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)
# Make function input from the matrices and the indices:
finput = izip(matrices, inds)
pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))
However, it seems like map_async
is actually creating this huge finput
-list first: My CPU's aren't doing much, but the memory and swap get completely consumed in a matter of seconds, which is obviously not what I want.
Is there a way to pass this huge list to a multiprocessing function without the need to explicitly create it first?
Or do you know another way of solving this problem?
Thanks a bunch! :-)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
所有
multiprocessing.Pool.map*
方法完全消耗迭代器(演示代码) 一旦调用该函数。要一次向迭代器的映射函数块提供一个块,请使用grouper_nofill
:PS。 pool.map_async 的 chunksize 参数做了一些不同的事情:它将迭代器分成块,然后将每个块提供给调用 map(func,chunk) 的工作进程)。如果
func(item)
完成得太快,这可以为工作进程提供更多数据来咀嚼,但它对您的情况没有帮助,因为迭代器在map_async< 之后仍然会立即完全消耗/code> 呼叫已发出。
All
multiprocessing.Pool.map*
methods consume iterators fully(demo code) as soon as the function is called. To feed the map function chunks of the iterator one chunk at a time, usegrouper_nofill
:PS.
pool.map_async
'schunksize
parameter does something different: It breaks the iterable into chunks, then gives each chunk to a worker process which callsmap(func,chunk)
. This can give the worker process more data to chew on iffunc(item)
finishes too quickly, but it does not help in your situation since the iterator still gets consumed fully immediately after themap_async
call is issued.我也遇到了这个问题。而不是这样:
imap
不消耗它!
I ran into this problem as well. instead of this:
do
imap doesn't consume it!
Pool.map_async()
需要知道可迭代的长度才能将工作分派给多个工作人员。由于izip
没有__len__
,它首先将iterable转换为列表,导致您遇到巨大的内存使用量。您可以尝试通过使用
__len__
创建您自己的izip
样式迭代器来避免此问题。Pool.map_async()
needs to know the length of the iterable to dispatch the work to multiple workers. Sinceizip
has no__len__
, it converts the iterable into a list first, causing the huge memory usage you are experiencing.You could try to sidestep this by creating your own
izip
-style iterator with__len__
.