使用带有无限迭代器和停止标准的并发.futures
我正在尝试并行化一个循环,该循环使用无限生成器作为输入来收集一些数据,并在收到一定量的数据时停止。
我的实现是这样的。
class A:
def __iter__(self):
i = 0
while True:
yield i
i += 1
def procpar(x):
r = random.random()
print('Computing x =', x)
if r > 0.5
return [2 * x]
else:
return [2 * x, x ** 2]
with ProcessPoolExecutor(4) as pool:
out = []
x = A()
for res in pool.map(procpar, x):
out.extend(res)
if len(out) > 100:
break
现在,当我运行它时,我确实得到了这个输出,之后它就挂起并且没有任何反应。
Computing x = 1
Computing x = 6
Computing x = 2
Computing x = 3
Computing x = 4
Computing x = 5
看看发生了什么,map
方法试图从迭代器 x = A()
展开并生成数据,因此它陷入了无限循环。
有关如何避免陷入无限循环的任何建议。当然,我可以在将迭代器输入到进程池之前分块调用 x ,但看看是否有人有更好或更直接的解决方案。
I am trying to parallelize a loop which uses an infinite generator as input to collect some data and stops when a certain amount of data has been received.
My implementation is something like this.
class A:
def __iter__(self):
i = 0
while True:
yield i
i += 1
def procpar(x):
r = random.random()
print('Computing x =', x)
if r > 0.5
return [2 * x]
else:
return [2 * x, x ** 2]
with ProcessPoolExecutor(4) as pool:
out = []
x = A()
for res in pool.map(procpar, x):
out.extend(res)
if len(out) > 100:
break
Now, when I run it, I do get this output, after which it just hangs and nothing happend.
Computing x = 1
Computing x = 6
Computing x = 2
Computing x = 3
Computing x = 4
Computing x = 5
Looking into whats going on, is that the map
method is trying to unroll and generate data from the iterator x = A()
, so it is stuck in an infinite loop.
Any suggestions how to avoid being stuck in infinite loop. Ofcorse, I could call the iterator x
in chunks before feeding them to the process pool, but looking if someone may have better or more straightforward solution.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
尝试使用
multiprocessing.pool.imap
代替:打印:
Try using
multiprocessing.pool.imap
instead:Prints: