Python子进程:cmd退出时的回调

发布于 2024-08-27 09:36:04 字数 439 浏览 5 评论 0原文

我目前正在使用 subprocess.Popen(cmd, shell=TRUE) 启动一个程序

我对 Python 相当陌生,但“感觉”应该有一些 api 可以让我做一些事情类似于:

subprocess.Popen(cmd, shell=TRUE,  postexec_fn=function_to_call_on_exit)

我这样做是为了让 function_to_call_on_exit 可以在知道 cmd 已退出的情况下做一些事情(例如,记录当前正在运行的外部进程的数量)

我认为我可以相当简单地换行子进程在一个将线程与 Popen.wait() 方法相结合的类中,但由于我还没有在 Python 中完成线程,而且看起来这对于 API 的存在来说可能足够常见,所以我我想我应该先尝试找到一个。

提前致谢 :)

I'm currently launching a programme using subprocess.Popen(cmd, shell=TRUE)

I'm fairly new to Python, but it 'feels' like there ought to be some api that lets me do something similar to:

subprocess.Popen(cmd, shell=TRUE,  postexec_fn=function_to_call_on_exit)

I am doing this so that function_to_call_on_exit can do something based on knowing that the cmd has exited (for example keeping count of the number of external processes currently running)

I assume that I could fairly trivially wrap subprocess in a class that combined threading with the Popen.wait() method, but as I've not done threading in Python yet and it seems like this might be common enough for an API to exist, I thought I'd try and find one first.

Thanks in advance :)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(10

握住你手 2024-09-03 09:36:05

你是对的 - 没有很好的 API 可以做到这一点。您的第二点也是正确的 - 设计一个使用线程为您执行此操作的函数非常容易。

import threading
import subprocess

def popen_and_call(on_exit, popen_args):
    """
    Runs the given args in a subprocess.Popen, and then calls the function
    on_exit when the subprocess completes.
    on_exit is a callable object, and popen_args is a list/tuple of args that 
    would give to subprocess.Popen.
    """
    def run_in_thread(on_exit, popen_args):
        proc = subprocess.Popen(*popen_args)
        proc.wait()
        on_exit()
        return
    thread = threading.Thread(target=run_in_thread, args=(on_exit, popen_args))
    thread.start()
    # returns immediately after the thread starts
    return thread

即使线程在 Python 中也很容易,但请注意,如果 on_exit() 的计算量很大,您将需要将其放在单独的进程中,而不是使用多处理(这样 GIL 就不会减慢您的程序速度)。它实际上非常简单 - 您基本上只需用 multiprocessing.Process 替换对 threading.Thread 的所有调用,因为它们遵循(几乎)相同的 API。

You're right - there is no nice API for this. You're also right on your second point - it's trivially easy to design a function that does this for you using threading.

import threading
import subprocess

def popen_and_call(on_exit, popen_args):
    """
    Runs the given args in a subprocess.Popen, and then calls the function
    on_exit when the subprocess completes.
    on_exit is a callable object, and popen_args is a list/tuple of args that 
    would give to subprocess.Popen.
    """
    def run_in_thread(on_exit, popen_args):
        proc = subprocess.Popen(*popen_args)
        proc.wait()
        on_exit()
        return
    thread = threading.Thread(target=run_in_thread, args=(on_exit, popen_args))
    thread.start()
    # returns immediately after the thread starts
    return thread

Even threading is pretty easy in Python, but note that if on_exit() is computationally expensive, you'll want to put this in a separate process instead using multiprocessing (so that the GIL doesn't slow your program down). It's actually very simple - you can basically just replace all calls to threading.Thread with multiprocessing.Process since they follow (almost) the same API.

回心转意 2024-09-03 09:36:05

Python 3.2 中有 concurrent.futures 模块(对于较旧的 Python <3.2,可通过 pip install futures 获取):

pool = Pool(max_workers=1)
f = pool.submit(subprocess.call, "sleep 2; echo done", shell=True)
f.add_done_callback(callback)

回调将在调用 f.add_done_callback() 的同一进程中调用。

完整程序

import logging
import subprocess
# to install run `pip install futures` on Python <3.2
from concurrent.futures import ThreadPoolExecutor as Pool

info = logging.getLogger(__name__).info

def callback(future):
    if future.exception() is not None:
        info("got exception: %s" % future.exception())
    else:
        info("process returned %d" % future.result())

def main():
    logging.basicConfig(
        level=logging.INFO,
        format=("%(relativeCreated)04d %(process)05d %(threadName)-10s "
                "%(levelname)-5s %(msg)s"))

    # wait for the process completion asynchronously
    info("begin waiting")
    pool = Pool(max_workers=1)
    f = pool.submit(subprocess.call, "sleep 2; echo done", shell=True)
    f.add_done_callback(callback)
    pool.shutdown(wait=False) # no .submit() calls after that point
    info("continue waiting asynchronously")

if __name__=="__main__":
    main()

输出

$ python . && python3 .
0013 05382 MainThread INFO  begin waiting
0021 05382 MainThread INFO  continue waiting asynchronously
done
2025 05382 Thread-1   INFO  process returned 0
0007 05402 MainThread INFO  begin waiting
0014 05402 MainThread INFO  continue waiting asynchronously
done
2018 05402 Thread-1   INFO  process returned 0

There is concurrent.futures module in Python 3.2 (available via pip install futures for older Python < 3.2):

pool = Pool(max_workers=1)
f = pool.submit(subprocess.call, "sleep 2; echo done", shell=True)
f.add_done_callback(callback)

The callback will be called in the same process that called f.add_done_callback().

Full program

import logging
import subprocess
# to install run `pip install futures` on Python <3.2
from concurrent.futures import ThreadPoolExecutor as Pool

info = logging.getLogger(__name__).info

def callback(future):
    if future.exception() is not None:
        info("got exception: %s" % future.exception())
    else:
        info("process returned %d" % future.result())

def main():
    logging.basicConfig(
        level=logging.INFO,
        format=("%(relativeCreated)04d %(process)05d %(threadName)-10s "
                "%(levelname)-5s %(msg)s"))

    # wait for the process completion asynchronously
    info("begin waiting")
    pool = Pool(max_workers=1)
    f = pool.submit(subprocess.call, "sleep 2; echo done", shell=True)
    f.add_done_callback(callback)
    pool.shutdown(wait=False) # no .submit() calls after that point
    info("continue waiting asynchronously")

if __name__=="__main__":
    main()

Output

$ python . && python3 .
0013 05382 MainThread INFO  begin waiting
0021 05382 MainThread INFO  continue waiting asynchronously
done
2025 05382 Thread-1   INFO  process returned 0
0007 05402 MainThread INFO  begin waiting
0014 05402 MainThread INFO  continue waiting asynchronously
done
2018 05402 Thread-1   INFO  process returned 0
無心 2024-09-03 09:36:05

我修改了 Daniel G 的答案,将 subprocess.Popen argskwargs 作为它们本身而不是作为单独的元组/列表传递,因为我想要将关键字参数与 subprocess.Popen 结合使用。

就我而言,我有一个方法 postExec() ,我想在 subprocess.Popen('exe', cwd=WORKING_DIR) 之后运行它,

使用下面的代码,它就变成了popenAndCall(postExec, 'exe', cwd=WORKING_DIR)

import threading
import subprocess

def popenAndCall(onExit, *popenArgs, **popenKWArgs):
    """
    Runs a subprocess.Popen, and then calls the function onExit when the
    subprocess completes.

    Use it exactly the way you'd normally use subprocess.Popen, except include a
    callable to execute as the first argument. onExit is a callable object, and
    *popenArgs and **popenKWArgs are simply passed up to subprocess.Popen.
    """
    def runInThread(onExit, popenArgs, popenKWArgs):
        proc = subprocess.Popen(*popenArgs, **popenKWArgs)
        proc.wait()
        onExit()
        return

    thread = threading.Thread(target=runInThread,
                              args=(onExit, popenArgs, popenKWArgs))
    thread.start()

    return thread # returns immediately after the thread starts

I modified Daniel G's answer to simply pass the subprocess.Popen args and kwargs as themselves instead of as a separate tuple/list, since I wanted to use keyword arguments with subprocess.Popen.

In my case I had a method postExec() that I wanted to run after subprocess.Popen('exe', cwd=WORKING_DIR)

With the code below, it simply becomes popenAndCall(postExec, 'exe', cwd=WORKING_DIR)

import threading
import subprocess

def popenAndCall(onExit, *popenArgs, **popenKWArgs):
    """
    Runs a subprocess.Popen, and then calls the function onExit when the
    subprocess completes.

    Use it exactly the way you'd normally use subprocess.Popen, except include a
    callable to execute as the first argument. onExit is a callable object, and
    *popenArgs and **popenKWArgs are simply passed up to subprocess.Popen.
    """
    def runInThread(onExit, popenArgs, popenKWArgs):
        proc = subprocess.Popen(*popenArgs, **popenKWArgs)
        proc.wait()
        onExit()
        return

    thread = threading.Thread(target=runInThread,
                              args=(onExit, popenArgs, popenKWArgs))
    thread.start()

    return thread # returns immediately after the thread starts
旧时模样 2024-09-03 09:36:05

我遇到了同样的问题,并使用 multiprocessing.Pool 解决了它。这里涉及到两个技巧:

  1. 让池的大小 1
  2. 在长度为 1 的可迭代对象中传递可迭代参数

结果是一个在完成时执行回调的函数

def sub(arg):
    print arg             #prints [1,2,3,4,5]
    return "hello"

def cb(arg):
    print arg             # prints "hello"

pool = multiprocessing.Pool(1)
rval = pool.map_async(sub,([[1,2,3,4,5]]),callback =cb)
(do stuff) 
pool.close()

在我的例子中,我也希望调用是非阻塞的。做工精美

I had same problem, and solved it using multiprocessing.Pool. There are two hacky tricks involved:

  1. make size of pool 1
  2. pass iterable arguments within an iterable of length 1

result is one function executed with callback on completion

def sub(arg):
    print arg             #prints [1,2,3,4,5]
    return "hello"

def cb(arg):
    print arg             # prints "hello"

pool = multiprocessing.Pool(1)
rval = pool.map_async(sub,([[1,2,3,4,5]]),callback =cb)
(do stuff) 
pool.close()

In my case, I wanted invocation to be non-blocking as well. Works beautifully

沐歌 2024-09-03 09:36:05

在 POSIX 系统上,当子进程退出时,父进程会收到 SIGCHLD 信号。要在子进程命令退出时运行回调,请在父进程中处理 SIGCHLD 信号。像这样的东西:

import signal
import subprocess

def sigchld_handler(signum, frame):
    # This is run when the child exits.
    # Do something here ...
    pass

signal.signal(signal.SIGCHLD, sigchld_handler)

process = subprocess.Popen('mycmd', shell=TRUE)

请注意,这在 Windows 上不起作用。

On POSIX systems, the parent process receives a SIGCHLD signal when a child process exits. To run a callback when a subprocess command exits, handle the SIGCHLD signal in the parent. Something like this:

import signal
import subprocess

def sigchld_handler(signum, frame):
    # This is run when the child exits.
    # Do something here ...
    pass

signal.signal(signal.SIGCHLD, sigchld_handler)

process = subprocess.Popen('mycmd', shell=TRUE)

Note that this will not work on Windows.

稳稳的幸福 2024-09-03 09:36:05

我受到 Daniel G.answer 的启发,实现了一个非常简单的用例 - 在我的工作中,我经常需要使用不同的参数重复调用相同的(外部)进程。我已经找到了一种方法来确定每个特定调用何时完成,但现在我有一种更简洁的方法来发出回调。

我喜欢这个实现,因为它非常简单,但它允许我向多个处理器发出异步调用(请注意,我使用多处理而不是线程)并在完成时接收通知。

我测试了示例程序并且运行良好。请随意编辑并提供反馈。

import multiprocessing
import subprocess

class Process(object):
    """This class spawns a subprocess asynchronously and calls a
    `callback` upon completion; it is not meant to be instantiated
    directly (derived classes are called instead)"""
    def __call__(self, *args):
    # store the arguments for later retrieval
    self.args = args
    # define the target function to be called by
    # `multiprocessing.Process`
    def target():
        cmd = [self.command] + [str(arg) for arg in self.args]
        process = subprocess.Popen(cmd)
        # the `multiprocessing.Process` process will wait until
        # the call to the `subprocess.Popen` object is completed
        process.wait()
        # upon completion, call `callback`
        return self.callback()
    mp_process = multiprocessing.Process(target=target)
    # this call issues the call to `target`, but returns immediately
    mp_process.start()
    return mp_process

if __name__ == "__main__":

    def squeal(who):
    """this serves as the callback function; its argument is the
    instance of a subclass of Process making the call"""
    print "finished %s calling %s with arguments %s" % (
        who.__class__.__name__, who.command, who.args)

    class Sleeper(Process):
    """Sample implementation of an asynchronous process - define
    the command name (available in the system path) and a callback
    function (previously defined)"""
    command = "./sleeper"
    callback = squeal

    # create an instance to Sleeper - this is the Process object that
    # can be called repeatedly in an asynchronous manner
    sleeper_run = Sleeper()

    # spawn three sleeper runs with different arguments
    sleeper_run(5)
    sleeper_run(2)
    sleeper_run(1)

    # the user should see the following message immediately (even
    # though the Sleeper calls are not done yet)
    print "program continued"

示例输出:

program continued
finished Sleeper calling ./sleeper with arguments (1,)
finished Sleeper calling ./sleeper with arguments (2,)
finished Sleeper calling ./sleeper with arguments (5,)

下面是 sleeper.c 的源代码 - 我的示例“耗时”外部进程

#include<stdlib.h>
#include<unistd.h>

int main(int argc, char *argv[]){
  unsigned int t = atoi(argv[1]);
  sleep(t);
  return EXIT_SUCCESS;
}

编译为:

gcc -o sleeper sleeper.c

I was inspired by Daniel G. answer and implemented a very simple use case - in my work I often need to make repeated calls to the same (external) process with different arguments. I had hacked a way to determine when each specific call was done, but now I have a much cleaner way to issue callbacks.

I like this implementation because it is very simple, yet it allows me to issue asynchronous calls to multiple processors (notice I use multiprocessing instead of threading) and receive notification upon completion.

I tested the sample program and works great. Please edit at will and provide feedback.

import multiprocessing
import subprocess

class Process(object):
    """This class spawns a subprocess asynchronously and calls a
    `callback` upon completion; it is not meant to be instantiated
    directly (derived classes are called instead)"""
    def __call__(self, *args):
    # store the arguments for later retrieval
    self.args = args
    # define the target function to be called by
    # `multiprocessing.Process`
    def target():
        cmd = [self.command] + [str(arg) for arg in self.args]
        process = subprocess.Popen(cmd)
        # the `multiprocessing.Process` process will wait until
        # the call to the `subprocess.Popen` object is completed
        process.wait()
        # upon completion, call `callback`
        return self.callback()
    mp_process = multiprocessing.Process(target=target)
    # this call issues the call to `target`, but returns immediately
    mp_process.start()
    return mp_process

if __name__ == "__main__":

    def squeal(who):
    """this serves as the callback function; its argument is the
    instance of a subclass of Process making the call"""
    print "finished %s calling %s with arguments %s" % (
        who.__class__.__name__, who.command, who.args)

    class Sleeper(Process):
    """Sample implementation of an asynchronous process - define
    the command name (available in the system path) and a callback
    function (previously defined)"""
    command = "./sleeper"
    callback = squeal

    # create an instance to Sleeper - this is the Process object that
    # can be called repeatedly in an asynchronous manner
    sleeper_run = Sleeper()

    # spawn three sleeper runs with different arguments
    sleeper_run(5)
    sleeper_run(2)
    sleeper_run(1)

    # the user should see the following message immediately (even
    # though the Sleeper calls are not done yet)
    print "program continued"

Sample output:

program continued
finished Sleeper calling ./sleeper with arguments (1,)
finished Sleeper calling ./sleeper with arguments (2,)
finished Sleeper calling ./sleeper with arguments (5,)

Below is the source code of sleeper.c - my sample "time consuming" external process

#include<stdlib.h>
#include<unistd.h>

int main(int argc, char *argv[]){
  unsigned int t = atoi(argv[1]);
  sleep(t);
  return EXIT_SUCCESS;
}

compile as:

gcc -o sleeper sleeper.c
怪我太投入 2024-09-03 09:36:05

从 3.2 开始,concurrent.futures 中也有 ProcesPoolExecutor (https://docs.python .org/3/library/concurrent.futures.html)。用法与上面提到的ThreadPoolExecutor一样。通过 executor.add_done_callback() 附加退出回调。

There is also ProcesPoolExecutor since 3.2 in concurrent.futures (https://docs.python.org/3/library/concurrent.futures.html). The usage is as of the ThreadPoolExecutor mentioned above. With on exit callback being attached via executor.add_done_callback().

十年不长 2024-09-03 09:36:05

谢谢你们为我指明了正确的方向。我根据在这里找到的内容创建了一个类,并添加了一个停止函数来终止该进程:

class popenplus():
  def __init__(self, onExit, *popenArgs, **popenKWArgs):
    thread = Thread(target=self.runInThread, args=(onExit, popenArgs, popenKWArgs))
    thread.start()

  def runInThread(self, onExit, popenArgs, popenKWArgs):
    self.proc = Popen(*popenArgs, **popenKWArgs)
    self.proc.wait()
    self.proc = None
    onExit()

  def stop(self):
    if self.proc:
      self.proc.kill()

Thanks guys, for pointing me into the right direction. I made a class from what I found here and added a stop-function to kill the process:

class popenplus():
  def __init__(self, onExit, *popenArgs, **popenKWArgs):
    thread = Thread(target=self.runInThread, args=(onExit, popenArgs, popenKWArgs))
    thread.start()

  def runInThread(self, onExit, popenArgs, popenKWArgs):
    self.proc = Popen(*popenArgs, **popenKWArgs)
    self.proc.wait()
    self.proc = None
    onExit()

  def stop(self):
    if self.proc:
      self.proc.kill()
半城柳色半声笛 2024-09-03 09:36:05

当前对该问题的大多数答案都建议每个进程旋转一个线程只是为了等待该回调。在我看来,这是不必要的浪费:单个线程应该足以满足以这种方式创建的所有进程的所有回调。

另一个答案建议使用信号,但这会产生竞争条件,在上一个调用完成之前,信号处理程序可能会再次被调用。在 Linux 上,signalfd(2) 可以帮助解决这个问题,但 Python 不支持 (尽管通过 ctypes 添加很容易)。

Python 中 asyncio 使用的替代方法是使用 signal.set_wakeup_fd。然而,还有另一种解决方案,基于操作系统将在进程退出时关闭所有打开的 fd:

import os
import select
import subprocess
import threading
import weakref


def _close_and_join(fd, thread):
    os.close(fd)
    thread.join()


def _run_poll_callbacks(quitfd, poll, callbacks):
    poll.register(quitfd, select.POLLHUP)
    while True:
        for fd, event in poll.poll(1000.0):
            poll.unregister(fd)
            if fd == quitfd:
                return
            callback = callbacks.pop(fd)
            if callback is not None:
                callback()


class PollProcs:
    def __init__(self):
        self.poll = select.poll()
        self.callbacks = {}
        self.closed = False

        r, w = os.pipe()
        self.thread = threading.Thread(
            target=_run_poll_callbacks, args=(r, self.poll, self.callbacks)
        )
        self.thread.start()
        self.finalizer = weakref.finalize(self, _close_and_join, w, self.thread)

    def run(self, cmd, callback=None):
        if self.closed:
            return

        r, w = os.pipe()
        self.callbacks[r] = callback
        self.poll.register(r, select.POLLHUP)
        popen = subprocess.Popen(cmd, pass_fds=(w,))
        os.close(w)
        print("running", " ".join(cmd), "as", popen.pid)
        return popen


def main():
    procs = PollProcs()

    for i in range(3, 0, -1):
        procs.run(["sleep", str(i)], callback=lambda i=i: print(f"sleep {i} done?"))

    import time

    print("Waiting...")
    time.sleep(3)


if __name__ == "__main__":
    main()

如果不需要支持 MacOS,select.epoll 可能是更好的选择,因为它允许持续更新轮询。

Most of the current answers to this question suggest spinning up one thread per process just to wait for that callback. That strikes me as needlessly wasteful: A single thread should suffice for all callbacks from all processes created this way.

Another answer suggests using signals, but that creates a race condition where the signal handler might get called again before the previous call finished. On Linux, signalfd(2) could help with that but it's not supported by Python (although it's easy enough to add via ctypes).

The alternative used by asyncio in Python is to use signal.set_wakeup_fd. However, there is another solution based on the fact that the OS will close all open fds on process exit:

import os
import select
import subprocess
import threading
import weakref


def _close_and_join(fd, thread):
    os.close(fd)
    thread.join()


def _run_poll_callbacks(quitfd, poll, callbacks):
    poll.register(quitfd, select.POLLHUP)
    while True:
        for fd, event in poll.poll(1000.0):
            poll.unregister(fd)
            if fd == quitfd:
                return
            callback = callbacks.pop(fd)
            if callback is not None:
                callback()


class PollProcs:
    def __init__(self):
        self.poll = select.poll()
        self.callbacks = {}
        self.closed = False

        r, w = os.pipe()
        self.thread = threading.Thread(
            target=_run_poll_callbacks, args=(r, self.poll, self.callbacks)
        )
        self.thread.start()
        self.finalizer = weakref.finalize(self, _close_and_join, w, self.thread)

    def run(self, cmd, callback=None):
        if self.closed:
            return

        r, w = os.pipe()
        self.callbacks[r] = callback
        self.poll.register(r, select.POLLHUP)
        popen = subprocess.Popen(cmd, pass_fds=(w,))
        os.close(w)
        print("running", " ".join(cmd), "as", popen.pid)
        return popen


def main():
    procs = PollProcs()

    for i in range(3, 0, -1):
        procs.run(["sleep", str(i)], callback=lambda i=i: print(f"sleep {i} done?"))

    import time

    print("Waiting...")
    time.sleep(3)


if __name__ == "__main__":
    main()

If supporting MacOS isn't a requirement select.epoll is likely a better choice as it allows updating ongoing polling.

双手揣兜 2024-09-03 09:36:05

AFAIK 没有这样的 API,至少在 subprocess 模块中没有。您需要自己滚动一些东西,可能使用线程。

AFAIK there is no such API, at least not in subprocess module. You need to roll something on your own, possibly using threads.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文