我可以在 Pool.imap 调用的函数中使用多处理队列吗?

发布于 2024-09-25 07:57:40 字数 1424 浏览 8 评论 0原文

我正在使用 python 2.7,并尝试在自己的进程中运行一些 CPU 繁重的任务。我希望能够将消息发送回父进程,以使其了解进程的当前状态。多处理队列似乎对此很完美,但我不知道如何让它工作。

所以,这是我的基本工作示例,不使用队列。

import multiprocessing as mp
import time

def f(x):
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print str(results.next())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

我尝试以多种方式传递队列,但他们收到错误消息“RuntimeError:队列对象只能通过继承在进程之间共享”。这是我根据我发现的早期答案尝试的方法之一。 (我在尝试使用 Pool.map_async 和 Pool.imap 时遇到了同样的问题)

import multiprocessing as mp
import time

def f(args):
    x = args[0]
    q = args[1]
    q.put(str(x))
    time.sleep(0.1)
    return x*x

def main():
    q = mp.Queue()
    pool = mp.Pool()
    results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))

    print str(q.get())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

最后,0 适应度方法(使其全局)不会生成任何消息,它只是锁定。

import multiprocessing as mp
import time

q = mp.Queue()

def f(x):
    q.put(str(x))
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print q.get()

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

我知道它可能会直接与 multiprocessing.Process 一起使用,并且还有其他库可以完成此操作,但我讨厌放弃非常适合的标准库函数,直到我确定这不仅仅是我的缺乏知识使我无法利用它们。

谢谢。

I'm using python 2.7, and trying to run some CPU heavy tasks in their own processes. I would like to be able to send messages back to the parent process to keep it informed of the current status of the process. The multiprocessing Queue seems perfect for this but I can't figure out how to get it work.

So, this is my basic working example minus the use of a Queue.

import multiprocessing as mp
import time

def f(x):
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print str(results.next())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

I've tried passing the Queue in several ways, and they get the error message "RuntimeError: Queue objects should only be shared between processes through inheritance". Here is one of the ways I tried based on an earlier answer I found. (I get the same problem trying to use Pool.map_async and Pool.imap)

import multiprocessing as mp
import time

def f(args):
    x = args[0]
    q = args[1]
    q.put(str(x))
    time.sleep(0.1)
    return x*x

def main():
    q = mp.Queue()
    pool = mp.Pool()
    results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))

    print str(q.get())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

Finally, the 0 fitness approach (make it global) doesn't generate any messages, it just locks up.

import multiprocessing as mp
import time

q = mp.Queue()

def f(x):
    q.put(str(x))
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print q.get()

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

I'm aware that it will probably work with multiprocessing.Process directly and that there are other libraries to accomplish this, but I hate to back away from the standard library functions that are a great fit until I'm sure it's not just my lack of knowledge keeping me from being able to exploit them.

Thanks.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

女皇必胜 2024-10-02 07:57:40

技巧是将队列作为参数传递给初始化器。似乎适用于所有池调度方法。

import multiprocessing as mp

def f(x):
    f.q.put('Doing: ' + str(x))
    return x*x

def f_init(q):
    f.q = q

def main():
    jobs = range(1,6)

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, jobs)
    p.close()

    for i in range(len(jobs)):
        print q.get()
        print results.next()

if __name__ == '__main__':
    main()

The trick is to pass the Queue as an argument to the initializer. Appears to work with all the Pool dispatch methods.

import multiprocessing as mp

def f(x):
    f.q.put('Doing: ' + str(x))
    return x*x

def f_init(q):
    f.q = q

def main():
    jobs = range(1,6)

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, jobs)
    p.close()

    for i in range(len(jobs)):
        print q.get()
        print results.next()

if __name__ == '__main__':
    main()
似梦非梦 2024-10-02 07:57:40

使用fork启动方法(即,在Unix平台上),您不需要需要在最上面的答案中使用初始化技巧

只需定义mp.Queue 作为全局变量,它将被子进程正确继承。

OP 的示例在 Linux 上使用 Python 3.9.7 运行良好(代码略有调整):

import multiprocessing as mp
import time

q = mp.Queue()


def f(x):
    q.put(str(x))
    return x * x


def main():
    pool = mp.Pool(5)
    pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    for _ in range(1, 6):
        print(q.get())

    pool.close()
    pool.join()


if __name__ == '__main__':
    main()

输出:

2
1
3
4
5

已经过去 12 年了,但我想确保任何遇到这个问题的 Linux 用户都知道,只有当您不能使用叉子

With fork start method (i.e., on Unix platforms), you do NOT need to use that initializer trick in the top answer

Just define mp.Queue as a global variable and it will be correctly inherited by the child processes.

OP's example works fine using Python 3.9.7 on Linux (code slightly adjusted):

import multiprocessing as mp
import time

q = mp.Queue()


def f(x):
    q.put(str(x))
    return x * x


def main():
    pool = mp.Pool(5)
    pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    for _ in range(1, 6):
        print(q.get())

    pool.close()
    pool.join()


if __name__ == '__main__':
    main()

Output:

2
1
3
4
5

It's been 12 years, but I'd like to make sure any Linux user who come across this question knows the top answer's trick is only needed if you cannot use fork

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文