我应该在这里使用协程或其他调度对象吗?
我目前有生成器形式的代码,它调用 IO 绑定任务。生成器实际上也调用子生成器,因此我们将不胜感激更通用的解决方案。
类似于以下内容:
def processed_values(list_of_io_tasks):
for task in list_of_io_tasks:
value = slow_io_call(task)
yield postprocess(value) # in real version, would iterate over
# processed_values2(value) here
我可以完全控制 slow_io_call
,并且我不关心从 processed_values
获取项目的顺序。是否有类似协程的东西可以通过将 slow_io_call
转换为异步函数并使用返回最快的调用来以最快的顺序获得生成的结果?我预计 list_of_io_tasks
的长度至少有数千个条目。除了显式线程之外,我从未做过任何并行工作,特别是我从未使用过可用的各种形式的轻量级线程。
我需要使用标准 CPython 实现,并且我在 Linux 上运行。
I currently have code in the form of a generator which calls an IO-bound task. The generator actually calls sub-generators as well, so a more general solution would be appreciated.
Something like the following:
def processed_values(list_of_io_tasks):
for task in list_of_io_tasks:
value = slow_io_call(task)
yield postprocess(value) # in real version, would iterate over
# processed_values2(value) here
I have complete control over slow_io_call
, and I don't care in which order I get the items from processed_values
. Is there something like coroutines I can use to get the yielded results in the fastest order by turning slow_io_call
into an asynchronous function and using whichever call returns fastest? I expect list_of_io_tasks
to be at least thousands of entries long. I've never done any parallel work other than with explicit threading, and in particular I've never used the various forms of lightweight threading which are available.
I need to use the standard CPython implementation, and I'm running on Linux.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
听起来你正在寻找 multiprocessing.Pool() ,特别是 Pool.imap_unordered() 方法。
这是您的函数的一个端口,用于使用 imap_unordered() 并行化对 Slow_io_call() 的调用。
请注意,您也可以直接迭代
results
(即for item in results:ield item
),无需使用while True
循环,但是调用results.next()
具有超时值可以解决此问题 多处理键盘中断bug 并允许您使用 Ctrl-C 终止主进程和所有子进程。另请注意,此函数不会捕获 StopIteration 异常,但当results.next()
没有更多项返回时,将会引发异常。这对于生成器函数来说是合法的,例如这个函数,当没有更多的值可以产生时,预计会引发 StopIteration 错误,或者只是停止产生,并且将代表它引发 StopIteration 异常。要使用线程代替进程,请替换
导入多处理
与
导入 multiprocessing.dummy 作为多处理
Sounds like you are in search of multiprocessing.Pool(), specifically the Pool.imap_unordered() method.
Here is a port of your function to use imap_unordered() to parallelize calls to slow_io_call().
Note that you could also iterate over
results
directly (i.e.for item in results: yield item
) without awhile True
loop, however callingresults.next()
with a time-out value works around this multiprocessing keyboard interrupt bug and allows you to kill the main process and all subprocesses with Ctrl-C. Also note that the StopIteration exceptions are not caught in this function but one will be raised whenresults.next()
has no more items return. This is legal from generator functions, such as this one, which are expected to either raise StopIteration errors when there are no more values to yield or just stop yielding and a StopIteration exception will be raised on it's behalf.To use threads in place of processes, replace
import multiprocessing
with
import multiprocessing.dummy as multiprocessing