未检测到多处理池中引发的异常

发布于 2024-11-24 19:54:21 字数 312 浏览 2 评论 0原文

似乎当 multiprocessing.Pool 进程引发异常时,没有堆栈跟踪或任何其他指示它已失败。示例:

from multiprocessing import Pool 

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go)
p.close()
p.join()

打印 1 并安静地停止。有趣的是,引发 BaseException 反而有效。有什么方法可以使所有异常的行为与 BaseException 相同吗?

It seems that when an exception is raised from a multiprocessing.Pool process, there is no stack trace or any other indication that it has failed. Example:

from multiprocessing import Pool 

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go)
p.close()
p.join()

prints 1 and stops silently. Interestingly, raising a BaseException instead works. Is there any way to make the behavior for all exceptions the same as BaseException?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(9

感情旳空白 2024-12-01 19:54:21

也许我遗漏了一些东西,但这不是 Result 对象的 get 方法返回的内容吗?请参阅进程池

类 multiprocessing.pool.AsyncResult

Pool.apply_async() 和 Pool.map_async().get([timeout]) 返回结果的类
当结果到达时返回。如果 timeout 不是 None 并且结果没有在
超时秒然后 multiprocessing.TimeoutError 被引发。如果远程
调用引发了异常,然后该异常将由 get() 重新引发。

因此,稍微修改一下你的例子,就可以做

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()

这给出的结果

1
Traceback (most recent call last):
  File "rob.py", line 10, in <module>
    x.get()
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
    raise self._value
Exception: foobar

这并不完全令人满意,因为它不打印回溯,但总比没有好。

更新:此错误已在 Python 3.4 中修复,由 Richard Oudkerk 提供。请参阅问题multiprocessing.pool.Async 的 get 方法应返回完整的回溯

Maybe I'm missing something, but isn't that what the get method of the Result object returns? See Process Pools.

class multiprocessing.pool.AsyncResult

The class of the result returned by Pool.apply_async() and Pool.map_async().get([timeout])
Return the result when it arrives. If timeout is not None and the result does not arrive within
timeout seconds then multiprocessing.TimeoutError is raised. If the remote
call raised an exception then that exception will be reraised by get().

So, slightly modifying your example, one can do

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()

Which gives as result

1
Traceback (most recent call last):
  File "rob.py", line 10, in <module>
    x.get()
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
    raise self._value
Exception: foobar

This is not completely satisfactory, since it does not print the traceback, but is better than nothing.

UPDATE: This bug has been fixed in Python 3.4, courtesy of Richard Oudkerk. See the issue get method of multiprocessing.pool.Async should return full traceback.

古镇旧梦 2024-12-01 19:54:21

我对这个问题有一个合理的解决方案,至少出于调试目的。我目前没有解决方案可以在主流程中引发异常。我的第一个想法是使用装饰器,但你只能 pickle 在模块顶层定义的函数,所以这是正确的。

相反,一个简单的包装类和一个 Pool 子类将其用于 apply_async(因此 apply)。我将把 map_async 留给读者作为练习。

import traceback
from multiprocessing.pool import Pool
import multiprocessing

# Shortcut to multiprocessing's logger
def error(msg, *args):
    return multiprocessing.get_logger().error(msg, *args)

class LogExceptions(object):
    def __init__(self, callable):
        self.__callable = callable

    def __call__(self, *args, **kwargs):
        try:
            result = self.__callable(*args, **kwargs)

        except Exception as e:
            # Here we add some debugging help. If multiprocessing's
            # debugging is on, it will arrange to log the traceback
            error(traceback.format_exc())
            # Re-raise the original exception so the Pool worker can
            # clean up
            raise

        # It was fine, give a normal answer
        return result

class LoggingPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)

def go():
    print(1)
    raise Exception()
    print(2)

multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)

p.apply_async(go)
p.close()
p.join()

这给了我:

1
[ERROR/PoolWorker-1] Traceback (most recent call last):
  File "mpdebug.py", line 24, in __call__
    result = self.__callable(*args, **kwargs)
  File "mpdebug.py", line 44, in go
    raise Exception()
Exception

I have a reasonable solution for the problem, at least for debugging purposes. I do not currently have a solution that will raise the exception back in the main processes. My first thought was to use a decorator, but you can only pickle functions defined at the top level of a module, so that's right out.

Instead, a simple wrapping class and a Pool subclass that uses this for apply_async (and hence apply). I'll leave map_async as an exercise for the reader.

import traceback
from multiprocessing.pool import Pool
import multiprocessing

# Shortcut to multiprocessing's logger
def error(msg, *args):
    return multiprocessing.get_logger().error(msg, *args)

class LogExceptions(object):
    def __init__(self, callable):
        self.__callable = callable

    def __call__(self, *args, **kwargs):
        try:
            result = self.__callable(*args, **kwargs)

        except Exception as e:
            # Here we add some debugging help. If multiprocessing's
            # debugging is on, it will arrange to log the traceback
            error(traceback.format_exc())
            # Re-raise the original exception so the Pool worker can
            # clean up
            raise

        # It was fine, give a normal answer
        return result

class LoggingPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)

def go():
    print(1)
    raise Exception()
    print(2)

multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)

p.apply_async(go)
p.close()
p.join()

This gives me:

1
[ERROR/PoolWorker-1] Traceback (most recent call last):
  File "mpdebug.py", line 24, in __call__
    result = self.__callable(*args, **kwargs)
  File "mpdebug.py", line 44, in go
    raise Exception()
Exception
瞎闹 2024-12-01 19:54:21

在撰写本文时获得最多票数的解决方案有一个问题:

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()  ## waiting here for go() to complete...
p.close()
p.join()

正如 @dfrankow 指出的那样,它将等待 x.get(),这破坏了异步运行任务的意义。因此,为了提高效率(特别是如果您的辅助函数 go 需要很长时间),我会将其更改为:

from multiprocessing import Pool

def go(x):
    print(1)
    # task_that_takes_a_long_time()
    raise Exception("Can't go anywhere.")
    print(2)
    return x**2

p = Pool()
results = []
for x in range(1000):
    results.append( p.apply_async(go, [x]) )

p.close()

for r in results:
     r.get()

优点:辅助函数是异步运行的,因此如果例如,您在多个内核上运行许多任务,它将比原始解决方案效率更高。

缺点:如果工作函数中存在异常,只有池完成所有任务后才会引发异常。这可能是也可能不是理想的行为。根据@colinfang的评论进行编辑,修复了这个问题。

The solution with the most votes at the time of writing has a problem:

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()  ## waiting here for go() to complete...
p.close()
p.join()

As @dfrankow noted, it will wait on x.get(), which ruins the point of running a task asynchronously. So, for better efficiency (in particular if your worker function go takes a long time) I would change it to:

from multiprocessing import Pool

def go(x):
    print(1)
    # task_that_takes_a_long_time()
    raise Exception("Can't go anywhere.")
    print(2)
    return x**2

p = Pool()
results = []
for x in range(1000):
    results.append( p.apply_async(go, [x]) )

p.close()

for r in results:
     r.get()

Advantages: the worker function is run asynchronously, so if for example you are running many tasks on several cores, it will be a lot more efficient than the original solution.

Disadvantages: if there is an exception in the worker function, it will only be raised after the pool has completed all the tasks. This may or may not be the desirable behaviour. EDITED according to @colinfang's comment, which fixed this.

土豪我们做朋友吧 2024-12-01 19:54:21

由于 multiprocessing.Pool 已经有了不错的答案,因此我将使用不同的方法提供一个解决方案以保证完整性。

对于 python >= 3.2 ,以下解决方案似乎是最简单的:

from concurrent.futures import ProcessPoolExecutor, wait

def go():
    print(1)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor() as p:
    for i in range(10):
        futures.append(p.submit(go))

results = [f.result() for f in futures]

优点:

  • 很少有代码
  • 在主进程中引发异常
  • 提供堆栈跟踪
  • 没有外部依赖

项 有关 API 的更多信息,请查看这个

此外,如果您要提交一个大型任务数,并且您希望主进程在其中一个任务失败时立即失败,您可以使用以下代码片段:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed
import time


def go():
    print(1)
    time.sleep(0.3)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor(1) as p:
    for i in range(10):
        futures.append(p.submit(go))

    for f in as_completed(futures):
        if f.exception() is not None:
            for f in futures:
                f.cancel()
            break

[f.result() for f in futures]

只有在执行所有任务后,所有其他答案才会失败。

Since there are already decent answers for multiprocessing.Pool available, I will provide a solution using a different approach for completeness.

For python >= 3.2 the following solution seems to be the simplest:

from concurrent.futures import ProcessPoolExecutor, wait

def go():
    print(1)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor() as p:
    for i in range(10):
        futures.append(p.submit(go))

results = [f.result() for f in futures]

Advantages:

  • very little code
  • raises an exception in the main process
  • provides a stack trace
  • no external dependencies

For more info about the API please check out this

Additionally, if you are submitting a large number of tasks and you would like your main process to fail as soon as one of your tasks fail, you can use the following snippet:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed
import time


def go():
    print(1)
    time.sleep(0.3)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor(1) as p:
    for i in range(10):
        futures.append(p.submit(go))

    for f in as_completed(futures):
        if f.exception() is not None:
            for f in futures:
                f.cancel()
            break

[f.result() for f in futures]

All of the other answers fail only once all tasks have been executed.

鯉魚旗 2024-12-01 19:54:21

我已经用这个装饰器成功记录了异常:

import traceback, functools, multiprocessing

def trace_unhandled_exceptions(func):
    @functools.wraps(func)
    def wrapped_func(*args, **kwargs):
        try:
            func(*args, **kwargs)
        except:
            print 'Exception in '+func.__name__
            traceback.print_exc()
    return wrapped_func

使用问题中的代码,它

@trace_unhandled_exceptions
def go():
    print(1)
    raise Exception()
    print(2)

p = multiprocessing.Pool(1)

p.apply_async(go)
p.close()
p.join()

只是装饰您传递给进程池的函数。这项工作的关键是@functools.wraps(func),否则多重处理会抛出PicklingError。

上面的代码给出了

1
Exception in go
Traceback (most recent call last):
  File "<stdin>", line 5, in wrapped_func
  File "<stdin>", line 4, in go
Exception

I've had success logging exceptions with this decorator:

import traceback, functools, multiprocessing

def trace_unhandled_exceptions(func):
    @functools.wraps(func)
    def wrapped_func(*args, **kwargs):
        try:
            func(*args, **kwargs)
        except:
            print 'Exception in '+func.__name__
            traceback.print_exc()
    return wrapped_func

with the code in the question, it's

@trace_unhandled_exceptions
def go():
    print(1)
    raise Exception()
    print(2)

p = multiprocessing.Pool(1)

p.apply_async(go)
p.close()
p.join()

Simply decorate the function you pass to your process pool. The key to this working is @functools.wraps(func) otherwise multiprocessing throws a PicklingError.

code above gives

1
Exception in go
Traceback (most recent call last):
  File "<stdin>", line 5, in wrapped_func
  File "<stdin>", line 4, in go
Exception
温柔嚣张 2024-12-01 19:54:21

由于您已经使用了 apply_sync,我猜用例是想要执行一些同步任务。使用回调进行处理是另一种选择。请注意,此选项仅适用于 python3.2 及以上版本,不适用于 python2.7。

from multiprocessing import Pool

def callback(result):
    print('success', result)

def callback_error(result):
    print('error', result)

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go, callback=callback, error_callback=callback_error)

# You can do another things

p.close()
p.join()

Since you have used apply_sync, I guess the use case is want to do some synchronize tasks. Use callback for handling is another option. Please note this option is available only for python3.2 and above and not available on python2.7.

from multiprocessing import Pool

def callback(result):
    print('success', result)

def callback_error(result):
    print('error', result)

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go, callback=callback, error_callback=callback_error)

# You can do another things

p.close()
p.join()
无人接听 2024-12-01 19:54:21
import logging
from multiprocessing import Pool

def proc_wrapper(func, *args, **kwargs):
    """Print exception because multiprocessing lib doesn't return them right."""
    try:
        return func(*args, **kwargs)
    except Exception as e:
        logging.exception(e)
        raise

def go(x):
    print x
    raise Exception("foobar")

p = Pool()
p.apply_async(proc_wrapper, (go, 5))
p.join()
p.close()
import logging
from multiprocessing import Pool

def proc_wrapper(func, *args, **kwargs):
    """Print exception because multiprocessing lib doesn't return them right."""
    try:
        return func(*args, **kwargs)
    except Exception as e:
        logging.exception(e)
        raise

def go(x):
    print x
    raise Exception("foobar")

p = Pool()
p.apply_async(proc_wrapper, (go, 5))
p.join()
p.close()
温柔女人霸气范 2024-12-01 19:54:21

我创建了一个模块 RemoteException.py,它显示进程中异常的完整回溯。 Python2。 下载并将其添加到您的代码中:

import RemoteException

@RemoteException.showError
def go():
    raise Exception('Error!')

if __name__ == '__main__':
    import multiprocessing
    p = multiprocessing.Pool(processes = 1)
    r = p.apply(go) # full traceback is shown here

I created a module RemoteException.py that shows the full traceback of a exception in a process. Python2. Download it and add this to your code:

import RemoteException

@RemoteException.showError
def go():
    raise Exception('Error!')

if __name__ == '__main__':
    import multiprocessing
    p = multiprocessing.Pool(processes = 1)
    r = p.apply(go) # full traceback is shown here
苏佲洛 2024-12-01 19:54:21

我会尝试使用 pdb:

import pdb
import sys
def handler(type, value, tb):
  pdb.pm()
sys.excepthook = handler

I'd try using pdb:

import pdb
import sys
def handler(type, value, tb):
  pdb.pm()
sys.excepthook = handler
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文