使用多处理时出现 PicklingError
我在多处理模块中使用 Pool.map_async()
(以及 Pool.map()
)时遇到问题。我已经实现了一个并行 for 循环函数,只要 Pool.map_async
的函数输入是“常规”函数,该函数就可以正常工作。当函数是例如类的方法时,我会得到一个PicklingError:
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我仅使用Python进行科学计算,所以我对pickling的概念不太熟悉,刚刚了解了一些今天。我看过之前的几个答案,例如 可以't pickle <类型'instancemethod'>当使用 multiprocessing Pool.map() 时,但我无法弄清楚如何使其工作,即使按照答案中提供的链接也是如此。
我的代码,其目标是使用多个核心来模拟普通 rv 的向量。请注意,这只是一个示例,也许在多个内核上运行甚至没有回报。
import multiprocessing as mp
import scipy as sp
import scipy.stats as spstat
def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None):
"""
Purpose: Evaluate function using Multiple cores.
Input:
func - Function to evaluate in parallel
arg - Array of arguments to evaluate func(arg)
static_arg - The "static" argument (if any), i.e. the variables that are constant in the evaluation of func.
nWorkers - Number of Workers to process computations.
Output:
func(i, static_arg) for i in args.
"""
# Prepare arguments for func: Collect arguments with static argument (if any)
if static_arg != None:
arguments = [[arg] + static_arg for arg in list(args)]
else:
arguments = args
# Initialize workers
pool = mp.Pool(processes = nWorkers)
# Evaluate function
result = pool.map_async(func, arguments, chunksize = chunksize)
pool.close()
pool.join()
return sp.array(result.get()).flatten()
# First test-function. Freeze location and scale for the Normal random variates generator.
# This returns a function that is a method of the class Norm_gen. Methods cannot be pickled
# so this will give an error.
def genNorm(loc, scale):
def subfunc(a):
return spstat.norm.rvs(loc = loc, scale = scale, size = a)
return subfunc
# Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be
# pickled
def test(fargs):
x, a, b = fargs
return spstat.norm.rvs(size = x, loc = a, scale = b)
# Try it out.
N = 1000000
# Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each
# element in the output vector.
args1 = sp.ones(N)
static_arg = [0, 1] # standarized normal.
# This gives the PicklingError
func = genNorm(*static_arg)
sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None)
# This is OK:
func = test
sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None)
按照中问题的答案中提供的链接可以't pickle <类型'instancemethod'>当使用多处理 Pool.map() 时,Steven Bethard(几乎在最后)建议使用 copy_reg
模块。他的代码是:
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
import copy_reg
import types
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
我真的不明白如何利用它。我唯一能想到的就是把它放在我的代码之前,但这没有帮助。一种简单的解决方案当然是选择有效的解决方案,并避免涉及 copy_reg
。我更感兴趣的是让 copy_reg
正常工作以充分利用多处理,而不必每次都解决问题。
I am having trouble when using the Pool.map_async()
(and also Pool.map()
) in the multiprocessing module. I have implemented a parallel-for-loop function that works fine as long as the function input to Pool.map_async
is a "regular" function. When the function is e.g. a method to a class, then I get a PicklingError
:
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
I use Python only for scientific computing so I am not so familiar with the concept of pickling, have just learned a bit about it today. I have looked at a couple of previous answers, like Can't pickle <type 'instancemethod'> when using multiprocessing Pool.map(), but I cannot figure out how to make it work, even when following the link provided in the answer.
My code, where the objective is to simulate a vector of Normal r.v's with the use of multiple cores. Note that this is just an example and maybe it does not even payoff to run on multiple cores.
import multiprocessing as mp
import scipy as sp
import scipy.stats as spstat
def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None):
"""
Purpose: Evaluate function using Multiple cores.
Input:
func - Function to evaluate in parallel
arg - Array of arguments to evaluate func(arg)
static_arg - The "static" argument (if any), i.e. the variables that are constant in the evaluation of func.
nWorkers - Number of Workers to process computations.
Output:
func(i, static_arg) for i in args.
"""
# Prepare arguments for func: Collect arguments with static argument (if any)
if static_arg != None:
arguments = [[arg] + static_arg for arg in list(args)]
else:
arguments = args
# Initialize workers
pool = mp.Pool(processes = nWorkers)
# Evaluate function
result = pool.map_async(func, arguments, chunksize = chunksize)
pool.close()
pool.join()
return sp.array(result.get()).flatten()
# First test-function. Freeze location and scale for the Normal random variates generator.
# This returns a function that is a method of the class Norm_gen. Methods cannot be pickled
# so this will give an error.
def genNorm(loc, scale):
def subfunc(a):
return spstat.norm.rvs(loc = loc, scale = scale, size = a)
return subfunc
# Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be
# pickled
def test(fargs):
x, a, b = fargs
return spstat.norm.rvs(size = x, loc = a, scale = b)
# Try it out.
N = 1000000
# Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each
# element in the output vector.
args1 = sp.ones(N)
static_arg = [0, 1] # standarized normal.
# This gives the PicklingError
func = genNorm(*static_arg)
sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None)
# This is OK:
func = test
sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None)
Following the link provided in the answer to the question in Can't pickle <type 'instancemethod'> when using multiprocessing Pool.map(), Steven Bethard (almost at the end) suggests using the copy_reg
module. His code is:
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
import copy_reg
import types
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
I don't really understand how I can make use of this. The only thing I could come up with was putting it just before my code but it did not help. A simple solution is of course to just go with the one that works and avoid getting involved with copy_reg
. I am more interested in getting copy_reg
to work properly to take fully advantage of multiprocessing without having to go around the problem each time.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这里的问题与其说是“pickle”错误消息,不如说是概念性的:
多进程确实将您的代码分叉到“worker”不同的进程中以便执行
它的魔力。
然后,它通过无缝序列化和反序列化数据(即使用 pickle 的部分)将数据发送到不同的进程或从不同的进程发送数据。
当来回传递的部分数据是一个函数时 - 它假设被调用者进程中存在一个具有相同名称的函数,并且(我猜)将函数名称作为字符串传递。由于函数是无状态的,因此被调用的工作进程仅使用其收到的数据调用同一函数。
(Python 函数无法通过 pickle 序列化,因此仅在主进程和工作进程之间传递引用)
当您的函数是实例中的方法时 - 尽管当我们编写 python 代码时,它与函数非常相似,使用“自动”
self
变量,其底层并不相同。因为实例(对象)是有状态的。这意味着工作进程没有您要在另一端调用的方法的所有者的对象副本。解决将方法作为函数传递给 map_async 调用的方法也不起作用 - 因为多进程仅使用函数引用,而不是传递它时的实际函数。
因此,您应该 (1) 更改代码,以便将函数(而不是方法)传递给工作进程,将对象保留的任何状态转换为要调用的新参数。
(2) 为map_async调用创建一个“目标”函数,该函数在工作进程端重建所需的对象,然后调用其中的函数。 Python 中最简单的类本身是可以选择的,因此您可以在 map_async 调用上传递函数所有者本身的对象 - 并且“目标”函数将在工作端调用适当的方法本身。
(2)可能听起来“困难”,但它可能只是这样 - 除非你的对象的类不能被腌制:
*免责声明:我还没有测试过这个
The problem here is less of the "pickle" error message than conceptual:
multiprocess does fork your code in "worker" different processes in order to perform
its magic.
It then sends data to and from the different process by seamlessly serializing and de-serializing the data (that is the part that uses the pickle).
When part of the data passed back and forth is a function - it assumes a function with the same name exists in the callee process, and (I guess) passes the function name, as a string. Since functions are stateless, the called worker-process just calls that same function with the data it has received.
(Python functions can't be serialized through pickle, so just the reference is passed between the master and the worker processes)
When your function is a method in an instance - although when we code python it is much like the same thing as a function, with an "automatic"
self
variable, it is not the same underneath. Because instances (objects) are stateful. That means the worker process does not have a copy of the object that is the owner of the method you want to call on the other side.Working around ways of passing your method as a function to the map_async call won't work either - as multiprocess just uses a function reference, not the actual function when passing it around.
So, you should (1) either change your code so that you do pass a function - and not a method - to the worker processes, converting whatever states the object keeps to new parameters to be called.
(2) Create a "target" function for the map_async call that reconstructs the needed object on the worker-process side, and then calls the function inside it. Most straightforward classes in Python are pickable themselves, so you could pass the object that is the function owner itself on the map_async call - and the "target" function would call the appropriate method itself on the worker side.
(2) may sound "difficult" but it is probably just something like this - unless your object's class can't be pickled:
*disclaimer: I haven't tested this