如何等待可取消的子进程而不在异步期间进行python /终止进行轮询。
假设我想从Python中的异步函数作为子过程运行同步函数。
def subProcess():
print(f"[{os.getpid()}] subProcess running", file=sys.stderr)
try:
time.sleep(5)
finally:
print(f"[{os.getpid()}] subProcess closing", file=sys.stderr)
我想可以:
- 等待子过程的终止,并知道出口码结果。最终,我想分辨出优雅的出口(包括sigterm,sigintr,exitCode = 0)与其他情况之间的区别,并使用它来知道是否不重新启动失败的子过程。
- 如果启动的Coroutine取消,则终止子过程。在返回之前,等待过程结果会很不错 - 让我们说子进程在端口上服务...我应该等待旧过程在启动新过程之前退出,否则端口将不可用。
我不能使用 asyncio.get_event_loop()。run_in_executor(executor = processPool(),func = subprocess)
因为这并没有给我终止子过程的途径。我不能直接使用多处理
(我是从异步代码调用)。
以下是我拥有的原型。
- coroutine创建
多处填充。程序
对象(非assync thics) - 一个工作线程是
.join
。 > process
等待出口代码响应 - 完成后,将出口代码(或异常)写入将来, - 原始的coroutine等待将来
- 可以取消Coroutine。就我而言,Coroutine终止了该过程(通过发送Sigint)。由于未来(正在等待的未来)被取消,因此该线程还将其结果写入了第二个未来,Coro可以等待。
最后一部分是我的问题:
- 该过程已终止。线程写入
- Coro将来正在等待的两个期货,但是
等待结果2
永远不会返回或投掷。为什么?
有什么想法被困在:
exitcode = await result2
?我的调试打印 runsubProcess2:thread set2
,确认result2.set_result被调用。
import asyncio
import os
import signal
import sys
import threading
import time
from multiprocessing import Process
def subProcess(*args):
print(f"[{os.getpid()}] subProcess{args} running", file=sys.stderr)
try:
time.sleep(5)
finally:
print(f"[{os.getpid()}] subProcess{args} done", file=sys.stderr)
MISSING = object()
async def runSubprocess2(target=subProcess, args=(), name=None):
def task():
# Sync code to start, join the Process and write the
# exitcode/exception to the outer cooroutine's futures.
# This all works as expected. Both result2 is assigned to
# (result1 was cancelled already)
p.start()
r = None
try:
p.join()
r = p.exitcode
if 1:
print(f"runSubprocess2:thread set2")
result2.set_result(r)
if not result1.cancelled():
print(f"runSubprocess2:thread set1")
result1.set_result(r)
except BaseException as e:
print(f"runSubprocess2:thread exception {type(e)}")
if not result2.cancelled():
result2.set_exception(e)
print("runSubprocess2:thread result2 set ex")
if not result1.cancelled():
result1.set_exception(e)
print("runSubprocess2:thread result1 set ex")
else:
print("runSubprocess2:thread terminated")
print(f"runSubprocess2:thread exception OK")
finally:
print(f"runSubprocess2:thread terminated (exitcode={r})")
result1 = asyncio.Future()
result2 = asyncio.Future() # Used if result1 is cancelled
p = Process(target=target,
args=args,
daemon=True,
name=name)
threading.Thread(
target=task,
daemon=True,
name=name
).start()
# task = asyncio.get_event_loop().run_in_executor(ProcessPoolExecutor(), subProcess)
exitcode = MISSING
try:
exitcode = await result1
except asyncio.CancelledError as e:
print(f"runSubprocess2:coro cancelled {type(e)}")
except BaseException as e:
print(f"runSubprocess2:coro exception {type(e)}")
raise
finally:
# If the process was not killed, send a KeyboardInterrupt
wait = not result1.done()
if result1.cancelled():
# This path is visited in this example:
wait = True
print(f"runSubprocess2:coro send {p.pid} terminate ...")
# p.terminate()
os.kill(p.pid, signal.SIGINT)
print("runSubprocess2:coro sent SIGINT ... OK")
if wait:
print(f"runSubprocess2:coro awaiting shutdown ... {result2}")
try:
print(f"runSubprocess2:coro awaiting 2")
exitcode = await result2 # <<< STUCK HERE. What the hey!
print(f"runSubprocess2:coro awaiting shutdown ... OK exitcode={exitcode}")
except BaseException as e:
print(f"runSubprocess2:coro awaiting shutdown ... {type(e)}") # ???
raise
finally:
print(f"runSubprocess2:coro awaiting shutdown ... DONE {result2}")
else:
print("runSubprocess2:coro terminated")
assert exitcode is not MISSING
# Handle exit code cases here.
async def main():
print(f"[{os.getpid()}] main", file=sys.stderr)
if 1:
nTasks = 1
tasks = [asyncio.create_task(runSubprocess2(target=subProcess, args=(i,))) #asyncio.get_event_loop().run_in_executor(executor=None, func=runSubprocess2)
for i in range(nTasks)]
await asyncio.sleep(1)
print("Cancelling...")
for task in tasks:
task.cancel()
print("Cancelling... OK")
try:
await asyncio.wait(tasks)
except asyncio.CancelledError:
pass
await asyncio.wait(tasks)
print("Done", file=sys.stderr)
if __name__ =='__main__':
def _main():
try:
asyncio.run(main())
finally:
print(f"Main exit.")
在py3.8,3.11上测试
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论