多重处理:如何在类中定义的函数上使用 Pool.map?
当我运行类似的东西时:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
它工作正常。但是,将其作为类的函数:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
给我以下错误:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我看过 Alex Martelli 处理同类问题的帖子,但它不够明确。
When I run something like:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
it works fine. However, putting this as a function of a class:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
Gives me the following error:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
I've seen a post from Alex Martelli dealing with the same kind of problem, but it wasn't explicit enough.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(20)
我无法使用到目前为止发布的代码,因为使用“multiprocessing.Pool”的代码不适用于 lambda 表达式,并且不使用“multiprocessing.Pool”的代码会产生与工作项一样多的进程。
我修改了代码,它会生成预定义数量的工作人员,并且仅在存在空闲工作人员时才迭代输入列表。我还为工作人员启用了“守护进程”模式 st ctrl-c 按预期工作。
I could not use the code posted so far because code using "multiprocessing.Pool" do not work with lambda expressions and code not using "multiprocessing.Pool" spawn as many processes as there are work items.
I adapted the code s.t. it spawns a predefined amount of workers and only iterates through the input list if there exists an idle worker. I also enabled the "daemon" mode for the workers s.t. ctrl-c works as expected.
除非您跳出标准库,否则多重处理和酸洗会被破坏和限制。
如果您使用名为
pathos.multiprocesssing
的multiprocessing
分支,则可以直接在 multiprocessing 的map
函数中使用类和类方法。这是因为使用dill
代替pickle
或cPickle
,并且dill
几乎可以序列化 python 中的任何内容。pathos.multiprocessing
还提供了一个异步映射函数...并且它可以map
具有多个参数的函数(例如map(math.pow, [1,2,3] , [4,5,6])
)请参阅讨论:
multiprocessing 和 dill 可以一起做什么?
以及:
http://matthewrocklin.com/blog/work/2013/12 /05/Parallelism-and-Serialization
它甚至可以处理您最初编写的代码,无需修改,并且来自解释器。为什么还要做其他更脆弱且特定于单个案例的事情呢?
在这里获取代码:
https://github.com/uqfoundation/pathos
而且,只是为了展示更多内容可以做:
Multiprocessing and pickling is broken and limited unless you jump outside the standard library.
If you use a fork of
multiprocessing
calledpathos.multiprocesssing
, you can directly use classes and class methods in multiprocessing'smap
functions. This is becausedill
is used instead ofpickle
orcPickle
, anddill
can serialize almost anything in python.pathos.multiprocessing
also provides an asynchronous map function… and it canmap
functions with multiple arguments (e.g.map(math.pow, [1,2,3], [4,5,6])
)See discussions:
What can multiprocessing and dill do together?
and:
http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
It even handles the code you wrote initially, without modification, and from the interpreter. Why do anything else that's more fragile and specific to a single case?
Get the code here:
https://github.com/uqfoundation/pathos
And, just to show off a little more of what it can do:
我还对 pool.map 可以接受的函数类型的限制感到恼火。我写了以下内容来规避这个问题。即使对于 parmap 的递归使用,它似乎也有效。
I also was annoyed by restrictions on what sort of functions pool.map could accept. I wrote the following to circumvent this. It appears to work, even for recursive use of parmap.
据我所知,目前没有解决您的问题的方法:您提供给
map()
的函数必须可以通过导入模块来访问。这就是 Robert 的代码起作用的原因:函数f()
可以通过导入以下代码来获得:我实际上添加了一个“main”部分,因为它遵循 针对 Windows 平台的建议(“确保主模块可以由新的 Python 解释器安全地导入,而不会导致意外的副作用的影响”)。
我还在
Calculate
前面添加了一个大写字母,以便遵循There is currently no solution to your problem, as far as I know: the function that you give to
map()
must be accessible through an import of your module. This is why robert's code works: the functionf()
can be obtained by importing the following code:I actually added a "main" section, because this follows the recommendations for the Windows platform ("Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects").
I also added an uppercase letter in front of
Calculate
, so as to follow PEP 8. :)mrule 的解决方案是正确的,但有一个错误:如果子进程发回大量数据,它可以填充管道的缓冲区,阻塞子进程的
pipe.send()
,而父进程则等待子进程在pipe.join()
上退出。解决方案是在join()
子级之前读取子级的数据。此外,子进程应该关闭父进程的管道端以防止死锁。下面的代码解决了这个问题。另请注意,此parmap
为X
中的每个元素创建一个进程。更高级的解决方案是使用multiprocessing.cpu_count()
将X
分成多个块,然后在返回之前合并结果。我将其作为练习留给读者,以免破坏 mrule 的好答案的简洁性。 ;)The solution by mrule is correct but has a bug: if the child sends back a large amount of data, it can fill the pipe's buffer, blocking on the child's
pipe.send()
, while the parent is waiting for the child to exit onpipe.join()
. The solution is to read the child's data beforejoin()
ing the child. Furthermore the child should close the parent's end of the pipe to prevent a deadlock. The code below fixes that. Also be aware that thisparmap
creates one process per element inX
. A more advanced solution is to usemultiprocessing.cpu_count()
to divideX
into a number of chunks, and then merge the results before returning. I leave that as an exercise to the reader so as not to spoil the conciseness of the nice answer by mrule. ;)我也曾为此苦苦挣扎。我将函数作为类的数据成员,作为一个简化的示例:
我需要在同一个类中的 Pool.map() 调用中使用函数 self.f ,而 self.f 没有将元组作为参数。由于此函数嵌入在类中,因此我不清楚如何编写其他答案建议的包装器类型。
我通过使用不同的包装器解决了这个问题,该包装器采用元组/列表,其中第一个元素是函数,其余元素是该函数的参数,称为 eval_func_tuple(f_args)。使用此功能,有问题的行可以替换为 return pool.map(eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2))。以下是完整代码:
文件:util.py
文件:main.py
运行 main.py 将给出 [11, 22, 33]。请随意改进这一点,例如 eval_func_tuple 也可以修改为采用关键字参数。
另一方面,在另一个答案中,对于进程数多于可用 CPU 数的情况,可以使函数“parmap”更加高效。我正在复制下面的编辑版本。这是我的第一篇文章,我不确定是否应该直接编辑原始答案。我还重命名了一些变量。
I've also struggled with this. I had functions as data members of a class, as a simplified example:
I needed to use the function self.f in a Pool.map() call from within the same class and self.f did not take a tuple as an argument. Since this function was embedded in a class, it was not clear to me how to write the type of wrapper other answers suggested.
I solved this problem by using a different wrapper that takes a tuple/list, where the first element is the function, and the remaining elements are the arguments to that function, called eval_func_tuple(f_args). Using this, the problematic line can be replaced by return pool.map(eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2)). Here is the full code:
File: util.py
File: main.py
Running main.py will give [11, 22, 33]. Feel free to improve this, for example eval_func_tuple could also be modified to take keyword arguments.
On another note, in another answers, the function "parmap" can be made more efficient for the case of more Processes than number of CPUs available. I'm copying an edited version below. This is my first post and I wasn't sure if I should directly edit the original answer. I also renamed some variables.
我知道这个问题是 8 年零 10 个月前提出的,但我想向您展示我的解决方案:
您只需将类函数变成静态方法即可。但也可以使用类方法:
在 Python 3.7.3 中测试
I know that this question was asked 8 years and 10 months ago but I want to present you my solution:
You just need to make your class function into a static method. But it's also possible with a class method:
Tested in Python 3.7.3
我知道这是 6 年前提出的问题,但只是想添加我的解决方案,因为上面的一些建议看起来非常复杂,但我的解决方案实际上非常简单。
我所要做的就是将 pool.map() 调用包装到辅助函数中。将类对象和方法的参数作为元组传递,看起来有点像这样。
I know this was asked over 6 years ago now, but just wanted to add my solution, as some of the suggestions above seem horribly complicated, but my solution was actually very simple.
All I had to do was wrap the pool.map() call to a helper function. Passing the class object along with args for the method as a tuple, which looked a bit like this.
我采纳了 klaus se 和 aganders3 的答案,并制作了一个更具可读性并保存在一个文件中的文档化模块。您只需将其添加到您的项目中即可。它甚至还有一个可选的进度条!
编辑:添加了@alexander-mcfarlane建议和测试功能
I took klaus se's and aganders3's answer, and made a documented module that is more readable and holds in one file. You can just add it to your project. It even has an optional progress bar !
EDIT: Added @alexander-mcfarlane suggestion and a test function
在类中定义的函数(甚至在类中的函数内)并不能真正进行pickle。然而,这有效:
Functions defined in classes (even within functions within classes) don't really pickle. However, this works:
我修改了 klaus se 的方法,因为虽然它对我来说适用于小列表,但当项目数量约为 1000 或更多时,它会挂起。我没有使用
None
停止条件一次推送一个作业,而是一次性加载所有输入队列,然后让进程不断地处理它,直到它为空。编辑:不幸的是,现在我在系统上遇到了这个错误: 多处理队列最大大小限制为 32767 ,希望那里的解决方法会有所帮助。
I modified klaus se's method because while it was working for me with small lists, it would hang when the number of items was ~1000 or greater. Instead of pushing the jobs one at a time with the
None
stop condition, I load up the input queue all at once and just let the processes munch on it until it's empty.Edit: unfortunately now I am running into this error on my system: Multiprocessing Queue maxsize limit is 32767, hopefully the workarounds there will help.
这是我的解决方案,我认为它比这里的大多数其他解决方案要简单一些。这与夜猫子的答案类似。
Here is my solution, which I think is a bit less hackish than most others here. It is similar to nightowl's answer.
如果您以某种方式手动忽略类中对象列表中的
Pool
对象,则可以毫无问题地运行代码,因为它不能像错误所述那样pickle
。您可以使用__getstate__
函数来执行此操作(查看 这里)如下。Pool
对象将尝试查找__getstate__
和__setstate__
函数,并在运行map
、map_async
等:然后 do:
会给你输出:
我已经在 Python 3.x 中测试了上面的代码,它可以工作。
You can run your code without any issues if you somehow manually ignore the
Pool
object from the list of objects in the class because it is notpickle
able as the error says. You can do this with the__getstate__
function (look here too) as follow. ThePool
object will try to find the__getstate__
and__setstate__
functions and execute them if it finds it when you runmap
,map_async
etc:Then do:
will give you the output:
I've tested the above code in Python 3.x and it works.
这可能不是一个很好的解决方案,但就我而言,我是这样解决的。
我必须将 self 传递给我的函数,因为我必须通过该函数访问类的属性和函数。这对我有用。随时欢迎指正和建议。
This may not be a very good solution but in my case, I solve it like this.
I had to pass
self
to my function as I have to access attributes and functions of my class through that function. This is working for me. Corrections and suggestions are always welcome.这是我为在 python3 中使用多处理池而编写的样板,特别是使用 python3.7.7 来运行测试。我使用
imap_unordered
获得了最快的运行速度。只需插入您的场景并尝试一下即可。您可以使用timeit
或仅使用time.time()
来找出最适合您的方法。在上面的场景中,
imap_unordered
实际上对我来说似乎表现最差。尝试您的案例并在您计划运行它的机器上对其进行基准测试。另请阅读进程池。干杯!Here is a boilerplate I wrote for using multiprocessing Pool in python3, specifically python3.7.7 was used to run the tests. I got my fastest runs using
imap_unordered
. Just plug in your scenario and try it out. You can usetimeit
or justtime.time()
to figure out which works best for you.In the above scenario
imap_unordered
actually seems to perform the worst for me. Try out your case and benchmark it on the machine you plan to run it on. Also read up on Process Pools. Cheers!我不确定是否已采取这种方法,但我正在使用的解决方法是:
输出应该是:
I'm not sure if this approach has been taken but a work around i'm using is:
Output should be:
您可能希望将此函数应用于该类的每个不同实例。那么这也是解决方案
There is a possibility that you would want to apply this function for each different instance of the class. Then here is the solution for that also
来自http://www.rueckstiess.net/research/snippets/show/ca1d7d90 和 http://qingkaikong.blogspot.com /2016/12/python-parallel-method-in-class.html
我们可以创建一个外部函数并使用类 self 对象为其播种:
或者不使用 joblib:
From http://www.rueckstiess.net/research/snippets/show/ca1d7d90 and http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html
We can make an external function and seed it with the class self object:
OR without joblib:
要在 aws lambda 中实现多处理,我们有两种方法。
注意:线程池在 aws lambda 中不起作用
使用 aws 团队提供的示例解决方案
请使用此链接 https://aws .amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/
使用此包https://pypi.org/project/lambda-multiprocessing/
我有使用这两个解决方案实现了我的 lambda 函数,并且两者都工作正常,无法在这里分享我的代码,但这两个链接肯定会对您有所帮助。
我发现第二种方法更容易实现。
To implement multiprocessing in aws lambda we have two ways.
Note : Threadpool doesn't work in aws lambda
use the example solution which is provided by aws team
please use this link https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/
use this package https://pypi.org/project/lambda-multiprocessing/
i have implemented my lambda function with both the solution and both is working fine can't share my code here but this 2 links will help you for sure.
i find 2 nd way more easy to implement.
还有一些库可以使这变得更容易,例如
autothread
(仅适用于 Python 3.6 及更高版本):您还可以查看 lox.
There are also some libraries to make this easier, for example
autothread
(only for Python 3.6 and up):You can also take a look at lox.