在Python中击中异常时,如何重新启动池中的过程
import signal
import asyncio
import os
import random
import time
import multiprocessing
my_list = []
for i in range(0,10):
n = random.randint(1,100)
my_list.append(n)
async def loop_item(my_item):
while True:
a = random.randint(1, 2)
if a == 2:
print(f"process id: {os.getpid()}")
raise Exception('Error')
print(f"process id: {os.getpid()} - {my_item}")
time.sleep(0.5)
def run_loop(my_item):
asyncio.run(loop_item(my_item))
def throw_error(e):
os.system('bash /root/my-script.sh') #that launchs "python my-script.py"
os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=10)
for my_item in my_list:
pool.apply_async(run_loop, (my_item,), error_callback=throw_error)
pool.close()
pool.join()
这是我的演示代码,其中将创建一个my_list
,其中10个项目作为
号
随机 以模仿任何类型的例外情况,并且要重新启动此loop_item(my_item)
在新过程中的功能,如果发生异常,
则有两个障碍,一个是通过变量my_item
,但我认为我应该能够使其与redis这样的外部工具(如redis)进行工作,但要获得变量,但是任何更好的想法都会受到赞赏。
真正阻止我的是如何在遇到异常后再次有效启动该过程,然后退出
到目前为止,我能够使用throw_error
函数杀死Python脚本本身或启动另一个Shell脚本来杀死和启动Python再次脚本,但是这种方法似乎不太有效,
所以我想知道是否有更好的方法重新启动一个方法,而不是重新启动整个脚本?
我尝试的一种方法是在throw_error
函数中创建一个新的过程池,
def throw_error(e):
pool2 = multiprocessing.Pool(processes=1)
pool2.apply_async(run_loop, (my_item,), error_callback=throw_error)
pool2.close()
pool2.join()
但是这似乎是一个坏主意,因为多个例外之后,该过程池已经失控,并且累积了多达数百个。如果不是成千上万的“僵尸”过程
import signal
import asyncio
import os
import random
import time
import multiprocessing
my_list = []
for i in range(0,10):
n = random.randint(1,100)
my_list.append(n)
async def loop_item(my_item):
while True:
a = random.randint(1, 2)
if a == 2:
print(f"process id: {os.getpid()}")
raise Exception('Error')
print(f"process id: {os.getpid()} - {my_item}")
time.sleep(0.5)
def run_loop(my_item):
asyncio.run(loop_item(my_item))
def throw_error(e):
os.system('bash /root/my-script.sh') #that launchs "python my-script.py"
os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=10)
for my_item in my_list:
pool.apply_async(run_loop, (my_item,), error_callback=throw_error)
pool.close()
pool.join()
this is my demo code , in which it will create a my_list
with 10 items in it as random number
then launch with 10 processes to print it out to alone with pid
then I add a raise Exception
to mimic any kind of exception it may occur and want to restart this loop_item(my_item)
function in new process if exception happens
there are two obstacle for this , one is pass the variable my_item
but I think I should be able to make it work with external tool like Redis that put/get variable , but any better idea is appreciated.
what really stops me is how to effectively launch the process again after it hit exception and exited
so far I was able to use throw_error
function to kill the python script itself or launch another shell script to kill and launch python script again , but this approach seems to be less efficient
so I am wondering if there is a better way to restart one except'ed process instead of restart the whole script ?
one way I tried , was creating a new process pool in throw_error
function ,like
def throw_error(e):
pool2 = multiprocessing.Pool(processes=1)
pool2.apply_async(run_loop, (my_item,), error_callback=throw_error)
pool2.close()
pool2.join()
but it seems to be an bad idea as after multiple exceptions , the process pools are getting out of controls , and accumulated up to hundreds if not thousands "zombie" processes
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我假设这是 xy问题。
这个答案是提出一种可以解决问题X的替代设计,而不是解决问题的问题。
据我所知,Python无法很好地控制对已经运行的卵子子过程或线程的优雅终止。
因此,最好的方法是 - 只是不让每个过程/线程完全失败,而是首先传播错误。
可以通过编写一个小包装器来实现这一点,在该包装器中,您可以在
try -except
中包装exception
- 然后捕获任何例外情况。然后,我们可以在循环时使用一个重试。
这是一些愚蠢的演示代码测试这个想法,失败的机会一半。
输出:
I'm assuming this is one of XY Problem.
This answer is to suggest an alternative design that could solve problem X, not solving problem Y - aka restarting process in pool.
As far as I know Python doesn't have good control over graceful termination of the spawned subprocesses or threads that's already running.
So, the best approach would be - just not letting each process/thread fail completely and propagating error in the first place.
Such can be achieved by writing a small wrapper, where you wrap a function inside
try-except
block withException
- which then will catch any Exception it encounters. Then we can retry using onewhile
loop.Here's some dumb demo code testing this idea, with half the failure chance.
Output: