numpy 和多处理队列的组合扰乱了队列的顺序

发布于 2024-10-22 11:27:31 字数 1807 浏览 10 评论 0原文

我正在使用以下模式进行多重处理:

    for item in data:
        inQ.put(item)

    for i in xrange(nProcesses):
        inQ.put('STOP')
        multiprocessing.Process(target=worker, args=(inQ, outQ)).start()

    inQ.join()
    outQ.put('STOP')

    for result in iter(outQ.get, 'STOP'):
        # save result

效果很好。但是,如果我通过 outQ 发送 numpy 数组,'STOP' 不会出现在 outQ 的末尾,导致我的结果获取循环提前终止。

这是一些重现该行为的代码。

import multiprocessing
import numpy as np

def worker(inQ, outQ):
    for i in iter(inQ.get, 'STOP'):
        result = np.random.rand(1,100)
        outQ.put(result)
        inQ.task_done()
    inQ.task_done() # for the 'STOP'

def main():
    nProcesses = 8
    data = range(1000)

    inQ = multiprocessing.JoinableQueue()
    outQ = multiprocessing.Queue()
    for item in data:
        inQ.put(item)

    for i in xrange(nProcesses):
        inQ.put('STOP')
        multiprocessing.Process(target=worker, args=(inQ, outQ)).start()

    inQ.join()
    print outQ.qsize()
    outQ.put('STOP')

    cnt = 0
    for result in iter(outQ.get, 'STOP'):
        cnt += 1
    print "got %d items" % cnt
    print outQ.qsize()

if __name__ == '__main__':
    main()

如果将 result = np.random.rand(1,100) 替换为 result = i*i 之类的内容,则代码将按预期工作。

这里发生了什么?我在这里做的是根本错误的事情吗?我希望在 inQ.join() 之后的 outQ.put() 能够执行我想要的操作,因为 join() 会阻塞直到所有进程完成所有 put() 操作。

对我有用的解决方法是使用 while outQ.qsize() > 执行结果获取循环0,它可以工作查找。但我读到 qsize() 并不可靠。只有在不同进程运行时才不可靠吗?在完成 inQ.join() 后,我依赖 qsize() 是否安全?

我希望有些人建议使用 multiprocessing.Pool.map() ,但是在使用 numpy 数组(ndarrays)执行此操作时,我遇到了 pickle 错误。

感谢您的浏览!

I am using the follwing pattern to do multiprocessing:

    for item in data:
        inQ.put(item)

    for i in xrange(nProcesses):
        inQ.put('STOP')
        multiprocessing.Process(target=worker, args=(inQ, outQ)).start()

    inQ.join()
    outQ.put('STOP')

    for result in iter(outQ.get, 'STOP'):
        # save result

Which works fine. But if I send a numpy array through the outQ, the 'STOP' does not end up in the end of outQ, causing my result fetching loop terminating to early.

Here is some code to reproduce the bahaviour.

import multiprocessing
import numpy as np

def worker(inQ, outQ):
    for i in iter(inQ.get, 'STOP'):
        result = np.random.rand(1,100)
        outQ.put(result)
        inQ.task_done()
    inQ.task_done() # for the 'STOP'

def main():
    nProcesses = 8
    data = range(1000)

    inQ = multiprocessing.JoinableQueue()
    outQ = multiprocessing.Queue()
    for item in data:
        inQ.put(item)

    for i in xrange(nProcesses):
        inQ.put('STOP')
        multiprocessing.Process(target=worker, args=(inQ, outQ)).start()

    inQ.join()
    print outQ.qsize()
    outQ.put('STOP')

    cnt = 0
    for result in iter(outQ.get, 'STOP'):
        cnt += 1
    print "got %d items" % cnt
    print outQ.qsize()

if __name__ == '__main__':
    main()

If you replace the result = np.random.rand(1,100) with something like result = i*i the code works as expected.

What is happening here? Am I doing something fundamentally wrong here? I would have expected the outQ.put() after the inQ.join() to do what I want, since the join() blocks until all processes have done all put()s.

On workaround working for me is doing the result fetching loop with while outQ.qsize() > 0, which works find. But I read qsize() is not reliable. Is it only unreliable while different processes are running? Would it be to secure for me to rely on qsize() after having done the inQ.join()?

I expect some people to propose to use multiprocessing.Pool.map(), but I'm getting pickle errors, when doing that with numpy arrays (ndarrays).

Thanks for having a look!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

忘羡 2024-10-29 11:27:31

numpy 数组使用丰富的比较。所以 a=='STOP' 返回一个 numpy 数组,而不是 bool,并且该 numpy 数组不能被强制为 bool。在幕后,iter(outQ.get, 'STOP') 正在做这种比较,并且当它尝试将结果转换为 bool 时可能将异常视为 False。您必须执行手动 while 循环,从队列中提取项目,在将其与“STOP”进行比较之前检查是否为 isinstance(item, basestring)。

while True:
    item = outQ.get()
    if isinstance(item, basestring) and item == 'STOP':
        break
    cnt += 1

检查 qsize() 可能也可以正常工作,因为在加入输入队列后没有其他进程添加到队列中。

numpy arrays use rich comparisons. So a=='STOP' returns a numpy array, not a bool, and that numpy array cannot be coerced to a bool. Under the covers, iter(outQ.get, 'STOP') is doing just that comparison and probably treating the exception when it tries to convert the result to a bool as False. You will have to do a manual while loop, pull items from the queue, check if isinstance(item, basestring) before comparing it to 'STOP'.

while True:
    item = outQ.get()
    if isinstance(item, basestring) and item == 'STOP':
        break
    cnt += 1

Checking for qsize() will probably also work fine because no other process is adding to the queue after the input queue is joined.

最偏执的依靠 2024-10-29 11:27:31

由于您知道需要从 outQ 获取多少个项目,因此另一种解决方法是显式等待该数量的项目:

import multiprocessing as mp
import numpy as np
import Queue

N=100

def worker(inQ, outQ):
    while True:
        i,item=inQ.get()
        result = np.random.rand(1,N)
        outQ.put((i,result))
        inQ.task_done()

def main():
    nProcesses = 8
    data = range(N)
    inQ = mp.JoinableQueue()
    outQ = mp.Queue()    

    for i,item in enumerate(data):
        inQ.put((i,item))

    for i in xrange(nProcesses):
        proc=mp.Process(target=worker, args=[inQ, outQ])
        proc.daemon=True
        proc.start()

    inQ.join()
    cnt=0
    for _ in range(N):
        result=outQ.get()
        print(result)
        cnt+=1
        print(cnt)      
    print('got {c} items'.format(c=cnt))

if __name__ == '__main__':
    main()

Since you know how many items to expect from outQ, another work-around would be to wait for that number of items explicitly:

import multiprocessing as mp
import numpy as np
import Queue

N=100

def worker(inQ, outQ):
    while True:
        i,item=inQ.get()
        result = np.random.rand(1,N)
        outQ.put((i,result))
        inQ.task_done()

def main():
    nProcesses = 8
    data = range(N)
    inQ = mp.JoinableQueue()
    outQ = mp.Queue()    

    for i,item in enumerate(data):
        inQ.put((i,item))

    for i in xrange(nProcesses):
        proc=mp.Process(target=worker, args=[inQ, outQ])
        proc.daemon=True
        proc.start()

    inQ.join()
    cnt=0
    for _ in range(N):
        result=outQ.get()
        print(result)
        cnt+=1
        print(cnt)      
    print('got {c} items'.format(c=cnt))

if __name__ == '__main__':
    main()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文