Python:通过单击进行多处理
我有以下文件:
binreader/
├─ packet/
│ ├─ __init__.py
│ ├─ aggregator.py
│ ├─ parser.py
│ ├─ uploader.py
├─ __init__.py
├─ __main__.py
├─ upload_concurrent.py
重现错误的代码:
/packet/__init__.py
/packet/aggergator.py
from multiprocessing import Queue, Process
import logging
log = logging.getLogger()
class AggregatorProcess(Process):
def __init__(self, q_in: Queue, q_out: Queue):
super(AggregatorProcess, self).__init__()
self.q_in = q_in
self.q_out = q_out
def run(self):
while x := self.q_in.get():
log.debug(f"Aggregator: {x}")
self.q_out.put(x)
log.debug("Aggregator: Done")
self.q_out.put(None)
/packet/parser.py
from multiprocessing import Queue, Process
import logging
from typing import List
log = logging.getLogger()
class ParserProcess(Process):
"""Threaded version of parser class"""
def __init__(self, data: List, q_out: Queue):
super(ParserProcess, self).__init__()
self.q_out = q_out
self.data = data
def run(self):
for x in self.data:
log.debug(f"Parser: {x}")
self.q_out.put(x)
log.debug("Parser: Done")
self.q_out.put(None)
/packet/uploader.py
from multiprocessing import Queue, Process
import logging
log = logging.getLogger()
class UploaderProcess(Process):
def __init__(self, q_in: Queue) -> None:
super(UploaderProcess, self).__init__()
self.q_in = q_in
def run(self):
while x := self.q_in.get():
log.debug(f"Uploader: {x}")
log.debug("Uploader: Done")
/__init__.py
import sys
import click
import logging
from binreader import upload_concurrent
@click.group()
def cli():
logging.basicConfig(
format="%(asctime)s [%(processName)-16s]@%(lineno)4d %(levelname)s: %(message)s",
level=logging.DEBUG,
handlers=[
logging.StreamHandler(sys.stdout),
],
)
cli.add_command(upload_concurrent.upload_data_concurrent)
cli()
/__main__.py
/upload_concurrent.py
from multiprocessing import Queue
import click
from .packet.aggregator import AggregatorProcess
from .packet.parser import ParserProcess
from .packet.uploader import UploaderProcess
log = logging.getLogger()
@click.command(name="upload-concurrent")
def upload_data_concurrent():
parser_agg_wq = Queue()
agg_upl_wq = Queue()
parser = ParserProcess([1, 2, 3, 4, 5], parser_agg_wq)
parser.name = type(parser).__name__
aggregator = AggregatorProcess(parser_agg_wq, agg_upl_wq)
aggregator.name = type(aggregator).__name__
uploader = UploaderProcess(agg_upl_wq)
uploader.name = type(uploader).__name__
parser.start()
aggregator.start()
uploader.start()
parser.join()
aggregator.join()
uploader.join()
我有完成处理的同步代码,但是它是这样的太慢了约 1 小时/GB。每两周需要处理大约 1.5TB 的数据。
引入多处理时,每次调用 Process.start 时都会出现以下错误:
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
此程序作为模块运行:python -m binreader upload-concurrent
我已阅读 这个问题,但是我不确定在哪里添加 if __name__ == '__main__':
守卫。鉴于这是使用点击模块,并且我不确定这对模块的启动/运行方式有何影响,这可能不是一个可行的解决方案。
非常感谢任何指导
I have the following files:
binreader/
├─ packet/
│ ├─ __init__.py
│ ├─ aggregator.py
│ ├─ parser.py
│ ├─ uploader.py
├─ __init__.py
├─ __main__.py
├─ upload_concurrent.py
Code that reproduces the error:
/packet/__init__.py
<empty>
/packet/aggergator.py
from multiprocessing import Queue, Process
import logging
log = logging.getLogger()
class AggregatorProcess(Process):
def __init__(self, q_in: Queue, q_out: Queue):
super(AggregatorProcess, self).__init__()
self.q_in = q_in
self.q_out = q_out
def run(self):
while x := self.q_in.get():
log.debug(f"Aggregator: {x}")
self.q_out.put(x)
log.debug("Aggregator: Done")
self.q_out.put(None)
/packet/parser.py
from multiprocessing import Queue, Process
import logging
from typing import List
log = logging.getLogger()
class ParserProcess(Process):
"""Threaded version of parser class"""
def __init__(self, data: List, q_out: Queue):
super(ParserProcess, self).__init__()
self.q_out = q_out
self.data = data
def run(self):
for x in self.data:
log.debug(f"Parser: {x}")
self.q_out.put(x)
log.debug("Parser: Done")
self.q_out.put(None)
/packet/uploader.py
from multiprocessing import Queue, Process
import logging
log = logging.getLogger()
class UploaderProcess(Process):
def __init__(self, q_in: Queue) -> None:
super(UploaderProcess, self).__init__()
self.q_in = q_in
def run(self):
while x := self.q_in.get():
log.debug(f"Uploader: {x}")
log.debug("Uploader: Done")
/__init__.py
import sys
import click
import logging
from binreader import upload_concurrent
@click.group()
def cli():
logging.basicConfig(
format="%(asctime)s [%(processName)-16s]@%(lineno)4d %(levelname)s: %(message)s",
level=logging.DEBUG,
handlers=[
logging.StreamHandler(sys.stdout),
],
)
cli.add_command(upload_concurrent.upload_data_concurrent)
cli()
/__main__.py
<empty>
/upload_concurrent.py
from multiprocessing import Queue
import click
from .packet.aggregator import AggregatorProcess
from .packet.parser import ParserProcess
from .packet.uploader import UploaderProcess
log = logging.getLogger()
@click.command(name="upload-concurrent")
def upload_data_concurrent():
parser_agg_wq = Queue()
agg_upl_wq = Queue()
parser = ParserProcess([1, 2, 3, 4, 5], parser_agg_wq)
parser.name = type(parser).__name__
aggregator = AggregatorProcess(parser_agg_wq, agg_upl_wq)
aggregator.name = type(aggregator).__name__
uploader = UploaderProcess(agg_upl_wq)
uploader.name = type(uploader).__name__
parser.start()
aggregator.start()
uploader.start()
parser.join()
aggregator.join()
uploader.join()
I have synchronous code that completes the processing, however it is way too slow at ~1 hr/GB. There is about 1.5TB of data that needs processed every two weeks.
When introducing multiprocessing I am getting the following error once per call to Process.start:
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
This program is run as a module: python -m binreader upload-concurrent
I have read this question, however I am not sure where to add the if __name__ == '__main__':
guard. This may not be a viable solution given this is using the click module and I'm unsure what effect that has on how the module starts/runs.
Any guidance is greatly appreciated
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
if __name__ == '__main__':
如果直接调用程序而不是从另一个模块调用,则条件为 true。它需要进入代码的主要范围。错误信息中的主模块虽然正确,但对我来说似乎有点令人困惑。条件将转到程序中第一个调用所在的位置。
if __name__ == '__main__':
condition is true if the program is called directly rather than from another module. It needs to go in the main scope of the code.main module
as it is termed in the error info, although correct, seems a little confusing to me.The condition would go where the first call in the program would be.