Python:从 multiprocessing.Process 获取回溯

发布于 2024-11-09 15:54:45 字数 967 浏览 0 评论 0 原文

我试图从 multiprocessing.Process 获取回溯对象。 不幸的是,通过管道传递异常信息不起作用,因为无法对回溯对象进行腌制:

def foo(pipe_to_parent):
    try:
        raise Exception('xxx')
    except:
        pipe_to_parent.send(sys.exc_info())

to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print traceback.format_exception(*exc_info)
to_child.close()
to_self.close()

Traceback:

Traceback (most recent call last):
  File "/usr/lib/python2.6/multiprocessing/process.py", line 231, in _bootstrap
    self.run()
  File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run
    self._target(*self._args, **self._kwargs)
  File "foo", line 7, in foo
    to_parent.send(sys.exc_info())
PicklingError: Can't pickle <type 'traceback'>: attribute lookup __builtin__.traceback failed

是否有另一种方法来访问异常信息?我想避免传递格式化字符串。

I am trying to get hold of a traceback object from a multiprocessing.Process.
Unfortunately passing the exception info through a pipe does not work because traceback objects can not be pickled:

def foo(pipe_to_parent):
    try:
        raise Exception('xxx')
    except:
        pipe_to_parent.send(sys.exc_info())

to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print traceback.format_exception(*exc_info)
to_child.close()
to_self.close()

Traceback:

Traceback (most recent call last):
  File "/usr/lib/python2.6/multiprocessing/process.py", line 231, in _bootstrap
    self.run()
  File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run
    self._target(*self._args, **self._kwargs)
  File "foo", line 7, in foo
    to_parent.send(sys.exc_info())
PicklingError: Can't pickle <type 'traceback'>: attribute lookup __builtin__.traceback failed

Is there another way to access the exception info? I'd like to avoid passing the formatted string.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

遮了一弯 2024-11-16 15:54:45

使用 tblib 您可以传递包装的异常并稍后重新引发它们

import tblib.pickling_support
tblib.pickling_support.install()

from multiprocessing import Pool
import sys


class ExceptionWrapper(object):

    def __init__(self, ee):
        self.ee = ee
        __, __, self.tb = sys.exc_info()

    def re_raise(self):
        raise self.ee.with_traceback(self.tb)
        # for Python 2 replace the previous line by:
        # raise self.ee, None, self.tb


# example of how to use ExceptionWrapper

def inverse(i):
    """ will fail for i == 0 """
    try:
        return 1.0 / i
    except Exception as e:
        return ExceptionWrapper(e)


def main():
    p = Pool(1)
    results = p.map(inverse, [0, 1, 2, 3])
    for result in results:
        if isinstance(result, ExceptionWrapper):
            result.re_raise()


if __name__ == "__main__":
    main()

: ,如果您在远程进程中捕获异常,请使用 ExceptionWrapper 包装它,然后将其传回。在主进程中调用 re_raise() 即可完成这项工作。

Using tblib you can pass wrapped exceptions and reraise them later:

import tblib.pickling_support
tblib.pickling_support.install()

from multiprocessing import Pool
import sys


class ExceptionWrapper(object):

    def __init__(self, ee):
        self.ee = ee
        __, __, self.tb = sys.exc_info()

    def re_raise(self):
        raise self.ee.with_traceback(self.tb)
        # for Python 2 replace the previous line by:
        # raise self.ee, None, self.tb


# example of how to use ExceptionWrapper

def inverse(i):
    """ will fail for i == 0 """
    try:
        return 1.0 / i
    except Exception as e:
        return ExceptionWrapper(e)


def main():
    p = Pool(1)
    results = p.map(inverse, [0, 1, 2, 3])
    for result in results:
        if isinstance(result, ExceptionWrapper):
            result.re_raise()


if __name__ == "__main__":
    main()

So, if you catch an exception in your remote process, wrap it with ExceptionWrapper and then pass it back. Calling re_raise() in the main process will do the work.

音盲 2024-11-16 15:54:45

由于 multiprocessing 确实会打印子进程中引发的异常的字符串内容,因此您可以将所有子进程代码包装在 try- except 中,以捕获任何异常,格式化相关堆栈跟踪,并引发新的 < code>Exception 在其字符串中保存所有相关信息:

我与 multiprocessing.map 一起使用的函数示例:

def run_functor(functor):
    """
    Given a no-argument functor, run it and return its result. We can 
    use this with multiprocessing.map and map it over a list of job 
    functors to do them.

    Handles getting more than multiprocessing's pitiful exception output
    """

    try:
        # This is where you do your actual work
        return functor()
    except:
        # Put all exception text into an exception and raise that
        raise Exception("".join(traceback.format_exception(*sys.exc_info())))

您得到的是一个堆栈跟踪,其中另一个格式化的堆栈跟踪作为错误消息,有助于调试。

Since multiprocessing does print the string contents of exceptions raised in child processes, you can wrap all your child process code in a try-except that catches any exceptions, formats the relavent stack traces, and raises a new Exception that holds all the relevant information in its string:

An example of a function I use with multiprocessing.map:

def run_functor(functor):
    """
    Given a no-argument functor, run it and return its result. We can 
    use this with multiprocessing.map and map it over a list of job 
    functors to do them.

    Handles getting more than multiprocessing's pitiful exception output
    """

    try:
        # This is where you do your actual work
        return functor()
    except:
        # Put all exception text into an exception and raise that
        raise Exception("".join(traceback.format_exception(*sys.exc_info())))

What you get is a stack trace with another formatted stack trace as the error message, which helps with debugging.

岛歌少女 2024-11-16 15:54:45

使回溯对象变得可腌制似乎很困难。
但您只能发送 sys.exc_info() 的前 2 项,以及带有 traceback.extract_tb 方法:

import multiprocessing
import sys
import traceback

def foo(pipe_to_parent):
    try:
        raise Exception('xxx')
    except:
        except_type, except_class, tb = sys.exc_info()
        pipe_to_parent.send((except_type, except_class, traceback.extract_tb(tb)))

to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print exc_info
to_child.close()
to_self.close()

它为您提供:

(, Exception('xxx',), [('test_tb.py', 7, 'foo', "raise Exception('xxx')")])

然后,您将能够获取更多信息关于异常原因(文件名、引发异常的行号、方法名称和引发异常的语句)

It seems to be difficult to made picklable the traceback object.
But you can only send the 2 first items of sys.exc_info(), and a preformated traceback information with the traceback.extract_tb method :

import multiprocessing
import sys
import traceback

def foo(pipe_to_parent):
    try:
        raise Exception('xxx')
    except:
        except_type, except_class, tb = sys.exc_info()
        pipe_to_parent.send((except_type, except_class, traceback.extract_tb(tb)))

to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print exc_info
to_child.close()
to_self.close()

which give you :

(<type 'exceptions.Exception'>, Exception('xxx',), [('test_tb.py', 7, 'foo', "raise Exception('xxx')")])

And then, you'll be able to grab more informations about the exception cause (filename, line number where exception raised, method name and the statement that raise the exception)

泡沫很甜 2024-11-16 15:54:45

Python 3

在 Python 3 中,现在 multiprocessing.pool.Asyncget 方法返回完整的回溯,请参阅 http://bugs.python.org/issue13831

Python 2

使用 traceback.format_exc (这意味着格式化表达式)来获取回溯字符串。
制作如下装饰器会更方便。

def full_traceback(func):
    import traceback, functools
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            msg = "{}\n\nOriginal {}".format(e, traceback.format_exc())
            raise type(e)(msg)
    return wrapper

示例:

def func0():
    raise NameError("func0 exception")

def func1():
    return func0()

# Key is here!
@full_traceback
def main(i):
    return func1()

if __name__ == '__main__':
    from multiprocessing import Pool
    pool = Pool(4)
    try:
        results = pool.map_async(main, range(5)).get(1e5)
    finally:
        pool.close()
        pool.join()

带有装饰器的回溯:

Traceback (most recent call last):
  File "bt.py", line 34, in <module>
    results = pool.map_async(main, range(5)).get(1e5)
  File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
NameError: Exception in func0

Original Traceback (most recent call last):
  File "bt.py", line 13, in wrapper
    return func(*args, **kwargs)
  File "bt.py", line 27, in main
    return func1()
  File "bt.py", line 23, in func1
    return func0()
  File "bt.py", line 20, in func0
    raise NameError("Exception in func0")
NameError: Exception in func0

不带装饰器的回溯:

Traceback (most recent call last):
  File "bt.py", line 34, in <module>
    results = pool.map_async(main, range(5)).get(1e5)
  File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
NameError: Exception in func0

Python 3

In Python 3, now the get method of multiprocessing.pool.Async returns full traceback, see http://bugs.python.org/issue13831.

Python 2

Use the traceback.format_exc (which means formatted expetion) to get the traceback string.
It would be much more covenient with making a decorator as below.

def full_traceback(func):
    import traceback, functools
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            msg = "{}\n\nOriginal {}".format(e, traceback.format_exc())
            raise type(e)(msg)
    return wrapper

Example:

def func0():
    raise NameError("func0 exception")

def func1():
    return func0()

# Key is here!
@full_traceback
def main(i):
    return func1()

if __name__ == '__main__':
    from multiprocessing import Pool
    pool = Pool(4)
    try:
        results = pool.map_async(main, range(5)).get(1e5)
    finally:
        pool.close()
        pool.join()

The traceback with the decorator:

Traceback (most recent call last):
  File "bt.py", line 34, in <module>
    results = pool.map_async(main, range(5)).get(1e5)
  File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
NameError: Exception in func0

Original Traceback (most recent call last):
  File "bt.py", line 13, in wrapper
    return func(*args, **kwargs)
  File "bt.py", line 27, in main
    return func1()
  File "bt.py", line 23, in func1
    return func0()
  File "bt.py", line 20, in func0
    raise NameError("Exception in func0")
NameError: Exception in func0

The traceback without the decorator:

Traceback (most recent call last):
  File "bt.py", line 34, in <module>
    results = pool.map_async(main, range(5)).get(1e5)
  File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
NameError: Exception in func0
痴意少年 2024-11-16 15:54:45

这是这个优秀答案的变体。两者都依赖 tblib 来存储回溯。

但是,worker 函数不必返回异常对象(按照 OP 的要求),而是可以按原样保留,只需将其包装在 try/< code>except 来存储重新引发的异常。

import tblib.pickling_support
tblib.pickling_support.install()

import sys

class DelayedException(Exception):

    def __init__(self, ee):
        self.ee = ee
        __,  __, self.tb = sys.exc_info()
        super(DelayedException, self).__init__(str(ee))

    def re_raise(self):
        raise self.ee, None, self.tb

例子

def worker():
    try:
        raise ValueError('Something went wrong.')
    except Exception as e:
        raise DelayedException(e)


if __name__ == '__main__':

    import multiprocessing

    pool = multiprocessing.Pool()
    try:
        pool.imap(worker, [1, 2, 3])
    except DelayedException as e:
        e.re_raise()

This is a variation of this excellent answer. Both are relying on tblib for storing the traceback.

However, instead of having to return the exception object (as asked for by the OP), the worker function can be left as-is and is just wrapped in try/except to store exceptions for re-raise.

import tblib.pickling_support
tblib.pickling_support.install()

import sys

class DelayedException(Exception):

    def __init__(self, ee):
        self.ee = ee
        __,  __, self.tb = sys.exc_info()
        super(DelayedException, self).__init__(str(ee))

    def re_raise(self):
        raise self.ee, None, self.tb

Example

def worker():
    try:
        raise ValueError('Something went wrong.')
    except Exception as e:
        raise DelayedException(e)


if __name__ == '__main__':

    import multiprocessing

    pool = multiprocessing.Pool()
    try:
        pool.imap(worker, [1, 2, 3])
    except DelayedException as e:
        e.re_raise()
兔小萌 2024-11-16 15:54:45

@Syrtis Major@interfect 但是,使用 Python 3.6 进行测试:

import sys
import traceback
import functools

def catch_remote_exceptions(wrapped_function):
    """ https://stackoverflow.com/questions/6126007/python-getting-a-traceback """

    @functools.wraps(wrapped_function)
    def new_function(*args, **kwargs):
        try:
            return wrapped_function(*args, **kwargs)

        except:
            raise Exception( "".join(traceback.format_exception(*sys.exc_info())) )

    return new_function

用法:

class ProcessLocker(object):
    @catch_remote_exceptions
    def __init__(self):
        super().__init__()

    @catch_remote_exceptions
    def create_process_locks(self, total_processes):
        self.process_locks = []
        # ...

The same solutions as @Syrtis Major and @interfect but, tested with Python 3.6:

import sys
import traceback
import functools

def catch_remote_exceptions(wrapped_function):
    """ https://stackoverflow.com/questions/6126007/python-getting-a-traceback """

    @functools.wraps(wrapped_function)
    def new_function(*args, **kwargs):
        try:
            return wrapped_function(*args, **kwargs)

        except:
            raise Exception( "".join(traceback.format_exception(*sys.exc_info())) )

    return new_function

Usage:

class ProcessLocker(object):
    @catch_remote_exceptions
    def __init__(self):
        super().__init__()

    @catch_remote_exceptions
    def create_process_locks(self, total_processes):
        self.process_locks = []
        # ...
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文