Python子进程超时?

发布于 2024-09-24 06:57:07 字数 136 浏览 10 评论 0原文

是否有任何参数或选项可以为 Python 的 subprocess.Popen 方法设置超时?

像这样的东西:

subprocess.Popen(['..'], ..., timeout=20)

Is there any argument or options to setup a timeout for Python's subprocess.Popen method?

Something like this:

subprocess.Popen(['..'], ..., timeout=20) ?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(10

凡尘雨 2024-10-01 06:57:07

我建议查看 Timer threading 模块中的 class。我用它来实现 Popen 的超时。

首先,创建一个回调:

def timeout( p ):
    if p.poll() is None:
        print 'Error: process taking too long to complete--terminating'
        p.kill()

然后打开进程:

proc = Popen( ... )

然后创建一个将调用回调的计时器,并将进程传递给它。

t = threading.Timer( 10.0, timeout, [proc] )
t.start()
t.join()

在程序稍后的某个位置,您可能需要添加以下行:

t.cancel()

否则,python 程序将继续运行,直到计时器完成运行。

编辑:我被告知存在竞争条件,子进程 p可能在p.poll()p之间终止.kill() 调用。我相信下面的代码可以解决这个问题:

import errno

def timeout( p ):
    if p.poll() is None:
        try:
            p.kill()
            print 'Error: process taking too long to complete--terminating'
        except OSError as e:
            if e.errno != errno.ESRCH:
                raise

尽管您可能想要清理异常处理以专门处理子进程已经正常终止时发生的特定异常。

I would advise taking a look at the Timer class in the threading module. I used it to implement a timeout for a Popen.

First, create a callback:

def timeout( p ):
    if p.poll() is None:
        print 'Error: process taking too long to complete--terminating'
        p.kill()

Then open the process:

proc = Popen( ... )

Then create a timer that will call the callback, passing the process to it.

t = threading.Timer( 10.0, timeout, [proc] )
t.start()
t.join()

Somewhere later in the program, you may want to add the line:

t.cancel()

Otherwise, the python program will keep running until the timer has finished running.

EDIT: I was advised that there is a race condition that the subprocess p may terminate between the p.poll() and p.kill() calls. I believe the following code can fix that:

import errno

def timeout( p ):
    if p.poll() is None:
        try:
            p.kill()
            print 'Error: process taking too long to complete--terminating'
        except OSError as e:
            if e.errno != errno.ESRCH:
                raise

Though you may want to clean the exception handling to specifically handle just the particular exception that occurs when the subprocess has already terminated normally.

向日葵 2024-10-01 06:57:07

subprocess.Popen 不会阻塞,因此您可以执行以下操作:

import time

p = subprocess.Popen(['...'])
time.sleep(20)
if p.poll() is None:
  p.kill()
  print 'timed out'
else:
  print p.communicate()

它有一个缺点,您必须始终等待至少 20 秒才能完成。

subprocess.Popen doesn't block so you can do something like this:

import time

p = subprocess.Popen(['...'])
time.sleep(20)
if p.poll() is None:
  p.kill()
  print 'timed out'
else:
  print p.communicate()

It has a drawback in that you must always wait at least 20 seconds for it to finish.

浮世清欢 2024-10-01 06:57:07
import subprocess, threading

class Command(object):
    def __init__(self, cmd):
        self.cmd = cmd
        self.process = None

    def run(self, timeout):
        def target():
            print 'Thread started'
            self.process = subprocess.Popen(self.cmd, shell=True)
            self.process.communicate()
            print 'Thread finished'

        thread = threading.Thread(target=target)
        thread.start()

        thread.join(timeout)
        if thread.is_alive():
            print 'Terminating process'
            self.process.terminate()
            thread.join()
        print self.process.returncode

command = Command("echo 'Process started'; sleep 2; echo 'Process finished'")
command.run(timeout=3)
command.run(timeout=1)

其输出应该是:

Thread started
Process started
Process finished
Thread finished
0
Thread started
Process started
Terminating process
Thread finished
-15

可以看出,在第一次执行中,进程正确完成(返回代码 0),而在第二次执行中,进程被终止(返回代码 -15)。

我没有在windows下测试过;但是,除了更新示例命令之外,我认为它应该可以工作,因为我在文档中没有找到任何表明不支持 thread.join 或 process.terminate 的内容。

import subprocess, threading

class Command(object):
    def __init__(self, cmd):
        self.cmd = cmd
        self.process = None

    def run(self, timeout):
        def target():
            print 'Thread started'
            self.process = subprocess.Popen(self.cmd, shell=True)
            self.process.communicate()
            print 'Thread finished'

        thread = threading.Thread(target=target)
        thread.start()

        thread.join(timeout)
        if thread.is_alive():
            print 'Terminating process'
            self.process.terminate()
            thread.join()
        print self.process.returncode

command = Command("echo 'Process started'; sleep 2; echo 'Process finished'")
command.run(timeout=3)
command.run(timeout=1)

The output of this should be:

Thread started
Process started
Process finished
Thread finished
0
Thread started
Process started
Terminating process
Thread finished
-15

where it can be seen that, in the first execution, the process finished correctly (return code 0), while the in the second one the process was terminated (return code -15).

I haven't tested in windows; but, aside from updating the example command, I think it should work since I haven't found in the documentation anything that says that thread.join or process.terminate is not supported.

傲性难收 2024-10-01 06:57:07

您可以

from twisted.internet import reactor, protocol, error, defer

class DyingProcessProtocol(protocol.ProcessProtocol):
    def __init__(self, timeout):
        self.timeout = timeout

    def connectionMade(self):
        @defer.inlineCallbacks
        def killIfAlive():
            try:
                yield self.transport.signalProcess('KILL')
            except error.ProcessExitedAlready:
                pass

        d = reactor.callLater(self.timeout, killIfAlive)

reactor.spawnProcess(DyingProcessProtocol(20), ...)

使用 Twisted 的异步流程 API。

You could do

from twisted.internet import reactor, protocol, error, defer

class DyingProcessProtocol(protocol.ProcessProtocol):
    def __init__(self, timeout):
        self.timeout = timeout

    def connectionMade(self):
        @defer.inlineCallbacks
        def killIfAlive():
            try:
                yield self.transport.signalProcess('KILL')
            except error.ProcessExitedAlready:
                pass

        d = reactor.callLater(self.timeout, killIfAlive)

reactor.spawnProcess(DyingProcessProtocol(20), ...)

using Twisted's asynchronous process API.

甜宝宝 2024-10-01 06:57:07

没有内置 python 子进程自动超时功能,因此您必须构建自己的子进程。

这对我在运行 python 2.7.3 的 Ubuntu 12.10 上有效

。将其放入名为 test 的文件中。 py

#!/usr/bin/python
import subprocess
import threading

class RunMyCmd(threading.Thread):
    def __init__(self, cmd, timeout):
        threading.Thread.__init__(self)
        self.cmd = cmd 
        self.timeout = timeout

    def run(self):
        self.p = subprocess.Popen(self.cmd)
        self.p.wait()

    def run_the_process(self):
        self.start()
        self.join(self.timeout)

        if self.is_alive():
            self.p.terminate()   #if your process needs a kill -9 to make 
                                 #it go away, use self.p.kill() here instead.

            self.join()

RunMyCmd(["sleep", "20"], 3).run_the_process()

保存并运行:

python test.py

sleep 20 命令需要 20 秒才能完成。如果 3 秒内没有终止(不会),则进程将终止。

el@apollo:~$  python test.py 
el@apollo:~$ 

进程运行与终止之间有三秒钟的时间。

A python subprocess auto-timeout is not built in, so you're going to have to build your own.

This works for me on Ubuntu 12.10 running python 2.7.3

Put this in a file called test.py

#!/usr/bin/python
import subprocess
import threading

class RunMyCmd(threading.Thread):
    def __init__(self, cmd, timeout):
        threading.Thread.__init__(self)
        self.cmd = cmd 
        self.timeout = timeout

    def run(self):
        self.p = subprocess.Popen(self.cmd)
        self.p.wait()

    def run_the_process(self):
        self.start()
        self.join(self.timeout)

        if self.is_alive():
            self.p.terminate()   #if your process needs a kill -9 to make 
                                 #it go away, use self.p.kill() here instead.

            self.join()

RunMyCmd(["sleep", "20"], 3).run_the_process()

Save it, and run it:

python test.py

The sleep 20 command takes 20 seconds to complete. If it doesn't terminate in 3 seconds (it won't) then the process is terminated.

el@apollo:~$  python test.py 
el@apollo:~$ 

There is three seconds between when the process is run, and it is terminated.

握住你手 2024-10-01 06:57:07

从 Python 3.3 开始,子进程模块中的阻塞辅助函数还有一个 timeout 参数。

https://docs.python.org/3/library/subprocess.html

As of Python 3.3, there is also a timeout argument to the blocking helper functions in the subprocess module.

https://docs.python.org/3/library/subprocess.html

来日方长 2024-10-01 06:57:07

不幸的是,没有这样的解决方案。我设法使用线程计时器来做到这一点,该计时器将与超时后杀死它的进程一起启动,但由于僵尸进程或其他原因,我确实遇到了一些过时的文件描述符问题。

Unfortunately, there isn't such a solution. I managed to do this using a threaded timer that would launch along with the process that would kill it after the timeout but I did run into some stale file descriptor issues because of zombie processes or some such.

吹梦到西洲 2024-10-01 06:57:07

不,没有时间限制。我想,您正在寻找的是在一段时间后终止子进程。由于您能够向子进程发出信号,因此您也应该能够杀死它。

向子进程发送信号的通用方法:

proc = subprocess.Popen([command])
time.sleep(1)
print 'signaling child'
sys.stdout.flush()
os.kill(proc.pid, signal.SIGUSR1)

您可以使用此机制在超时期限后终止。

No there is no time out. I guess, what you are looking for is to kill the sub process after some time. Since you are able to signal the subprocess, you should be able to kill it too.

generic approach to sending a signal to subprocess:

proc = subprocess.Popen([command])
time.sleep(1)
print 'signaling child'
sys.stdout.flush()
os.kill(proc.pid, signal.SIGUSR1)

You could use this mechanism to terminate after a time out period.

你另情深 2024-10-01 06:57:07

是的, https://pypi.python.org/pypi/python-subprocess2 将扩展Popen 模块有两个附加功能,

Popen.waitUpTo(timeout=seconds)

这将等待一定的秒数以完成该过程,否则

也返回 None,

Popen.waitOrTerminate

这将等待某个时间,然后调用 .terminate(),然后调用 .kill(),其中之一或两者的某种组合,请参阅文档以获取完整详细信息:

http://htmlpreview.github.io/?https://github.com/kata198/python-subprocess2/blob/master/doc/subprocess2.html

Yes, https://pypi.python.org/pypi/python-subprocess2 will extend the Popen module with two additional functions,

Popen.waitUpTo(timeout=seconds)

This will wait up to acertain number of seconds for the process to complete, otherwise return None

also,

Popen.waitOrTerminate

This will wait up to a point, and then call .terminate(), then .kill(), one orthe other or some combination of both, see docs for full details:

http://htmlpreview.github.io/?https://github.com/kata198/python-subprocess2/blob/master/doc/subprocess2.html

倦话 2024-10-01 06:57:07

对于 Linux,您可以使用信号。这是依赖于平台的,因此 Windows 需要另一个解决方案。不过它可能适用于 Mac。

def launch_cmd(cmd, timeout=0):
    '''Launch an external command

    It launchs the program redirecting the program's STDIO
    to a communication pipe, and appends those responses to
    a list.  Waits for the program to exit, then returns the
    ouput lines.

    Args:
        cmd: command Line of the external program to launch
        time: time to wait for the command to complete, 0 for indefinitely
    Returns:
        A list of the response lines from the program    
    '''

    import subprocess
    import signal

    class Alarm(Exception):
        pass

    def alarm_handler(signum, frame):
        raise Alarm

    lines = []

    if not launch_cmd.init:
        launch_cmd.init = True
        signal.signal(signal.SIGALRM, alarm_handler)

    p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    signal.alarm(timeout)  # timeout sec

    try:
        for line in p.stdout:
            lines.append(line.rstrip())
        p.wait()
        signal.alarm(0)  # disable alarm
    except:
        print "launch_cmd taking too long!"
        p.kill()

    return lines        
launch_cmd.init = False

For Linux, you can use a signal. This is platform dependent so another solution is required for Windows. It may work with Mac though.

def launch_cmd(cmd, timeout=0):
    '''Launch an external command

    It launchs the program redirecting the program's STDIO
    to a communication pipe, and appends those responses to
    a list.  Waits for the program to exit, then returns the
    ouput lines.

    Args:
        cmd: command Line of the external program to launch
        time: time to wait for the command to complete, 0 for indefinitely
    Returns:
        A list of the response lines from the program    
    '''

    import subprocess
    import signal

    class Alarm(Exception):
        pass

    def alarm_handler(signum, frame):
        raise Alarm

    lines = []

    if not launch_cmd.init:
        launch_cmd.init = True
        signal.signal(signal.SIGALRM, alarm_handler)

    p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    signal.alarm(timeout)  # timeout sec

    try:
        for line in p.stdout:
            lines.append(line.rstrip())
        p.wait()
        signal.alarm(0)  # disable alarm
    except:
        print "launch_cmd taking too long!"
        p.kill()

    return lines        
launch_cmd.init = False
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文