Python:通过单击进行多处理

发布于 2025-01-11 17:44:38 字数 4211 浏览 0 评论 0原文

我有以下文件:

binreader/
├─ packet/
│  ├─ __init__.py
│  ├─ aggregator.py
│  ├─ parser.py
│  ├─ uploader.py
├─ __init__.py
├─ __main__.py
├─ upload_concurrent.py

重现错误的代码:

/packet/__init__.py

;

/packet/aggergator.py

from multiprocessing import Queue, Process
import logging


log = logging.getLogger()


class AggregatorProcess(Process):
    def __init__(self, q_in: Queue, q_out: Queue):
        super(AggregatorProcess, self).__init__()
        self.q_in = q_in
        self.q_out = q_out

    def run(self):
        while x := self.q_in.get():
            log.debug(f"Aggregator: {x}")
            self.q_out.put(x)

        log.debug("Aggregator: Done")
        self.q_out.put(None)

/packet/parser.py

from multiprocessing import Queue, Process
import logging
from typing import List

log = logging.getLogger()


class ParserProcess(Process):
    """Threaded version of parser class"""

    def __init__(self, data: List, q_out: Queue):
        super(ParserProcess, self).__init__()
        self.q_out = q_out
        self.data = data

    def run(self):

        for x in self.data:
            log.debug(f"Parser: {x}")
            self.q_out.put(x)

        log.debug("Parser: Done")
        self.q_out.put(None)

/packet/uploader.py

from multiprocessing import Queue, Process
import logging

log = logging.getLogger()


class UploaderProcess(Process):
    def __init__(self, q_in: Queue) -> None:
        super(UploaderProcess, self).__init__()
        self.q_in = q_in

    def run(self):
        while x := self.q_in.get():
            log.debug(f"Uploader: {x}")

        log.debug("Uploader: Done")

/__init__.py

import sys
import click
import logging

from binreader import upload_concurrent


@click.group()
def cli():
    logging.basicConfig(
        format="%(asctime)s [%(processName)-16s]@%(lineno)4d %(levelname)s: %(message)s",
        level=logging.DEBUG,
        handlers=[
            logging.StreamHandler(sys.stdout),
        ],
    )


cli.add_command(upload_concurrent.upload_data_concurrent)

cli()

/__main__.py

/upload_concurrent.py

from multiprocessing import Queue
import click

from .packet.aggregator import AggregatorProcess
from .packet.parser import ParserProcess
from .packet.uploader import UploaderProcess

log = logging.getLogger()


@click.command(name="upload-concurrent")
def upload_data_concurrent():

    parser_agg_wq = Queue()
    agg_upl_wq = Queue()

    parser = ParserProcess([1, 2, 3, 4, 5], parser_agg_wq)
    parser.name = type(parser).__name__

    aggregator = AggregatorProcess(parser_agg_wq, agg_upl_wq)
    aggregator.name = type(aggregator).__name__

    uploader = UploaderProcess(agg_upl_wq)
    uploader.name = type(uploader).__name__

    parser.start()
    aggregator.start()
    uploader.start()

    parser.join()
    aggregator.join()
    uploader.join()

我有完成处理的同步代码,但是它是这样的太慢了约 1 小时/GB。每两周需要处理大约 1.5TB 的数据。

引入多处理时,每次调用 Process.start 时都会出现以下错误:

RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

此程序作为模块运行:python -m binreader upload-concurrent

我已阅读 这个问题,但是我不确定在哪里添加 if __name__ == '__main__': 守卫。鉴于这是使用点击模块,并且我不确定这对模块的启动/运行方式有何影响,这可能不是一个可行的解决方案。

非常感谢任何指导

I have the following files:

binreader/
├─ packet/
│  ├─ __init__.py
│  ├─ aggregator.py
│  ├─ parser.py
│  ├─ uploader.py
├─ __init__.py
├─ __main__.py
├─ upload_concurrent.py

Code that reproduces the error:

/packet/__init__.py

<empty>

/packet/aggergator.py

from multiprocessing import Queue, Process
import logging


log = logging.getLogger()


class AggregatorProcess(Process):
    def __init__(self, q_in: Queue, q_out: Queue):
        super(AggregatorProcess, self).__init__()
        self.q_in = q_in
        self.q_out = q_out

    def run(self):
        while x := self.q_in.get():
            log.debug(f"Aggregator: {x}")
            self.q_out.put(x)

        log.debug("Aggregator: Done")
        self.q_out.put(None)

/packet/parser.py

from multiprocessing import Queue, Process
import logging
from typing import List

log = logging.getLogger()


class ParserProcess(Process):
    """Threaded version of parser class"""

    def __init__(self, data: List, q_out: Queue):
        super(ParserProcess, self).__init__()
        self.q_out = q_out
        self.data = data

    def run(self):

        for x in self.data:
            log.debug(f"Parser: {x}")
            self.q_out.put(x)

        log.debug("Parser: Done")
        self.q_out.put(None)

/packet/uploader.py

from multiprocessing import Queue, Process
import logging

log = logging.getLogger()


class UploaderProcess(Process):
    def __init__(self, q_in: Queue) -> None:
        super(UploaderProcess, self).__init__()
        self.q_in = q_in

    def run(self):
        while x := self.q_in.get():
            log.debug(f"Uploader: {x}")

        log.debug("Uploader: Done")

/__init__.py

import sys
import click
import logging

from binreader import upload_concurrent


@click.group()
def cli():
    logging.basicConfig(
        format="%(asctime)s [%(processName)-16s]@%(lineno)4d %(levelname)s: %(message)s",
        level=logging.DEBUG,
        handlers=[
            logging.StreamHandler(sys.stdout),
        ],
    )


cli.add_command(upload_concurrent.upload_data_concurrent)

cli()

/__main__.py

<empty>

/upload_concurrent.py

from multiprocessing import Queue
import click

from .packet.aggregator import AggregatorProcess
from .packet.parser import ParserProcess
from .packet.uploader import UploaderProcess

log = logging.getLogger()


@click.command(name="upload-concurrent")
def upload_data_concurrent():

    parser_agg_wq = Queue()
    agg_upl_wq = Queue()

    parser = ParserProcess([1, 2, 3, 4, 5], parser_agg_wq)
    parser.name = type(parser).__name__

    aggregator = AggregatorProcess(parser_agg_wq, agg_upl_wq)
    aggregator.name = type(aggregator).__name__

    uploader = UploaderProcess(agg_upl_wq)
    uploader.name = type(uploader).__name__

    parser.start()
    aggregator.start()
    uploader.start()

    parser.join()
    aggregator.join()
    uploader.join()

I have synchronous code that completes the processing, however it is way too slow at ~1 hr/GB. There is about 1.5TB of data that needs processed every two weeks.

When introducing multiprocessing I am getting the following error once per call to Process.start:

RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

This program is run as a module: python -m binreader upload-concurrent

I have read this question, however I am not sure where to add the if __name__ == '__main__': guard. This may not be a viable solution given this is using the click module and I'm unsure what effect that has on how the module starts/runs.

Any guidance is greatly appreciated

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

彻夜缠绵 2025-01-18 17:44:38

if __name__ == '__main__': 如果直接调用程序而不是从另一个模块调用,则条件为 true。它需要进入代码的主要范围。错误信息中的主模块虽然正确,但对我来说似乎有点令人困惑。

条件将转到程序中第一个调用所在的位置。

if __name__ == '__main__': condition is true if the program is called directly rather than from another module. It needs to go in the main scope of the code. main module as it is termed in the error info, although correct, seems a little confusing to me.

The condition would go where the first call in the program would be.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文