使用带有无限迭代器和停止标准的并发.futures

发布于 2025-01-17 09:54:30 字数 857 浏览 0 评论 0原文

我正在尝试并行化一个循环,该循环使用无限生成器作为输入来收集一些数据,并在收到一定量的数据时停止。

我的实现是这样的。

class A:
  def __iter__(self):
    i = 0
    while True:
       yield i 
       i += 1


def procpar(x):
    r = random.random()
    print('Computing x =', x)
    
    if r > 0.5
        return [2 * x]
    else:
        return [2 * x, x ** 2]


with ProcessPoolExecutor(4) as pool:
  out = []
  x = A()
  
  for res in pool.map(procpar, x):
    out.extend(res)
    if len(out) > 100:
      break

现在,当我运行它时,我确实得到了这个输出,之后它就挂起并且没有任何反应。

Computing x = 1
Computing x = 6
Computing x = 2
Computing x = 3
Computing x = 4
Computing x = 5

看看发生了什么,map 方法试图从迭代器 x = A() 展开并生成数据,因此它陷入了无限循环。

有关如何避免陷入无限循环的任何建议。当然,我可以在将迭代器输入到进程池之前分块调用 x ,但看看是否有人有更好或更直接的解决方案。

I am trying to parallelize a loop which uses an infinite generator as input to collect some data and stops when a certain amount of data has been received.

My implementation is something like this.

class A:
  def __iter__(self):
    i = 0
    while True:
       yield i 
       i += 1


def procpar(x):
    r = random.random()
    print('Computing x =', x)
    
    if r > 0.5
        return [2 * x]
    else:
        return [2 * x, x ** 2]


with ProcessPoolExecutor(4) as pool:
  out = []
  x = A()
  
  for res in pool.map(procpar, x):
    out.extend(res)
    if len(out) > 100:
      break

Now, when I run it, I do get this output, after which it just hangs and nothing happend.

Computing x = 1
Computing x = 6
Computing x = 2
Computing x = 3
Computing x = 4
Computing x = 5

Looking into whats going on, is that the map method is trying to unroll and generate data from the iterator x = A(), so it is stuck in an infinite loop.

Any suggestions how to avoid being stuck in infinite loop. Ofcorse, I could call the iterator x in chunks before feeding them to the process pool, but looking if someone may have better or more straightforward solution.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

梅倚清风 2025-01-24 09:54:30

尝试使用 multiprocessing.pool.imap 代替:

from multiprocessing import Pool
import random


class A:
    def __iter__(self):
        i = 0
        while True:
            yield i
            i += 1


def procpar(x):
    r = random.random()
    print('Computing x =', x)

    if r > 0.5:
        return [2 * x]
    else:
        return [2 * x, x ** 2]


# Required for Windows:
if __name__ == '__main__':
    with Pool(4) as pool:
        out = []
        x = A()

        for res in pool.imap(procpar, x):
            out.extend(res)
            if len(out) > 100:
                break
    print(out)

打印:

Computing x = 0
Computing x = 1
Computing x = 2
Computing x = 3
Computing x = 4
Computing x = 5
Computing x = 6
Computing x = 7
Computing x = 8
Computing x = 9
Computing x = 10
Computing x = 11
Computing x = 12
Computing x = 13
Computing x = 14
Computing x = 15
Computing x = 16
Computing x = 17
Computing x = 18
Computing x = 19
Computing x = 20
Computing x = 21
Computing x = 22
Computing x = 23
Computing x = 24
Computing x = 25
Computing x = 26
Computing x = 27
Computing x = 28
Computing x = 29
Computing x = 30
Computing x = 31
Computing x = 32
Computing x = 33
Computing x = 34
Computing x = 35
Computing x = 36
Computing x = 37
Computing x = 38
Computing x = 39
Computing x = 40
Computing x = 41
Computing x = 42
Computing x = 43
Computing x = 44
Computing x = 45
Computing x = 46
Computing x = 47
Computing x = 48
Computing x = 49
Computing x = 50
Computing x = 51
Computing x = 52
Computing x = 53
Computing x = 54
Computing x = 55
Computing x = 56
Computing x = 57
Computing x = 58
Computing x = 59
Computing x = 60
Computing x = 61
Computing x = 62
Computing x = 63
Computing x = 64
[0, 2, 1, 4, 4, 6, 8, 16, 10, 12, 14, 49, 16, 64, 18, 20, 22, 24, 144, 26, 28, 196, 30, 225, 32, 256, 34, 289, 36, 38, 361, 40, 400, 42, 441, 44, 484, 46, 529, 48, 576, 50, 625, 52, 54, 56, 58, 60, 900, 62, 961, 64, 66, 1089, 68, 1156, 70, 1225, 72, 1296, 74, 1369, 76, 78, 1521, 80, 1600, 82, 1681, 84, 1764, 86, 88, 90, 2025, 92, 2116, 94, 96, 2304, 98, 100, 102, 2601, 104, 2704, 106, 2809, 108, 110, 3025, 112, 3136, 114, 116, 3364, 118, 3481, 120, 3600, 122]

Try using multiprocessing.pool.imap instead:

from multiprocessing import Pool
import random


class A:
    def __iter__(self):
        i = 0
        while True:
            yield i
            i += 1


def procpar(x):
    r = random.random()
    print('Computing x =', x)

    if r > 0.5:
        return [2 * x]
    else:
        return [2 * x, x ** 2]


# Required for Windows:
if __name__ == '__main__':
    with Pool(4) as pool:
        out = []
        x = A()

        for res in pool.imap(procpar, x):
            out.extend(res)
            if len(out) > 100:
                break
    print(out)

Prints:

Computing x = 0
Computing x = 1
Computing x = 2
Computing x = 3
Computing x = 4
Computing x = 5
Computing x = 6
Computing x = 7
Computing x = 8
Computing x = 9
Computing x = 10
Computing x = 11
Computing x = 12
Computing x = 13
Computing x = 14
Computing x = 15
Computing x = 16
Computing x = 17
Computing x = 18
Computing x = 19
Computing x = 20
Computing x = 21
Computing x = 22
Computing x = 23
Computing x = 24
Computing x = 25
Computing x = 26
Computing x = 27
Computing x = 28
Computing x = 29
Computing x = 30
Computing x = 31
Computing x = 32
Computing x = 33
Computing x = 34
Computing x = 35
Computing x = 36
Computing x = 37
Computing x = 38
Computing x = 39
Computing x = 40
Computing x = 41
Computing x = 42
Computing x = 43
Computing x = 44
Computing x = 45
Computing x = 46
Computing x = 47
Computing x = 48
Computing x = 49
Computing x = 50
Computing x = 51
Computing x = 52
Computing x = 53
Computing x = 54
Computing x = 55
Computing x = 56
Computing x = 57
Computing x = 58
Computing x = 59
Computing x = 60
Computing x = 61
Computing x = 62
Computing x = 63
Computing x = 64
[0, 2, 1, 4, 4, 6, 8, 16, 10, 12, 14, 49, 16, 64, 18, 20, 22, 24, 144, 26, 28, 196, 30, 225, 32, 256, 34, 289, 36, 38, 361, 40, 400, 42, 441, 44, 484, 46, 529, 48, 576, 50, 625, 52, 54, 56, 58, 60, 900, 62, 961, 64, 66, 1089, 68, 1156, 70, 1225, 72, 1296, 74, 1369, 76, 78, 1521, 80, 1600, 82, 1681, 84, 1764, 86, 88, 90, 2025, 92, 2116, 94, 96, 2304, 98, 100, 102, 2601, 104, 2704, 106, 2809, 108, 110, 3025, 112, 3136, 114, 116, 3364, 118, 3481, 120, 3600, 122]
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文