在进程之间共享复杂的对象?

发布于 2024-09-17 21:55:33 字数 437 浏览 7 评论 0原文

我有一个相当复杂的 Python 对象,需要在多个进程之间共享。我使用 multiprocessing.Process 启动这些进程。当我与其中的 multiprocessing.Queuemultiprocessing.Pipe 共享一个对象时,它们可以很好地共享。但是当我尝试与其他非多处理模块对象共享一个对象时,Python 似乎分叉了这些对象。这是真的吗?

我尝试使用 multiprocessing.Value。但我不确定类型应该是什么?我的对象类称为 MyClass。但是当我尝试 multiprocess.Value(MyClass, instance) 时,它失败并显示:

TypeError: this type has no size

知道发生了什么吗?

I have a fairly complex Python object that I need to share between multiple processes. I launch these processes using multiprocessing.Process. When I share an object with multiprocessing.Queue and multiprocessing.Pipe in it, they are shared just fine. But when I try to share an object with other non-multiprocessing-module objects, it seems like Python forks these objects. Is that true?

I tried using multiprocessing.Value. But I'm not sure what the type should be? My object class is called MyClass. But when I try multiprocess.Value(MyClass, instance), it fails with:

TypeError: this type has no size

Any idea what's going on?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

战皆罪 2024-09-24 21:55:33

经过大量研究和测试,我发现“Manager”在非复杂对象级别完成这项工作。

下面的代码显示对象 inst 在进程之间共享,这意味着当子进程更改它时,inst 的属性 var 在外部发生更改。

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class SimpleClass(object):
    def __init__(self):
        self.var = 0

    def set(self, value):
        self.var = value

    def get(self):
        return self.var
        

def change_obj_value(obj):
    obj.set(100)


if __name__ == '__main__':
    BaseManager.register('SimpleClass', SimpleClass)
    manager = BaseManager()
    manager.start()
    inst = manager.SimpleClass()

    p = Process(target=change_obj_value, args=[inst])
    p.start()
    p.join()

    print inst                    # <__main__.SimpleClass object at 0x10cf82350>
    print inst.get()              # 100

好的,如果您只需要共享简单对象,上面的代码就足够了。

为什么不复杂呢?因为如果您的对象是嵌套的(对象内部的对象),它可能会失败

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class GetSetter(object):
    def __init__(self):
        self.var = None

    def set(self, value):
        self.var = value

    def get(self):
        return self.var
        

class ChildClass(GetSetter):
    pass

class ParentClass(GetSetter):
    def __init__(self):
        self.child = ChildClass()
        GetSetter.__init__(self)

    def getChild(self):
        return self.child


def change_obj_value(obj):
    obj.set(100)
    obj.getChild().set(100)


if __name__ == '__main__':
    BaseManager.register('ParentClass', ParentClass)
    manager = BaseManager()
    manager.start()
    inst2 = manager.ParentClass()

    p2 = Process(target=change_obj_value, args=[inst2])
    p2.start()
    p2.join()

    print inst2                    # <__main__.ParentClass object at 0x10cf82350>
    print inst2.getChild()         # <__main__.ChildClass object at 0x10cf6dc50>
    print inst2.get()              # 100
    #good!

    print inst2.getChild().get()   # None
    #bad! you need to register child class too but there's almost no way to do it
    #even if you did register child class, you may get PicklingError :)

我认为这种行为的主要原因是因为 Manager 只是一个构建在 low- 之上的直板级别通信工具,如管道/队列。

因此,对于多处理情况,强烈推荐这种方法。如果您可以使用诸如锁/信号量/管道/队列之类的低级工具或诸如Redis队列Redis发布/订阅<之类的高级工具,那总是更好/strong> 对于复杂的用例(只是我的建议,哈哈)。

After a lot research and testing, I found that "Manager" does this job at a non-complex object level.

The code below shows that object inst is shared between processes, which means property var of inst is changed outside when child process changes it.

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class SimpleClass(object):
    def __init__(self):
        self.var = 0

    def set(self, value):
        self.var = value

    def get(self):
        return self.var
        

def change_obj_value(obj):
    obj.set(100)


if __name__ == '__main__':
    BaseManager.register('SimpleClass', SimpleClass)
    manager = BaseManager()
    manager.start()
    inst = manager.SimpleClass()

    p = Process(target=change_obj_value, args=[inst])
    p.start()
    p.join()

    print inst                    # <__main__.SimpleClass object at 0x10cf82350>
    print inst.get()              # 100

Okay, above code is enough if you only need to share simple objects.

Why no complex? Because it may fail if your object is nested (object inside object):

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class GetSetter(object):
    def __init__(self):
        self.var = None

    def set(self, value):
        self.var = value

    def get(self):
        return self.var
        

class ChildClass(GetSetter):
    pass

class ParentClass(GetSetter):
    def __init__(self):
        self.child = ChildClass()
        GetSetter.__init__(self)

    def getChild(self):
        return self.child


def change_obj_value(obj):
    obj.set(100)
    obj.getChild().set(100)


if __name__ == '__main__':
    BaseManager.register('ParentClass', ParentClass)
    manager = BaseManager()
    manager.start()
    inst2 = manager.ParentClass()

    p2 = Process(target=change_obj_value, args=[inst2])
    p2.start()
    p2.join()

    print inst2                    # <__main__.ParentClass object at 0x10cf82350>
    print inst2.getChild()         # <__main__.ChildClass object at 0x10cf6dc50>
    print inst2.get()              # 100
    #good!

    print inst2.getChild().get()   # None
    #bad! you need to register child class too but there's almost no way to do it
    #even if you did register child class, you may get PicklingError :)

I think the main reason of this behavior is because Manager is just a candybar built on top of low-level communication tools like pipe/queue.

So, this approach is not well recommended for multiprocessing case. It's always better if you can use low-level tools like lock/semaphore/pipe/queue or high-level tools like Redis queue or Redis publish/subscribe for complicated use case (only my recommendation lol).

捂风挽笑 2024-09-24 21:55:33

您可以使用 Python 的 multiprocessingManager”类和您定义的代理类。请参阅 Python 文档中的代理对象

您想要做的是为自定义对象定义一个代理类,然后使用“远程管理器”共享该对象 - 查看“使用远程管理器" 部分,其中文档展示了如何共享远程队列。您将执行相同的操作,但对 your_manager_instance.register() 的调用将在其参数列表中包含您的自定义代理类。

通过这种方式,您可以设置服务器以与自定义代理共享自定义对象。您的客户端需要访问服务器(再次,请参阅如何设置对远程队列的客户端/服务器访问的优秀文档示例,但您不是共享队列,而是共享对特定类的访问)。

You can do this using Python's multiprocessing "Manager" classes and a proxy class that you define. See Proxy Objects in the Python docs.

What you want to do is define a proxy class for your custom object, and then share the object using a "Remote Manager" -- look at the examples in the same linked doc page in the "Using a remote manager" section where the docs show how to share a remote queue. You're going to be doing the same thing, but your call to your_manager_instance.register() will include your custom proxy class in its argument list.

In this manner, you're setting up a server to share the custom object with a custom proxy. Your clients need access to the server (again, see the excellent documentation examples of how to setup client/server access to a remote queue, but instead of sharing a Queue, you are sharing access to your specific class).

余厌 2024-09-24 21:55:33

在 Python 3.6 中,文档说:

版本 3.6 中的更改:共享对象可以嵌套。例如,共享容器对象(例如共享列表)可以包含其他共享对象,这些对象都将由 SyncManager 管理和同步。

只要通过 SyncManager 创建实例,您就应该能够使对象相互引用。在另一种类型的对象的方法中动态创建一种类型的对象可能仍然是不可能的或者非常棘手。

编辑:我偶然发现了这个问题多处理管理器和自定义类与python 3.6.5和3.6.7.需要检查 python 3.7

编辑 2:由于其他一些问题,我目前无法使用 python3.7 进行测试。 https://stackoverflow.com/a/50878600/7541006 中提供的解决方法对我来说效果很好

In Python 3.6 the docs say:

Changed in version 3.6: Shared objects are capable of being nested. For example, a shared container object such as a shared list can contain other shared objects which will all be managed and synchronized by the SyncManager.

As long as instances are created through the SyncManager, you should be able to make the objects reference each other. Dynamic creation of one type of object in the methods of another type of object might still be impossible or very tricky though.

Edit: I stumbled upon this issue Multiprocessing managers and custom classes with python 3.6.5 and 3.6.7. Need to check python 3.7

Edit 2: Due to some other issues I can't currently test this with python3.7. The workaround provided in https://stackoverflow.com/a/50878600/7541006 works fine for me

梦境 2024-09-24 21:55:33

这是我专门为此制作的一个 python 包(在进程之间共享复杂的对象)。

git: https://github.com/dRoje/pipe-proxy

这个想法是你创建一个代理您的对象并将其传递给进程。然后,您可以使用代理,就像您拥有对原始对象的引用一样。虽然只能使用方法调用,但访问对象变量是通过抛出 setter 和 getter 来完成的。

假设我们有一个名为“example”的对象,创建代理和代理侦听器很容易:

from pipeproxy import proxy 
example = Example() 
exampleProxy, exampleProxyListener = proxy.createProxy(example) 

现在您将代理发送到另一个进程。

p = Process(target=someMethod, args=(exampleProxy,)) p.start()

在其他进程中使用它就像使用原始对象一样(示例):

def someMethod(exampleProxy):
    ...
    exampleProxy.originalExampleMethod()
    ...

但是您必须在主进程中监听它:

exampleProxyListener.listen()

阅读更多内容并在此处查找示例:

http://matkodjipalo.com/index.php/2017/11/12/proxy-solution-python-multiprocessing /

here's a python package I made just for that (sharing complex objects between processes).

git: https://github.com/dRoje/pipe-proxy

The idea is you create a proxy for your object and pass it to a process. Then you use the proxy like you have a reference to the original object. Although you can only use method calls, so accessing object variables is done threw setters and getters.

Say we have an object called ‘example’, creating proxy and proxy listener is easy:

from pipeproxy import proxy 
example = Example() 
exampleProxy, exampleProxyListener = proxy.createProxy(example) 

Now you send the proxy to another process.

p = Process(target=someMethod, args=(exampleProxy,)) p.start()

Use it in the other process as you would use the original object (example):

def someMethod(exampleProxy):
    ...
    exampleProxy.originalExampleMethod()
    ...

But you do have to listen to it in the main process:

exampleProxyListener.listen()

Read more and find examples here:

http://matkodjipalo.com/index.php/2017/11/12/proxy-solution-python-multiprocessing/

執念 2024-09-24 21:55:33

我尝试使用 BaseManager 并注册我的自定义类以使其满意,并得到有关嵌套类的问题,正如 Tom 上面提到的那样。

我认为主要原因与前面所说的嵌套类无关,而是python采用的底层通信机制。原因是Python使用一些类似套接字的通信机制来在低级别的服务器进程内同步自定义类的修改。我认为它封装了一些 rpc 方法,使其对用户透明,就好像他们调用嵌套类对象的本地方法一样。

因此,当您想要修改、检索自定义对象或某些第三方对象时,您应该在进程中定义一些接口来与其通信,而不是直接获取或设置值。

然而,在操作嵌套对象中的多层对象时,可以忽略上述问题,就像您在普通例程中所做的那样,因为您注册的类中的嵌套对象不再是代理对象,在其上进行操作不会再次经历类似套接字的通信例程并且是本地化的。

这是我为解决该问题而编写的可行代码。

from multiprocessing import Process, Manager, Lock
from multiprocessing.managers import BaseManager
import numpy as np

class NestedObj(object):
       def __init__(self):
                self.val = 1

class CustomObj(object):
        def __init__(self, numpy_obj):
                self.numpy_obj = numpy_obj
                self.nested_obj = NestedObj()

        def set_value(self, p, q, v):
                self.numpy_obj[p, q] = v

        def get_obj(self):
                return self.numpy_obj

        def get_nested_obj(self):
                return self.nested_obj.val

class CustomProcess(Process):
        def __init__(self, obj, p, q, v):
                super(CustomProcess, self).__init__()
                self.obj = obj
                self.index = p, q
                self.v = v

        def run(self):
                self.obj.set_value(*self.index, self.v)



if __name__=="__main__":
        BaseManager.register('CustomObj', CustomObj)
        manager = BaseManager()
        manager.start()
        data = [[0 for x in range(10)] for y in range(10)]
        matrix = np.matrix(data)
        custom_obj = manager.CustomObj(matrix)
        print(custom_obj.get_obj())
        process_list = []
        for p in range(10):
                for q in range(10):
                        proc = CustomProcess(custom_obj, p, q, 10*p+q)
                        process_list.append(proc)
        for x in range(100):
                process_list[x].start()
        for x in range(100):
                process_list[x].join()
        print(custom_obj.get_obj())
        print(custom_obj.get_nested_obj())

I tried to use BaseManager and register my customized class to make it happy, and get the problem about nested class just as Tom had mentioned above.

I think the main reason is irrelevant to the nested class as said, yet the communication mechanism that python take in low level. The reason is python use some socket-alike communication mechanism to synchronize the modification of customized class within a server process in low level. I think it encapsulate some rpc methods, make it just transparent to the user as if they called the local methods of a nested class object.

So, when you want to modify, retrieve your self-defined objects or some third-party objects, you should define some interfaces within your processes to communicate to it rather than directly get or set values.

Yet when operating the multi-nested objects in the nested objects, one can ignore the issues mentioned above, just as what you do in your common routine because your nested objects in the registered class is not a proxy objects any longer, on which the operation will not go through the socket-alike communication routine again and is localized.

Here is the workable code I wrote to solve the problem.

from multiprocessing import Process, Manager, Lock
from multiprocessing.managers import BaseManager
import numpy as np

class NestedObj(object):
       def __init__(self):
                self.val = 1

class CustomObj(object):
        def __init__(self, numpy_obj):
                self.numpy_obj = numpy_obj
                self.nested_obj = NestedObj()

        def set_value(self, p, q, v):
                self.numpy_obj[p, q] = v

        def get_obj(self):
                return self.numpy_obj

        def get_nested_obj(self):
                return self.nested_obj.val

class CustomProcess(Process):
        def __init__(self, obj, p, q, v):
                super(CustomProcess, self).__init__()
                self.obj = obj
                self.index = p, q
                self.v = v

        def run(self):
                self.obj.set_value(*self.index, self.v)



if __name__=="__main__":
        BaseManager.register('CustomObj', CustomObj)
        manager = BaseManager()
        manager.start()
        data = [[0 for x in range(10)] for y in range(10)]
        matrix = np.matrix(data)
        custom_obj = manager.CustomObj(matrix)
        print(custom_obj.get_obj())
        process_list = []
        for p in range(10):
                for q in range(10):
                        proc = CustomProcess(custom_obj, p, q, 10*p+q)
                        process_list.append(proc)
        for x in range(100):
                process_list[x].start()
        for x in range(100):
                process_list[x].join()
        print(custom_obj.get_obj())
        print(custom_obj.get_nested_obj())
折戟 2024-09-24 21:55:33

为了避免共享资源带来的一些麻烦,您可以尝试在由 pool.imap_unordered 映射的函数的返回语句中收集需要访问单例资源的数据,然后在循环中进一步处理它检索部分结果:

for result in in pool.imap_unordered(process_function, iterable_data):
    do_something(result)

如果返回的数据不多,那么执行此操作可能不会有太多开销。

To save some headaches with shared resources you can try to collect data that needs access to a singleton resource in a return statement of the function that is mapped by e.g. pool.imap_unordered and then further process it in a loop that retrieves the partial results:

for result in in pool.imap_unordered(process_function, iterable_data):
    do_something(result)

If it is not much data that gets returned, then there might not be much overhead in doing this.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文