在Python中击中异常时,如何重新启动池中的过程

发布于 2025-02-12 05:56:23 字数 1643 浏览 1 评论 0原文

import signal
import asyncio
import os
import random
import time
import multiprocessing

my_list = []
for i in range(0,10):
    n = random.randint(1,100)
    my_list.append(n)


async def loop_item(my_item):
    while True:
        a = random.randint(1, 2)
        if a == 2:
            print(f"process id: {os.getpid()}")
            raise Exception('Error')
        print(f"process id: {os.getpid()} - {my_item}")
        time.sleep(0.5)


def run_loop(my_item):
    asyncio.run(loop_item(my_item))


def throw_error(e):
    os.system('bash /root/my-script.sh')  #that launchs "python my-script.py"
    os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)


if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=10)
    for my_item in my_list:
        pool.apply_async(run_loop, (my_item,), error_callback=throw_error)
    pool.close()
    pool.join()

这是我的演示代码,其中将创建一个my_list,其中10个项目作为

随机 以模仿任何类型的例外情况,并且要重新启动此loop_item(my_item)在新过程中的功能,如果发生异常,

则有两个障碍,一个是通过变量my_item,但我认为我应该能够使其与redis这样的外部工具(如redis)进行工作,但要获得变量,但是任何更好的想法都会受到赞赏。

真正阻止我的是如何在遇到异常后再次有效启动该过程,然后退出

到目前为止,我能够使用throw_error函数杀死Python脚本本身或启动另一个Shell脚本来杀死和启动Python再次脚本,但是这种方法似乎不太有效,

所以我想知道是否有更好的方法重新启动一个方法,而不是重新启动整个脚本?

我尝试的一种方法是在throw_error函数中创建一个新的过程池,

def throw_error(e):
    pool2 = multiprocessing.Pool(processes=1)
    pool2.apply_async(run_loop, (my_item,), error_callback=throw_error)
    pool2.close()
    pool2.join()

但是这似乎是一个坏主意,因为多个例外之后,该过程池已经失控,并且累积了多达数百个。如果不是成千上万的“僵尸”过程

import signal
import asyncio
import os
import random
import time
import multiprocessing

my_list = []
for i in range(0,10):
    n = random.randint(1,100)
    my_list.append(n)


async def loop_item(my_item):
    while True:
        a = random.randint(1, 2)
        if a == 2:
            print(f"process id: {os.getpid()}")
            raise Exception('Error')
        print(f"process id: {os.getpid()} - {my_item}")
        time.sleep(0.5)


def run_loop(my_item):
    asyncio.run(loop_item(my_item))


def throw_error(e):
    os.system('bash /root/my-script.sh')  #that launchs "python my-script.py"
    os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)


if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=10)
    for my_item in my_list:
        pool.apply_async(run_loop, (my_item,), error_callback=throw_error)
    pool.close()
    pool.join()

this is my demo code , in which it will create a my_list with 10 items in it as random number

then launch with 10 processes to print it out to alone with pid

then I add a raise Exception to mimic any kind of exception it may occur and want to restart this loop_item(my_item) function in new process if exception happens

there are two obstacle for this , one is pass the variable my_item but I think I should be able to make it work with external tool like Redis that put/get variable , but any better idea is appreciated.

what really stops me is how to effectively launch the process again after it hit exception and exited

so far I was able to use throw_error function to kill the python script itself or launch another shell script to kill and launch python script again , but this approach seems to be less efficient

so I am wondering if there is a better way to restart one except'ed process instead of restart the whole script ?

one way I tried , was creating a new process pool in throw_error function ,like

def throw_error(e):
    pool2 = multiprocessing.Pool(processes=1)
    pool2.apply_async(run_loop, (my_item,), error_callback=throw_error)
    pool2.close()
    pool2.join()

but it seems to be an bad idea as after multiple exceptions , the process pools are getting out of controls , and accumulated up to hundreds if not thousands "zombie" processes

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

尘世孤行 2025-02-19 05:56:29

我假设这是 xy问题

这个答案是提出一种可以解决问题X的替代设计,而不是解决问题的问题。


据我所知,Python无法很好地控制对已经运行的卵子子过程或线程的优雅终止。

因此,最好的方法是 - 只是不让每个过程/线程完全失败,而是首先传播错误。

可以通过编写一个小包装器来实现这一点,在该包装器中,您可以在try -except中包装exception - 然后捕获任何例外情况。然后,我们可以在循环时使用一个重试。

def wrapper(func, max_retries, data):
    """
    Wrapper adding retry capabilities to workload.

    Args:
        func: function to execute.
        max_retries: Maximum retries until moving on.
        data: data to process.

    Returns:
        (Input data, result) Tuple.
    """

    # wrap in while in case for retry.
    retry_count = 0
    while retry_count <= max_retries:  # while true: if you want infinite loop
        retry_count += 1

        # try to process data
        try:
            result = func(data)
        except Exception as err:
            # on exception save result as error and retry
            result = err
        else:
            # otherwise it was successful, break out of retry loop
            break

    # return result
    return data, result

这是一些愚蠢的演示代码测试这个想法,失败的机会一半。

import logging
import functools
import random
from os import getpid
from multiprocessing import Pool


logging.basicConfig(format="%(levelname)-8s %(message)s", level=logging.DEBUG)
logger = logging.getLogger()


def wrapper(func, max_retries, data):
    """
    Wrapper adding retry capabilities to workload with some fancy output.

    Args:
        func: function to execute.
        max_retries: Maximum retries until moving on.
        data: data to process.

    Returns:
        (Input data, result) Tuple.
    """
    pid = f"{getpid():<6}"
    logger.info(f"[{pid}]  Processing {data}")

    # just a line to satisfy pylint
    result = None

    # wrap in while in case for retry.
    retry_count = 0

    while retry_count <= max_retries:
        # try to process data
        try:
            result = func(data)
        except Exception as err:
            # on exception print out error, set result as err, then retry
            logger.error(
                f"[{pid}]  {err.__class__.__name__} while processing {data}, "
                f"{max_retries - retry_count} retries left. "
            )
            result = err
        else:
            break

        retry_count += 1

    # print and return result
    logger.info(f"[{pid}]  Processing {data} done")
    return data, result


class RogueAIException(Exception):
    pass


def workload(n):
    """
    Quite rebellious Fibonacci function
    """

    if random.randint(0, 1):
        raise RogueAIException("I'm sorry Dave, I'm Afraid I can't do that.")

    a, b = 0, 1
    for _ in range(n - 1):
        a, b = b, a + b

    return b


def main():
    data = [random.randint(0, 100) for _ in range(20)]

    # fix parameters. Decorator can't be pickled, we'll have to live with this.
    wrapped_workload = functools.partial(wrapper, workload, 3)

    with Pool(processes=3) as pool:
        # apply function for each data
        results = pool.map(wrapped_workload, data)

        print("\nInput Output")
        for fed_data, result in results:
            print(f"{fed_data:<6}{result}")


if __name__ == '__main__':
    main()

输出:

INFO     [13904 ]  Processing 40
ERROR    [13904 ]  RogueAIException while processing 40, 3 retries left. 
INFO     [13904 ]  Processing 40 done
INFO     [13904 ]  Processing 93
ERROR    [13904 ]  RogueAIException while processing 93, 3 retries left. 
INFO     [13904 ]  Processing 93 done
INFO     [13904 ]  Processing 96
INFO     [13904 ]  Processing 96 done
INFO     [13904 ]  Processing 48
INFO     [13904 ]  Processing 48 done
INFO     [13904 ]  Processing 17
INFO     [13904 ]  Processing 17 done
INFO     [13904 ]  Processing 52
ERROR    [13904 ]  RogueAIException while processing 52, 3 retries left. 
INFO     [13904 ]  Processing 52 done
INFO     [13904 ]  Processing 96
ERROR    [13904 ]  RogueAIException while processing 96, 3 retries left. 
INFO     [13904 ]  Processing 96 done
INFO     [13904 ]  Processing 23
ERROR    [13904 ]  RogueAIException while processing 23, 3 retries left. 
INFO     [13904 ]  Processing 23 done
INFO     [13904 ]  Processing 99
ERROR    [13904 ]  RogueAIException while processing 99, 3 retries left. 
ERROR    [13904 ]  RogueAIException while processing 99, 2 retries left.
INFO     [13904 ]  Processing 99 done
INFO     [13904 ]  Processing 55
ERROR    [13904 ]  RogueAIException while processing 55, 3 retries left.
ERROR    [13904 ]  RogueAIException while processing 55, 2 retries left.
INFO     [13904 ]  Processing 55 done
INFO     [13904 ]  Processing 63
ERROR    [13904 ]  RogueAIException while processing 63, 3 retries left.
INFO     [13904 ]  Processing 63 done
INFO     [13904 ]  Processing 61
INFO     [25180 ]  Processing 3
ERROR    [13904 ]  RogueAIException while processing 61, 3 retries left.
INFO     [25180 ]  Processing 3 done
INFO     [13904 ]  Processing 61 done
INFO     [25180 ]  Processing 42
INFO     [13904 ]  Processing 33
ERROR    [25180 ]  RogueAIException while processing 42, 3 retries left.
ERROR    [13904 ]  RogueAIException while processing 33, 3 retries left.
ERROR    [25180 ]  RogueAIException while processing 42, 2 retries left.
INFO     [13904 ]  Processing 33 done
ERROR    [25180 ]  RogueAIException while processing 42, 1 retries left.
INFO     [13904 ]  Processing 2
INFO     [25180 ]  Processing 42 done
INFO     [13904 ]  Processing 2 done
INFO     [25180 ]  Processing 35
INFO     [13904 ]  Processing 45
INFO     [25180 ]  Processing 35 done
INFO     [13904 ]  Processing 45 done
INFO     [25180 ]  Processing 2
INFO     [13904 ]  Processing 11
ERROR    [25180 ]  RogueAIException while processing 2, 3 retries left.
ERROR    [13904 ]  RogueAIException while processing 11, 3 retries left.
INFO     [25180 ]  Processing 2 done
ERROR    [13904 ]  RogueAIException while processing 11, 2 retries left.
ERROR    [13904 ]  RogueAIException while processing 11, 1 retries left.
ERROR    [13904 ]  RogueAIException while processing 11, 0 retries left.
INFO     [13904 ]  Processing 11 done

Input Output
40    102334155
93    12200160415121876738
96    51680708854858323072
48    4807526976
17    1597
52    32951280099
96    51680708854858323072
23    28657
99    218922995834555169026
55    139583862445
63    6557470319842
61    2504730781961
3     2
42    267914296
33    3524578
2     1
35    9227465
2     1
45    1134903170
11    I'm sorry Dave, I'm Afraid I can't do that.

I'm assuming this is one of XY Problem.

This answer is to suggest an alternative design that could solve problem X, not solving problem Y - aka restarting process in pool.


As far as I know Python doesn't have good control over graceful termination of the spawned subprocesses or threads that's already running.

So, the best approach would be - just not letting each process/thread fail completely and propagating error in the first place.

Such can be achieved by writing a small wrapper, where you wrap a function inside try-except block with Exception - which then will catch any Exception it encounters. Then we can retry using one while loop.

def wrapper(func, max_retries, data):
    """
    Wrapper adding retry capabilities to workload.

    Args:
        func: function to execute.
        max_retries: Maximum retries until moving on.
        data: data to process.

    Returns:
        (Input data, result) Tuple.
    """

    # wrap in while in case for retry.
    retry_count = 0
    while retry_count <= max_retries:  # while true: if you want infinite loop
        retry_count += 1

        # try to process data
        try:
            result = func(data)
        except Exception as err:
            # on exception save result as error and retry
            result = err
        else:
            # otherwise it was successful, break out of retry loop
            break

    # return result
    return data, result

Here's some dumb demo code testing this idea, with half the failure chance.

import logging
import functools
import random
from os import getpid
from multiprocessing import Pool


logging.basicConfig(format="%(levelname)-8s %(message)s", level=logging.DEBUG)
logger = logging.getLogger()


def wrapper(func, max_retries, data):
    """
    Wrapper adding retry capabilities to workload with some fancy output.

    Args:
        func: function to execute.
        max_retries: Maximum retries until moving on.
        data: data to process.

    Returns:
        (Input data, result) Tuple.
    """
    pid = f"{getpid():<6}"
    logger.info(f"[{pid}]  Processing {data}")

    # just a line to satisfy pylint
    result = None

    # wrap in while in case for retry.
    retry_count = 0

    while retry_count <= max_retries:
        # try to process data
        try:
            result = func(data)
        except Exception as err:
            # on exception print out error, set result as err, then retry
            logger.error(
                f"[{pid}]  {err.__class__.__name__} while processing {data}, "
                f"{max_retries - retry_count} retries left. "
            )
            result = err
        else:
            break

        retry_count += 1

    # print and return result
    logger.info(f"[{pid}]  Processing {data} done")
    return data, result


class RogueAIException(Exception):
    pass


def workload(n):
    """
    Quite rebellious Fibonacci function
    """

    if random.randint(0, 1):
        raise RogueAIException("I'm sorry Dave, I'm Afraid I can't do that.")

    a, b = 0, 1
    for _ in range(n - 1):
        a, b = b, a + b

    return b


def main():
    data = [random.randint(0, 100) for _ in range(20)]

    # fix parameters. Decorator can't be pickled, we'll have to live with this.
    wrapped_workload = functools.partial(wrapper, workload, 3)

    with Pool(processes=3) as pool:
        # apply function for each data
        results = pool.map(wrapped_workload, data)

        print("\nInput Output")
        for fed_data, result in results:
            print(f"{fed_data:<6}{result}")


if __name__ == '__main__':
    main()

Output:

INFO     [13904 ]  Processing 40
ERROR    [13904 ]  RogueAIException while processing 40, 3 retries left. 
INFO     [13904 ]  Processing 40 done
INFO     [13904 ]  Processing 93
ERROR    [13904 ]  RogueAIException while processing 93, 3 retries left. 
INFO     [13904 ]  Processing 93 done
INFO     [13904 ]  Processing 96
INFO     [13904 ]  Processing 96 done
INFO     [13904 ]  Processing 48
INFO     [13904 ]  Processing 48 done
INFO     [13904 ]  Processing 17
INFO     [13904 ]  Processing 17 done
INFO     [13904 ]  Processing 52
ERROR    [13904 ]  RogueAIException while processing 52, 3 retries left. 
INFO     [13904 ]  Processing 52 done
INFO     [13904 ]  Processing 96
ERROR    [13904 ]  RogueAIException while processing 96, 3 retries left. 
INFO     [13904 ]  Processing 96 done
INFO     [13904 ]  Processing 23
ERROR    [13904 ]  RogueAIException while processing 23, 3 retries left. 
INFO     [13904 ]  Processing 23 done
INFO     [13904 ]  Processing 99
ERROR    [13904 ]  RogueAIException while processing 99, 3 retries left. 
ERROR    [13904 ]  RogueAIException while processing 99, 2 retries left.
INFO     [13904 ]  Processing 99 done
INFO     [13904 ]  Processing 55
ERROR    [13904 ]  RogueAIException while processing 55, 3 retries left.
ERROR    [13904 ]  RogueAIException while processing 55, 2 retries left.
INFO     [13904 ]  Processing 55 done
INFO     [13904 ]  Processing 63
ERROR    [13904 ]  RogueAIException while processing 63, 3 retries left.
INFO     [13904 ]  Processing 63 done
INFO     [13904 ]  Processing 61
INFO     [25180 ]  Processing 3
ERROR    [13904 ]  RogueAIException while processing 61, 3 retries left.
INFO     [25180 ]  Processing 3 done
INFO     [13904 ]  Processing 61 done
INFO     [25180 ]  Processing 42
INFO     [13904 ]  Processing 33
ERROR    [25180 ]  RogueAIException while processing 42, 3 retries left.
ERROR    [13904 ]  RogueAIException while processing 33, 3 retries left.
ERROR    [25180 ]  RogueAIException while processing 42, 2 retries left.
INFO     [13904 ]  Processing 33 done
ERROR    [25180 ]  RogueAIException while processing 42, 1 retries left.
INFO     [13904 ]  Processing 2
INFO     [25180 ]  Processing 42 done
INFO     [13904 ]  Processing 2 done
INFO     [25180 ]  Processing 35
INFO     [13904 ]  Processing 45
INFO     [25180 ]  Processing 35 done
INFO     [13904 ]  Processing 45 done
INFO     [25180 ]  Processing 2
INFO     [13904 ]  Processing 11
ERROR    [25180 ]  RogueAIException while processing 2, 3 retries left.
ERROR    [13904 ]  RogueAIException while processing 11, 3 retries left.
INFO     [25180 ]  Processing 2 done
ERROR    [13904 ]  RogueAIException while processing 11, 2 retries left.
ERROR    [13904 ]  RogueAIException while processing 11, 1 retries left.
ERROR    [13904 ]  RogueAIException while processing 11, 0 retries left.
INFO     [13904 ]  Processing 11 done

Input Output
40    102334155
93    12200160415121876738
96    51680708854858323072
48    4807526976
17    1597
52    32951280099
96    51680708854858323072
23    28657
99    218922995834555169026
55    139583862445
63    6557470319842
61    2504730781961
3     2
42    267914296
33    3524578
2     1
35    9227465
2     1
45    1134903170
11    I'm sorry Dave, I'm Afraid I can't do that.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文