我应该在这里使用协程或其他调度对象吗?

发布于 2024-11-02 00:37:08 字数 649 浏览 0 评论 0原文

我目前有生成器形式的代码,它调用 IO 绑定任务。生成器实际上也调用子生成器,因此我们将不胜感激更通用的解决方案。

类似于以下内容:

def processed_values(list_of_io_tasks):
    for task in list_of_io_tasks:
        value = slow_io_call(task)
        yield postprocess(value) # in real version, would iterate over 
                                 # processed_values2(value) here

我可以完全控制 slow_io_call,并且我不关心从 processed_values 获取项目的顺序。是否有类似协程的东西可以通过将 slow_io_call 转换为异步函数并使用返回最快的调用来以最快的顺序获得生成的结果?我预计 list_of_io_tasks 的长度至少有数千个条目。除了显式线程之外,我从未做过任何并行工作,特别是我从未使用过可用的各种形式的轻量级线程。

我需要使用标准 CPython 实现,并且我在 Linux 上运行。

I currently have code in the form of a generator which calls an IO-bound task. The generator actually calls sub-generators as well, so a more general solution would be appreciated.

Something like the following:

def processed_values(list_of_io_tasks):
    for task in list_of_io_tasks:
        value = slow_io_call(task)
        yield postprocess(value) # in real version, would iterate over 
                                 # processed_values2(value) here

I have complete control over slow_io_call, and I don't care in which order I get the items from processed_values. Is there something like coroutines I can use to get the yielded results in the fastest order by turning slow_io_call into an asynchronous function and using whichever call returns fastest? I expect list_of_io_tasks to be at least thousands of entries long. I've never done any parallel work other than with explicit threading, and in particular I've never used the various forms of lightweight threading which are available.

I need to use the standard CPython implementation, and I'm running on Linux.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

牵强ㄟ 2024-11-09 00:37:08

听起来你正在寻找 multiprocessing.Pool() ,特别是 Pool.imap_unordered() 方法。

这是您的函数的一个端口,用于使用 imap_unordered() 并行化对 Slow_io_call() 的调用。

 def processed_values(list_of_io_tasks):
     pool = multiprocessing.Pool(4) # num workers
     results = pool.imap_unordered(slow_io_call, list_of_io_tasks)
     while True:
         yield results.next(9999999) # large time-out

请注意,您也可以直接迭代 results(即 for item in results:ield item),无需使用 while True 循环,但是调用 results.next() 具有超时值可以解决此问题 多处理键盘中断bug 并允许您使用 Ctrl-C 终止主进程和所有子进程。另请注意,此函数不会捕获 StopIteration 异常,但当 results.next() 没有更多项返回时,将会引发异常。这对于生成器函数来说是合法的,例如这个函数,当没有更多的值可以产生时,预计会引发 StopIteration 错误,或者只是停止产生,并且将代表它引发 StopIteration 异常。

要使用线程代替进程,请替换
导入多处理

导入 multiprocessing.dummy 作为多处理

Sounds like you are in search of multiprocessing.Pool(), specifically the Pool.imap_unordered() method.

Here is a port of your function to use imap_unordered() to parallelize calls to slow_io_call().

 def processed_values(list_of_io_tasks):
     pool = multiprocessing.Pool(4) # num workers
     results = pool.imap_unordered(slow_io_call, list_of_io_tasks)
     while True:
         yield results.next(9999999) # large time-out

Note that you could also iterate over results directly (i.e. for item in results: yield item) without a while True loop, however calling results.next() with a time-out value works around this multiprocessing keyboard interrupt bug and allows you to kill the main process and all subprocesses with Ctrl-C. Also note that the StopIteration exceptions are not caught in this function but one will be raised when results.next() has no more items return. This is legal from generator functions, such as this one, which are expected to either raise StopIteration errors when there are no more values to yield or just stop yielding and a StopIteration exception will be raised on it's behalf.

To use threads in place of processes, replace
import multiprocessing
with
import multiprocessing.dummy as multiprocessing

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文