具有多处理功能的 Python 装饰器失败

发布于 2025-01-06 22:29:50 字数 1438 浏览 0 评论 0原文

我想在一个函数上使用装饰器,然后将其传递给多处理池。但是,代码失败并显示“PicklingError: Can't pickle : attribute Lookup __builtin__.function failed”。我不太明白为什么它在这里失败。我确信这很简单,但我找不到它。下面是一个最小的“工作”示例。我认为使用 functools 函数就足以让这个工作完成。

如果我注释掉函数装饰,它就可以正常工作。我在这里误解了多处理是什么?有什么办法可以让这个工作吗?

编辑:添加可调用类装饰器函数装饰器后,函数装饰器按预期工作。可调用类装饰器继续失败。可调用类版本有什么问题可以防止它被腌制?

import random
import multiprocessing
import functools

class my_decorator_class(object):
    def __init__(self, target):
        self.target = target
        try:
            functools.update_wrapper(self, target)
        except:
            pass

    def __call__(self, elements):
        f = []
        for element in elements:
            f.append(self.target([element])[0])
        return f

def my_decorator_function(target):
    @functools.wraps(target)
    def inner(elements):
        f = []
        for element in elements:
            f.append(target([element])[0])
        return f
    return inner

@my_decorator_function
def my_func(elements):
    f = []
    for element in elements:
        f.append(sum(element))
    return f

if __name__ == '__main__':
    elements = [[random.randint(0, 9) for _ in range(5)] for _ in range(10)]
    pool = multiprocessing.Pool(processes=4)
    results = [pool.apply_async(my_func, ([e],)) for e in elements]
    pool.close()
    f = [r.get()[0] for r in results]
    print(f)

I would like to use a decorator on a function that I will subsequently pass to a multiprocessing pool. However, the code fails with "PicklingError: Can't pickle : attribute lookup __builtin__.function failed". I don't quite see why it fails here. I feel certain that it's something simple, but I can't find it. Below is a minimal "working" example. I thought that using the functools function would be enough to let this work.

If I comment out the function decoration, it works without an issue. What is it about multiprocessing that I'm misunderstanding here? Is there any way to make this work?

Edit: After adding both a callable class decorator and a function decorator, it turns out that the function decorator works as expected. The callable class decorator continues to fail. What is it about the callable class version that keeps it from being pickled?

import random
import multiprocessing
import functools

class my_decorator_class(object):
    def __init__(self, target):
        self.target = target
        try:
            functools.update_wrapper(self, target)
        except:
            pass

    def __call__(self, elements):
        f = []
        for element in elements:
            f.append(self.target([element])[0])
        return f

def my_decorator_function(target):
    @functools.wraps(target)
    def inner(elements):
        f = []
        for element in elements:
            f.append(target([element])[0])
        return f
    return inner

@my_decorator_function
def my_func(elements):
    f = []
    for element in elements:
        f.append(sum(element))
    return f

if __name__ == '__main__':
    elements = [[random.randint(0, 9) for _ in range(5)] for _ in range(10)]
    pool = multiprocessing.Pool(processes=4)
    results = [pool.apply_async(my_func, ([e],)) for e in elements]
    pool.close()
    f = [r.get()[0] for r in results]
    print(f)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

蓝海 2025-01-13 22:29:50

问题是 pickle 需要有某种方法来重新组装你 pickle 的所有东西。请参阅此处以获取可腌制的内容的列表:

http://docs.python.org/library/pickle.html#what-c​​an-be-pickled-and-unpickled

当 pickle my_func 时,需要对以下组件进行 pickle:

  • my_decorator_class 的实例,称为 my_func

    这很好。 Pickle 将存储类的名称并腌制其 __dict__ 内容。当unpickle时,它使用名称来查找类,然后创建一个实例并填充__dict__内容。但是,__dict__ 内容存在问题......

  • 存储在 my_func.target 中的原始 my_func 实例。

    这不太好。它是顶层的函数,通常可以对它们进行 pickle。 Pickle 将存储函数的名称。然而,问题是名称“my_func”不再绑定到未修饰的函数,而是绑定到已修饰的函数。这意味着 pickle 将无法查找未修饰的函数来重新创建对象。遗憾的是,pickle 没有任何方法知道它尝试 pickle 的对象始终可以在名称 __main__.my_func 下找到。

您可以像这样更改它,它会起作用:

import random
import multiprocessing
import functools

class my_decorator(object):
    def __init__(self, target):
        self.target = target
        try:
            functools.update_wrapper(self, target)
        except:
            pass

    def __call__(self, candidates, args):
        f = []
        for candidate in candidates:
            f.append(self.target([candidate], args)[0])
        return f

def old_my_func(candidates, args):
    f = []
    for c in candidates:
        f.append(sum(c))
    return f

my_func = my_decorator(old_my_func)

if __name__ == '__main__':
    candidates = [[random.randint(0, 9) for _ in range(5)] for _ in range(10)]
    pool = multiprocessing.Pool(processes=4)
    results = [pool.apply_async(my_func, ([c], {})) for c in candidates]
    pool.close()
    f = [r.get()[0] for r in results]
    print(f)

您已经观察到,当类不起作用时,装饰器函数起作用。我相信这是因为 functools.wraps 修改了修饰函数,以便它具有它所包装的函数的名称和其他属性。据 pickle 模块所知,它与普通的顶级函数没有区别,因此它通过存储其名称来对其进行 pickle。取消腌制后,名称将绑定到装饰函数,因此一切正常。

The problem is that pickle needs to have some way to reassemble everything that you pickle. See here for a list of what can be pickled:

http://docs.python.org/library/pickle.html#what-can-be-pickled-and-unpickled

When pickling my_func, the following components need to be pickled:

  • An instance of my_decorator_class, called my_func.

    This is fine. Pickle will store the name of the class and pickle its __dict__ contents. When unpickling, it uses the name to find the class, then creates an instance and fills in the __dict__ contents. However, the __dict__ contents present a problem...

  • The instance of the original my_func that's stored in my_func.target.

    This isn't so good. It's a function at the top-level, and normally these can be pickled. Pickle will store the name of the function. The problem, however, is that the name "my_func" is no longer bound to the undecorated function, it's bound to the decorated function. This means that pickle won't be able to look up the undecorated function to recreate the object. Sadly, pickle doesn't have any way to know that object it's trying to pickle can always be found under the name __main__.my_func.

You can change it like this and it will work:

import random
import multiprocessing
import functools

class my_decorator(object):
    def __init__(self, target):
        self.target = target
        try:
            functools.update_wrapper(self, target)
        except:
            pass

    def __call__(self, candidates, args):
        f = []
        for candidate in candidates:
            f.append(self.target([candidate], args)[0])
        return f

def old_my_func(candidates, args):
    f = []
    for c in candidates:
        f.append(sum(c))
    return f

my_func = my_decorator(old_my_func)

if __name__ == '__main__':
    candidates = [[random.randint(0, 9) for _ in range(5)] for _ in range(10)]
    pool = multiprocessing.Pool(processes=4)
    results = [pool.apply_async(my_func, ([c], {})) for c in candidates]
    pool.close()
    f = [r.get()[0] for r in results]
    print(f)

You have observed that the decorator function works when the class does not. I believe this is because functools.wraps modifies the decorated function so that it has the name and other properties of the function it wraps. As far as the pickle module can tell, it is indistinguishable from a normal top-level function, so it pickles it by storing its name. Upon unpickling, the name is bound to the decorated function so everything works out.

情丝乱 2025-01-13 22:29:50

我在多处理中使用装饰器也遇到了一些问题。我不确定这是否与您的问题相同:

我的代码看起来像这样:

from multiprocessing import Pool

def decorate_func(f):
    def _decorate_func(*args, **kwargs):
        print "I'm decorating"
        return f(*args, **kwargs)
    return _decorate_func

@decorate_func
def actual_func(x):
    return x ** 2

my_swimming_pool = Pool()
result = my_swimming_pool.apply_async(actual_func,(2,))
print result.get()

当我运行代码时,我得到这个:

Traceback (most recent call last):
  File "test.py", line 15, in <module>
    print result.get()
  File "somedirectory_too_lengthy_to_put_here/lib/python2.7/multiprocessing/pool.py", line 572, in get
    raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我通过定义一个新函数将函数包装在装饰器函数中来修复它,而不是使用装饰器语法

from multiprocessing import Pool

def decorate_func(f):
    def _decorate_func(*args, **kwargs):
        print "I'm decorating"
        return f(*args, **kwargs)
    return _decorate_func

def actual_func(x):
    return x ** 2

def wrapped_func(*args, **kwargs):
    return decorate_func(actual_func)(*args, **kwargs)

my_swimming_pool = Pool()
result = my_swimming_pool.apply_async(wrapped_func,(2,))
print result.get()

代码运行完美,我得到:

I'm decorating
4

我在 Python 方面不是很有经验,但是这个解决方案为我解决了我的问题

I also had some problem using decorators in multiprocessing. I'm not sure if it's the same problem as yours:

My code looked like this:

from multiprocessing import Pool

def decorate_func(f):
    def _decorate_func(*args, **kwargs):
        print "I'm decorating"
        return f(*args, **kwargs)
    return _decorate_func

@decorate_func
def actual_func(x):
    return x ** 2

my_swimming_pool = Pool()
result = my_swimming_pool.apply_async(actual_func,(2,))
print result.get()

and when I run the code I get this:

Traceback (most recent call last):
  File "test.py", line 15, in <module>
    print result.get()
  File "somedirectory_too_lengthy_to_put_here/lib/python2.7/multiprocessing/pool.py", line 572, in get
    raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

I fixed it by defining a new function to wrap the function in the decorator function, instead of using the decorator syntax

from multiprocessing import Pool

def decorate_func(f):
    def _decorate_func(*args, **kwargs):
        print "I'm decorating"
        return f(*args, **kwargs)
    return _decorate_func

def actual_func(x):
    return x ** 2

def wrapped_func(*args, **kwargs):
    return decorate_func(actual_func)(*args, **kwargs)

my_swimming_pool = Pool()
result = my_swimming_pool.apply_async(wrapped_func,(2,))
print result.get()

The code ran perfectly and I got:

I'm decorating
4

I'm not very experienced at Python, but this solution solved my problem for me

紫南 2025-01-13 22:29:50

如果你太想要装饰器(像我一样),你也可以在函数字符串上使用 exec() 命令,来避免提到的酸洗。

我希望能够将所有参数传递给原始函数,然后连续使用它们。以下是我的代码。

首先,我创建了一个 make_functext() 函数来将目标函数对象转换为字符串。为此,我使用了 inspect 模块中的 getsource() 函数(请参阅文档 此处并注意它无法从编译代码等中检索源代码)。它是这样的:

from inspect import getsource

def make_functext(func):
    ft = '\n'.join(getsource(func).split('\n')[1:]) # Removing the decorator, of course
    ft = ft.replace(func.__name__, 'func')          # Making function callable with 'func'
    ft = ft.replace('#§ ', '').replace('#§', '')    # For using commented code starting with '#§'
    ft = ft.strip()                                 # In case the function code was indented
    return ft

它在以下将成为进程目标的 _worker() 函数中使用:

def _worker(functext, args):
    scope = {}               # This is needed to keep executed definitions
    exec(functext, scope)
    scope['func'](args)      # Using func from scope

最后,这是我的装饰器:

from multiprocessing import Process 

def parallel(num_processes, **kwargs):
    def parallel_decorator(func, num_processes=num_processes):
        functext = make_functext(func)
        print('This is the parallelized function:\n', functext)
        def function_wrapper(funcargs, num_processes=num_processes):
            workers = []
            print('Launching processes...')
            for k in range(num_processes):
                p = Process(target=_worker, args=(functext, funcargs[k])) # use args here
                p.start()
                workers.append(p)
        return function_wrapper
    return parallel_decorator

最终可以通过定义如下函数来使用代码:

@parallel(4)
def hello(args):
    #§ from time import sleep     # use '#§' to avoid unnecessary (re)imports in main program
    name, seconds = tuple(args)   # unpack args-list here
    sleep(seconds)
    print('Hi', name)

...现在可以这样调用:

hello([['Marty', 0.5],
       ['Catherine', 0.9],
       ['Tyler', 0.7],
       ['Pavel', 0.3]])

...输出:

This is the parallelized function:
 def func(args):
        from time import sleep
        name, seconds = tuple(args)
        sleep(seconds)
        print('Hi', name)
Launching processes...
Hi Pavel
Hi Marty
Hi Tyler
Hi Catherine

感谢您的阅读,这是我的第一篇文章。如果您发现任何错误或不好的做法,请随时发表评论。我知道这些字符串转换非常脏,但是......

If you want the decorators too bad (like me), you can also use the exec() command on the function string, to circumvent the mentioned pickling.

I wanted to be able to pass all the arguments to an original function and then use them successively. The following is my code for it.

At first, I made a make_functext() function to convert the target function object to a string. For that, I used the getsource() function from the inspect module (see doctumentation here and note that it can't retrieve source code from compiled code etc.). Here it is:

from inspect import getsource

def make_functext(func):
    ft = '\n'.join(getsource(func).split('\n')[1:]) # Removing the decorator, of course
    ft = ft.replace(func.__name__, 'func')          # Making function callable with 'func'
    ft = ft.replace('#§ ', '').replace('#§', '')    # For using commented code starting with '#§'
    ft = ft.strip()                                 # In case the function code was indented
    return ft

It is used in the following _worker() function that will be the target of the processes:

def _worker(functext, args):
    scope = {}               # This is needed to keep executed definitions
    exec(functext, scope)
    scope['func'](args)      # Using func from scope

And finally, here's my decorator:

from multiprocessing import Process 

def parallel(num_processes, **kwargs):
    def parallel_decorator(func, num_processes=num_processes):
        functext = make_functext(func)
        print('This is the parallelized function:\n', functext)
        def function_wrapper(funcargs, num_processes=num_processes):
            workers = []
            print('Launching processes...')
            for k in range(num_processes):
                p = Process(target=_worker, args=(functext, funcargs[k])) # use args here
                p.start()
                workers.append(p)
        return function_wrapper
    return parallel_decorator

The code can finally be used by defining a function like this:

@parallel(4)
def hello(args):
    #§ from time import sleep     # use '#§' to avoid unnecessary (re)imports in main program
    name, seconds = tuple(args)   # unpack args-list here
    sleep(seconds)
    print('Hi', name)

... which can now be called like this:

hello([['Marty', 0.5],
       ['Catherine', 0.9],
       ['Tyler', 0.7],
       ['Pavel', 0.3]])

... which outputs:

This is the parallelized function:
 def func(args):
        from time import sleep
        name, seconds = tuple(args)
        sleep(seconds)
        print('Hi', name)
Launching processes...
Hi Pavel
Hi Marty
Hi Tyler
Hi Catherine

Thanks for reading, this is my very first post. If you find any mistakes or bad practices, feel free to leave a comment. I know that these string conversions are quite dirty, though...

习ぎ惯性依靠 2025-01-13 22:29:50

如果您将此代码用于装饰器:

import multiprocessing
from types import MethodType


DEFAULT_POOL = []


def run_parallel(_func=None, *, name: str = None, context_pool: list = DEFAULT_POOL):

    class RunParallel:
        def __init__(self, func):
            self.func = func

        def __call__(self, *args, **kwargs):
            process = multiprocessing.Process(target=self.func, name=name, args=args, kwargs=kwargs)
            context_pool.append(process)
            process.start()

        def __get__(self, instance, owner):
            return self if instance is None else MethodType(self, instance)

    if _func is None:
        return RunParallel
    else:
        return RunParallel(_func)


def wait_context(context_pool: list = DEFAULT_POOL, kill_others_if_one_fails: bool = False):
    finished = []
    for process in context_pool:
        process.join()
        finished.append(process)

        if kill_others_if_one_fails and process.exitcode != 0:
            break

    if kill_others_if_one_fails:
        # kill unfinished processes
        for process in context_pool:
            if process not in finished:
                process.kill()

        # wait for every process to be dead
        for process in context_pool:
            process.join()

那么您可以像这样使用它,在这 4 个示例中:

@run_parallel
def m1(a, b="b"):
    print(f"m1 -- {a=} {b=}")


@run_parallel(name="mym2", context_pool=DEFAULT_POOL)
def m2(d, cc="cc"):
    print(f"m2 -- {d} {cc=}")
    a = 1/0


class M:

    @run_parallel
    def c3(self, k, n="n"):
        print(f"c3 -- {k=} {n=}")

    @run_parallel(name="Mc4", context_pool=DEFAULT_POOL)
    def c4(self, x, y="y"):
        print(f"c4 -- {x=} {y=}")


if __name__ == "__main__":
    m1(11)
    m2(22)
    M().c3(33)
    M().c4(44)
    wait_context(kill_others_if_one_fails=True)

输出将为:

m1 -- a=11 b='b'

m2 -- 22 cc ='cc'

c3 -- k=33 n='n'

(后跟方法 m2 中引发的异常)

If you use this code for your decorator:

import multiprocessing
from types import MethodType


DEFAULT_POOL = []


def run_parallel(_func=None, *, name: str = None, context_pool: list = DEFAULT_POOL):

    class RunParallel:
        def __init__(self, func):
            self.func = func

        def __call__(self, *args, **kwargs):
            process = multiprocessing.Process(target=self.func, name=name, args=args, kwargs=kwargs)
            context_pool.append(process)
            process.start()

        def __get__(self, instance, owner):
            return self if instance is None else MethodType(self, instance)

    if _func is None:
        return RunParallel
    else:
        return RunParallel(_func)


def wait_context(context_pool: list = DEFAULT_POOL, kill_others_if_one_fails: bool = False):
    finished = []
    for process in context_pool:
        process.join()
        finished.append(process)

        if kill_others_if_one_fails and process.exitcode != 0:
            break

    if kill_others_if_one_fails:
        # kill unfinished processes
        for process in context_pool:
            if process not in finished:
                process.kill()

        # wait for every process to be dead
        for process in context_pool:
            process.join()

Then you can use it like this, in these 4 examples:

@run_parallel
def m1(a, b="b"):
    print(f"m1 -- {a=} {b=}")


@run_parallel(name="mym2", context_pool=DEFAULT_POOL)
def m2(d, cc="cc"):
    print(f"m2 -- {d} {cc=}")
    a = 1/0


class M:

    @run_parallel
    def c3(self, k, n="n"):
        print(f"c3 -- {k=} {n=}")

    @run_parallel(name="Mc4", context_pool=DEFAULT_POOL)
    def c4(self, x, y="y"):
        print(f"c4 -- {x=} {y=}")


if __name__ == "__main__":
    m1(11)
    m2(22)
    M().c3(33)
    M().c4(44)
    wait_context(kill_others_if_one_fails=True)

The output will be:

m1 -- a=11 b='b'

m2 -- 22 cc='cc'

c3 -- k=33 n='n'

(followed by the exception raised in method m2)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文