使用模块“子进程”; 有超时

发布于 2024-07-28 00:06:04 字数 517 浏览 11 评论 0原文

以下是运行返回其 stdout 数据的任意命令,或在非零退出代码时引发异常的 Python 代码:

proc = subprocess.Popen(
    cmd,
    stderr=subprocess.STDOUT,  # Merge stdout and stderr
    stdout=subprocess.PIPE,
    shell=True)

communicate 用于等待进程退出:

stdoutdata, stderrdata = proc.communicate()

subprocess 模块不支持超时(能够终止运行时间超过 X 秒的进程),因此,communicate 可能需要永远运行。

在 Windows 和 Linux 上运行的 Python 程序中实现超时的最简单方法是什么?

Here's the Python code to run an arbitrary command returning its stdout data, or raise an exception on non-zero exit codes:

proc = subprocess.Popen(
    cmd,
    stderr=subprocess.STDOUT,  # Merge stdout and stderr
    stdout=subprocess.PIPE,
    shell=True)

communicate is used to wait for the process to exit:

stdoutdata, stderrdata = proc.communicate()

The subprocess module does not support timeout--ability to kill a process running for more than X number of seconds--therefore, communicate may take forever to run.

What is the simplest way to implement timeouts in a Python program meant to run on Windows and Linux?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(30

可遇━不可求 2024-08-04 00:06:05

这是 Alex Martelli 的解决方案,作为具有适当进程终止的模块。 其他方法不起作用,因为它们不使用 proc.communicate()。 因此,如果您有一个产生大量输出的进程,它将填充其输出缓冲区,然后阻塞,直到您从中读取内容。

from os import kill
from signal import alarm, signal, SIGALRM, SIGKILL
from subprocess import PIPE, Popen

def run(args, cwd = None, shell = False, kill_tree = True, timeout = -1, env = None):
    '''
    Run a command with a timeout after which it will be forcibly
    killed.
    '''
    class Alarm(Exception):
        pass
    def alarm_handler(signum, frame):
        raise Alarm
    p = Popen(args, shell = shell, cwd = cwd, stdout = PIPE, stderr = PIPE, env = env)
    if timeout != -1:
        signal(SIGALRM, alarm_handler)
        alarm(timeout)
    try:
        stdout, stderr = p.communicate()
        if timeout != -1:
            alarm(0)
    except Alarm:
        pids = [p.pid]
        if kill_tree:
            pids.extend(get_process_children(p.pid))
        for pid in pids:
            # process might have died before getting to this line
            # so wrap to avoid OSError: no such process
            try: 
                kill(pid, SIGKILL)
            except OSError:
                pass
        return -9, '', ''
    return p.returncode, stdout, stderr

def get_process_children(pid):
    p = Popen('ps --no-headers -o pid --ppid %d' % pid, shell = True,
              stdout = PIPE, stderr = PIPE)
    stdout, stderr = p.communicate()
    return [int(p) for p in stdout.split()]

if __name__ == '__main__':
    print run('find /', shell = True, timeout = 3)
    print run('find', shell = True)

Here is Alex Martelli's solution as a module with proper process killing. The other approaches do not work because they do not use proc.communicate(). So if you have a process that produces lots of output, it will fill its output buffer and then block until you read something from it.

from os import kill
from signal import alarm, signal, SIGALRM, SIGKILL
from subprocess import PIPE, Popen

def run(args, cwd = None, shell = False, kill_tree = True, timeout = -1, env = None):
    '''
    Run a command with a timeout after which it will be forcibly
    killed.
    '''
    class Alarm(Exception):
        pass
    def alarm_handler(signum, frame):
        raise Alarm
    p = Popen(args, shell = shell, cwd = cwd, stdout = PIPE, stderr = PIPE, env = env)
    if timeout != -1:
        signal(SIGALRM, alarm_handler)
        alarm(timeout)
    try:
        stdout, stderr = p.communicate()
        if timeout != -1:
            alarm(0)
    except Alarm:
        pids = [p.pid]
        if kill_tree:
            pids.extend(get_process_children(p.pid))
        for pid in pids:
            # process might have died before getting to this line
            # so wrap to avoid OSError: no such process
            try: 
                kill(pid, SIGKILL)
            except OSError:
                pass
        return -9, '', ''
    return p.returncode, stdout, stderr

def get_process_children(pid):
    p = Popen('ps --no-headers -o pid --ppid %d' % pid, shell = True,
              stdout = PIPE, stderr = PIPE)
    stdout, stderr = p.communicate()
    return [int(p) for p in stdout.split()]

if __name__ == '__main__':
    print run('find /', shell = True, timeout = 3)
    print run('find', shell = True)
稀香 2024-08-04 00:06:05

Python 3.5开始,有一个新的subprocess.run 通用命令(即替换 check_callcheck_output ...),并且具有 timeout=< /code> 参数也是如此。

subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, cwd=None, timeout=None,检查=假,编码=无,错误=无)

运行args描述的命令。 等待命令完成,然后返回 CompletedProcess 实例。

它会引发 subprocess.TimeoutExpired超时到期时出现异常。

Since Python 3.5, there's a new subprocess.run universal command (that is meant to replace check_call, check_output ...) and which has the timeout= parameter as well.

subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None)

Run the command described by args. Wait for command to complete, then return a CompletedProcess instance.

It raises a subprocess.TimeoutExpired exception when the timeout expires.

雪落纷纷 2024-08-04 00:06:05

timeout 现在支持,如下: subprocess 模块中的 code>call() 和 communicate() (从 Python3.3 开始):

import subprocess

subprocess.call("command", timeout=20, shell=True)

这将调用命令并引发异常

subprocess.TimeoutExpired

如果命令在 20 后未完成, 秒。

然后,您可以处理异常以继续您的代码,例如:

try:
    subprocess.call("command", timeout=20, shell=True)
except subprocess.TimeoutExpired:
    # insert code here

希望这有帮助。

timeout is now supported by call() and communicate() in the subprocess module (as of Python3.3):

import subprocess

subprocess.call("command", timeout=20, shell=True)

This will call the command and raise the exception

subprocess.TimeoutExpired

if the command doesn't finish after 20 seconds.

You can then handle the exception to continue your code, something like:

try:
    subprocess.call("command", timeout=20, shell=True)
except subprocess.TimeoutExpired:
    # insert code here

Hope this helps.

南七夏 2024-08-04 00:06:05

令人惊讶的是没有人提到使用 timeout

timeout 5 ping -c 3 somehost

这显然不适用于每个用例,但如果您处理一个简单的脚本,这很难击败。

Mac 用户还可以通过 homebrew 在 coreutils 中以 gtimeout 形式使用。

surprised nobody mentioned using timeout

timeout 5 ping -c 3 somehost

This won't for work for every use case obviously, but if your dealing with a simple script, this is hard to beat.

Also available as gtimeout in coreutils via homebrew for mac users.

赢得她心 2024-08-04 00:06:05

我修改了 sussudio 答案。 现在函数返回:(returncode, stdout, stderr, timeout) - stdout并且 stderr 被解码为 utf-8 字符串

def kill_proc(proc, timeout):
  timeout["value"] = True
  proc.kill()

def run(cmd, timeout_sec):
  proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  timeout = {"value": False}
  timer = Timer(timeout_sec, kill_proc, [proc, timeout])
  timer.start()
  stdout, stderr = proc.communicate()
  timer.cancel()
  return proc.returncode, stdout.decode("utf-8"), stderr.decode("utf-8"), timeout["value"]

I've modified sussudio answer. Now function returns: (returncode, stdout, stderr, timeout) - stdout and stderr is decoded to utf-8 string

def kill_proc(proc, timeout):
  timeout["value"] = True
  proc.kill()

def run(cmd, timeout_sec):
  proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  timeout = {"value": False}
  timer = Timer(timeout_sec, kill_proc, [proc, timeout])
  timer.start()
  stdout, stderr = proc.communicate()
  timer.cancel()
  return proc.returncode, stdout.decode("utf-8"), stderr.decode("utf-8"), timeout["value"]
轻许诺言 2024-08-04 00:06:05

另一种选择是写入临时文件以防止 stdout 阻塞,而不需要使用 communications() 进行轮询。 这对我有用,而其他答案则不起作用; 例如在 Windows 上。

    outFile =  tempfile.SpooledTemporaryFile() 
    errFile =   tempfile.SpooledTemporaryFile() 
    proc = subprocess.Popen(args, stderr=errFile, stdout=outFile, universal_newlines=False)
    wait_remaining_sec = timeout

    while proc.poll() is None and wait_remaining_sec > 0:
        time.sleep(1)
        wait_remaining_sec -= 1

    if wait_remaining_sec <= 0:
        killProc(proc.pid)
        raise ProcessIncompleteError(proc, timeout)

    # read temp streams from start
    outFile.seek(0);
    errFile.seek(0);
    out = outFile.read()
    err = errFile.read()
    outFile.close()
    errFile.close()

Another option is to write to a temporary file to prevent the stdout blocking instead of needing to poll with communicate(). This worked for me where the other answers did not; for example on windows.

    outFile =  tempfile.SpooledTemporaryFile() 
    errFile =   tempfile.SpooledTemporaryFile() 
    proc = subprocess.Popen(args, stderr=errFile, stdout=outFile, universal_newlines=False)
    wait_remaining_sec = timeout

    while proc.poll() is None and wait_remaining_sec > 0:
        time.sleep(1)
        wait_remaining_sec -= 1

    if wait_remaining_sec <= 0:
        killProc(proc.pid)
        raise ProcessIncompleteError(proc, timeout)

    # read temp streams from start
    outFile.seek(0);
    errFile.seek(0);
    out = outFile.read()
    err = errFile.read()
    outFile.close()
    errFile.close()
栖迟 2024-08-04 00:06:05

预先添加 Linux 命令 timeout 并不是一个糟糕的解决方法,而且它对我有用。

cmd = "timeout 20 "+ cmd
subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(output, err) = p.communicate()

Prepending the Linux command timeout isn't a bad workaround and it worked for me.

cmd = "timeout 20 "+ cmd
subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(output, err) = p.communicate()
魔法少女 2024-08-04 00:06:05

我将来自 jcollado 的线程解决方案添加到我的 Python 模块 easyprocess

安装:

pip install easyprocess

示例:

from easyprocess import Proc

# shell is not supported!
stdout=Proc('ping localhost').call(timeout=1.5).stdout
print stdout

I added the solution with threading from jcollado to my Python module easyprocess.

Install:

pip install easyprocess

Example:

from easyprocess import Proc

# shell is not supported!
stdout=Proc('ping localhost').call(timeout=1.5).stdout
print stdout
辞慾 2024-08-04 00:06:05

这是我的解决方案,我使用线程和事件:

import subprocess
from threading import Thread, Event

def kill_on_timeout(done, timeout, proc):
    if not done.wait(timeout):
        proc.kill()

def exec_command(command, timeout):

    done = Event()
    proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    watcher = Thread(target=kill_on_timeout, args=(done, timeout, proc))
    watcher.daemon = True
    watcher.start()

    data, stderr = proc.communicate()
    done.set()

    return data, stderr, proc.returncode

实际操作:

In [2]: exec_command(['sleep', '10'], 5)
Out[2]: ('', '', -9)

In [3]: exec_command(['sleep', '10'], 11)
Out[3]: ('', '', 0)

Here is my solution, I was using Thread and Event:

import subprocess
from threading import Thread, Event

def kill_on_timeout(done, timeout, proc):
    if not done.wait(timeout):
        proc.kill()

def exec_command(command, timeout):

    done = Event()
    proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    watcher = Thread(target=kill_on_timeout, args=(done, timeout, proc))
    watcher.daemon = True
    watcher.start()

    data, stderr = proc.communicate()
    done.set()

    return data, stderr, proc.returncode

In action:

In [2]: exec_command(['sleep', '10'], 5)
Out[2]: ('', '', -9)

In [3]: exec_command(['sleep', '10'], 11)
Out[3]: ('', '', 0)
迟月 2024-08-04 00:06:05

我使用的解决方案是在 shell 命令前加上 timelimit 前缀。 如果命令花费的时间太长,timelimit 将停止它,并且 Popen 将有一个由 timelimit 设置的返回码。 如果是> 128,表示timelimit杀死了进程。

另请参阅具有超时和大输出(> 64K)的python子进程

The solution I use is to prefix the shell command with timelimit. If the comand takes too long, timelimit will stop it and Popen will have a returncode set by timelimit. If it is > 128, it means timelimit killed the process.

See also python subprocess with timeout and large output (>64K)

噩梦成真你也成魔 2024-08-04 00:06:05

如果您使用的是 python 2,请尝试一下

import subprocess32

try:
    output = subprocess32.check_output(command, shell=True, timeout=3)
except subprocess32.TimeoutExpired as e:
    print e

if you are using python 2, give it a try

import subprocess32

try:
    output = subprocess32.check_output(command, shell=True, timeout=3)
except subprocess32.TimeoutExpired as e:
    print e
溺渁∝ 2024-08-04 00:06:05

我已经实现了从其中一些中可以收集到的内容。 这适用于 Windows,并且由于这是一个社区 wiki,我想我也会分享我的代码:

class Command(threading.Thread):
    def __init__(self, cmd, outFile, errFile, timeout):
        threading.Thread.__init__(self)
        self.cmd = cmd
        self.process = None
        self.outFile = outFile
        self.errFile = errFile
        self.timed_out = False
        self.timeout = timeout

    def run(self):
        self.process = subprocess.Popen(self.cmd, stdout = self.outFile, \
            stderr = self.errFile)

        while (self.process.poll() is None and self.timeout > 0):
            time.sleep(1)
            self.timeout -= 1

        if not self.timeout > 0:
            self.process.terminate()
            self.timed_out = True
        else:
            self.timed_out = False

然后从另一个类或文件:

        outFile =  tempfile.SpooledTemporaryFile()
        errFile =   tempfile.SpooledTemporaryFile()

        executor = command.Command(c, outFile, errFile, timeout)
        executor.daemon = True
        executor.start()

        executor.join()
        if executor.timed_out:
            out = 'timed out'
        else:
            outFile.seek(0)
            errFile.seek(0)
            out = outFile.read()
            err = errFile.read()

        outFile.close()
        errFile.close()

I've implemented what I could gather from a few of these. This works in Windows, and since this is a community wiki, I figure I would share my code as well:

class Command(threading.Thread):
    def __init__(self, cmd, outFile, errFile, timeout):
        threading.Thread.__init__(self)
        self.cmd = cmd
        self.process = None
        self.outFile = outFile
        self.errFile = errFile
        self.timed_out = False
        self.timeout = timeout

    def run(self):
        self.process = subprocess.Popen(self.cmd, stdout = self.outFile, \
            stderr = self.errFile)

        while (self.process.poll() is None and self.timeout > 0):
            time.sleep(1)
            self.timeout -= 1

        if not self.timeout > 0:
            self.process.terminate()
            self.timed_out = True
        else:
            self.timed_out = False

Then from another class or file:

        outFile =  tempfile.SpooledTemporaryFile()
        errFile =   tempfile.SpooledTemporaryFile()

        executor = command.Command(c, outFile, errFile, timeout)
        executor.daemon = True
        executor.start()

        executor.join()
        if executor.timed_out:
            out = 'timed out'
        else:
            outFile.seek(0)
            errFile.seek(0)
            out = outFile.read()
            err = errFile.read()

        outFile.close()
        errFile.close()
画离情绘悲伤 2024-08-04 00:06:05

一旦您了解了 *unix 中的完整进程运行机制,您将很容易找到更简单的解决方案:

考虑这个简单的示例,如何使用 select.select() 来实现可超时的 communications() 方法(现在 *nix 上几乎到处都可用)。 这也可以用 epoll/poll/kqueue 编写,但 select.select() 变体可能是一个很好的例子。 select.select() 的主要限制(速度和最大 1024 个 fd)不适用于您的任务。

这在 *nix 下工作,不创建线程,不使用信号,可以从任何线程(不仅仅是主线程)启动,并且足够快以从我的机器(i5 2.3ghz)上的 stdout 读取 250mb/s 的数据。

在通信结束时加入 stdout/stderr 时出现问题。 如果您有大量的程序输出,这可能会导致大量的内存使用。 但是您可以以较小的超时时间多次调用communicate()。

class Popen(subprocess.Popen):
    def communicate(self, input=None, timeout=None):
        if timeout is None:
            return subprocess.Popen.communicate(self, input)

        if self.stdin:
            # Flush stdio buffer, this might block if user
            # has been writing to .stdin in an uncontrolled
            # fashion.
            self.stdin.flush()
            if not input:
                self.stdin.close()

        read_set, write_set = [], []
        stdout = stderr = None

        if self.stdin and input:
            write_set.append(self.stdin)
        if self.stdout:
            read_set.append(self.stdout)
            stdout = []
        if self.stderr:
            read_set.append(self.stderr)
            stderr = []

        input_offset = 0
        deadline = time.time() + timeout

        while read_set or write_set:
            try:
                rlist, wlist, xlist = select.select(read_set, write_set, [], max(0, deadline - time.time()))
            except select.error as ex:
                if ex.args[0] == errno.EINTR:
                    continue
                raise

            if not (rlist or wlist):
                # Just break if timeout
                # Since we do not close stdout/stderr/stdin, we can call
                # communicate() several times reading data by smaller pieces.
                break

            if self.stdin in wlist:
                chunk = input[input_offset:input_offset + subprocess._PIPE_BUF]
                try:
                    bytes_written = os.write(self.stdin.fileno(), chunk)
                except OSError as ex:
                    if ex.errno == errno.EPIPE:
                        self.stdin.close()
                        write_set.remove(self.stdin)
                    else:
                        raise
                else:
                    input_offset += bytes_written
                    if input_offset >= len(input):
                        self.stdin.close()
                        write_set.remove(self.stdin)

            # Read stdout / stderr by 1024 bytes
            for fn, tgt in (
                (self.stdout, stdout),
                (self.stderr, stderr),
            ):
                if fn in rlist:
                    data = os.read(fn.fileno(), 1024)
                    if data == '':
                        fn.close()
                        read_set.remove(fn)
                    tgt.append(data)

        if stdout is not None:
            stdout = ''.join(stdout)
        if stderr is not None:
            stderr = ''.join(stderr)

        return (stdout, stderr)

Once you understand full process running machinery in *unix, you will easily find simplier solution:

Consider this simple example how to make timeoutable communicate() meth using select.select() (available alsmost everythere on *nix nowadays). This also can be written with epoll/poll/kqueue, but select.select() variant could be a good example for you. And major limitations of select.select() (speed and 1024 max fds) are not applicapable for your task.

This works under *nix, does not create threads, does not uses signals, can be lauched from any thread (not only main), and fast enought to read 250mb/s of data from stdout on my machine (i5 2.3ghz).

There is a problem in join'ing stdout/stderr at the end of communicate. If you have huge program output this could lead to big memory usage. But you can call communicate() several times with smaller timeouts.

class Popen(subprocess.Popen):
    def communicate(self, input=None, timeout=None):
        if timeout is None:
            return subprocess.Popen.communicate(self, input)

        if self.stdin:
            # Flush stdio buffer, this might block if user
            # has been writing to .stdin in an uncontrolled
            # fashion.
            self.stdin.flush()
            if not input:
                self.stdin.close()

        read_set, write_set = [], []
        stdout = stderr = None

        if self.stdin and input:
            write_set.append(self.stdin)
        if self.stdout:
            read_set.append(self.stdout)
            stdout = []
        if self.stderr:
            read_set.append(self.stderr)
            stderr = []

        input_offset = 0
        deadline = time.time() + timeout

        while read_set or write_set:
            try:
                rlist, wlist, xlist = select.select(read_set, write_set, [], max(0, deadline - time.time()))
            except select.error as ex:
                if ex.args[0] == errno.EINTR:
                    continue
                raise

            if not (rlist or wlist):
                # Just break if timeout
                # Since we do not close stdout/stderr/stdin, we can call
                # communicate() several times reading data by smaller pieces.
                break

            if self.stdin in wlist:
                chunk = input[input_offset:input_offset + subprocess._PIPE_BUF]
                try:
                    bytes_written = os.write(self.stdin.fileno(), chunk)
                except OSError as ex:
                    if ex.errno == errno.EPIPE:
                        self.stdin.close()
                        write_set.remove(self.stdin)
                    else:
                        raise
                else:
                    input_offset += bytes_written
                    if input_offset >= len(input):
                        self.stdin.close()
                        write_set.remove(self.stdin)

            # Read stdout / stderr by 1024 bytes
            for fn, tgt in (
                (self.stdout, stdout),
                (self.stderr, stderr),
            ):
                if fn in rlist:
                    data = os.read(fn.fileno(), 1024)
                    if data == '':
                        fn.close()
                        read_set.remove(fn)
                    tgt.append(data)

        if stdout is not None:
            stdout = ''.join(stdout)
        if stderr is not None:
            stderr = ''.join(stderr)

        return (stdout, stderr)
倾其所爱 2024-08-04 00:06:05

您可以使用 select 来完成此操作

import subprocess
from datetime import datetime
from select import select

def call_with_timeout(cmd, timeout):
    started = datetime.now()
    sp = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    while True:
        p = select([sp.stdout], [], [], timeout)
        if p[0]:
            p[0][0].read()
        ret = sp.poll()
        if ret is not None:
            return ret
        if (datetime.now()-started).total_seconds() > timeout:
            sp.kill()
            return None

You can do this using select

import subprocess
from datetime import datetime
from select import select

def call_with_timeout(cmd, timeout):
    started = datetime.now()
    sp = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    while True:
        p = select([sp.stdout], [], [], timeout)
        if p[0]:
            p[0][0].read()
        ret = sp.poll()
        if ret is not None:
            return ret
        if (datetime.now()-started).total_seconds() > timeout:
            sp.kill()
            return None
Saygoodbye 2024-08-04 00:06:05

蟒蛇2.7

import time
import subprocess

def run_command(cmd, timeout=0):
    start_time = time.time()
    df = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    while timeout and df.poll() == None:
        if time.time()-start_time >= timeout:
            df.kill()
            return -1, ""
    output = '\n'.join(df.communicate()).strip()
    return df.returncode, output

python 2.7

import time
import subprocess

def run_command(cmd, timeout=0):
    start_time = time.time()
    df = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    while timeout and df.poll() == None:
        if time.time()-start_time >= timeout:
            df.kill()
            return -1, ""
    output = '\n'.join(df.communicate()).strip()
    return df.returncode, output
慈悲佛祖 2024-08-04 00:06:05

在 Python 3.7.8 中测试超时后捕获的输出示例:

try:
    return subprocess.run(command, shell=True, capture_output=True, timeout=20, cwd=cwd, universal_newlines=True)
except subprocess.TimeoutExpired as e:
    print(e.output.decode(encoding="utf-8", errors="ignore"))
    assert False;

异常 subprocess.TimeoutExpired 具有输出和其他成员:

cmd - 用于生成子进程的命令。

超时 - 超时(以秒为单位)。

output - 子进程的输出(如果它是由 run() 捕获的)或
检查输出()。 否则,无。

stdout - 输出的别名,与 stderr 对称。

stderr - 子进程的 Stderr 输出(如果它被捕获)
跑步()。 否则,无。

更多信息: https://docs.python.org/3/library /subprocess.html#subprocess.TimeoutExpired

Example of captured output after timeout tested in Python 3.7.8:

try:
    return subprocess.run(command, shell=True, capture_output=True, timeout=20, cwd=cwd, universal_newlines=True)
except subprocess.TimeoutExpired as e:
    print(e.output.decode(encoding="utf-8", errors="ignore"))
    assert False;

The exception subprocess.TimeoutExpired has the output and other members:

cmd - Command that was used to spawn the child process.

timeout - Timeout in seconds.

output - Output of the child process if it was captured by run() or
check_output(). Otherwise, None.

stdout - Alias for output, for symmetry with stderr.

stderr - Stderr output of the child process if it was captured by
run(). Otherwise, None.

More info: https://docs.python.org/3/library/subprocess.html#subprocess.TimeoutExpired

謸气贵蔟 2024-08-04 00:06:05

迟到的答案,仅适用于 Linux,但如果有人想要使用 subprocess.getstatusoutput(),其中超时参数不可用,您可以使用 内置Linux超时位于命令开头,即:

import subprocess

timeout = 25 # seconds
cmd = f"timeout --preserve-status --foreground {timeout} ping duckgo.com"
exit_c, out = subprocess.getstatusoutput(cmd)

if (exit_c == 0):
    print("success")
else:
    print("Error: ", out)

timeout参数:

Late answer and for Linux only, but in case someone wants to use subprocess.getstatusoutput(), where the timeout argument isn't available, you can use the built-in Linux timeout on the beginning of the command, i.e.:

import subprocess

timeout = 25 # seconds
cmd = f"timeout --preserve-status --foreground {timeout} ping duckgo.com"
exit_c, out = subprocess.getstatusoutput(cmd)

if (exit_c == 0):
    print("success")
else:
    print("Error: ", out)

timeout Arguments:

私藏温柔 2024-08-04 00:06:05

我已经在 Windows、Linux 和 Mac 上成功使用了 killableprocess。 如果您使用 Cygwin Python,则需要 OSAF 版本的killableprocess< /a> 因为否则本机 Windows 进程不会被终止。

I've used killableprocess successfully on Windows, Linux and Mac. If you are using Cygwin Python, you'll need OSAF's version of killableprocess because otherwise native Windows processes won't get killed.

牵你的手,一向走下去 2024-08-04 00:06:05

虽然我没有广泛地研究过它,但我发现了这个 装饰器 at ActiveState 似乎对于这类事情非常有用。 与 subprocess.Popen(..., close_fds=True) 一起,至少我已经准备好使用 Python 编写 shell 脚本了。

Although I haven't looked at it extensively, this decorator I found at ActiveState seems to be quite useful for this sort of thing. Along with subprocess.Popen(..., close_fds=True), at least I'm ready for shell-scripting in Python.

爱要勇敢去追 2024-08-04 00:06:05

该解决方案在 shell=True 的情况下杀死进程树,将参数传递给进程(或不传递参数),有超时并获取回调的 stdout、stderr 和进程输出(它使用 psutil 作为kill_proc_tree)。 这是基于 SO 中发布的几个解决方案,包括 jcollado 的解决方案。 发布回应 Anson 和 jradice 在 jcollado 的回答中的评论。 在 Windows Srvr 2012 和 Ubuntu 14.04 中测试。 请注意,对于 Ubuntu,您需要将parent.children(...) 调用更改为parent.get_children(...)。

def kill_proc_tree(pid, including_parent=True):
  parent = psutil.Process(pid)
  children = parent.children(recursive=True)
  for child in children:
    child.kill()
  psutil.wait_procs(children, timeout=5)
  if including_parent:
    parent.kill()
    parent.wait(5)

def run_with_timeout(cmd, current_dir, cmd_parms, timeout):
  def target():
    process = subprocess.Popen(cmd, cwd=current_dir, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)

    # wait for the process to terminate
    if (cmd_parms == ""):
      out, err = process.communicate()
    else:
      out, err = process.communicate(cmd_parms)
    errcode = process.returncode

  thread = Thread(target=target)
  thread.start()

  thread.join(timeout)
  if thread.is_alive():
    me = os.getpid()
    kill_proc_tree(me, including_parent=False)
    thread.join()

This solution kills the process tree in case of shell=True, passes parameters to the process (or not), has a timeout and gets the stdout, stderr and process output of the call back (it uses psutil for the kill_proc_tree). This was based on several solutions posted in SO including jcollado's. Posting in response to comments by Anson and jradice in jcollado's answer. Tested in Windows Srvr 2012 and Ubuntu 14.04. Please note that for Ubuntu you need to change the parent.children(...) call to parent.get_children(...).

def kill_proc_tree(pid, including_parent=True):
  parent = psutil.Process(pid)
  children = parent.children(recursive=True)
  for child in children:
    child.kill()
  psutil.wait_procs(children, timeout=5)
  if including_parent:
    parent.kill()
    parent.wait(5)

def run_with_timeout(cmd, current_dir, cmd_parms, timeout):
  def target():
    process = subprocess.Popen(cmd, cwd=current_dir, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)

    # wait for the process to terminate
    if (cmd_parms == ""):
      out, err = process.communicate()
    else:
      out, err = process.communicate(cmd_parms)
    errcode = process.returncode

  thread = Thread(target=target)
  thread.start()

  thread.join(timeout)
  if thread.is_alive():
    me = os.getpid()
    kill_proc_tree(me, including_parent=False)
    thread.join()
酒解孤独 2024-08-04 00:06:05

有一个想法是对 Popen 类进行子类化并使用一些简单的方法装饰器对其进行扩展。 我们称之为 ExpirablePopen。

from logging import error
from subprocess import Popen
from threading import Event
from threading import Thread


class ExpirablePopen(Popen):

    def __init__(self, *args, **kwargs):
        self.timeout = kwargs.pop('timeout', 0)
        self.timer = None
        self.done = Event()

        Popen.__init__(self, *args, **kwargs)

    def __tkill(self):
        timeout = self.timeout
        if not self.done.wait(timeout):
            error('Terminating process {} by timeout of {} secs.'.format(self.pid, timeout))
            self.kill()

    def expirable(func):
        def wrapper(self, *args, **kwargs):
            # zero timeout means call of parent method
            if self.timeout == 0:
                return func(self, *args, **kwargs)

            # if timer is None, need to start it
            if self.timer is None:
                self.timer = thr = Thread(target=self.__tkill)
                thr.daemon = True
                thr.start()

            result = func(self, *args, **kwargs)
            self.done.set()

            return result
        return wrapper

    wait = expirable(Popen.wait)
    communicate = expirable(Popen.communicate)


if __name__ == '__main__':
    from subprocess import PIPE

    print ExpirablePopen('ssh -T [email protected]', stdout=PIPE, timeout=1).communicate()

There's an idea to subclass the Popen class and extend it with some simple method decorators. Let's call it ExpirablePopen.

from logging import error
from subprocess import Popen
from threading import Event
from threading import Thread


class ExpirablePopen(Popen):

    def __init__(self, *args, **kwargs):
        self.timeout = kwargs.pop('timeout', 0)
        self.timer = None
        self.done = Event()

        Popen.__init__(self, *args, **kwargs)

    def __tkill(self):
        timeout = self.timeout
        if not self.done.wait(timeout):
            error('Terminating process {} by timeout of {} secs.'.format(self.pid, timeout))
            self.kill()

    def expirable(func):
        def wrapper(self, *args, **kwargs):
            # zero timeout means call of parent method
            if self.timeout == 0:
                return func(self, *args, **kwargs)

            # if timer is None, need to start it
            if self.timer is None:
                self.timer = thr = Thread(target=self.__tkill)
                thr.daemon = True
                thr.start()

            result = func(self, *args, **kwargs)
            self.done.set()

            return result
        return wrapper

    wait = expirable(Popen.wait)
    communicate = expirable(Popen.communicate)


if __name__ == '__main__':
    from subprocess import PIPE

    print ExpirablePopen('ssh -T [email protected]', stdout=PIPE, timeout=1).communicate()
往事风中埋 2024-08-04 00:06:05

我遇到的问题是,如果多线程子进程花费的时间超过给定的超时长度,我想终止它。 我想在Popen()中设置超时,但没有成功。 然后,我意识到 Popen().wait() 等于 call(),因此我想到在 .wait( timeout=xxx) 方法,终于成功了。 因此,我这样解决了:

import os
import sys
import signal
import subprocess
from multiprocessing import Pool

cores_for_parallelization = 4
timeout_time = 15  # seconds

def main():
    jobs = [...YOUR_JOB_LIST...]
    with Pool(cores_for_parallelization) as p:
        p.map(run_parallel_jobs, jobs)

def run_parallel_jobs(args):
    # Define the arguments including the paths
    initial_terminal_command = 'C:\\Python34\\python.exe'  # Python executable
    function_to_start = 'C:\\temp\\xyz.py'  # The multithreading script
    final_list = [initial_terminal_command, function_to_start]
    final_list.extend(args)

    # Start the subprocess and determine the process PID
    subp = subprocess.Popen(final_list)  # starts the process
    pid = subp.pid

    # Wait until the return code returns from the function by considering the timeout. 
    # If not, terminate the process.
    try:
        returncode = subp.wait(timeout=timeout_time)  # should be zero if accomplished
    except subprocess.TimeoutExpired:
        # Distinguish between Linux and Windows and terminate the process if 
        # the timeout has been expired
        if sys.platform == 'linux2':
            os.kill(pid, signal.SIGTERM)
        elif sys.platform == 'win32':
            subp.terminate()

if __name__ == '__main__':
    main()

I had the problem that I wanted to terminate a multithreading subprocess if it took longer than a given timeout length. I wanted to set a timeout in Popen(), but it did not work. Then, I realized that Popen().wait() is equal to call() and so I had the idea to set a timeout within the .wait(timeout=xxx) method, which finally worked. Thus, I solved it this way:

import os
import sys
import signal
import subprocess
from multiprocessing import Pool

cores_for_parallelization = 4
timeout_time = 15  # seconds

def main():
    jobs = [...YOUR_JOB_LIST...]
    with Pool(cores_for_parallelization) as p:
        p.map(run_parallel_jobs, jobs)

def run_parallel_jobs(args):
    # Define the arguments including the paths
    initial_terminal_command = 'C:\\Python34\\python.exe'  # Python executable
    function_to_start = 'C:\\temp\\xyz.py'  # The multithreading script
    final_list = [initial_terminal_command, function_to_start]
    final_list.extend(args)

    # Start the subprocess and determine the process PID
    subp = subprocess.Popen(final_list)  # starts the process
    pid = subp.pid

    # Wait until the return code returns from the function by considering the timeout. 
    # If not, terminate the process.
    try:
        returncode = subp.wait(timeout=timeout_time)  # should be zero if accomplished
    except subprocess.TimeoutExpired:
        # Distinguish between Linux and Windows and terminate the process if 
        # the timeout has been expired
        if sys.platform == 'linux2':
            os.kill(pid, signal.SIGTERM)
        elif sys.platform == 'win32':
            subp.terminate()

if __name__ == '__main__':
    main()
风轻花落早 2024-08-04 00:06:05

子进程 Popen.communicate 现在有一个 timeout 选项:

如果进程在超时秒后仍未终止,则会引发 TimeoutExpired 异常。 捕获此异常并重试通信不会丢失任何输出。
如果超时到期,子进程不会被终止,因此为了正确清理,行为良好的应用程序应该终止子进程并完成通信

proc = subprocess.Popen(...)
try:
    outs, errs = proc.communicate(timeout=15)
except TimeoutExpired:
    proc.kill()
    outs, errs = proc.communicate()

您可以查看 文档

Subprocess Popen.communicate now has a timeout option:

If the process does not terminate after timeout seconds, a TimeoutExpired exception will be raised. Catching this exception and retrying communication will not lose any output.
The child process is not killed if the timeout expires, so in order to cleanup properly a well-behaved application should kill the child process and finish communication

proc = subprocess.Popen(...)
try:
    outs, errs = proc.communicate(timeout=15)
except TimeoutExpired:
    proc.kill()
    outs, errs = proc.communicate()

You can take a look at the docs.

情深缘浅 2024-08-04 00:06:05

不幸的是,我的雇主对源代码的披露有非常严格的政策约束,所以我无法提供实际的代码。 但就我的口味而言,最好的解决方案是创建一个重写 Popen.wait() 的子类来轮询而不是无限期等待,并创建 Popen.__init__ 来接受超时参数。 完成此操作后,所有其他 Popen 方法(调用 wait)将按预期工作,包括 communicate

Unfortunately, I'm bound by very strict policies on the disclosure of source code by my employer, so I can't provide actual code. But for my taste the best solution is to create a subclass overriding Popen.wait() to poll instead of wait indefinitely, and Popen.__init__ to accept a timeout parameter. Once you do that, all the other Popen methods (which call wait) will work as expected, including communicate.

仅冇旳回忆 2024-08-04 00:06:05

https://pypi.python.org/pypi/python-subprocess2 提供了对subprocess 模块允许您等待一段时间,否则终止。

因此,最多等待 10 秒让进程终止,否则杀死:

pipe  = subprocess.Popen('...')

timeout =  10

results = pipe.waitOrTerminate(timeout)

这与 windows 和 unix 兼容。 “results”是一个字典,它包含“returnCode”,它是应用程序的返回(如果必须被杀死,则为“None”),以及“actionTaken”。 如果进程正常完成,则为“SUBPROCESS2_PROCESS_COMPLETED”,或者根据所采取的操作,为“SUBPROCESS2_PROCESS_TERMINATED”和SUBPROCESS2_PROCESS_KILLED的掩码(有关完整详细信息,请参阅文档)

https://pypi.python.org/pypi/python-subprocess2 provides extensions to the subprocess module which allow you to wait up to a certain period of time, otherwise terminate.

So, to wait up to 10 seconds for the process to terminate, otherwise kill:

pipe  = subprocess.Popen('...')

timeout =  10

results = pipe.waitOrTerminate(timeout)

This is compatible with both windows and unix. "results" is a dictionary, it contains "returnCode" which is the return of the app (or None if it had to be killed), as well as "actionTaken". which will be "SUBPROCESS2_PROCESS_COMPLETED" if the process completed normally, or a mask of "SUBPROCESS2_PROCESS_TERMINATED" and SUBPROCESS2_PROCESS_KILLED depending on action taken (see documentation for full details)

梦途 2024-08-04 00:06:05

对于 python 2.6+,使用 gevent

 from gevent.subprocess import Popen, PIPE, STDOUT

 def call_sys(cmd, timeout):
      p= Popen(cmd, shell=True, stdout=PIPE)
      output, _ = p.communicate(timeout=timeout)
      assert p.returncode == 0, p. returncode
      return output

 call_sys('./t.sh', 2)

 # t.sh example
 sleep 5
 echo done
 exit 1

for python 2.6+, use gevent

 from gevent.subprocess import Popen, PIPE, STDOUT

 def call_sys(cmd, timeout):
      p= Popen(cmd, shell=True, stdout=PIPE)
      output, _ = p.communicate(timeout=timeout)
      assert p.returncode == 0, p. returncode
      return output

 call_sys('./t.sh', 2)

 # t.sh example
 sleep 5
 echo done
 exit 1
失去的东西太少 2024-08-04 00:06:04

在 Python 3.3+ 中:

from subprocess import STDOUT, check_output

output = check_output(cmd, stderr=STDOUT, timeout=seconds)

output 是一个字节字符串,其中包含命令的合并 stdout、stderr 数据。

check_output 引发 CalledProcessError 与问题文本中指定的非零退出状态不同,与 proc.communicate() 方法不同。

我删除了 shell=True 因为它经常被不必要地使用。 如果 cmd 确实需要它,您可以随时将其添加回来。 如果添加 shell=True 即,如果子进程生成自己的后代; check_output() 的返回时间可能比超时指示的时间晚得多,请参阅子进程超时失败

Python 2.x 上通过 subprocess32 提供超时功能3.2+ 子进程模块的向后移植。

In Python 3.3+:

from subprocess import STDOUT, check_output

output = check_output(cmd, stderr=STDOUT, timeout=seconds)

output is a byte string that contains command's merged stdout, stderr data.

check_output raises CalledProcessError on non-zero exit status as specified in the question's text unlike proc.communicate() method.

I've removed shell=True because it is often used unnecessarily. You can always add it back if cmd indeed requires it. If you add shell=True i.e., if the child process spawns its own descendants; check_output() can return much later than the timeout indicates, see Subprocess timeout failure.

The timeout feature is available on Python 2.x via the subprocess32 backport of the 3.2+ subprocess module.

小嗲 2024-08-04 00:06:04

我对底层细节了解不多; 但是,考虑到在
python 2.6 API 提供了等待线程和
终止进程,如何在单独的进程中运行该进程
线?

import subprocess, threading

class Command(object):
    def __init__(self, cmd):
        self.cmd = cmd
        self.process = None

    def run(self, timeout):
        def target():
            print 'Thread started'
            self.process = subprocess.Popen(self.cmd, shell=True)
            self.process.communicate()
            print 'Thread finished'

        thread = threading.Thread(target=target)
        thread.start()

        thread.join(timeout)
        if thread.is_alive():
            print 'Terminating process'
            self.process.terminate()
            thread.join()
        print self.process.returncode

command = Command("echo 'Process started'; sleep 2; echo 'Process finished'")
command.run(timeout=3)
command.run(timeout=1)

这段代码在我的机器上的输出是:

Thread started
Process started
Process finished
Thread finished
0
Thread started
Process started
Terminating process
Thread finished
-15

可以看出,在第一次执行时,进程
正确完成(返回代码 0),而在第二个中
进程被终止(返回代码-15)。

我没有在windows下测试过; 但是,除了更新示例之外
命令,我认为它应该有效,因为我没有在
记录任何表示 thread.join 或 process.terminate 的内容
不支持。

I don't know much about the low level details; but, given that in
python 2.6 the API offers the ability to wait for threads and
terminate processes, what about running the process in a separate
thread?

import subprocess, threading

class Command(object):
    def __init__(self, cmd):
        self.cmd = cmd
        self.process = None

    def run(self, timeout):
        def target():
            print 'Thread started'
            self.process = subprocess.Popen(self.cmd, shell=True)
            self.process.communicate()
            print 'Thread finished'

        thread = threading.Thread(target=target)
        thread.start()

        thread.join(timeout)
        if thread.is_alive():
            print 'Terminating process'
            self.process.terminate()
            thread.join()
        print self.process.returncode

command = Command("echo 'Process started'; sleep 2; echo 'Process finished'")
command.run(timeout=3)
command.run(timeout=1)

The output of this snippet in my machine is:

Thread started
Process started
Process finished
Thread finished
0
Thread started
Process started
Terminating process
Thread finished
-15

where it can be seen that, in the first execution, the process
finished correctly (return code 0), while the in the second one the
process was terminated (return code -15).

I haven't tested in windows; but, aside from updating the example
command, I think it should work since I haven't found in the
documentation anything that says that thread.join or process.terminate
is not supported.

灼痛 2024-08-04 00:06:04

jcollado的答案可以使用 threading.Timer 类进行简化:

import shlex
from subprocess import Popen, PIPE
from threading import Timer

def run(cmd, timeout_sec):
    proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
    timer = Timer(timeout_sec, proc.kill)
    try:
        timer.start()
        stdout, stderr = proc.communicate()
    finally:
        timer.cancel()

# Examples: both take 1 second
run("sleep 1", 5)  # process ends normally at 1 second
run("sleep 5", 1)  # timeout happens at 1 second

jcollado's answer can be simplified using the threading.Timer class:

import shlex
from subprocess import Popen, PIPE
from threading import Timer

def run(cmd, timeout_sec):
    proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
    timer = Timer(timeout_sec, proc.kill)
    try:
        timer.start()
        stdout, stderr = proc.communicate()
    finally:
        timer.cancel()

# Examples: both take 1 second
run("sleep 1", 5)  # process ends normally at 1 second
run("sleep 5", 1)  # timeout happens at 1 second
氛圍 2024-08-04 00:06:04

如果你使用的是 Unix,

import signal
  ...
class Alarm(Exception):
    pass

def alarm_handler(signum, frame):
    raise Alarm

signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(5*60)  # 5 minutes
try:
    stdoutdata, stderrdata = proc.communicate()
    signal.alarm(0)  # reset the alarm
except Alarm:
    print "Oops, taking too long!"
    # whatever else

If you're on Unix,

import signal
  ...
class Alarm(Exception):
    pass

def alarm_handler(signum, frame):
    raise Alarm

signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(5*60)  # 5 minutes
try:
    stdoutdata, stderrdata = proc.communicate()
    signal.alarm(0)  # reset the alarm
except Alarm:
    print "Oops, taking too long!"
    # whatever else
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文