Python 进程池非守护进程?

发布于 2024-11-28 05:46:59 字数 380 浏览 3 评论 0 原文

是否可以创建一个非守护进程的 python 池?我希望一个池能够调用内部有另一个池的函数。

我想要这个,因为守护进程无法创建进程。具体来说,它会导致错误:

AssertionError: daemonic processes are not allowed to have children

例如,考虑这样一种情况:function_a 有一个运行 function_b 的池,而 function_b 有一个运行 function_c 的池。该函数链将失败,因为 function_b 正在守护进程中运行,而守护进程无法创建进程。

Would it be possible to create a python Pool that is non-daemonic? I want a pool to be able to call a function that has another pool inside.

I want this because deamon processes cannot create process. Specifically, it will cause the error:

AssertionError: daemonic processes are not allowed to have children

For example, consider the scenario where function_a has a pool which runs function_b which has a pool which runs function_c. This function chain will fail, because function_b is being run in a daemon process, and daemon processes cannot create processes.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(11

清音悠歌 2024-12-05 05:46:59

multiprocessing.pool.Pool 类在其 __init__ 方法中创建工作进程,使它们成为守护进程并启动它们,并且无法重新设置它们的 daemon 属性在启动之前设置为 False(此后不再允许)。但是您可以创建自己的 multiprocessing.pool.Pool 子类(multiprocessing.Pool 只是一个包装函数)并替换您自己的 multiprocessing.Process< /code> 子类,始终是非守护进程,用于工作进程。

这是如何执行此操作的完整示例。重要的部分是顶部的两个类 NoDaemonProcessMyPool 以及调用 pool.close()pool.join( ) 最后在您的 MyPool 实例上。

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()

The multiprocessing.pool.Pool class creates the worker processes in its __init__ method, makes them daemonic and starts them, and it is not possible to re-set their daemon attribute to False before they are started (and afterwards it's not allowed anymore). But you can create your own sub-class of multiprocesing.pool.Pool (multiprocessing.Pool is just a wrapper function) and substitute your own multiprocessing.Process sub-class, which is always non-daemonic, to be used for the worker processes.

Here's a full example of how to do this. The important parts are the two classes NoDaemonProcess and MyPool at the top and to call pool.close() and pool.join() on your MyPool instance at the end.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()
关于从前 2024-12-05 05:46:59

我有必要在 Python 3.7 中使用非守护进程池,并最终调整了已接受答案中发布的代码。下面是创建非守护进程池的代码片段:

import multiprocessing.pool

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(NestablePool, self).__init__(*args, **kwargs)

由于 multiprocessing 的当前实现已被广泛重构为基于上下文,我们需要提供一个 NoDaemonContext 类来有我们的 NoDaemonProcess 作为属性。然后,NestablePool 将使用该上下文而不是默认上下文。

也就是说,我应该警告这种方法至少有两个警告:

  1. 它仍然取决于 multiprocessing 包的实现细节,因此可能随时中断。
  2. 多处理使得使用非守护进程变得如此困难,这是有充分理由的,其中许多原因都已解释此处。我认为最引人注目的是:

至于允许子线程使用以下方式生成其自己的子线程
子进程冒着创建一小群僵尸的风险
'grandchildren' 如果父线程或子线程在此之前终止
子流程完成并返回。

I had the necessity to employ a non-daemonic pool in Python 3.7 and ended up adapting the code posted in the accepted answer. Below there's the snippet that creates the non-daemonic pool:

import multiprocessing.pool

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(NestablePool, self).__init__(*args, **kwargs)

As the current implementation of multiprocessing has been extensively refactored to be based on contexts, we need to provide a NoDaemonContext class that has our NoDaemonProcess as attribute. NestablePool will then use that context instead of the default one.

That said, I should warn that there are at least two caveats to this approach:

  1. It still depends on implementation details of the multiprocessing package, and could therefore break at any time.
  2. There are valid reasons why multiprocessing made it so hard to use non-daemonic processes, many of which are explained here. The most compelling in my opinion is:

As for allowing children threads to spawn off children of its own using
subprocess runs the risk of creating a little army of zombie
'grandchildren' if either the parent or child threads terminate before
the subprocess completes and returns.

路还长,别太狂 2024-12-05 05:46:59

从 Python 3.8 开始,concurrent.futures。 ProcessPoolExecutor没有这个限制。它可以有一个嵌套的进程池,完全没有问题:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

上面的演示代码是用Python 3.8测试的。

然而,ProcessPoolExecutor 的限制是它没有 maxtasksperchild。如果您需要这个,请考虑Massimiliano 的回答

信用:jfs 的回答

As of Python 3.8, concurrent.futures.ProcessPoolExecutor doesn't have this limitation. It can have a nested process pool with no problem at all:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

The above demonstration code was tested with Python 3.8.

A limitation of ProcessPoolExecutor, however, is that it doesn't have maxtasksperchild. If you need this, consider the answer by Massimiliano instead.

Credit: answer by jfs

╭ゆ眷念 2024-12-05 05:46:59

multiprocessing 模块有一个很好的界面,可以将池与进程或< /strong> 线程。根据您当前的用例,您可能会考虑对外部池使用 multiprocessing.pool.ThreadPool,这将导致线程(允许从内部生成进程)反对流程。

它可能受到 GIL 的限制,但在我的特定情况下(我测试了两者),创建时外部的进程的启动时间这里远远超过了解决方案线程池


进程替换为线程确实很容易。详细了解如何使用 ThreadPool 解决方案此处此处

The multiprocessing module has a nice interface to use pools with processes or threads. Depending on your current use case, you might consider using multiprocessing.pool.ThreadPool for your outer Pool, which will result in threads (that allow to spawn processes from within) as opposed to processes.

It might be limited by the GIL, but in my particular case (I tested both), the startup time for the processes from the outer Pool as created here far outweighed the solution with ThreadPool.


It's really easy to swap Processes for Threads. Read more about how to use a ThreadPool solution here or here.

薄情伤 2024-12-05 05:46:59

在某些 Python 版本上,将标准池替换为自定义池可能会引发错误:AssertionError:组参数现在必须为 None

在这里我找到了一个可以提供帮助的解决方案:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc

On some Python versions replacing standard Pool to custom can raise error: AssertionError: group argument must be None for now.

Here I found a solution that can help:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc
落墨 2024-12-05 05:46:59

我见过有人通过使用 celerymultiprocessing 分支来处理这个问题,称为 台球(多处理池扩展),它允许守护进程生成子进程。解决方法是简单地通过以下方式替换 multiprocessing 模块:

import billiard as multiprocessing

I have seen people dealing with this issue by using celery's fork of multiprocessing called billiard (multiprocessing pool extensions), which allows daemonic processes to spawn children. The walkaround is to simply replace the multiprocessing module by:

import billiard as multiprocessing
一笑百媚生 2024-12-05 05:46:59

我遇到的问题是尝试在模块之间导入全局变量,导致 ProcessPool() 行被多次评估。

globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
     
    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

然后从代码中的其他位置安全地导入

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         

我在这里围绕 pathos.multiprocessing 编写了一个更扩展的包装类:

作为旁注,如果您的用例只需要异步多进程映射作为性能优化,然后 joblib 将在幕后管理所有进程池并允许使用这种非常简单的语法:

squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )

The issue I encountered was in trying to import globals between modules, causing the ProcessPool() line to get evaluated multiple times.

globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
     
    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

Then import safely from elsewhere in your code

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         

I have written a more expanded wrapper class around pathos.multiprocessing here:

As a side note, if your usecase just requires async multiprocess map as a performance optimization, then joblib will manage all your process pools behind the scenes and allow this very simple syntax:

squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )
你的心境我的脸 2024-12-05 05:46:59

即使您已经处于守护进程中,以下也是启动池的方法。这是在 python 3.8.5 中测试的。

首先,定义 Undaemonize 上下文管理器,它临时删除当前进程的守护进程状态。

class Undaemonize(object):
    '''Context Manager to resolve AssertionError: daemonic processes are not allowed to have children
    
    Tested in python 3.8.5'''
    def __init__(self):
        self.p = multiprocessing.process.current_process()
        if 'daemon' in self.p._config:
            self.daemon_status_set = True
        else:
            self.daemon_status_set = False
        self.daemon_status_value = self.p._config.get('daemon')
    def __enter__(self):
        if self.daemon_status_set:
            del self.p._config['daemon']
    def __exit__(self, type, value, traceback):
        if self.daemon_status_set:
            self.p._config['daemon'] = self.daemon_status_value

现在,您可以按如下方式启动池,甚至可以在守护进程中启动:

with Undaemonize():
    pool = multiprocessing.Pool(1)
pool.map(... # you can do something with the pool outside of the context manager 

虽然此处的其他方法旨在首先创建非守护进程的池,但此方法允许您启动池,即使您处于守护进程中已经。

Here is how you can start a pool, even if you are in a daemonic process already. This was tested in python 3.8.5

First, define the Undaemonize context manager, which temporarily deletes the daemon state of the current process.

class Undaemonize(object):
    '''Context Manager to resolve AssertionError: daemonic processes are not allowed to have children
    
    Tested in python 3.8.5'''
    def __init__(self):
        self.p = multiprocessing.process.current_process()
        if 'daemon' in self.p._config:
            self.daemon_status_set = True
        else:
            self.daemon_status_set = False
        self.daemon_status_value = self.p._config.get('daemon')
    def __enter__(self):
        if self.daemon_status_set:
            del self.p._config['daemon']
    def __exit__(self, type, value, traceback):
        if self.daemon_status_set:
            self.p._config['daemon'] = self.daemon_status_value

Now you can start a pool as follows, even from within a daemon process:

with Undaemonize():
    pool = multiprocessing.Pool(1)
pool.map(... # you can do something with the pool outside of the context manager 

While the other approaches here aim to create pool that is not daemonic in the first place, this approach allows you to start a pool even if you are in a daemonic process already.

偷得浮生 2024-12-05 05:46:59

当错误看似误报时,这提供了一种解决方法。正如 James 所指出的,从守护进程中无意导入可能会发生这种情况。

例如,如果您有以下简单代码,WORKER_POOL 可能会无意中从工作线程导入,从而导致错误。

import multiprocessing

WORKER_POOL = multiprocessing.Pool()

一个简单但可靠的解决方法是:

import multiprocessing
import multiprocessing.pool


class MyClass:

    @property
    def worker_pool(self) -> multiprocessing.pool.Pool:
        # Ref: https://stackoverflow.com/a/63984747/
        try:
            return self._worker_pool  # type: ignore
        except AttributeError:
            # pylint: disable=protected-access
            self.__class__._worker_pool = multiprocessing.Pool()  # type: ignore
            return self.__class__._worker_pool  # type: ignore
            # pylint: enable=protected-access

在上面的解决方法中,可以使用 MyClass.worker_pool 而不会出现错误。如果您认为这种方法可以改进,请告诉我。

This presents a workaround for when the error is seemingly a false-positive. As also noted by James, this can happen to an unintentional import from a daemonic process.

For example, if you have the following simple code, WORKER_POOL can inadvertently be imported from a worker, leading to the error.

import multiprocessing

WORKER_POOL = multiprocessing.Pool()

A simple but reliable approach for a workaround is:

import multiprocessing
import multiprocessing.pool


class MyClass:

    @property
    def worker_pool(self) -> multiprocessing.pool.Pool:
        # Ref: https://stackoverflow.com/a/63984747/
        try:
            return self._worker_pool  # type: ignore
        except AttributeError:
            # pylint: disable=protected-access
            self.__class__._worker_pool = multiprocessing.Pool()  # type: ignore
            return self.__class__._worker_pool  # type: ignore
            # pylint: enable=protected-access

In the above workaround, MyClass.worker_pool can be used without the error. If you think this approach can be improved upon, let me know.

夜司空 2024-12-05 05:46:59

从 Python 3.7 版本开始,我们可以使用 if __name__ == "__main__": 创建非守护进程 ProcessPoolExecutor,

这在使用多处理时是必要的。

from concurrent.futures import ProcessPoolExecutor as Pool

num_pool = 10
    
def main_pool(num):
    print(num)
    strings_write = (f'{num}-{i}' for i in range(num))
    with Pool(num) as subp:
        subp.map(sub_pool,strings_write)
    return None


def sub_pool(x):
    print(f'{x}')
    return None


if __name__ == "__main__":
    with Pool(num_pool) as p:
        p.map(main_pool,list(range(1,num_pool+1)))

Since Python version 3.7 we can create non-daemonic ProcessPoolExecutor

Using if __name__ == "__main__": is necessary while using multiprocessing.

from concurrent.futures import ProcessPoolExecutor as Pool

num_pool = 10
    
def main_pool(num):
    print(num)
    strings_write = (f'{num}-{i}' for i in range(num))
    with Pool(num) as subp:
        subp.map(sub_pool,strings_write)
    return None


def sub_pool(x):
    print(f'{x}')
    return None


if __name__ == "__main__":
    with Pool(num_pool) as p:
        p.map(main_pool,list(range(1,num_pool+1)))
ˉ厌 2024-12-05 05:46:59

对我来说,这个答案有帮助: https://stackoverflow.com/a/71929459/14715428

虽然问题本身有点不同,速度保持不变,我不需要重写任何东西

For me it was that answer that helped: https://stackoverflow.com/a/71929459/14715428

Although the question itself is a little bit different, the speed remained the same and I didn't have to rewrite anything

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文