如何在 Python 多处理池中运行清理代码?

发布于 2024-11-08 13:32:35 字数 116 浏览 0 评论 0原文

我有一些 Python 代码(在 Windows 上),它使用多处理模块来运行工作进程池。每个工作进程都需要在 map_async 方法结束时进行一些清理。

有谁知道该怎么做?

I have some Python code (on Windows) that uses the multiprocessing module to run a pool of worker processes. Each worker process needs to do some cleanup at the end of the map_async method.

Does anyone know how to do that?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

何以笙箫默 2024-11-15 13:32:35

您是否真的想为每个工作进程运行一次清理函数,而不是为 map_async 调用创建的每个任务运行一次?

multiprocess.pool.Pool 创建一个包含 8 个工作进程的池。 map_async 可能会提交 40 个任务以分配给 8 个工作人员。
我可以想象为什么您可能想要在每个任务结束时运行清理代码,但我很难想象为什么您想要在 8 个工作进程中的每一个都完成之前运行清理代码。

不过,如果这就是您想要做的,您可以通过 Monkeypatching multiprocessing.pool.worker

import multiprocessing as mp
import multiprocessing.pool as mpool
from multiprocessing.util import debug

def cleanup():
    print('{n} CLEANUP'.format(n=mp.current_process().name))

# This code comes from /usr/lib/python2.6/multiprocessing/pool.py,
# except for the single line at the end which calls cleanup().
def myworker(inqueue, outqueue, initializer=None, initargs=()):
    put = outqueue.put
    get = inqueue.get
    if hasattr(inqueue, '_writer'):
        inqueue._writer.close()
        outqueue._reader.close()

    if initializer is not None:
        initializer(*initargs)

    while 1:
        try:
            task = get()
        except (EOFError, IOError):
            debug('worker got EOFError or IOError -- exiting')
            break

        if task is None:
            debug('worker got sentinel -- exiting')
            break

        job, i, func, args, kwds = task
        try:
            result = (True, func(*args, **kwds))
        except Exception, e:
            result = (False, e)
        put((job, i, result))
    cleanup()

# Here we monkeypatch mpool.worker
mpool.worker=myworker

def foo(i):
    return i*i

def main():
    pool = mp.Pool(8)
    results = pool.map_async(foo, range(40)).get()
    print(results)

if __name__=='__main__':
    main()

来实现:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521]
PoolWorker-8 CLEANUP
PoolWorker-3 CLEANUP
PoolWorker-7 CLEANUP
PoolWorker-1 CLEANUP
PoolWorker-6 CLEANUP
PoolWorker-2 CLEANUP
PoolWorker-4 CLEANUP
PoolWorker-5 CLEANUP

Do you really want to run a cleanup function once for each worker process rather than once for every task created by the map_async call?

multiprocess.pool.Pool creates a pool of, say, 8 worker processes. map_async might submit 40 tasks to be distributed among the 8 workers.
I can imagine why you might want to run cleanup code at the end of each task, but I'm having trouble imagining why you would want to run cleanup code just before each of the 8 worker processes is finalized.

Nevertheless, if that is what you want to do, you could do it by monkeypatching multiprocessing.pool.worker:

import multiprocessing as mp
import multiprocessing.pool as mpool
from multiprocessing.util import debug

def cleanup():
    print('{n} CLEANUP'.format(n=mp.current_process().name))

# This code comes from /usr/lib/python2.6/multiprocessing/pool.py,
# except for the single line at the end which calls cleanup().
def myworker(inqueue, outqueue, initializer=None, initargs=()):
    put = outqueue.put
    get = inqueue.get
    if hasattr(inqueue, '_writer'):
        inqueue._writer.close()
        outqueue._reader.close()

    if initializer is not None:
        initializer(*initargs)

    while 1:
        try:
            task = get()
        except (EOFError, IOError):
            debug('worker got EOFError or IOError -- exiting')
            break

        if task is None:
            debug('worker got sentinel -- exiting')
            break

        job, i, func, args, kwds = task
        try:
            result = (True, func(*args, **kwds))
        except Exception, e:
            result = (False, e)
        put((job, i, result))
    cleanup()

# Here we monkeypatch mpool.worker
mpool.worker=myworker

def foo(i):
    return i*i

def main():
    pool = mp.Pool(8)
    results = pool.map_async(foo, range(40)).get()
    print(results)

if __name__=='__main__':
    main()

yields:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521]
PoolWorker-8 CLEANUP
PoolWorker-3 CLEANUP
PoolWorker-7 CLEANUP
PoolWorker-1 CLEANUP
PoolWorker-6 CLEANUP
PoolWorker-2 CLEANUP
PoolWorker-4 CLEANUP
PoolWorker-5 CLEANUP
吖咩 2024-11-15 13:32:35

这里唯一真正的选择是在 map_async 到的函数末尾运行清理。

如果此清理确实是为了进程死亡,则不能使用池的概念。它们是正交的。除非您使用 maxtasksperchild,这是 Python 2.7 中的新功能。即使如此,您也无法在进程死亡时运行代码。但是,maxtasksperchild 可能适合您,因为当进程终止时,进程打开的任何资源肯定会消失。

话虽这么说,如果您有一堆需要运行清理的函数,您可以通过设计装饰器来节省重复的工作。这是我的意思的一个例子:

import functools
import multiprocessing

def cleanup(f):
    """Decorator for shared cleanup mechanism"""
    @functools.wraps(f)
    def wrapped(arg):
        result = f(arg)
        print("Cleaning up after f({0})".format(arg))
        return result
    return wrapped

@cleanup
def task1(arg):
    print("Hello from task1({0})".format(arg))
    return arg * 2

@cleanup
def task2(arg):
    print("Bonjour from task2({0})".format(arg))
    return arg ** 2

def main():
    p = multiprocessing.Pool(processes=3)
    print(p.map(task1, [1, 2, 3]))
    print(p.map(task2, [1, 2, 3]))

if __name__ == "__main__":
    main()

当你执行这个(除非 stdout 被弄乱,因为我没有为了简洁而将它锁定在这里),你取出东西的顺序应该表明你的清理任务正在运行在每个任务结束时:

Hello from task1(1)
Cleaning up after f(1)
Hello from task1(2)
Cleaning up after f(2)
Hello from task1(3)
Cleaning up after f(3)
[2, 4, 6]

Bonjour from task2(1)
Cleaning up after f(1)
Bonjour from task2(2)
Cleaning up after f(2)
Bonjour from task2(3)
Cleaning up after f(3)
[1, 4, 9]

Your only real option here is to run cleanup at the end of the function you map_async to.

If this cleanup is honestly intended for at process death, you cannot use the concept of a pool. They are orthogonal. A pool does not dictate the process lifetime unless you use maxtasksperchild, which is new in Python 2.7. Even then, you do not gain the ability to run code at process death. However, maxtasksperchild might suit you, because any resources that the process opens will definitely go away when the process is terminated.

That being said, if you have a bunch of functions that you need to run cleanup on, you can save duplication of effort by designing a decorator. Here's an example of what I mean:

import functools
import multiprocessing

def cleanup(f):
    """Decorator for shared cleanup mechanism"""
    @functools.wraps(f)
    def wrapped(arg):
        result = f(arg)
        print("Cleaning up after f({0})".format(arg))
        return result
    return wrapped

@cleanup
def task1(arg):
    print("Hello from task1({0})".format(arg))
    return arg * 2

@cleanup
def task2(arg):
    print("Bonjour from task2({0})".format(arg))
    return arg ** 2

def main():
    p = multiprocessing.Pool(processes=3)
    print(p.map(task1, [1, 2, 3]))
    print(p.map(task2, [1, 2, 3]))

if __name__ == "__main__":
    main()

When you execute this (barring stdout being jumbled because I'm not locking it here for brevity), the order you get things out should indicate that your cleanup task is running at the end of each task:

Hello from task1(1)
Cleaning up after f(1)
Hello from task1(2)
Cleaning up after f(2)
Hello from task1(3)
Cleaning up after f(3)
[2, 4, 6]

Bonjour from task2(1)
Cleaning up after f(1)
Bonjour from task2(2)
Cleaning up after f(2)
Bonjour from task2(3)
Cleaning up after f(3)
[1, 4, 9]
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文