并发代码中的死锁

发布于 2025-01-03 16:28:08 字数 1843 浏览 1 评论 0原文

我一直在尝试使用 concurrent.futures.ProcessPoolExecutor 并行化一些代码,但一直遇到奇怪的死锁,而 ThreadPoolExecutor 不会发生这种情况。一个最小的例子:

from concurrent import futures

def test():
    pass

with futures.ProcessPoolExecutor(4) as executor:
    for i in range(100):
        print('submitting {}'.format(i))
        executor.submit(test)

在 python 3.2.2(在 64 位 Ubuntu 上)中,这似乎在提交所有作业后始终挂起 - 每当提交的作业数量大于工作人员数量时,似乎就会发生这种情况。如果我用 ThreadPoolExecutor 替换 ProcessPoolExecutor ,它可以完美地工作。

为了进行调查,我给每个 future 一个回调来打印 i 的值:

from concurrent import futures

def test():
    pass

with futures.ProcessPoolExecutor(4) as executor:
    for i in range(100):
        print('submitting {}'.format(i))
        future = executor.submit(test)

        def callback(f):
            print('callback {}'.format(i))
        future.add_done_callback(callback)

这让我更加困惑 - 打印出的 i 的值回调是调用时的值,而不是定义时的值(所以我从来没有看到callback 0,但我得到了很多callback 99 >s)。再次,ThreadPoolExecutor 打印出预期值。

想知道这是否是一个错误,我尝试了 python 的最新开发版本。现在,代码至少看起来终止了,但我仍然打印出错误的 i 值。

那么任何人都可以解释一下:

  • 在 python 3.2 和当前开发版本之间发生了什么 ProcessPoolExecutor 显然解决了这个死锁

  • 为什么打印 i 的“错误”值

编辑:正如 jukiewicz 在下面指出的那样,当然打印 i 将打印调用回调时的值,我不知道我是什么思考...如果我传递一个可调用对象,其值为 i 作为其属性之一,那么它会按预期工作。

编辑:更多一点信息:所有回调都被执行,所以看起来是 executor.shutdown (由 executor.__exit__ 调用)无法分辨流程已完成。这似乎在当前的 python 3.3 中完全修复了,但是 multiprocessingconcurrent.futures 似乎有很多变化,所以我不知道是什么解决了这个问题。由于我无法使用 3.3(它似乎与 numpy 的发行版或开发版本都不兼容),因此我尝试简单地将其多处理和并发包复制到我的 3.2 安装中,这似乎工作正常。不过,据我所知,ProcessPoolExecutor 在最新的发行版本中完全被破坏,但没有其他人受到影响,这似乎有点奇怪。

I've been trying to parallelise some code using concurrent.futures.ProcessPoolExecutor but have kept having strange deadlocks that don't occur with ThreadPoolExecutor. A minimal example:

from concurrent import futures

def test():
    pass

with futures.ProcessPoolExecutor(4) as executor:
    for i in range(100):
        print('submitting {}'.format(i))
        executor.submit(test)

In python 3.2.2 (on 64-bit Ubuntu), this seems to hang consistently after submitting all the jobs - and this seems to happen whenever the number of jobs submitted is greater than the number of workers. If I replace ProcessPoolExecutor with ThreadPoolExecutor it works flawlessly.

As an attempt to investigate, I gave each future a callback to print the value of i:

from concurrent import futures

def test():
    pass

with futures.ProcessPoolExecutor(4) as executor:
    for i in range(100):
        print('submitting {}'.format(i))
        future = executor.submit(test)

        def callback(f):
            print('callback {}'.format(i))
        future.add_done_callback(callback)

This just confused me even more - the value of i printed out by callback is the value at the time it is called, rather than at the time it was defined (so I never see callback 0 but I get lots of callback 99s). Again, ThreadPoolExecutor prints out the expected value.

Wondering if this might be a bug, I tried a recent development version of python. Now, the code at least seems to terminate, but I still get the wrong value of i printed out.

So can anyone explain:

  • what happened to ProcessPoolExecutor in between python 3.2 and the current dev version that apparently fixed this deadlock

  • why the 'wrong' value of i is being printed

EDIT: as jukiewicz pointed out below, of course printing i will print the value at the time the callback is called, I don't know what I was thinking... if I pass a callable object with the value of i as one of its attributes, that works as expected.

EDIT: a little bit more information: all of the callbacks are executed, so it looks like it is executor.shutdown (called by executor.__exit__) that is unable to tell that the processes have completed. This does seem to be completely fixed in the current python 3.3, but there seem to have been a lot of changes to multiprocessing and concurrent.futures, so I don't know what fixed this. Since I can't use 3.3 (it doesn't seem to be compatible with either the release or dev versions of numpy), I tried simply copying its multiprocessing and concurrent packages across to my 3.2 installation, which seems to work fine. Still, it seems a little weird that - as far as I can see - ProcessPoolExecutor is completely broken in the latest release version but nobody else is affected.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

夕嗳→ 2025-01-10 16:28:09

我修改了代码如下,解决了这两个问题。 callback 函数被定义为闭包,因此每次都会使用 i 的更新值。至于死锁,这可能是在所有任务完成之前关闭执行器的原因。等待期货完成也可以解决这个问题。

from concurrent import futures

def test(i):
    return i

def callback(f):
    print('callback {}'.format(f.result()))


with futures.ProcessPoolExecutor(4) as executor:
    fs = []
    for i in range(100):
        print('submitting {}'.format(i))
        future = executor.submit(test, i)
        future.add_done_callback(callback)
        fs.append(future)

    for _ in futures.as_completed(fs): pass

更新:哦,抱歉,我还没有阅读你的更新,这似乎已经解决了。

I modified the code as follows, that solved both problems. callback function was defined as a closure, thus would use the updated value of i every time. As for deadlock, that's likely to be a cause of shutting down the Executor before all the task are complete. Waiting for the futures to complete solves that, too.

from concurrent import futures

def test(i):
    return i

def callback(f):
    print('callback {}'.format(f.result()))


with futures.ProcessPoolExecutor(4) as executor:
    fs = []
    for i in range(100):
        print('submitting {}'.format(i))
        future = executor.submit(test, i)
        future.add_done_callback(callback)
        fs.append(future)

    for _ in futures.as_completed(fs): pass

UPDATE: oh, sorry, I haven't read your updates, this seems have been solved already.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文