Python中的多个步骤多线程

发布于 2025-01-23 17:59:49 字数 1070 浏览 3 评论 0原文

我对多线程处理有点困扰。也许不是卡住,而是挣扎。 情况是:

  1. 过程传入的数据元素在循环时读取 - 我创建了Multiprocessing.poolMultiprocessing.queue.queue。作为加工工人,我做:

    the_pool = pool(processes,process_item_queue,(i_queue,o_queue,)))

  2. 的想法是,process_item_queue生成了一些我想在一个我想在某个过程中进行处理的数据队列(因此​​o_queue -output_queue)。 process_item_queue的主体看起来像:

      true:
            args = i_queue.get(block = true)
            如果args是没有的:
                打印(“从队列中脱颖而出”)
                休息
            output = process_item(*args)
            o_queue.put([输出])
     
  3. 理想情况下,某些过程已经可以开始处理O_Queue,但是我不知道如何实现动态分配的过程。

  4. 我现在正在努力的观点,实际上可以分为多个部分:

4.1。 process_item函数返回一个dict。所有dicts都必须与一些自定义逻辑合并。我的想法是,每个过程都应从队列中读取两个命令,使用我的自定义逻辑合并它们,然后重新发布到队列。显然,在某个时候,我们最终只有两个命令,可能是在不同的过程中。我可以在主线程中进行最后一个合并,但是我不知道如何真正告知我的流程,好吧,那里还有更多!通常,我不会发送,每个过程都会退出,但这更复杂,因为我们走了1024-> 512-> 256-> 128 ...等。我也不能给它一个明确的超时,取决于CPU,每个过程可能需要花费更多的时间。 4.2我无法真正关闭池,因为O_Queue到处都是物品。我什至可以达到一些内存限制,因此实际开始即时处理这些项目可能很重要。

任何建议将不胜感激;)

I'm a bit stuck with my multithreading processing. Not maybe stuck, but struggling.
The case is:

  1. Process incoming data elements read in while loop - I created a multiprocessing.Pool and a multiprocessing.Queue. As a processing worker I do:

    the_pool = Pool(PROCESSES, process_item_queue, (i_queue, o_queue,))

  2. The idea is that process_item_queue generates some still complex data that I want to process ALSO in a queue (hence o_queue - output_queue). The body of process_item_queue looks something like:

        while True:
            args = i_queue.get(block=True)
            if args is None:
                print("Breaking out of the queue")
                break
            output = process_item(*args)
            o_queue.put([output])
    
  3. Ideally, at some point, some of the processes could already start processing the o_queue, but I have no idea how to achieve that dynamically assigned processes.

  4. The point I'm struggling with now, can actually be divided into multiple parts:

4.1. The process_item function returns a dict. All of the dicts have to be merged with some custom logic. My idea is, that each process should read two dicts from the queue, merge them using my custom logic, and republish dict back to the queue. At some point, obviously, we'll end up with just two dicts, possibly in different processes. I'd be okay with doing the last merge in main thread, but I don't know how to really inform my processes that, well, there's nothing more there! Normally, I'd send None and each process would quit, but this is more complicated, as we go 1024->512->256->128... etc. I also can't give it a definitive timeout, as depending on CPU, each process can take more or less time.
4.2 I can't really close the pool because o_queue is full of items. I could probably even hit some memory limits so it might be important to actually start processing those items on the fly.

Any advice will be appreciated ;)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文