我可以在 Pool.imap 调用的函数中使用多处理队列吗?
我正在使用 python 2.7,并尝试在自己的进程中运行一些 CPU 繁重的任务。我希望能够将消息发送回父进程,以使其了解进程的当前状态。多处理队列似乎对此很完美,但我不知道如何让它工作。
所以,这是我的基本工作示例,不使用队列。
import multiprocessing as mp
import time
def f(x):
return x*x
def main():
pool = mp.Pool()
results = pool.imap_unordered(f, range(1, 6))
time.sleep(1)
print str(results.next())
pool.close()
pool.join()
if __name__ == '__main__':
main()
我尝试以多种方式传递队列,但他们收到错误消息“RuntimeError:队列对象只能通过继承在进程之间共享”。这是我根据我发现的早期答案尝试的方法之一。 (我在尝试使用 Pool.map_async 和 Pool.imap 时遇到了同样的问题)
import multiprocessing as mp
import time
def f(args):
x = args[0]
q = args[1]
q.put(str(x))
time.sleep(0.1)
return x*x
def main():
q = mp.Queue()
pool = mp.Pool()
results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))
print str(q.get())
pool.close()
pool.join()
if __name__ == '__main__':
main()
最后,0 适应度方法(使其全局)不会生成任何消息,它只是锁定。
import multiprocessing as mp
import time
q = mp.Queue()
def f(x):
q.put(str(x))
return x*x
def main():
pool = mp.Pool()
results = pool.imap_unordered(f, range(1, 6))
time.sleep(1)
print q.get()
pool.close()
pool.join()
if __name__ == '__main__':
main()
我知道它可能会直接与 multiprocessing.Process 一起使用,并且还有其他库可以完成此操作,但我讨厌放弃非常适合的标准库函数,直到我确定这不仅仅是我的缺乏知识使我无法利用它们。
谢谢。
I'm using python 2.7, and trying to run some CPU heavy tasks in their own processes. I would like to be able to send messages back to the parent process to keep it informed of the current status of the process. The multiprocessing Queue seems perfect for this but I can't figure out how to get it work.
So, this is my basic working example minus the use of a Queue.
import multiprocessing as mp
import time
def f(x):
return x*x
def main():
pool = mp.Pool()
results = pool.imap_unordered(f, range(1, 6))
time.sleep(1)
print str(results.next())
pool.close()
pool.join()
if __name__ == '__main__':
main()
I've tried passing the Queue in several ways, and they get the error message "RuntimeError: Queue objects should only be shared between processes through inheritance". Here is one of the ways I tried based on an earlier answer I found. (I get the same problem trying to use Pool.map_async and Pool.imap)
import multiprocessing as mp
import time
def f(args):
x = args[0]
q = args[1]
q.put(str(x))
time.sleep(0.1)
return x*x
def main():
q = mp.Queue()
pool = mp.Pool()
results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))
print str(q.get())
pool.close()
pool.join()
if __name__ == '__main__':
main()
Finally, the 0 fitness approach (make it global) doesn't generate any messages, it just locks up.
import multiprocessing as mp
import time
q = mp.Queue()
def f(x):
q.put(str(x))
return x*x
def main():
pool = mp.Pool()
results = pool.imap_unordered(f, range(1, 6))
time.sleep(1)
print q.get()
pool.close()
pool.join()
if __name__ == '__main__':
main()
I'm aware that it will probably work with multiprocessing.Process directly and that there are other libraries to accomplish this, but I hate to back away from the standard library functions that are a great fit until I'm sure it's not just my lack of knowledge keeping me from being able to exploit them.
Thanks.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
技巧是将队列作为参数传递给初始化器。似乎适用于所有池调度方法。
The trick is to pass the Queue as an argument to the initializer. Appears to work with all the Pool dispatch methods.
使用
fork
启动方法(即,在Unix平台上),您不需要需要在最上面的答案中使用初始化技巧只需定义
mp.Queue
作为全局变量,它将被子进程正确继承。OP 的示例在 Linux 上使用 Python 3.9.7 运行良好(代码略有调整):
输出:
已经过去 12 年了,但我想确保任何遇到这个问题的 Linux 用户都知道,只有当您不能使用叉子
With
fork
start method (i.e., on Unix platforms), you do NOT need to use that initializer trick in the top answerJust define
mp.Queue
as a global variable and it will be correctly inherited by the child processes.OP's example works fine using Python 3.9.7 on Linux (code slightly adjusted):
Output:
It's been 12 years, but I'd like to make sure any Linux user who come across this question knows the top answer's trick is only needed if you cannot use fork