生产者/消费者 多个生产者和一个消费者写入文件 Python

发布于 2024-12-02 16:16:20 字数 362 浏览 2 评论 0原文

我的要求类似于 多个生产者,单个消费者 除了我在 python 中需要它之外,

我创建了一个生成 5 个并发进程的应用程序(我正在使用多处理库)。这 5 个进程独立生成 dict 格式的输出。

早些时候我将输出打印到控制台,但现在想将其输出到文件。

我正在寻找一种模式,其中我的 5 个生产者都写入支持并发写入的共享队列。

单个消费者进程也可以访问该队列并使用其中的数据,如果没有数据可写入,则能够等待,并在生产者完成任务时终止。

谢谢阿努杰

My requirement is similar to Multiple producers, single consumer
except i need it in python

I have created an application that spawns 5 concurrent processes (I am using multiprocessing library).These 5 process are independently producing output in dict format.

Earlier I was printing the output to console, but now would like to output it to a file.

I am looking for a pattern where all my 5 producers write to a shared queue that supports concurrent writes.

And a single consumer process that too has access to this queue and consumes the data from it, with the ability to wait if there is no data to write and terminate when producers are done with their task.

Thanks Anuj

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

2024-12-09 16:16:20

由于您已经在使用多进程,因此您需要的只是 Queue 类

和示例(从 Queue 文档修改)

from multiprocessing import Process, Queue

def child(q, url):
    result = my_process(url)
    q.put(result)

if __name__ == '__main__':
    q = Queue()
    urls = [...]
    children = []
    for url in urls:
       p = Process(target=child, args=(q,url))
       p.start()
       children.append(p)
    for p in children:
       p.join()
       print q.get() #or write to file (might not be the answer from this child)

编辑:
对于每个孩子的多个答案,请将最后一个 for 循环替换为:

while 0 != multiprocessing.active_children():
    print q.get()

since you are already using multiprocess, all you need is the Queue class

and a sample (modified from the Queue docs)

from multiprocessing import Process, Queue

def child(q, url):
    result = my_process(url)
    q.put(result)

if __name__ == '__main__':
    q = Queue()
    urls = [...]
    children = []
    for url in urls:
       p = Process(target=child, args=(q,url))
       p.start()
       children.append(p)
    for p in children:
       p.join()
       print q.get() #or write to file (might not be the answer from this child)

Edit:
For multiple answers from each child replace the last for loop with:

while 0 != multiprocessing.active_children():
    print q.get()
像你 2024-12-09 16:16:20

我在 Python 中实现了这种模式,其中一个主管进程生成一堆进程,然后使用来自所有进程的日志消息并将这些日志消息写入单个日志文件。

基本上,我使用 execve 来生成进程,并指定每个进程的 stderr 连接到 PTY。然后我的主管打开所有主 PTY 并使用 select 循环读取它们。 PTY 由 tty 行规则进行行缓冲,您可以在它们上使用 readline 进行非阻塞读取。我相信我还在 PTY 上使用了 fcntl 来设置 os.O_NONBLOCK 。

效果很好。唯一的问题是,当您从 select poll 返回时,每个 pty 需要读取不止一行,否则您可能会丢失输出(假设您有一些东西正在获取子进程并重新启动)。通过读取每个 PTY 上可用的所有行,您还可以避免回溯与其他消息交错。

如果您确实需要发送对象而不是文本行,那么您最好使用真正的发布-订阅消息系统,例如 AMQP 或 ZeroMQ。 AMQP 是一把比您需要的大得多的锤子,因此只有在您希望构建大量类似的应用程序时才检查一下。否则,尝试更简单的 0MQ http://www.zeromq.org/intro:read-the -manual 这只是一个消息传递库,使套接字更易于使用。

I've implemented this pattern in Python where a supervisor process spawns a bunch of processes and then consumes log messages from all of them and writes those log messages into a single log file.

Basically, I used execve to spawan the processes an specified that stderr for each process was connected to a PTY. Then my supervisor opened all the master PTYs and used select to read from them in a loop. The PTY's are line-buffered by the tty line discipline and you can use readline on them for non=blocking reads. I believe that I also used fcntl on the PTYs to set os.O_NONBLOCK as well.

Works great. The only hitch is that you need to read more than one line per pty when you return from the select poll, otherwise you can lose output (assuming you have something reaping the child processes and restarting). By reading all lines available on each PTY you also avoid tracebacks getting interleaved with other messages.

If you really need to send objects rather than text lines, then you are better of using a real pub-sub messaging system like AMQP or ZeroMQ. AMQP is a much bigger hammer than you need so only check that out if you expect to be building lots of similar apps. Otherwise, try the simpler 0MQ http://www.zeromq.org/intro:read-the-manual which is just a messaging library that makes sockets much much easier to use.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文