如何等待可取消的子进程而不在异步期间进行python /终止进行轮询。

发布于 2025-02-09 07:23:55 字数 5518 浏览 2 评论 0 原文

假设我想从Python中的异步函数作为子过程运行同步函数。

def subProcess():
    print(f"[{os.getpid()}] subProcess running", file=sys.stderr)
    try:
        time.sleep(5)
    finally:
        print(f"[{os.getpid()}] subProcess closing", file=sys.stderr)

我想可以:

  • 等待子过程的终止,并知道出口码结果。最终,我想分辨出优雅的出口(包括sigterm,sigintr,exitCode = 0)与其他情况之间的区别,并使用它来知道是否不重新启动失败的子过程。
  • 如果启动的Coroutine取消,则终止子过程。在返回之前,等待过程结果会很不错 - 让我们说子进程在端口上服务...我应该等待旧过程在启动新过程之前退出,否则端口将不可用。

我不能使用 asyncio.get_event_loop()。run_in_executor(executor = processPool(),func = subprocess) 因为这并没有给我终止子过程的途径。我不能直接使用多处理(我是从异步代码调用)。

以下是我拥有的原型。

  • coroutine创建多处填充。程序对象(非assync thics)
  • 一个工作线程是 and .join 。 > process 等待出口代码响应 - 完成后,将出口代码(或异常)写入将来,
  • 原始的coroutine等待将来
  • 可以取消Coroutine。就我而言,Coroutine终止了该过程(通过发送Sigint)。由于未来(正在等待的未来)被取消,因此该线程还将其结果写入了第二个未来,Coro可以等待。

最后一部分是我的问题:

  • 该过程已终止。线程写入
  • Coro将来正在等待的两个期货,但是等待结果2 永远不会返回或投掷。为什么?

有什么想法被困在:

   exitcode = await result2

?我的调试打印 runsubProcess2:thread set2 ,确认result2.set_result被调用。

import asyncio
import os
import signal
import sys
import threading
import time
from multiprocessing import Process


def subProcess(*args):
    print(f"[{os.getpid()}] subProcess{args} running", file=sys.stderr)
    try:
        time.sleep(5)
    finally:
        print(f"[{os.getpid()}] subProcess{args} done", file=sys.stderr)



MISSING = object()

async def runSubprocess2(target=subProcess, args=(), name=None):

    def task():
        # Sync code to start, join the Process and write the 
        # exitcode/exception to the outer cooroutine's futures.
        # This all works as expected. Both result2 is assigned to
        # (result1 was cancelled already)
        p.start()
        r = None
        try:
            p.join()
            r = p.exitcode
            if 1:
                print(f"runSubprocess2:thread set2")
                result2.set_result(r)
                if not result1.cancelled():
                    print(f"runSubprocess2:thread set1")
                    result1.set_result(r)
        except BaseException as e:
            print(f"runSubprocess2:thread exception {type(e)}")
            if not result2.cancelled():
                result2.set_exception(e)
                print("runSubprocess2:thread result2 set ex")
            if not result1.cancelled():
                result1.set_exception(e)
                print("runSubprocess2:thread result1 set ex")
            else:
                print("runSubprocess2:thread  terminated")
            print(f"runSubprocess2:thread exception OK")

        finally:
           print(f"runSubprocess2:thread terminated (exitcode={r})")

    result1 = asyncio.Future()
    result2 = asyncio.Future()  # Used if result1 is cancelled
    p = Process(target=target,
                args=args,
                daemon=True,
                name=name)
    threading.Thread(
        target=task,
        daemon=True,
        name=name
    ).start()

    # task = asyncio.get_event_loop().run_in_executor(ProcessPoolExecutor(), subProcess)
    exitcode = MISSING
    try:
        exitcode = await result1
    except asyncio.CancelledError as e:
        print(f"runSubprocess2:coro cancelled {type(e)}")
    except BaseException as e:
        print(f"runSubprocess2:coro exception {type(e)}")
        raise
    finally:
        # If the process was not killed, send a KeyboardInterrupt
        wait = not result1.done()
        if result1.cancelled():
            # This path is visited in this example:
            wait = True
            print(f"runSubprocess2:coro send {p.pid} terminate ...")
            # p.terminate()
            os.kill(p.pid, signal.SIGINT)
            print("runSubprocess2:coro sent SIGINT ... OK")

        if wait:
            print(f"runSubprocess2:coro awaiting shutdown ... {result2}")
            try:
                print(f"runSubprocess2:coro awaiting 2")
                exitcode = await result2   #  <<< STUCK HERE. What the hey!
                print(f"runSubprocess2:coro awaiting shutdown ... OK exitcode={exitcode}")
            except BaseException as e:
                print(f"runSubprocess2:coro awaiting shutdown ... {type(e)}")   # ???
                raise
            finally:
                print(f"runSubprocess2:coro awaiting shutdown ... DONE {result2}")

        else:
            print("runSubprocess2:coro terminated")

    assert exitcode is not MISSING
    # Handle exit code cases here.





async def main():
    print(f"[{os.getpid()}] main", file=sys.stderr)


    if 1:

        nTasks = 1
        tasks = [asyncio.create_task(runSubprocess2(target=subProcess, args=(i,)))  #asyncio.get_event_loop().run_in_executor(executor=None, func=runSubprocess2)
                 for i in range(nTasks)]

        await asyncio.sleep(1)
        print("Cancelling...")
        for task in tasks:
            task.cancel()
        print("Cancelling... OK")
        try:
            await asyncio.wait(tasks)
        except asyncio.CancelledError:
            pass
        await asyncio.wait(tasks)

    print("Done", file=sys.stderr)


if __name__ =='__main__':
    def _main():
        try:
            asyncio.run(main())
        finally:
            print(f"Main exit.")

在py3.8,3.11上测试

Suppose I want to run a synchronous function as a subprocess from an async function in Python.

def subProcess():
    print(f"[{os.getpid()}] subProcess running", file=sys.stderr)
    try:
        time.sleep(5)
    finally:
        print(f"[{os.getpid()}] subProcess closing", file=sys.stderr)

I'd like to be able to:

  • await the termination of the subprocess and know the exitcode result. Ultimately I want to tell the difference between a graceful exit (including SIGTERM, SIGINTR, exitcode=0) vs. other cases, and use this to know whether not to restart a failed subprocess.
  • terminate the subprocess if the coroutine that started it is cancelled. It would be nice to await the process result before returning - lets says that the subprocess serves on a port... I should wait for the old process to quit before starting a new one, otherwise the port will be unavailable.

I can't use asyncio.get_event_loop().run_in_executor(executor=ProcessPool(), func=subProcess)
because this doesn't give me a path to terminate the subprocess. I can't use multiprocessing.Process directly (I'm calling from async code).

Below is a prototype I have.

  • An coroutine creates a multiprocessing.Process object (a non-async thing)
  • A worker thread is created to .start and .join the Process to wait for exitcode response - when complete, it writes the exitcode (or exception) into a future
  • the original coroutine waits on the future
  • the coroutine may be cancelled. In my case the coroutine terminates the process (by sending a SIGINT). Because the future (which it was waiting on) was cancelled, the thread also writes its result to a second future, which the coro can wait on.

The last part is my problem:

  • the process is terminated. The thread write to both futures
  • the coro is waiting on the future, but await result2 never returns or throws. Why?

Any ideas why is gets stuck on:

   exitcode = await result2

? My debug prints runSubprocess2:thread set2, confirmingn that result2.set_result is called.

import asyncio
import os
import signal
import sys
import threading
import time
from multiprocessing import Process


def subProcess(*args):
    print(f"[{os.getpid()}] subProcess{args} running", file=sys.stderr)
    try:
        time.sleep(5)
    finally:
        print(f"[{os.getpid()}] subProcess{args} done", file=sys.stderr)



MISSING = object()

async def runSubprocess2(target=subProcess, args=(), name=None):

    def task():
        # Sync code to start, join the Process and write the 
        # exitcode/exception to the outer cooroutine's futures.
        # This all works as expected. Both result2 is assigned to
        # (result1 was cancelled already)
        p.start()
        r = None
        try:
            p.join()
            r = p.exitcode
            if 1:
                print(f"runSubprocess2:thread set2")
                result2.set_result(r)
                if not result1.cancelled():
                    print(f"runSubprocess2:thread set1")
                    result1.set_result(r)
        except BaseException as e:
            print(f"runSubprocess2:thread exception {type(e)}")
            if not result2.cancelled():
                result2.set_exception(e)
                print("runSubprocess2:thread result2 set ex")
            if not result1.cancelled():
                result1.set_exception(e)
                print("runSubprocess2:thread result1 set ex")
            else:
                print("runSubprocess2:thread  terminated")
            print(f"runSubprocess2:thread exception OK")

        finally:
           print(f"runSubprocess2:thread terminated (exitcode={r})")

    result1 = asyncio.Future()
    result2 = asyncio.Future()  # Used if result1 is cancelled
    p = Process(target=target,
                args=args,
                daemon=True,
                name=name)
    threading.Thread(
        target=task,
        daemon=True,
        name=name
    ).start()

    # task = asyncio.get_event_loop().run_in_executor(ProcessPoolExecutor(), subProcess)
    exitcode = MISSING
    try:
        exitcode = await result1
    except asyncio.CancelledError as e:
        print(f"runSubprocess2:coro cancelled {type(e)}")
    except BaseException as e:
        print(f"runSubprocess2:coro exception {type(e)}")
        raise
    finally:
        # If the process was not killed, send a KeyboardInterrupt
        wait = not result1.done()
        if result1.cancelled():
            # This path is visited in this example:
            wait = True
            print(f"runSubprocess2:coro send {p.pid} terminate ...")
            # p.terminate()
            os.kill(p.pid, signal.SIGINT)
            print("runSubprocess2:coro sent SIGINT ... OK")

        if wait:
            print(f"runSubprocess2:coro awaiting shutdown ... {result2}")
            try:
                print(f"runSubprocess2:coro awaiting 2")
                exitcode = await result2   #  <<< STUCK HERE. What the hey!
                print(f"runSubprocess2:coro awaiting shutdown ... OK exitcode={exitcode}")
            except BaseException as e:
                print(f"runSubprocess2:coro awaiting shutdown ... {type(e)}")   # ???
                raise
            finally:
                print(f"runSubprocess2:coro awaiting shutdown ... DONE {result2}")

        else:
            print("runSubprocess2:coro terminated")

    assert exitcode is not MISSING
    # Handle exit code cases here.





async def main():
    print(f"[{os.getpid()}] main", file=sys.stderr)


    if 1:

        nTasks = 1
        tasks = [asyncio.create_task(runSubprocess2(target=subProcess, args=(i,)))  #asyncio.get_event_loop().run_in_executor(executor=None, func=runSubprocess2)
                 for i in range(nTasks)]

        await asyncio.sleep(1)
        print("Cancelling...")
        for task in tasks:
            task.cancel()
        print("Cancelling... OK")
        try:
            await asyncio.wait(tasks)
        except asyncio.CancelledError:
            pass
        await asyncio.wait(tasks)

    print("Done", file=sys.stderr)


if __name__ =='__main__':
    def _main():
        try:
            asyncio.run(main())
        finally:
            print(f"Main exit.")

Tested on py3.8, 3.11

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文