如何从一个类中启动和停止多个子进程?

发布于 2025-01-11 11:05:22 字数 5108 浏览 0 评论 0原文

Python 程序:

import multiprocessing
import time


class Application:

    def __init__(self):
        self._event = multiprocessing.Event()
        self._processes = [
            multiprocessing.Process(target=self._worker)
            for _ in range(multiprocessing.cpu_count())]

    def _worker(self):
        while not self._event.is_set():
            print(multiprocessing.current_process().name)
            time.sleep(1)

    def start(self):
        for process in self._processes:
            print('starting')
            process.start()

    def stop(self):
        self._event.set()
        for process in self._processes:
            process.join()


if __name__ == '__main__':
    application = Application()
    application.start()
    time.sleep(3)
    application.stop()

其输出:

starting
starting
Traceback (most recent call last):
  File "/Users/maggyero/Desktop/application.py", line 31, in <module>
    application.start()
  File "/Users/maggyero/Desktop/application.py", line 21, in start
    process.start()
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
    self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory

在函数 Application.__init__ 中,每次调用 multiprocessing.Process(target=self._worker) 都会初始化一个 multiprocessing.Process code> 实例,实例方法 self._worker 作为其 target 参数。 self._worker 绑定到具有实例属性 self._processesself

在函数 Application.start 中,每次调用 process.start() 都会序列化 target 参数,因此 self._processes >。 self._processes 是 multiprocessing.Process 实例的列表,最初尚未启动。第一次调用 process.start() 启动该列表中的第一个 multiprocessing.Process 实例,没有问题,但第二次调用 process.start() > 失败。

因此启动的 multiprocessing.Process 实例无法序列化。如何解决这个问题呢?

The Python program:

import multiprocessing
import time


class Application:

    def __init__(self):
        self._event = multiprocessing.Event()
        self._processes = [
            multiprocessing.Process(target=self._worker)
            for _ in range(multiprocessing.cpu_count())]

    def _worker(self):
        while not self._event.is_set():
            print(multiprocessing.current_process().name)
            time.sleep(1)

    def start(self):
        for process in self._processes:
            print('starting')
            process.start()

    def stop(self):
        self._event.set()
        for process in self._processes:
            process.join()


if __name__ == '__main__':
    application = Application()
    application.start()
    time.sleep(3)
    application.stop()

Its output:

starting
starting
Traceback (most recent call last):
  File "/Users/maggyero/Desktop/application.py", line 31, in <module>
    application.start()
  File "/Users/maggyero/Desktop/application.py", line 21, in start
    process.start()
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
    self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory

In the function Application.__init__, each call multiprocessing.Process(target=self._worker) initializes a multiprocessing.Process instance with the instance method self._worker as its target argument. self._worker is bound to self which has the instance attribute self._processes.

In the function Application.start, each call process.start() serialises the target argument and therefore self._processes. self._processes is a list of multiprocessing.Process instances, initially not started yet. The first call process.start() starts the first multiprocessing.Process instance in that list without issue, but the second call process.start() fails.

So a started multiprocessing.Process instance cannot be serialised. How to solve that problem?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

我恋#小黄人 2025-01-18 11:05:22

问题的根源在于 multiprocessing.Process 实例的 start 方法将其 _popen 实例属性设置为 multiprocessing.popen_ *.Popen 实例。该实例的初始化执行以下两个步骤(以及其他步骤):

  1. 对于 multiprocessing.popen_spawn_posix.Popen 实例、multiprocessing.popen_spawn_win32.Popen 实例或multiprocessing.popen_forkserver.Popen 实例,但不是 multiprocessing.popen_fork.Popen 实例(即用于 start 方法'spawn' 或启动方法 'forkserver' 但不是启动方法 'fork'),它 序列化 multiprocessing.Process 实例,用于将其写入父进程用于与子进程通信的管道末尾,以便子进程可以执行该进程的 run 方法multiprocessing.Process 实例。

  2. 设置finalizer 实例属性到 multiprocessing.util.Finalize 实例,该实例本身将其 _weakref 实例属性设置为weakref.ref 实例,用于在解释器退出时关闭父进程用于与子进程通信的管道末端。换句话说,它使 multiprocessing.Process 实例持有弱引用。

因此,如果一个 multiprocessing.Process 实例持有对已启动的 multiprocessing.Process 实例的引用,那么它持有一个弱引用(第 2 点),因此启动它将会失败,因为它会序列化(第 1 点)弱引用和弱引用不可序列化:

import multiprocessing

if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')  # or 'forkserver' but not 'fork'
    process_a = multiprocessing.Process()
    process_b = multiprocessing.Process()
    process_b.foo = process_a
    process_a.start()  # creates process_a._popen.finalizer._weakref
    process_b.start()  # TypeError: cannot pickle 'weakref' object

显示序列化问题的最小 Python 程序:

import pickle
import weakref

pickle.dumps(weakref.ref(int))  # TypeError: cannot pickle 'weakref' object

避免序列化问题的两种解决方法:

  • 要么使 target 参数成为 classmethod< /code> 所以它不是绑定到 self (特别是实例属性 self._processes):
class Application:

    def __init__(self):
        self._event = multiprocessing.Event()
        self._processes = [
            multiprocessing.Process(target=self._worker, args=(self._event,))
            for _ in range(multiprocessing.cpu_count())]

    @classmethod
    def _worker(self, event):
        while not self._event.is_set():
            print(multiprocessing.current_process().name)
            time.sleep(1)

    def start(self):
        for process in self._processes:
            print('starting')
            process.start()

    def stop(self):
        self._event.set()
        for process in self._processes:
            process.join()
  • 或者从序列化中专门排除实例属性 self._processes带有 __getstate__target 参数:
class Application:

    def __init__(self):
        self._event = multiprocessing.Event()
        self._processes = [
            multiprocessing.Process(target=self._worker)
            for _ in range(multiprocessing.cpu_count())]

    def _worker(self):
        while not self._event.is_set():
            print(multiprocessing.current_process().name)
            time.sleep(1)

    def start(self):
        for process in self._processes:
            print('starting')
            process.start()

    def stop(self):
        self._event.set()
        for process in self._processes:
            process.join()

    def __getstate__(self):
        state = {}
        for key, value in vars(self).items():
            if key != '_processes':
                state[key] = value
        return state

The root of the problem is that the start method of a multiprocessing.Process instance sets its _popen instance attribute to a multiprocessing.popen_*.Popen instance. The initialization of that instance performs these two steps (among others):

  1. For a multiprocessing.popen_spawn_posix.Popen instance, a multiprocessing.popen_spawn_win32.Popen instance, or a multiprocessing.popen_forkserver.Popen instance but not a multiprocessing.popen_fork.Popen instance (i.e. for the start method 'spawn' or the start method 'forkserver' but not the start method 'fork'), it serialises the multiprocessing.Process instance for writing it to the end of the pipe used by the parent process to communicate with the child process so that the child process can execute the run method of the multiprocessing.Process instance.

  2. It sets its finalizer instance attribute to a multiprocessing.util.Finalize instance which itself sets its _weakref instance attribute to a weakref.ref instance for closing at interpreter exit the ends of the pipes used by the parent process to communicate with the child process. In other words, it makes the multiprocessing.Process instance hold a weak reference.

Thus if a multiprocessing.Process instance holds a reference to a started multiprocessing.Process instance then it holds a weak reference (point 2), so starting it will fail since it will serialise (point 1) the weak reference and weak references are not serialisable:

import multiprocessing

if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')  # or 'forkserver' but not 'fork'
    process_a = multiprocessing.Process()
    process_b = multiprocessing.Process()
    process_b.foo = process_a
    process_a.start()  # creates process_a._popen.finalizer._weakref
    process_b.start()  # TypeError: cannot pickle 'weakref' object

A minimal Python program showing the serialisation issue:

import pickle
import weakref

pickle.dumps(weakref.ref(int))  # TypeError: cannot pickle 'weakref' object

Two workarounds that avoid the serialisation issue:

  • either make the target argument a classmethod so that it is not bound to self (and in particular to the instance attribute self._processes):
class Application:

    def __init__(self):
        self._event = multiprocessing.Event()
        self._processes = [
            multiprocessing.Process(target=self._worker, args=(self._event,))
            for _ in range(multiprocessing.cpu_count())]

    @classmethod
    def _worker(self, event):
        while not self._event.is_set():
            print(multiprocessing.current_process().name)
            time.sleep(1)

    def start(self):
        for process in self._processes:
            print('starting')
            process.start()

    def stop(self):
        self._event.set()
        for process in self._processes:
            process.join()
  • or exclude specifically the instance attribute self._processes from the serialisation of the target argument with __getstate__:
class Application:

    def __init__(self):
        self._event = multiprocessing.Event()
        self._processes = [
            multiprocessing.Process(target=self._worker)
            for _ in range(multiprocessing.cpu_count())]

    def _worker(self):
        while not self._event.is_set():
            print(multiprocessing.current_process().name)
            time.sleep(1)

    def start(self):
        for process in self._processes:
            print('starting')
            process.start()

    def stop(self):
        self._event.set()
        for process in self._processes:
            process.join()

    def __getstate__(self):
        state = {}
        for key, value in vars(self).items():
            if key != '_processes':
                state[key] = value
        return state
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文