python 多处理管理器复合图案共享

发布于 2024-08-06 04:43:55 字数 2204 浏览 5 评论 0原文

我试图通过多处理管理器共享复合结构,但在尝试仅使用其中一种复合类方法时,我遇到了“运行时错误:超出最大递归深度”的麻烦。

该类是来自 code.activestate 的令牌,并在纳入管理器之前经过我的测试。

当将类检索到进程中并调用其 addChild() 方法时,我保留了 RunTimeError,而在进程之外它仍然有效。

复合类继承自 SpecialDict 类,该类实现 ** ____getattr()____ ** 方法。

有可能在调用 addChild() 时,python 解释器会寻找不同的 ** ____getattr()____ ** 因为正确的一个没有被管理器代理?

如果是这样,我不清楚为该类/方法创建代理的正确方法

下面的代码准确地重现了这种情况:

1)这是manager.py:2

from multiprocessing.managers import BaseManager
from CompositeDict import *

class PlantPurchaser():

    def __init__(self):
        self.comp  = CompositeDict('Comp')

    def get_cp(self):
        return self.comp

class Manager():

    def __init__(self):

        self.comp  = QueuePurchaser().get_cp()

        BaseManager.register('get_comp', callable=lambda:self.comp)

        self.m = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        self.s = self.m.get_server()

        self.s.serve_forever()

)我想在这个consumer.py中使用复合:

from multiprocessing.managers import BaseManager

class Consumer():

    def __init__(self):

        BaseManager.register('get_comp')

        self.m = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        self.m.connect()

        self.comp = self.m.get_comp()
        ret = self.comp.addChild('consumer')

3)通过controller.py运行所有启动:

from multiprocessing import Process

class Controller():
    def __init__(self):
        for child in _run_children():
            child.join()

def _run_children():

    from manager import Manager
    from consumer import Consumer as Consumer

procs = (
         Process(target=Manager,  name='Manager' ),
         Process(target=Consumer, name='Consumer'),
        )

for proc in procs:
    proc.daemon = 1
    proc.start()
return procs

c = Controller()

看看这个相关问题,了解如何为 CompositeDict() 类做代理 正如阿尔伯特建议的那样。

tgray给出的解决方案有效,但无法避免竞争条件

I'm trying to share a composite structure through a multiprocessing manager but I felt in trouble with a "RuntimeError: maximum recursion depth exceeded" when trying to use just one of the Composite class methods.

The class is token from code.activestate and tested by me before inclusion into the manager.

When retrieving the class into a process and invoking its addChild() method I kept the RunTimeError, while outside the process it works.

The composite class inheritates from a SpecialDict class, that implements a ** ____getattr()____ **
method.

Could be possible that while calling addChild() the interpreter of python looks for a different ** ____getattr()____ ** because the right one is not proxied by the manager?

If so It's not clear to me the right way to make a proxy to that class/method

The following code reproduce exactly this condition:

1) this is the manager.py:

from multiprocessing.managers import BaseManager
from CompositeDict import *

class PlantPurchaser():

    def __init__(self):
        self.comp  = CompositeDict('Comp')

    def get_cp(self):
        return self.comp

class Manager():

    def __init__(self):

        self.comp  = QueuePurchaser().get_cp()

        BaseManager.register('get_comp', callable=lambda:self.comp)

        self.m = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        self.s = self.m.get_server()

        self.s.serve_forever()

2) I want to use the composite into this consumer.py:

from multiprocessing.managers import BaseManager

class Consumer():

    def __init__(self):

        BaseManager.register('get_comp')

        self.m = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        self.m.connect()

        self.comp = self.m.get_comp()
        ret = self.comp.addChild('consumer')

3) run all launching by a controller.py:

from multiprocessing import Process

class Controller():
    def __init__(self):
        for child in _run_children():
            child.join()

def _run_children():

    from manager import Manager
    from consumer import Consumer as Consumer

procs = (
         Process(target=Manager,  name='Manager' ),
         Process(target=Consumer, name='Consumer'),
        )

for proc in procs:
    proc.daemon = 1
    proc.start()
return procs

c = Controller()

Take a look this related questions on how to do a proxy for CompositeDict() class
as suggested by AlberT.

The solution given by tgray works but cannot avoid race conditions

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

孤君无依 2024-08-13 04:43:55

类之间是否有可能存在循环引用?例如,外部类具有对复合类的引用,而复合类又具有对外部类的引用。

多处理管理器运行良好,但是当您有大型、复杂的类结构时,您可能会遇到类型/引用无法正确序列化的错误。另一个问题是来自多处理管理器的错误非常神秘。这使得调试故障条件变得更加困难。

Is it possible there is a circular reference between the classes? For example, the outer class has a reference to the composite class, and the composite class has a reference back to the outer class.

The multiprocessing manager works well, but when you have large, complicated class structures, then you are likely to run into an error where a type/reference can not be serialized correctly. The other problem is that errors from multiprocessing manager are very cryptic. This makes debugging failure conditions even more difficult.

如果没有你 2024-08-13 04:43:55

我认为问题在于你必须指示 Manager 如何管理你的对象,这不是标准的 python 类型。

在其他世界中您必须为您的 CompositeDict 创建一个代理

您可以查看此文档的示例:http://ruffus.googlecode.com/svn/trunk/doc/html/sharing_data_across_jobs_example.html

I think the problem is that you have to instruct the Manager on how to manage you object, which is not a standard python type.

In other worlds you have to create a proxy for you CompositeDict

You could look at this doc for an example: http://ruffus.googlecode.com/svn/trunk/doc/html/sharing_data_across_jobs_example.html

信仰 2024-08-13 04:43:55

Python 默认的最大递归深度为 1000(或者 999,我忘了……)。但您可以这样更改默认行为:

import sys
sys.setrecursionlimit(n)

其中 n 是您希望允许的递归次数。

编辑:

以上答案没有解决此问题的根本原因(如评论中指出的)。仅当您有意递归超过 1000 次时才需要使用它。如果您处于无限循环中(就像在这个问题中),您最终将达到您设置的任何限制。

为了解决您的实际问题,我从头开始重新编写了您的代码,尽可能简单地将其构建为我认为您想要的:

import sys
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from CompositDict import *

class Shared():
    def __init__(self):
        self.comp = CompositeDict('Comp')

    def get_comp(self):
        return self.comp

    def set_comp(self, c):
        self.comp = c

class Manager():
    def __init__(self):
        shared = Shared()
        BaseManager.register('get_shared', callable=lambda:shared)
        mgr = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        srv = mgr.get_server()
        srv.serve_forever()

class Consumer():
    def __init__(self, child_name):
        BaseManager.register('get_shared')
        mgr = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        mgr.connect()

        shared = mgr.get_shared()
        comp = shared.get_comp()
        child = comp.addChild(child_name)
        shared.set_comp(comp)
        print comp

class Controller():
    def __init__(self):
        pass

    def main(self):
        m = Process(target=Manager, name='Manager')
        m.daemon = True
        m.start()

        consumers = []
        for i in xrange(3):
            p = Process(target=Consumer, name='Consumer', args=('Consumer_' + str(i),))
            p.daemon = True
            consumers.append(p)

        for c in consumers:
            c.start()
        for c in consumers:
            c.join()
        return 0


if __name__ == '__main__':
    con = Controller()
    sys.exit(con.main())

我在一个文件中完成了这一切,但您不应该有任何麻烦把它拆散。

我向您的消费者添加了一个 child_name 参数,以便我可以检查 CompositDict 是否正在更新。

请注意,CompositDict 对象有一个 getter setter。当我只有一个 getter 时,每个 Consumer 在添加子项时都会覆盖 CompositDict

这就是为什么我还将您的注册方法更改为 get_shared 而不是 get_comp,因为您将需要访问 Consumer 类中的 setter 和 getter。

另外,我认为您不想尝试加入您的经理流程,因为它将“永远服务”。如果您查看 BaseManager 的源代码(./Lib/multiprocessing/managers.py:Line 144),您会注意到 serve_forever() 函数会让您陷入一个无限循环,该循环只是被破坏了通过KeyboardInterruptSystemExit

最重要的是,这段代码可以在没有任何递归循环的情况下工作(据我所知),但如果您仍然遇到错误,请告诉我。

Python has a default maximum recursion depth of 1000 (or 999, I forget...). But you can change the default behavior thusly:

import sys
sys.setrecursionlimit(n)

Where n is the number of recursions you wish to allow.

Edit:

The above answer does nothing to solve the root cause of this problem (as pointed out in the comments). It only needs to be used if you are intentionally recursing more than 1000 times. If you are in an infinite loop (like in this problem), you will eventually hit whatever limit you set.

To address your actual problem, I re-wrote your code from scratch starting as simply as I could make it and built it up to what I believe is what you want:

import sys
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from CompositDict import *

class Shared():
    def __init__(self):
        self.comp = CompositeDict('Comp')

    def get_comp(self):
        return self.comp

    def set_comp(self, c):
        self.comp = c

class Manager():
    def __init__(self):
        shared = Shared()
        BaseManager.register('get_shared', callable=lambda:shared)
        mgr = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        srv = mgr.get_server()
        srv.serve_forever()

class Consumer():
    def __init__(self, child_name):
        BaseManager.register('get_shared')
        mgr = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        mgr.connect()

        shared = mgr.get_shared()
        comp = shared.get_comp()
        child = comp.addChild(child_name)
        shared.set_comp(comp)
        print comp

class Controller():
    def __init__(self):
        pass

    def main(self):
        m = Process(target=Manager, name='Manager')
        m.daemon = True
        m.start()

        consumers = []
        for i in xrange(3):
            p = Process(target=Consumer, name='Consumer', args=('Consumer_' + str(i),))
            p.daemon = True
            consumers.append(p)

        for c in consumers:
            c.start()
        for c in consumers:
            c.join()
        return 0


if __name__ == '__main__':
    con = Controller()
    sys.exit(con.main())

I did this all in one file, but you shouldn't have any trouble breaking it up.

I added a child_name argument to your consumer so that I could check that the CompositDict was getting updated.

Note that there is both a getter and a setter for your CompositDict object. When I only had a getter, each Consumer was overwriting the CompositDict when it added a child.

This is why I also changed your registered method to get_shared instead of get_comp, as you will want access to the setter as well as the getter within your Consumer class.

Also, I don't think you want to try joining your manager process, as it will "serve forever". If you look at the source for the BaseManager (./Lib/multiprocessing/managers.py:Line 144) you'll notice that the serve_forever() function puts you into an infinite loop that is only broken by KeyboardInterrupt or SystemExit.

Bottom line is that this code works without any recursive looping (as far as I can tell), but let me know if you still experience your error.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文