在Python中运行超时代码的正确方法

发布于 2024-11-27 20:48:05 字数 529 浏览 1 评论 0原文

我在网上查了一下,发现了一些 SO 讨论和 ActiveState 食谱,用于运行一些超时的代码。看起来有一些常见的方法:

  • 使用运行代码的线程,并在超时的情况下加入它。如果超时 - 终止线程。 Python 不直接支持此功能(使用私有 _Thread__stop 函数),因此这是不好的做法
  • 使用 signal.SIGALRM - 但这种方法无法在 Windows 上运行
  • 使用带有超时的子进程 - 但这太重了 - 如果我想经常启动可中断任务怎么办,我不想为每个进程启动进程!

那么,什么是正确的方式?我不是在询问解决方法(例如使用 Twisted 和异步 IO),而是在询问解决实际问题的实际方法 - 我有一些功能,并且我只想在超时的情况下运行它。如果超时,我想要恢复控制权。我希望它能够在 Linux 和 Windows 上运行。

I looked online and found some SO discussing and ActiveState recipes for running some code with a timeout. It looks there are some common approaches:

  • Use thread that run the code, and join it with timeout. If timeout elapsed - kill the thread. This is not directly supported in Python (used private _Thread__stop function) so it is bad practice
  • Use signal.SIGALRM - but this approach not working on Windows!
  • Use subprocess with timeout - but this is too heavy - what if I want to start interruptible task often, I don't want fire process for each!

So, what is the right way? I'm not asking about workarounds (eg use Twisted and async IO), but actual way to solve actual problem - I have some function and I want to run it only with some timeout. If timeout elapsed, I want control back. And I want it to work on Linux and Windows.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(10

对岸观火 2024-12-04 20:48:05

老实说,对此问题的完全通用的解决方案并不存在。您必须针对给定域使用正确的解决方案。

  • 如果你想让你完全控制的代码超时,你必须编写它来配合。此类代码必须能够以某种方式分解成小块,就像在事件驱动系统中一样。如果您可以确保没有任何东西会持有锁太长时间,您也可以通过线程来做到这一点,但正确处理锁实际上非常困难。

  • 如果您因为担心代码失控而希望超时(例如,如果您担心用户会要求计算器计算 9**(9**9) ),你需要在另一个进程中运行它。这是充分隔离它的唯一简单方法。在您的事件系统甚至不同的线程中运行它是不够的。与其他解决方案类似,也可以将事物分解成小块,但需要非常仔细的处理,并且通常不值得;无论如何,这不允许您执行与运行 Python 代码相同的操作。

A completely general solution to this really, honestly does not exist. You have to use the right solution for a given domain.

  • If you want timeouts for code you fully control, you have to write it to cooperate. Such code has to be able to break up into little chunks in some way, as in an event-driven system. You can also do this by threading if you can ensure nothing will hold a lock too long, but handling locks right is actually pretty hard.

  • If you want timeouts because you're afraid code is out of control (for example, if you're afraid the user will ask your calculator to compute 9**(9**9)), you need to run it in another process. This is the only easy way to sufficiently isolate it. Running it in your event system or even a different thread will not be enough. It is also possible to break things up into little chunks similar to the other solution, but requires very careful handling and usually isn't worth it; in any event, that doesn't allow you to do the same exact thing as just running the Python code.

梦晓ヶ微光ヅ倾城 2024-12-04 20:48:05

您可能正在寻找的是 multiprocessing 模块。如果 subprocess 太重,那么这可能也不适合您的需求。

import time
import multiprocessing

def do_this_other_thing_that_may_take_too_long(duration):
    time.sleep(duration)
    return 'done after sleeping {0} seconds.'.format(duration)

pool = multiprocessing.Pool(1)
print 'starting....'
res = pool.apply_async(do_this_other_thing_that_may_take_too_long, [8])
for timeout in range(1, 10):
    try:
        print '{0}: {1}'.format(duration, res.get(timeout))
    except multiprocessing.TimeoutError:
        print '{0}: timed out'.format(duration) 

print 'end'

What you might be looking for is the multiprocessing module. If subprocess is too heavy, then this may not suit your needs either.

import time
import multiprocessing

def do_this_other_thing_that_may_take_too_long(duration):
    time.sleep(duration)
    return 'done after sleeping {0} seconds.'.format(duration)

pool = multiprocessing.Pool(1)
print 'starting....'
res = pool.apply_async(do_this_other_thing_that_may_take_too_long, [8])
for timeout in range(1, 10):
    try:
        print '{0}: {1}'.format(duration, res.get(timeout))
    except multiprocessing.TimeoutError:
        print '{0}: timed out'.format(duration) 

print 'end'
妄司 2024-12-04 20:48:05

如果与网络相关,您可以尝试:

import socket
socket.setdefaulttimeout(number)

If it's network related you could try:

import socket
socket.setdefaulttimeout(number)
天荒地未老 2024-12-04 20:48:05

我在 eventlet 库中找到了这个:

http://eventlet.net/doc/modules/timeout.html

from eventlet.timeout import Timeout

timeout = Timeout(seconds, exception)
try:
    ... # execution here is limited by timeout
finally:
    timeout.cancel()

I found this with eventlet library:

http://eventlet.net/doc/modules/timeout.html

from eventlet.timeout import Timeout

timeout = Timeout(seconds, exception)
try:
    ... # execution here is limited by timeout
finally:
    timeout.cancel()
心房的律动 2024-12-04 20:48:05

对于“正常”Python 代码,不会在 C 扩展或 I/O 等待中长时间停留,您可以通过使用 sys.settrace() 设置跟踪函数来中止正在运行的跟踪函数来实现您的目标达到超时时的代码。

这是否足够取决于您运行的代码的合作程度或恶意程度。如果表现良好,跟踪功能就足够了。

For "normal" Python code, that doesn't linger prolongued times in C extensions or I/O waits, you can achieve your goal by setting a trace function with sys.settrace() that aborts the running code when the timeout is reached.

Whether that is sufficient or not depends on how co-operating or malicious the code you run is. If it's well-behaved, a tracing function is sufficient.

峩卟喜欢 2024-12-04 20:48:05

另一种方法是使用 faulthandler

import time
import faulthandler


faulthandler.enable()


try:
    faulthandler.dump_tracebacks_later(3)
    time.sleep(10)
finally:
    faulthandler.cancel_dump_tracebacks_later()

注意:faulthandler 模块是 python3.3 中 stdlib 的一部分。

An other way is to use faulthandler:

import time
import faulthandler


faulthandler.enable()


try:
    faulthandler.dump_tracebacks_later(3)
    time.sleep(10)
finally:
    faulthandler.cancel_dump_tracebacks_later()

N.B: The faulthandler module is part of stdlib in python3.3.

贱贱哒 2024-12-04 20:48:05

如果您正在运行的代码预计会在设定的时间后终止,那么您应该正确编写它,以便不会对关闭产生任何负面影响,无论它是线程还是子进程。带有撤消功能的命令模式在这里会很有用。

因此,这实际上取决于线程在杀死它时正在做什么。如果它只是处理数字,谁会在乎你是否杀死它。如果它与文件系统交互并且你杀死了它,那么也许你真的应该重新考虑你的策略。

Python 支持什么线程?守护进程线程和连接。如果您在守护进程仍处于活动状态时加入了守护进程,为什么 python 让主线程退出?因为它知道使用守护线程的人会(希望)以一种线程死亡时无关紧要的方式编写代码。在这种情况下,为连接设置超时,然后让 main 终止,从而带走任何守护线程,这是完全可以接受的。

If you're running code that you expect to die after a set time, then you should write it properly so that there aren't any negative effects on shutdown, no matter if its a thread or a subprocess. A command pattern with undo would be useful here.

So, it really depends on what the thread is doing when you kill it. If its just crunching numbers who cares if you kill it. If its interacting with the filesystem and you kill it , then maybe you should really rethink your strategy.

What is supported in Python when it comes to threads? Daemon threads and joins. Why does python let the main thread exit if you've joined a daemon while its still active? Because its understood that someone using daemon threads will (hopefully) write the code in a way that it wont matter when that thread dies. Giving a timeout to a join and then letting main die, and thus taking any daemon threads with it, is perfectly acceptable in this context.

荆棘i 2024-12-04 20:48:05

我已经用这种方式解决了这个问题:
对我来说效果很好(在 Windows 中,一点也不重)我希望它对某人有用)

import threading
import time

class LongFunctionInside(object):
    lock_state = threading.Lock()
    working = False

    def long_function(self, timeout):

        self.working = True

        timeout_work = threading.Thread(name="thread_name", target=self.work_time, args=(timeout,))
        timeout_work.setDaemon(True)
        timeout_work.start()

        while True:  # endless/long work
            time.sleep(0.1)  # in this rate the CPU is almost not used
            if not self.working:  # if state is working == true still working
                break
        self.set_state(True)

    def work_time(self, sleep_time):  # thread function that just sleeping specified time,
    # in wake up it asking if function still working if it does set the secured variable work to false
        time.sleep(sleep_time)
        if self.working:
            self.set_state(False)

    def set_state(self, state):  # secured state change
        while True:
            self.lock_state.acquire()
            try:
                self.working = state
                break
            finally:
                self.lock_state.release()

lw = LongFunctionInside()
lw.long_function(10)

主要思想是创建一个线程,该线程将与“长时间工作”并行睡眠并唤醒(超时后) )更改安全变量状态,长函数在工作期间检查安全变量。
我对 Python 编程还很陌生,所以如果该解决方案存在基本错误,例如资源、计时、死锁问题,请回复))。

I've solved that in that way:
For me is worked great (in windows and not heavy at all) I'am hope it was useful for someone)

import threading
import time

class LongFunctionInside(object):
    lock_state = threading.Lock()
    working = False

    def long_function(self, timeout):

        self.working = True

        timeout_work = threading.Thread(name="thread_name", target=self.work_time, args=(timeout,))
        timeout_work.setDaemon(True)
        timeout_work.start()

        while True:  # endless/long work
            time.sleep(0.1)  # in this rate the CPU is almost not used
            if not self.working:  # if state is working == true still working
                break
        self.set_state(True)

    def work_time(self, sleep_time):  # thread function that just sleeping specified time,
    # in wake up it asking if function still working if it does set the secured variable work to false
        time.sleep(sleep_time)
        if self.working:
            self.set_state(False)

    def set_state(self, state):  # secured state change
        while True:
            self.lock_state.acquire()
            try:
                self.working = state
                break
            finally:
                self.lock_state.release()

lw = LongFunctionInside()
lw.long_function(10)

The main idea is to create a thread that will just sleep in parallel to "long work" and in wake up (after timeout) change the secured variable state, the long function checking the secured variable during its work.
I'm pretty new in Python programming, so if that solution has a fundamental errors, like resources, timing, deadlocks problems , please response)).

清眉祭 2024-12-04 20:48:05

使用“with”构造和合并解决方案来解决 -

  • 超时函数如果完成时间太长
  • 这个线程效果更好。

    导入线程、时间
    
    类 Exception_TIMEOUT(异常):
        经过
    
    类 linwintimeout:
    
        def __init__(self, f, 秒=1.0, error_message='超时'):
            self.秒 = 秒
            self.thread = threading.Thread(目标=f)
            self.thread.daemon = True
            self.error_message = 错误消息
    
        defhandle_timeout(自身):
            引发 Exception_TIMEOUT(self.error_message)
    
        def __enter__(自我):
            尝试:
                self.thread.start()
                self.thread.join(self.秒)
            除了异常,te:
                提高TE
    
        def __exit__(自身、类型、值、回溯):
            如果 self.thread.is_alive():
                返回 self.handle_timeout()
    
    定义函数():
        而真实:
            print "继续打印...", time.sleep(1)
    
    尝试:
        与 linwintimeout(函数, 秒=5.0, error_message='超过超时 %s 秒' % 5.0):
            经过
    除了 Exception_TIMEOUT,e:
        print " 注意!! 超时,放弃... %s " % e
    

solving with the 'with' construct and merging solution from -

  • Timeout function if it takes too long to finish
  • this thread which work better.

    import threading, time
    
    class Exception_TIMEOUT(Exception):
        pass
    
    class linwintimeout:
    
        def __init__(self, f, seconds=1.0, error_message='Timeout'):
            self.seconds = seconds
            self.thread = threading.Thread(target=f)
            self.thread.daemon = True
            self.error_message = error_message
    
        def handle_timeout(self):
            raise Exception_TIMEOUT(self.error_message)
    
        def __enter__(self):
            try:
                self.thread.start()
                self.thread.join(self.seconds)
            except Exception, te:
                raise te
    
        def __exit__(self, type, value, traceback):
            if self.thread.is_alive():
                return self.handle_timeout()
    
    def function():
        while True:
            print "keep printing ...", time.sleep(1)
    
    try:
        with linwintimeout(function, seconds=5.0, error_message='exceeded timeout of %s seconds' % 5.0):
            pass
    except Exception_TIMEOUT, e:
        print "  attention !! execeeded timeout, giving up ... %s " % e
    
不离久伴 2024-12-04 20:48:05

wrapt-timeout-decorator 库值得在这里提及:

from wrapt_timeout_decorator import timeout

@timeout(10)
def function_with_timeout():
   ...

如果该函数不10秒内完成,抛出错误。

The wrapt-timeout-decorator library deserves its mention here:

from wrapt_timeout_decorator import timeout

@timeout(10)
def function_with_timeout():
   ...

If the function does not finish in 10 seconds, an error is thrown.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文