Python 中 setInterval() 的等价物?

发布于 2024-08-30 11:18:09 字数 330 浏览 2 评论 0原文

Python 是否有类似于 JavaScript 的 setInterval() 的函数?

我想要:

def set_interval(func, interval):
    ...

它将在每个 interval 时间单位调用 func

Does Python have a function similar to JavaScript's setInterval()?

I would like to have:

def set_interval(func, interval):
    ...

That will call func every interval time units.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(25

为人所爱 2024-09-06 11:18:10

你也可以试试这个方法:

import time

while True:
    time.sleep(5)
    print("5 seconds has passed")

这样每隔5秒就会打印“5秒已过去”。

函数 sleep()暂停执行指定的秒数。该参数可以是浮点数以指示更精确的睡眠时间。

You can also try out this method:

import time

while True:
    time.sleep(5)
    print("5 seconds has passed")

So it will print "5 seconds has passed" every 5 seconds.

The function sleep() suspends execution for the given number of seconds. The argument may be a floating point number to indicate a more precise sleep time.

书间行客 2024-09-06 11:18:10

上面的方法不太适合我,因为我需要能够取消间隔。我将该函数变成了一个类并提出了以下内容:

class setInterval():
    def __init__(self, func, sec):
        def func_wrapper():
            self.t = threading.Timer(sec, func_wrapper)
            self.t.start()
            func()
        self.t = threading.Timer(sec, func_wrapper)
        self.t.start()

    def cancel(self):
        self.t.cancel()

The above method didn't quite do it for me as I needed to be able to cancel the interval. I turned the function into a class and came up with the following:

class setInterval():
    def __init__(self, func, sec):
        def func_wrapper():
            self.t = threading.Timer(sec, func_wrapper)
            self.t.start()
            func()
        self.t = threading.Timer(sec, func_wrapper)
        self.t.start()

    def cancel(self):
        self.t.cancel()
翻身的咸鱼 2024-09-06 11:18:10

上面的大多数答案都没有正确关闭线程。在使用 Jupyter Notebook 时,我注意到,当发送显式中断时,线程仍在运行,更糟糕的是,它们会从 1 个线程运行、2 个、4 个等开始不断增加。我下面的方法基于 @doom 的答案,但很干净通过在主线程中运行无限循环来侦听 SIGINT 和 SIGTERM 事件来处理

  • 中断 无漂移
  • 可取消
  • 很好地处理 SIGINT 和 SIGTERM
  • 不会为每次运行创建一个新线程

欢迎提出改进建议

import time
import threading
import signal

# Record the time for the purposes of demonstration 
start_time=time.time()

class ProgramKilled(Exception):
    """
    An instance of this custom exception class will be thrown everytime we get an SIGTERM or SIGINT
    """
    pass

# Raise the custom exception whenever SIGINT or SIGTERM is triggered
def signal_handler(signum, frame):
    raise ProgramKilled

# This function serves as the callback triggered on every run of our IntervalThread
def action() :
    print('action ! -> time : {:.1f}s'.format(time.time()-start_time))

# https://stackoverflow.com/questions/2697039/python-equivalent-of-setinterval
class IntervalThread(threading.Thread) :
    def __init__(self,interval,action, *args, **kwargs) :
        super(IntervalThread, self).__init__()
        self.interval=interval
        self.action=action
        self.stopEvent=threading.Event()
        self.start()

    def run(self) :
        nextTime=time.time()+self.interval
        while not self.stopEvent.wait(nextTime-time.time()) :
            nextTime+=self.interval
            self.action()

    def cancel(self) :
        self.stopEvent.set()

def main():

    # Handle SIGINT and SIFTERM with the help of the callback function
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    # start action every 1s
    inter=IntervalThread(1,action)
    print('just after setInterval -> time : {:.1f}s'.format(time.time()-start_time))

    # will stop interval in 500s
    t=threading.Timer(500,inter.cancel)
    t.start()

    # https://www.g-loaded.eu/2016/11/24/how-to-terminate-running-python-threads-using-signals/
    while True:
        try:
            time.sleep(1)
        except ProgramKilled:
            print("Program killed: running cleanup code")
            inter.cancel()
            break

if __name__ == "__main__":
    main()

Most of the answers above do not shut down the Thread properly. While using Jupyter notebook I noticed that when an explicit interrupt was sent, the threads were still running and worse, they would keep multiplying starting at 1 thread running,2, 4 etc. My method below is based on the answer by @doom but cleanly handles interrupts by running an infinite loop in the Main thread to listen for SIGINT and SIGTERM events

  • No drift
  • Cancelable
  • Handles SIGINT and SIGTERM very well
  • Doesnt make a new thread for every run

Feel free to suggest improvements

import time
import threading
import signal

# Record the time for the purposes of demonstration 
start_time=time.time()

class ProgramKilled(Exception):
    """
    An instance of this custom exception class will be thrown everytime we get an SIGTERM or SIGINT
    """
    pass

# Raise the custom exception whenever SIGINT or SIGTERM is triggered
def signal_handler(signum, frame):
    raise ProgramKilled

# This function serves as the callback triggered on every run of our IntervalThread
def action() :
    print('action ! -> time : {:.1f}s'.format(time.time()-start_time))

# https://stackoverflow.com/questions/2697039/python-equivalent-of-setinterval
class IntervalThread(threading.Thread) :
    def __init__(self,interval,action, *args, **kwargs) :
        super(IntervalThread, self).__init__()
        self.interval=interval
        self.action=action
        self.stopEvent=threading.Event()
        self.start()

    def run(self) :
        nextTime=time.time()+self.interval
        while not self.stopEvent.wait(nextTime-time.time()) :
            nextTime+=self.interval
            self.action()

    def cancel(self) :
        self.stopEvent.set()

def main():

    # Handle SIGINT and SIFTERM with the help of the callback function
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    # start action every 1s
    inter=IntervalThread(1,action)
    print('just after setInterval -> time : {:.1f}s'.format(time.time()-start_time))

    # will stop interval in 500s
    t=threading.Timer(500,inter.cancel)
    t.start()

    # https://www.g-loaded.eu/2016/11/24/how-to-terminate-running-python-threads-using-signals/
    while True:
        try:
            time.sleep(1)
        except ProgramKilled:
            print("Program killed: running cleanup code")
            inter.cancel()
            break

if __name__ == "__main__":
    main()
枕头说它不想醒 2024-09-06 11:18:10

以上解决方案中,如果出现程序被关闭的情况,并不能保证它会正常关闭,总是建议通过杀软来关闭程序,大多数也没有停止的功能,我发现了一篇不错的文章在 Sankalp 编写的媒体上,它解决了这两个问题(在 python 中运行定期任务)请参阅随附的链接以获得更深入的了解。
在下面的示例中,使用名为 signal 的库来跟踪杀戮是软杀戮还是硬杀戮

import threading, time, signal

from datetime import timedelta

WAIT_TIME_SECONDS = 1

class ProgramKilled(Exception):
    pass

def foo():
    print time.ctime()

def signal_handler(signum, frame):
    raise ProgramKilled

class Job(threading.Thread):
    def __init__(self, interval, execute, *args, **kwargs):
        threading.Thread.__init__(self)
        self.daemon = False
        self.stopped = threading.Event()
        self.interval = interval
        self.execute = execute
        self.args = args
        self.kwargs = kwargs

    def stop(self):
                self.stopped.set()
                self.join()
    def run(self):
            while not self.stopped.wait(self.interval.total_seconds()):
                self.execute(*self.args, **self.kwargs)

if __name__ == "__main__":
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo)
    job.start()

    while True:
          try:
              time.sleep(1)
          except ProgramKilled:
              print "Program killed: running cleanup code"
              job.stop()
              break
#output
#Tue Oct 16 17:47:51 2018
#Tue Oct 16 17:47:52 2018
#Tue Oct 16 17:47:53 2018
#^CProgram killed: running cleanup code

In the above solutions if a situation arises where program is shutdown, there is no guarantee that it will shutdown gracefully,Its always recommended to shut a program via a soft kill, neither did most of them have a function to stop I found a nice article on medium written by Sankalp which solves both of these issues (run periodic tasks in python) refer the attached link to get a deeper insight.
In the below sample a library named signal is used to track the kill is soft kill or a hard kill

import threading, time, signal

from datetime import timedelta

WAIT_TIME_SECONDS = 1

class ProgramKilled(Exception):
    pass

def foo():
    print time.ctime()

def signal_handler(signum, frame):
    raise ProgramKilled

class Job(threading.Thread):
    def __init__(self, interval, execute, *args, **kwargs):
        threading.Thread.__init__(self)
        self.daemon = False
        self.stopped = threading.Event()
        self.interval = interval
        self.execute = execute
        self.args = args
        self.kwargs = kwargs

    def stop(self):
                self.stopped.set()
                self.join()
    def run(self):
            while not self.stopped.wait(self.interval.total_seconds()):
                self.execute(*self.args, **self.kwargs)

if __name__ == "__main__":
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo)
    job.start()

    while True:
          try:
              time.sleep(1)
          except ProgramKilled:
              print "Program killed: running cleanup code"
              job.stop()
              break
#output
#Tue Oct 16 17:47:51 2018
#Tue Oct 16 17:47:52 2018
#Tue Oct 16 17:47:53 2018
#^CProgram killed: running cleanup code
巨坚强 2024-09-06 11:18:10

我已经编写了代码来在 python 中创建一个非常非常灵活的 setInterval 。在这里:

import threading


class AlreadyRunning(Exception):
    pass


class IntervalNotValid(Exception):
    pass


class setInterval():
    def __init__(this, func=None, sec=None, args=[]):
        this.running = False
        this.func = func  # the function to be run
        this.sec = sec            # interval in second
        this.Return = None  # The returned data
        this.args = args
        this.runOnce = None  # asociated with run_once() method
        this.runOnceArgs = None   # asociated with run_once() method

        if (func is not None and sec is not None):
            this.running = True

            if (not callable(func)):
                raise TypeError("non-callable object is given")

            if (not isinstance(sec, int) and not isinstance(sec, float)):
                raise TypeError("A non-numeric object is given")

            this.TIMER = threading.Timer(this.sec, this.loop)
            this.TIMER.start()

    def start(this):
        if (not this.running):
            if (not this.isValid()):
                raise IntervalNotValid("The function and/or the " +
                                       "interval hasn't provided or invalid.")
            this.running = True
            this.TIMER = threading.Timer(this.sec, this.loop)
            this.TIMER.start()
        else:
            raise AlreadyRunning("Tried to run an already run interval")

    def stop(this):
        this.running = False

    def isValid(this):
        if (not callable(this.func)):
            return False

        cond1 = not isinstance(this.sec, int)
        cond2 = not isinstance(this.sec, float)
        if (cond1 and cond2):
            return False
        return True

    def loop(this):

        if (this.running):
            this.TIMER = threading.Timer(this.sec, this.loop)
            this.TIMER.start()
            function_, Args_ = this.func, this.args

            if (this.runOnce is not None):  # someone has provide the run_once
                runOnce, this.runOnce = this.runOnce, None
                result = runOnce(*(this.runOnceArgs))
                this.runOnceArgs = None

                # if and only if the result is False. not accept "None"
                # nor zero.
                if (result is False):
                    return  # cancel the interval right now

            this.Return = function_(*Args_)

    def change_interval(this, sec):

        cond1 = not isinstance(sec, int)
        cond2 = not isinstance(sec, float)
        if (cond1 and cond2):
            raise TypeError("A non-numeric object is given")

        # prevent error when providing interval to a blueprint
        if (this.running):
            this.TIMER.cancel()

        this.sec = sec

        # prevent error when providing interval to a blueprint
        # if the function hasn't provided yet
        if (this.running):
            this.TIMER = threading.Timer(this.sec, this.loop)
            this.TIMER.start()

    def change_next_interval(this, sec):

        if (not isinstance(sec, int) and not isinstance(sec, float)):
            raise TypeError("A non-numeric object is given")

        this.sec = sec

    def change_func(this, func, args=[]):

        if (not callable(func)):
            raise TypeError("non-callable object is given")

        this.func = func
        this.args = args

    def run_once(this, func, args=[]):
        this.runOnce = func
        this.runOnceArgs = args

    def get_return(this):
        return this.Return

您可以获得许多功能和灵活性。运行此代码不会冻结您的代码,您可以在运行时更改间隔,您可以在运行时更改函数,您可以传递参数,您可以从函数中获取返回的对象等等。你也可以发挥你的技巧!

这是一个非常简单和基本的使用示例:

import time

def interval(name="world"):
  print(f"Hello {name}!")

# function named interval will be called every two seconds
# output: "Hello world!"
interval1 = setInterval(interval, 2) 

# function named interval will be called every 1.5 seconds
# output: "Hello Jane!"
interval2 = setInterval(interval, 1.5, ["Jane"]) 

time.sleep(5) #stop all intervals after 5 seconds
interval1.stop()
interval2.stop()

查看我的 Github 项目以查看更多示例并关注后续更新:D
https://github.com/Hzzkygcs/setInterval-python

I have written my code to make a very very flexible setInterval in python. Here you are:

import threading


class AlreadyRunning(Exception):
    pass


class IntervalNotValid(Exception):
    pass


class setInterval():
    def __init__(this, func=None, sec=None, args=[]):
        this.running = False
        this.func = func  # the function to be run
        this.sec = sec            # interval in second
        this.Return = None  # The returned data
        this.args = args
        this.runOnce = None  # asociated with run_once() method
        this.runOnceArgs = None   # asociated with run_once() method

        if (func is not None and sec is not None):
            this.running = True

            if (not callable(func)):
                raise TypeError("non-callable object is given")

            if (not isinstance(sec, int) and not isinstance(sec, float)):
                raise TypeError("A non-numeric object is given")

            this.TIMER = threading.Timer(this.sec, this.loop)
            this.TIMER.start()

    def start(this):
        if (not this.running):
            if (not this.isValid()):
                raise IntervalNotValid("The function and/or the " +
                                       "interval hasn't provided or invalid.")
            this.running = True
            this.TIMER = threading.Timer(this.sec, this.loop)
            this.TIMER.start()
        else:
            raise AlreadyRunning("Tried to run an already run interval")

    def stop(this):
        this.running = False

    def isValid(this):
        if (not callable(this.func)):
            return False

        cond1 = not isinstance(this.sec, int)
        cond2 = not isinstance(this.sec, float)
        if (cond1 and cond2):
            return False
        return True

    def loop(this):

        if (this.running):
            this.TIMER = threading.Timer(this.sec, this.loop)
            this.TIMER.start()
            function_, Args_ = this.func, this.args

            if (this.runOnce is not None):  # someone has provide the run_once
                runOnce, this.runOnce = this.runOnce, None
                result = runOnce(*(this.runOnceArgs))
                this.runOnceArgs = None

                # if and only if the result is False. not accept "None"
                # nor zero.
                if (result is False):
                    return  # cancel the interval right now

            this.Return = function_(*Args_)

    def change_interval(this, sec):

        cond1 = not isinstance(sec, int)
        cond2 = not isinstance(sec, float)
        if (cond1 and cond2):
            raise TypeError("A non-numeric object is given")

        # prevent error when providing interval to a blueprint
        if (this.running):
            this.TIMER.cancel()

        this.sec = sec

        # prevent error when providing interval to a blueprint
        # if the function hasn't provided yet
        if (this.running):
            this.TIMER = threading.Timer(this.sec, this.loop)
            this.TIMER.start()

    def change_next_interval(this, sec):

        if (not isinstance(sec, int) and not isinstance(sec, float)):
            raise TypeError("A non-numeric object is given")

        this.sec = sec

    def change_func(this, func, args=[]):

        if (not callable(func)):
            raise TypeError("non-callable object is given")

        this.func = func
        this.args = args

    def run_once(this, func, args=[]):
        this.runOnce = func
        this.runOnceArgs = args

    def get_return(this):
        return this.Return

You can get many features and flexibility. Running this code won't freeze your code, you can change the interval at run time, you can change the function at run time, you can pass arguments, you can get the returned object from your function, and many more. You can make your tricks too!

here's a very simple and basic example to use it:

import time

def interval(name="world"):
  print(f"Hello {name}!")

# function named interval will be called every two seconds
# output: "Hello world!"
interval1 = setInterval(interval, 2) 

# function named interval will be called every 1.5 seconds
# output: "Hello Jane!"
interval2 = setInterval(interval, 1.5, ["Jane"]) 

time.sleep(5) #stop all intervals after 5 seconds
interval1.stop()
interval2.stop()

Check out my Github project to see more examples and follow next updates :D
https://github.com/Hzzkygcs/setInterval-python

恰似旧人归 2024-09-06 11:18:10

setInterval 应该在多线程上运行,并且在循环运行时不要冻结任务。

这是我支持多线程功能的RUNTIME包:

  • setTimeout(F,ms):定时在独立线程中触发函数。
  • delayF(F,ms) :类似 setTimeout(F,ms)。
  • setInterval(F,ms) :异步循环
    .pause, .resume : 暂停和恢复间隔
  • clearInterval(interval) : 清除间隔

它简短而简单。注意,如果直接输入函数,python 需要 lambda,但 lambda 不支持命令块,因此您应该在将函数内容放入 setInterval 之前定义函数内容。

### DEMO PYTHON MULTITHREAD ASYNCHRONOUS LOOP ###

import time;
import threading;
import random;

def delay(ms):time.sleep(ms/1000); # Controil while speed
def setTimeout(R,delayMS):
    t=threading.Timer(delayMS/1000,R)
    t.start();
    return t;
def delayF(R,delayMS):
    t=threading.Timer(delayMS/1000,R)
    t.start();
    return t;

class THREAD:
    def __init__(this):
        this.R_onRun=None;
        this.thread=None;
    def run(this):
        this.thread=threading.Thread(target=this.R_onRun);
        this.thread.start();
    def isRun(this): return this.thread.isAlive();
    
class setInterval :
    def __init__(this,R_onRun,msInterval) :
        this.ms=msInterval;
        this.R_onRun=R_onRun;
        this.kStop=False;
        this.thread=THREAD();
        this.thread.R_onRun=this.Clock;
        this.thread.run();
    def Clock(this) :
        while not this.kStop :
            this.R_onRun();
            delay(this.ms);
    def pause(this) :
        this.kStop=True;
    def stop(this) :
        this.kStop=True;
    def resume(this) :
        if (this.kStop) :
            this.kStop=False;
            this.thread.run();
    
def clearInterval(Timer): Timer.stop();

# EXAMPLE
def p():print(random.random());
tm=setInterval(p,20);
tm2=setInterval(lambda:print("AAAAA"),20);
delayF(tm.pause,1000);
delayF(tm.resume,2000);
delayF(lambda:clearInterval(tm),3000);

保存到文件 .py 并运行它。您将看到它打印随机数和字符串“AAAAA”。打印编号线程将在 1 秒后暂停打印并再次恢复打印 1 秒然后停止,而打印字符串保持打印文本不损坏。

如果您使用 OpenCV 进行图形动画,并使用 setInterval 来提高动画速度,则必须有 1 个主线程来应用 waitKey,否则无论延迟多慢或在子线程中应用 waitKey,窗口都会冻结:

def p:... # Your drawing task
setInterval(p,1); # Subthread1 running draw
setInterval(p,1); # Subthread2 running draw
setInterval(p,1); # Subthread3 running draw
while True: cv2.waitKey(10); # Main thread which waitKey have effect

setInterval should be run on multiple thread, and not freeze the task when it running loop.

Here is my RUNTIME package that support multithread feature:

  • setTimeout(F,ms) : timming to fire function in independence thread.
  • delayF(F,ms) : similar setTimeout(F,ms).
  • setInterval(F,ms) : asynchronous loop
    .pause, .resume : pause and resume the interval
  • clearInterval(interval) : clear the interval

It's short and simple. Note that python need lambda if you input direct the function, but lambda is not support command block, so you should define the function content before put it in the setInterval.

### DEMO PYTHON MULTITHREAD ASYNCHRONOUS LOOP ###

import time;
import threading;
import random;

def delay(ms):time.sleep(ms/1000); # Controil while speed
def setTimeout(R,delayMS):
    t=threading.Timer(delayMS/1000,R)
    t.start();
    return t;
def delayF(R,delayMS):
    t=threading.Timer(delayMS/1000,R)
    t.start();
    return t;

class THREAD:
    def __init__(this):
        this.R_onRun=None;
        this.thread=None;
    def run(this):
        this.thread=threading.Thread(target=this.R_onRun);
        this.thread.start();
    def isRun(this): return this.thread.isAlive();
    
class setInterval :
    def __init__(this,R_onRun,msInterval) :
        this.ms=msInterval;
        this.R_onRun=R_onRun;
        this.kStop=False;
        this.thread=THREAD();
        this.thread.R_onRun=this.Clock;
        this.thread.run();
    def Clock(this) :
        while not this.kStop :
            this.R_onRun();
            delay(this.ms);
    def pause(this) :
        this.kStop=True;
    def stop(this) :
        this.kStop=True;
    def resume(this) :
        if (this.kStop) :
            this.kStop=False;
            this.thread.run();
    
def clearInterval(Timer): Timer.stop();

# EXAMPLE
def p():print(random.random());
tm=setInterval(p,20);
tm2=setInterval(lambda:print("AAAAA"),20);
delayF(tm.pause,1000);
delayF(tm.resume,2000);
delayF(lambda:clearInterval(tm),3000);

Save to file .py and run it. You will see it print both random number and string "AAAAA". The print number thread will pause printing after 1 second and resume print again for 1 second then stop, while the print string keep printing text not corrupt.

In case you use OpenCV for graphic animation with those setInterval for boost animate speed, you must have 1 main thread to apply waitKey, otherwise the window will freeze no matter how slow delay or you applied waitKey in sub thread:

def p:... # Your drawing task
setInterval(p,1); # Subthread1 running draw
setInterval(p,1); # Subthread2 running draw
setInterval(p,1); # Subthread3 running draw
while True: cv2.waitKey(10); # Main thread which waitKey have effect
薯片软お妹 2024-09-06 11:18:10

最近我也遇到了和你一样的问题。我找到了这些解决方案:

1。您可以使用该库: threading.Time(这有介绍上面)

2。您可以使用该库:sched(上面也有介绍)

<强>3。您可以使用该库:高级Python调度程序(推荐)

Recently, I have the same issue as you. And I find these soluation:

1. you can use the library: threading.Time(this have introduction above)

2. you can use the library: sched(this have introduction above too)

3. you can use the library: Advanced Python Scheduler(Recommend)

南巷近海 2024-09-06 11:18:10

上面使用 func_wrapper 和 threading.Timer 的一些答案确实有效,只是它每次调用间隔时都会生成一个新线程,这会导致内存问题。

下面的基本示例通过将间隔放在单独的线程上,大致实现了类似的机制。它按照给定的时间间隔休眠。在开始编写代码之前,您需要了解以下一些限制:

  1. JavaScript 是单线程的,因此当 setInterval 内的函数被触发时,其他任何东西都不会起作用。同时(不包括工作线程,但让我们讨论 setInterval 的一般用例。因此,线程是安全的。但在此实现中,除非使用 threading.rLock< /code>.

  2. 下面的实现使用。 time.sleep来模拟interval,但是加上func的执行时间,这个interval的总时间可能会比你预期的要长。因此,根据用例,您可能希望“少睡觉”(减去调用 func 所花费的时间)

  3. 我只是粗略测试了这一点,您绝对不应该像我一样使用全局变量,随意调整它以使其适合您的系统。


说得够多了,这里是代码:

# Python 2.7
import threading
import time


class Interval(object):
    def __init__(self):
        self.daemon_alive = True
        self.thread = None # keep a reference to the thread so that we can "join"

    def ticktock(self, interval, func):
        while self.daemon_alive:
            time.sleep(interval)
            func()

num = 0
def print_num():
    global num
    num += 1
    print 'num + 1 = ', num

def print_negative_num():
    global num
    print '-num = ', num * -1

intervals = {} # keep track of intervals
g_id_counter = 0 # roughly generate ids for intervals

def set_interval(interval, func):
    global g_id_counter

    interval_obj = Interval()
    # Put this interval on a new thread
    t = threading.Thread(target=interval_obj.ticktock, args=(interval, func))
    t.setDaemon(True)
    interval_obj.thread = t
    t.start()

    # Register this interval so that we can clear it later
    # using roughly generated id
    interval_id = g_id_counter
    g_id_counter += 1
    intervals[interval_id] = interval_obj

    # return interval id like it does in JavaScript
    return interval_id

def clear_interval(interval_id):
    # terminate this interval's while loop
    intervals[interval_id].daemon_alive = False
    # kill the thread
    intervals[interval_id].thread.join()
    # pop out the interval from registry for reusing
    intervals.pop(interval_id)

if __name__ == '__main__':
    num_interval = set_interval(1, print_num)
    neg_interval = set_interval(3, print_negative_num)

    time.sleep(10) # Sleep 10 seconds on main thread to let interval run
    clear_interval(num_interval)
    clear_interval(neg_interval)
    print "- Are intervals all cleared?"
    time.sleep(3) # check if both intervals are stopped (not printing)
    print "- Yup, time to get beers"

预期输出:

num + 1 =  1
num + 1 =  2
-num =  -2
 num + 1 =  3
num + 1 =  4
num + 1 =  5
-num =  -5
num + 1 =  6
num + 1 =  7
num + 1 =  8
-num =  -8
num + 1 =  9
num + 1 =  10
-num =  -10
Are intervals all cleared?
Yup, time to get beers

Some answers above that uses func_wrapper and threading.Timer indeed work, except that it spawns a new thread every time an interval is called, which is causing memory problems.

The basic example below roughly implemented a similar mechanism by putting interval on a separate thread. It sleeps at the given interval. Before jumping into code, here are some of the limitations that you need to be aware of:

  1. JavaScript is single threaded, so when the function inside setInterval is fired, nothing else will be working at the same time (excluding worker thread, but let's talk general use case of setInterval. Therefore, threading is safe. But here in this implementation, you may encounter race conditions unless using a threading.rLock.

  2. The implementation below uses time.sleep to simulate intervals, but adding the execution time of func, the total time for this interval may be greater than what you expect. So depending on use cases, you may want to "sleep less" (minus time taken for calling func)

  3. I only roughly tested this, and you should definitely not use global variables the way I did, feel free to tweak it so that it fits in your system.


Enough talking, here is the code:

# Python 2.7
import threading
import time


class Interval(object):
    def __init__(self):
        self.daemon_alive = True
        self.thread = None # keep a reference to the thread so that we can "join"

    def ticktock(self, interval, func):
        while self.daemon_alive:
            time.sleep(interval)
            func()

num = 0
def print_num():
    global num
    num += 1
    print 'num + 1 = ', num

def print_negative_num():
    global num
    print '-num = ', num * -1

intervals = {} # keep track of intervals
g_id_counter = 0 # roughly generate ids for intervals

def set_interval(interval, func):
    global g_id_counter

    interval_obj = Interval()
    # Put this interval on a new thread
    t = threading.Thread(target=interval_obj.ticktock, args=(interval, func))
    t.setDaemon(True)
    interval_obj.thread = t
    t.start()

    # Register this interval so that we can clear it later
    # using roughly generated id
    interval_id = g_id_counter
    g_id_counter += 1
    intervals[interval_id] = interval_obj

    # return interval id like it does in JavaScript
    return interval_id

def clear_interval(interval_id):
    # terminate this interval's while loop
    intervals[interval_id].daemon_alive = False
    # kill the thread
    intervals[interval_id].thread.join()
    # pop out the interval from registry for reusing
    intervals.pop(interval_id)

if __name__ == '__main__':
    num_interval = set_interval(1, print_num)
    neg_interval = set_interval(3, print_negative_num)

    time.sleep(10) # Sleep 10 seconds on main thread to let interval run
    clear_interval(num_interval)
    clear_interval(neg_interval)
    print "- Are intervals all cleared?"
    time.sleep(3) # check if both intervals are stopped (not printing)
    print "- Yup, time to get beers"

Expected output:

num + 1 =  1
num + 1 =  2
-num =  -2
 num + 1 =  3
num + 1 =  4
num + 1 =  5
-num =  -5
num + 1 =  6
num + 1 =  7
num + 1 =  8
-num =  -8
num + 1 =  9
num + 1 =  10
-num =  -10
Are intervals all cleared?
Yup, time to get beers
会傲 2024-09-06 11:18:10

我的 Python 3 模块 jsinterval.py 会很有帮助!这是:

"""
Threaded intervals and timeouts from JavaScript
"""

import threading, sys

__all__ =  ['TIMEOUTS', 'INTERVALS', 'setInterval', 'clearInterval', 'setTimeout', 'clearTimeout']

TIMEOUTS  = {}
INTERVALS = {}

last_timeout_id  = 0
last_interval_id = 0

class Timeout:
    """Class for all timeouts."""
    def __init__(self, func, timeout):
        global last_timeout_id
        last_timeout_id += 1
        self.timeout_id = last_timeout_id
        TIMEOUTS[str(self.timeout_id)] = self
        self.func = func
        self.timeout = timeout
        self.threadname = 'Timeout #%s' %self.timeout_id

    def run(self):
        func = self.func
        delx = self.__del__
        def func_wrapper():
            func()
            delx()
        self.t = threading.Timer(self.timeout/1000, func_wrapper)
        self.t.name = self.threadname
        self.t.start()

    def __repr__(self):
        return '<JS Timeout set for %s seconds, launching function %s on timeout reached>' %(self.timeout, repr(self.func))

    def __del__(self):
        self.t.cancel()

class Interval:
    """Class for all intervals."""
    def __init__(self, func, interval):
        global last_interval_id
        self.interval_id = last_interval_id
        INTERVALS[str(self.interval_id)] = self
        last_interval_id += 1
        self.func = func
        self.interval = interval
        self.threadname = 'Interval #%s' %self.interval_id

    def run(self):
        func = self.func
        interval = self.interval
        def func_wrapper():
            timeout = Timeout(func_wrapper, interval)
            self.timeout = timeout
            timeout.run()
            func()
        self.t = threading.Timer(self.interval/1000, func_wrapper)
        self.t.name = self.threadname
        self.t.run()

    def __repr__(self):
        return '<JS Interval, repeating function %s with interval %s>' %(repr(self.func), self.interval)

    def __del__(self):
        self.timeout.__del__()

def setInterval(func, interval):
    """
    Create a JS Interval: func is the function to repeat, interval is the interval (in ms)
    of executing the function.
    """
    temp = Interval(func, interval)
    temp.run()
    idx = int(temp.interval_id)
    del temp
    return idx


def clearInterval(interval_id):
    try:
        INTERVALS[str(interval_id)].__del__()
        del INTERVALS[str(interval_id)]
    except KeyError:
        sys.stderr.write('No such interval "Interval #%s"\n' %interval_id)

def setTimeout(func, timeout):
    """
    Create a JS Timeout: func is the function to timeout, timeout is the timeout (in ms)
    of executing the function.
    """
    temp = Timeout(func, timeout)
    temp.run()
    idx = int(temp.timeout_id)
    del temp
    return idx


def clearTimeout(timeout_id):
    try:
        TIMEOUTS[str(timeout_id)].__del__()
        del TIMEOUTS[str(timeout_id)]
    except KeyError:
        sys.stderr.write('No such timeout "Timeout #%s"\n' %timeout_id)

代码编辑:
修复了内存泄漏(由 @benjaminz 发现)。现在所有线程都在结束时被清理。为什么会发生这种泄漏?这是由于隐式(甚至显式)引用而发生的。就我而言,是TIMEOUTSINTERVALS。超时会自动自清理(在此补丁之后),因为它们使用函数包装器来调用该函数,然后自杀。但这是怎么发生的呢?除非所有引用也被删除或使用了 gc 模块,否则无法从内存中删除对象。解释:无法(在我的代码中)创建对超时/间隔的不需要的引用。他们只有一个引用:TIMEOUTS/INTERVALS 字典。并且,当中断或完成时(只有超时才能不间断地完成),它们删除对自己的唯一现有引用:它们相应的 dict 元素。类使用__all__完美封装,因此没有内存泄漏的空间。

My Python 3 module jsinterval.py will be helpful! Here it is:

"""
Threaded intervals and timeouts from JavaScript
"""

import threading, sys

__all__ =  ['TIMEOUTS', 'INTERVALS', 'setInterval', 'clearInterval', 'setTimeout', 'clearTimeout']

TIMEOUTS  = {}
INTERVALS = {}

last_timeout_id  = 0
last_interval_id = 0

class Timeout:
    """Class for all timeouts."""
    def __init__(self, func, timeout):
        global last_timeout_id
        last_timeout_id += 1
        self.timeout_id = last_timeout_id
        TIMEOUTS[str(self.timeout_id)] = self
        self.func = func
        self.timeout = timeout
        self.threadname = 'Timeout #%s' %self.timeout_id

    def run(self):
        func = self.func
        delx = self.__del__
        def func_wrapper():
            func()
            delx()
        self.t = threading.Timer(self.timeout/1000, func_wrapper)
        self.t.name = self.threadname
        self.t.start()

    def __repr__(self):
        return '<JS Timeout set for %s seconds, launching function %s on timeout reached>' %(self.timeout, repr(self.func))

    def __del__(self):
        self.t.cancel()

class Interval:
    """Class for all intervals."""
    def __init__(self, func, interval):
        global last_interval_id
        self.interval_id = last_interval_id
        INTERVALS[str(self.interval_id)] = self
        last_interval_id += 1
        self.func = func
        self.interval = interval
        self.threadname = 'Interval #%s' %self.interval_id

    def run(self):
        func = self.func
        interval = self.interval
        def func_wrapper():
            timeout = Timeout(func_wrapper, interval)
            self.timeout = timeout
            timeout.run()
            func()
        self.t = threading.Timer(self.interval/1000, func_wrapper)
        self.t.name = self.threadname
        self.t.run()

    def __repr__(self):
        return '<JS Interval, repeating function %s with interval %s>' %(repr(self.func), self.interval)

    def __del__(self):
        self.timeout.__del__()

def setInterval(func, interval):
    """
    Create a JS Interval: func is the function to repeat, interval is the interval (in ms)
    of executing the function.
    """
    temp = Interval(func, interval)
    temp.run()
    idx = int(temp.interval_id)
    del temp
    return idx


def clearInterval(interval_id):
    try:
        INTERVALS[str(interval_id)].__del__()
        del INTERVALS[str(interval_id)]
    except KeyError:
        sys.stderr.write('No such interval "Interval #%s"\n' %interval_id)

def setTimeout(func, timeout):
    """
    Create a JS Timeout: func is the function to timeout, timeout is the timeout (in ms)
    of executing the function.
    """
    temp = Timeout(func, timeout)
    temp.run()
    idx = int(temp.timeout_id)
    del temp
    return idx


def clearTimeout(timeout_id):
    try:
        TIMEOUTS[str(timeout_id)].__del__()
        del TIMEOUTS[str(timeout_id)]
    except KeyError:
        sys.stderr.write('No such timeout "Timeout #%s"\n' %timeout_id)

CODE EDIT:
Fixed the memory leak (spotted by @benjaminz). Now ALL threads are cleaned up upon end. Why does this leak happen? It happens because of the implicit (or even explicit) references. In my case, TIMEOUTS and INTERVALS. Timeouts self-clean automatically (after this patch) because they use function wrapper which calls the function and then self-kills. But how does this happen? Objects can't be deleted from memory unless all references are deleted too or gc module is used. Explaining: there's no way to create (in my code) unwanted references to timeouts/intervals. They have only ONE referrer: the TIMEOUTS/INTERVALS dicts. And, when interrupted or finished (only timeouts can finish uninterrupted) they delete the only existing reference to themselves: their corresponding dict element. Classes are perfectly encapsulated using __all__, so no space for memory leaks.

黯淡〆 2024-09-06 11:18:10

这是一个低时间漂移解决方案,它使用线程定期向事件对象发出信号。线程的 run() 在等待超时时几乎不执行任何操作;因此时间漂移低。

# Example of low drift (time) periodic execution of a function.
import threading
import time

# Thread that sets 'flag' after 'timeout'
class timerThread (threading.Thread):

    def __init__(self , timeout , flag):
        threading.Thread.__init__(self)
        self.timeout = timeout
        self.stopFlag = False
        self.event = threading.Event()
        self.flag = flag

    # Low drift run(); there is only the 'if'
    # and 'set' methods between waits.
    def run(self):
        while not self.event.wait(self.timeout):
            if self.stopFlag:
                break
            self.flag.set()

    def stop(self):
        stopFlag = True
        self.event.set()

# Data.
printCnt = 0

# Flag to print.
printFlag = threading.Event()

# Create and start the timer thread.
printThread = timerThread(3 , printFlag)
printThread.start()

# Loop to wait for flag and print time.
while True:

    global printCnt

    # Wait for flag.
    printFlag.wait()
    # Flag must be manually cleared.
    printFlag.clear()
    print(time.time())
    printCnt += 1
    if printCnt == 3:
        break;

# Stop the thread and exit.
printThread.stop()
printThread.join()
print('Done')

Here is a low time drift solution that uses a thread to periodically signal an Event object. The thread's run() does almost nothing while waiting for a timeout; hence the low time drift.

# Example of low drift (time) periodic execution of a function.
import threading
import time

# Thread that sets 'flag' after 'timeout'
class timerThread (threading.Thread):

    def __init__(self , timeout , flag):
        threading.Thread.__init__(self)
        self.timeout = timeout
        self.stopFlag = False
        self.event = threading.Event()
        self.flag = flag

    # Low drift run(); there is only the 'if'
    # and 'set' methods between waits.
    def run(self):
        while not self.event.wait(self.timeout):
            if self.stopFlag:
                break
            self.flag.set()

    def stop(self):
        stopFlag = True
        self.event.set()

# Data.
printCnt = 0

# Flag to print.
printFlag = threading.Event()

# Create and start the timer thread.
printThread = timerThread(3 , printFlag)
printThread.start()

# Loop to wait for flag and print time.
while True:

    global printCnt

    # Wait for flag.
    printFlag.wait()
    # Flag must be manually cleared.
    printFlag.clear()
    print(time.time())
    printCnt += 1
    if printCnt == 3:
        break;

# Stop the thread and exit.
printThread.stop()
printThread.join()
print('Done')
吝吻 2024-09-06 11:18:10

入睡,直到下一个长度间隔开始:(非并发)

def sleep_until_next_interval(self, seconds):
    now = time.time()
    fall_asleep = seconds - now % seconds
    time.sleep(fall_asleep)

while True:
    sleep_until_next_interval(10) # 10 seconds - worktime
    # work here

简单且无漂移。

fall asleep until the next interval of seconds length starts: (not concurrent)

def sleep_until_next_interval(self, seconds):
    now = time.time()
    fall_asleep = seconds - now % seconds
    time.sleep(fall_asleep)

while True:
    sleep_until_next_interval(10) # 10 seconds - worktime
    # work here

simple and no drift.

她比我温柔 2024-09-06 11:18:10

这是我得到并测试过的明确解决方案。 setTimeout 是由其他人创建的,但我忘记了是谁。

def setInterval(func, time, *args):
    stopped = Event()
    def loop():
        while not stopped.wait((time/1000)):
            func(*args)
    Thread(target=loop).start()    
    return stopped.set

def setTimeout(func, time, *args):
    stopped = Event()
    def loop():
        while not stopped.wait((time/1000)):
            func(*args)
            stopped.set()
    Thread(target=loop).start()    
    return stopped.set

Here's a definite solution I got and tested out which worked. The setTimeout one was created by someone else but I forgot who.

def setInterval(func, time, *args):
    stopped = Event()
    def loop():
        while not stopped.wait((time/1000)):
            func(*args)
    Thread(target=loop).start()    
    return stopped.set

def setTimeout(func, time, *args):
    stopped = Event()
    def loop():
        while not stopped.wait((time/1000)):
            func(*args)
            stopped.set()
    Thread(target=loop).start()    
    return stopped.set
生寂 2024-09-06 11:18:10

导入时间

def my_function():
print("函数已执行!")

Interval = 10 # 时间(以秒为单位)

while True:
我的函数()
时间.睡眠(间隔)

import time

def my_function():
print("Function executed!")

interval = 10 # time in seconds

while True:
my_function()
time.sleep(interval)

狼性发作 2024-09-06 11:18:10

这是一些简单的事情:

import time

delay = 10 # Seconds

def setInterval():
    print('I print in intervals!')
    time.sleep(delay)
    setInterval()

Here's something easy peazy:

import time

delay = 10 # Seconds

def setInterval():
    print('I print in intervals!')
    time.sleep(delay)
    setInterval()
橘亓 2024-09-06 11:18:10

对 Python 没有真正的经验,但为什么我们不能:

from time import sleep
while True:
    functionName()
    sleep(interval)

Not really experienced with Python but why can't we just:

from time import sleep
while True:
    functionName()
    sleep(interval)
長街聽風 2024-09-06 11:18:10

Python 中的情况有所不同:您需要 sleep() (如果您想阻止当前线程)或启动一个新线程。请参阅http://docs.python.org/library/threading.html

Things work differently in Python: you need to either sleep() (if you want to block the current thread) or start a new thread. See http://docs.python.org/library/threading.html

那小子欠揍 2024-09-06 11:18:10

来自 Python 文档

from threading import Timer

def hello():
    print "hello, world"

t = Timer(30.0, hello)
t.start() # after 30 seconds, "hello, world" will be printed

From Python Documentation:

from threading import Timer

def hello():
    print "hello, world"

t = Timer(30.0, hello)
t.start() # after 30 seconds, "hello, world" will be printed
北城孤痞 2024-09-06 11:18:09

这可能是您正在寻找的正确片段:

import threading

def set_interval(func, sec):
    def func_wrapper():
        set_interval(func, sec)
        func()
    t = threading.Timer(sec, func_wrapper)
    t.start()
    return t

This might be the correct snippet you were looking for:

import threading

def set_interval(func, sec):
    def func_wrapper():
        set_interval(func, sec)
        func()
    t = threading.Timer(sec, func_wrapper)
    t.start()
    return t
未央 2024-09-06 11:18:09

这是一个可以启动和停止的版本。
它没有阻塞。
也没有任何故障,因为没有添加执行时间错误(对于例如音频等间隔很短的长时间执行很重要)

import time, threading

StartTime=time.time()

def action() :
    print('action ! -> time : {:.1f}s'.format(time.time()-StartTime))


class setInterval :
    def __init__(self,interval,action) :
        self.interval=interval
        self.action=action
        self.stopEvent=threading.Event()
        thread=threading.Thread(target=self.__setInterval)
        thread.start()

    def __setInterval(self) :
        nextTime=time.time()+self.interval
        while not self.stopEvent.wait(nextTime-time.time()) :
            nextTime+=self.interval
            self.action()

    def cancel(self) :
        self.stopEvent.set()

# start action every 0.6s
inter=setInterval(0.6,action)
print('just after setInterval -> time : {:.1f}s'.format(time.time()-StartTime))

# will stop interval in 5s
t=threading.Timer(5,inter.cancel)
t.start()

输出是:

just after setInterval -> time : 0.0s
action ! -> time : 0.6s
action ! -> time : 1.2s
action ! -> time : 1.8s
action ! -> time : 2.4s
action ! -> time : 3.0s
action ! -> time : 3.6s
action ! -> time : 4.2s
action ! -> time : 4.8s

This is a version where you could start and stop.
It is not blocking.
There is also no glitch as execution time error is not added (important for long time execution with very short interval as audio for example)

import time, threading

StartTime=time.time()

def action() :
    print('action ! -> time : {:.1f}s'.format(time.time()-StartTime))


class setInterval :
    def __init__(self,interval,action) :
        self.interval=interval
        self.action=action
        self.stopEvent=threading.Event()
        thread=threading.Thread(target=self.__setInterval)
        thread.start()

    def __setInterval(self) :
        nextTime=time.time()+self.interval
        while not self.stopEvent.wait(nextTime-time.time()) :
            nextTime+=self.interval
            self.action()

    def cancel(self) :
        self.stopEvent.set()

# start action every 0.6s
inter=setInterval(0.6,action)
print('just after setInterval -> time : {:.1f}s'.format(time.time()-StartTime))

# will stop interval in 5s
t=threading.Timer(5,inter.cancel)
t.start()

Output is :

just after setInterval -> time : 0.0s
action ! -> time : 0.6s
action ! -> time : 1.2s
action ! -> time : 1.8s
action ! -> time : 2.4s
action ! -> time : 3.0s
action ! -> time : 3.6s
action ! -> time : 4.2s
action ! -> time : 4.8s
欢烬 2024-09-06 11:18:09

只要保持美观和简单即可。

import threading

def setInterval(func,time):
    e = threading.Event()
    while not e.wait(time):
        func()

def foo():
    print "hello"

# using
setInterval(foo,5)

# output:
hello
hello
.
.
.

编辑:此代码是非阻塞的

import threading

class ThreadJob(threading.Thread):
    def __init__(self,callback,event,interval):
        '''runs the callback function after interval seconds

        :param callback:  callback function to invoke
        :param event: external event for controlling the update operation
        :param interval: time in seconds after which are required to fire the callback
        :type callback: function
        :type interval: int
        '''
        self.callback = callback
        self.event = event
        self.interval = interval
        super(ThreadJob,self).__init__()

    def run(self):
        while not self.event.wait(self.interval):
            self.callback()



event = threading.Event()

def foo():
    print "hello"

k = ThreadJob(foo,event,2)
k.start()

print "It is non-blocking"

Just keep it nice and simple.

import threading

def setInterval(func,time):
    e = threading.Event()
    while not e.wait(time):
        func()

def foo():
    print "hello"

# using
setInterval(foo,5)

# output:
hello
hello
.
.
.

EDIT : This code is non-blocking

import threading

class ThreadJob(threading.Thread):
    def __init__(self,callback,event,interval):
        '''runs the callback function after interval seconds

        :param callback:  callback function to invoke
        :param event: external event for controlling the update operation
        :param interval: time in seconds after which are required to fire the callback
        :type callback: function
        :type interval: int
        '''
        self.callback = callback
        self.event = event
        self.interval = interval
        super(ThreadJob,self).__init__()

    def run(self):
        while not self.event.wait(self.interval):
            self.callback()



event = threading.Event()

def foo():
    print "hello"

k = ThreadJob(foo,event,2)
k.start()

print "It is non-blocking"
为你鎻心 2024-09-06 11:18:09

稍微改变 Nailxx 的答案,你就得到了答案!

from threading import Timer

def hello():
    print "hello, world"
    Timer(30.0, hello).start()

Timer(30.0, hello).start() # after 30 seconds, "hello, world" will be printed

Change Nailxx's answer a bit and you got the answer!

from threading import Timer

def hello():
    print "hello, world"
    Timer(30.0, hello).start()

Timer(30.0, hello).start() # after 30 seconds, "hello, world" will be printed
仅此而已 2024-09-06 11:18:09

sched 模块为一般 Python 代码提供这些功能。但是,正如其文档所建议的,如果您的代码是多线程的,则使用 threading.Timer 类可能更有意义。

The sched module provides these abilities for general Python code. However, as its documentation suggests, if your code is multithreaded it might make more sense to use the threading.Timer class instead.

可爱咩 2024-09-06 11:18:09

我认为这就是您所追求的:

#timertest.py
import sched, time
def dostuff():
  print "stuff is being done!"
  s.enter(3, 1, dostuff, ())

s = sched.scheduler(time.time, time.sleep)
s.enter(3, 1, dostuff, ())
s.run()

如果您在重复方法末尾添加另一个条目到调度程序,它就会继续进行。

I think this is what you're after:

#timertest.py
import sched, time
def dostuff():
  print "stuff is being done!"
  s.enter(3, 1, dostuff, ())

s = sched.scheduler(time.time, time.sleep)
s.enter(3, 1, dostuff, ())
s.run()

If you add another entry to the scheduler at the end of the repeating method, it'll just keep going.

懒猫 2024-09-06 11:18:09

我使用 sched 创建 setInterval函数 要点

import functools
import sched, time

s = sched.scheduler(time.time, time.sleep)

def setInterval(sec):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*argv, **kw):
            setInterval(sec)(func)
            func(*argv, **kw)
        s.enter(sec, 1, wrapper, ())
        return wrapper
    s.run()
    return decorator


@setInterval(sec=3)
def testInterval():
  print ("test Interval ")

testInterval()

I use sched to create setInterval function gist

import functools
import sched, time

s = sched.scheduler(time.time, time.sleep)

def setInterval(sec):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*argv, **kw):
            setInterval(sec)(func)
            func(*argv, **kw)
        s.enter(sec, 1, wrapper, ())
        return wrapper
    s.run()
    return decorator


@setInterval(sec=3)
def testInterval():
  print ("test Interval ")

testInterval()
橘寄 2024-09-06 11:18:09

简单的 setInterval 实用程序

from threading import Timer

def setInterval(timer, task):
    isStop = task()
    if not isStop:
        Timer(timer, setInterval, [timer, task]).start()

def hello():
    print "do something"
    return False # return True if you want to stop

if __name__ == "__main__":
    setInterval(2.0, hello) # every 2 seconds, "do something" will be printed

Simple setInterval utils

from threading import Timer

def setInterval(timer, task):
    isStop = task()
    if not isStop:
        Timer(timer, setInterval, [timer, task]).start()

def hello():
    print "do something"
    return False # return True if you want to stop

if __name__ == "__main__":
    setInterval(2.0, hello) # every 2 seconds, "do something" will be printed
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文