使用 defaultdict 进行多处理?

发布于 2025-01-05 03:05:05 字数 1377 浏览 1 评论 0原文

只是实验和学习,我知道如何创建一个可以通过多个进程访问的共享字典,但我不确定如何保持字典同步。我相信,defaultdict 说明了我遇到的问题。

from collections import defaultdict
from multiprocessing import Pool, Manager, Process

#test without multiprocessing
s = 'mississippi'
d = defaultdict(int)
for k in s:
    d[k] += 1

print d.items() # Success! result: [('i', 4), ('p', 2), ('s', 4), ('m', 1)]
print '*'*10, ' with multiprocessing ', '*'*10

def test(k, multi_dict):
    multi_dict[k] += 1

if __name__ == '__main__':
    pool = Pool(processes=4)
    mgr = Manager()
    multi_d = mgr.dict()
    for k in s:
        pool.apply_async(test, (k, multi_d))

    # Mark pool as closed -- no more tasks can be added.
    pool.close()

    # Wait for tasks to exit
    pool.join()

    # Output results
    print multi_d.items()  #FAIL

print '*'*10, ' with multiprocessing and process module like on python site example', '*'*10
def test2(k, multi_dict2):
    multi_dict2[k] += 1


if __name__ == '__main__':
    manager = Manager()

    multi_d2 = manager.dict()
    for k in s:
        p = Process(target=test2, args=(k, multi_d2))
    p.start()
    p.join()

    print multi_d2 #FAIL

第一个结果有效(因为它没有使用多处理),但我在让它与多处理一起工作时遇到问题。我不确定如何解决它,但我认为可能是由于它没有同步(并稍后加入结果)或者可能是因为在 multiprocessing 中我无法弄清楚如何设置 defaultdict (int) 到字典。

任何有关如何使其发挥作用的帮助或建议都会很棒!

Just experimenting and learning, and I know how to create a shared dictionary that can be accessed with multiple proceses but I'm not sure how to keep the dict synced. defaultdict, I believe, illustrates the problem I'm having.

from collections import defaultdict
from multiprocessing import Pool, Manager, Process

#test without multiprocessing
s = 'mississippi'
d = defaultdict(int)
for k in s:
    d[k] += 1

print d.items() # Success! result: [('i', 4), ('p', 2), ('s', 4), ('m', 1)]
print '*'*10, ' with multiprocessing ', '*'*10

def test(k, multi_dict):
    multi_dict[k] += 1

if __name__ == '__main__':
    pool = Pool(processes=4)
    mgr = Manager()
    multi_d = mgr.dict()
    for k in s:
        pool.apply_async(test, (k, multi_d))

    # Mark pool as closed -- no more tasks can be added.
    pool.close()

    # Wait for tasks to exit
    pool.join()

    # Output results
    print multi_d.items()  #FAIL

print '*'*10, ' with multiprocessing and process module like on python site example', '*'*10
def test2(k, multi_dict2):
    multi_dict2[k] += 1


if __name__ == '__main__':
    manager = Manager()

    multi_d2 = manager.dict()
    for k in s:
        p = Process(target=test2, args=(k, multi_d2))
    p.start()
    p.join()

    print multi_d2 #FAIL

The first result works(because its not using multiprocessing), but I'm having problems getting it to work with multiprocessing. I'm not sure how to solve it but I think there might be due to it not being synced(and joining the results later) or maybe because within multiprocessing I cannot figure how to set defaultdict(int) to the dictionary.

Any help or suggestions on how to get this to work would be great!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

软的没边 2025-01-12 03:05:05

您可以对 BaseManager 进行子类化并注册其他类型以进行共享。如果默认的 AutoProxy 生成的类型不起作用,您需要提供合适的代理类型。对于defaultdict,如果您只需要访问dict中已经存在的属性,则可以使用DictProxy

from multiprocessing import Pool
from multiprocessing.managers import BaseManager, DictProxy
from collections import defaultdict

class MyManager(BaseManager):
    pass

MyManager.register('defaultdict', defaultdict, DictProxy)

def test(k, multi_dict):
    multi_dict[k] += 1

if __name__ == '__main__':
    pool = Pool(processes=4)
    mgr = MyManager()
    mgr.start()
    multi_d = mgr.defaultdict(int)
    for k in 'mississippi':
        pool.apply_async(test, (k, multi_d))
    pool.close()
    pool.join()
    print multi_d.items()

You can subclass BaseManager and register additional types for sharing. You need to provide a suitable proxy type in cases where the default AutoProxy-generated type does not work. For defaultdict, if you only need to access the attributes that are already present in dict, you can use DictProxy.

from multiprocessing import Pool
from multiprocessing.managers import BaseManager, DictProxy
from collections import defaultdict

class MyManager(BaseManager):
    pass

MyManager.register('defaultdict', defaultdict, DictProxy)

def test(k, multi_dict):
    multi_dict[k] += 1

if __name__ == '__main__':
    pool = Pool(processes=4)
    mgr = MyManager()
    mgr.start()
    multi_d = mgr.defaultdict(int)
    for k in 'mississippi':
        pool.apply_async(test, (k, multi_d))
    pool.close()
    pool.join()
    print multi_d.items()
缘字诀 2025-01-12 03:05:05

好吧,Manager 类似乎只提供了固定数量的可以在进程之间共享的预定义数据结构,而 defaultdict 并不在其中。如果您确实只需要一个 defaultdict,最简单的解决方案是自行实现默认行为:

def test(k, multi_dict):
    if k not in multi_dict:
        multi_dict[k] = 0
    multi_dict[k] += 1

Well, the Manager class seems to supply only a fixed number of predefined data structures which can be shared among processes, and defaultdict is not among them. If you really just need that one defaultdict, the easiest solution would be to implement the defaulting behavior on your own:

def test(k, multi_dict):
    if k not in multi_dict:
        multi_dict[k] = 0
    multi_dict[k] += 1
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文