在 Python 3 中安排重复事件

发布于 2024-08-24 09:17:02 字数 446 浏览 3 评论 0原文

我正在尝试安排一个重复事件在 Python 3 中每分钟运行一次。

我已经看到了 sched.scheduler 类,但我想知道是否还有其他方法可以做到这一点。我听说我可以为此使用多个线程,我不介意这样做。

我基本上是请求一些 JSON,然后解析它;它的价值随着时间的推移而变化。

要使用 sched.scheduler,我必须创建一个循环来请求它安排事件运行一小时:

scheduler = sched.scheduler(time.time, time.sleep)

# Schedule the event. THIS IS UGLY!
for i in range(60):
    scheduler.enter(3600 * i, 1, query_rate_limit, ())

scheduler.run()

还有哪些其他方法可以做到这一点?

I'm trying to schedule a repeating event to run every minute in Python 3.

I've seen class sched.scheduler but I'm wondering if there's another way to do it. I've heard mentions I could use multiple threads for this, which I wouldn't mind doing.

I'm basically requesting some JSON and then parsing it; its value changes over time.

To use sched.scheduler I have to create a loop to request it to schedule the even to run for one hour:

scheduler = sched.scheduler(time.time, time.sleep)

# Schedule the event. THIS IS UGLY!
for i in range(60):
    scheduler.enter(3600 * i, 1, query_rate_limit, ())

scheduler.run()

What other ways to do this are there?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(14

囚你心 2024-08-31 09:17:03

采用原始的 threading.Timer() 类实现并修复 run() 方法,我得到类似以下内容:

class PeriodicTimer(Thread):
    """A periodic timer that runs indefinitely until cancel() is called."""
    def __init__(self, interval, function, args=None, kwargs=None):
        Thread.__init__(self)
        self.interval = interval
        self.function = function
        self.args = args if args is not None else []
        self.kwargs = kwargs if kwargs is not None else {}
        self.finished = Event()

    def cancel(self):
        """Stop the timer if it hasn't finished yet."""
        self.finished.set()

    def run(self):
        """Run until canceled"""
        while not self.finished.wait(self.interval):
            self.function(*self.args, **self.kwargs)

调用的 wait() 方法是使用条件变量,所以它应该相当有效。

Taking the original threading.Timer() class implementation and fixing the run() method I get something like:

class PeriodicTimer(Thread):
    """A periodic timer that runs indefinitely until cancel() is called."""
    def __init__(self, interval, function, args=None, kwargs=None):
        Thread.__init__(self)
        self.interval = interval
        self.function = function
        self.args = args if args is not None else []
        self.kwargs = kwargs if kwargs is not None else {}
        self.finished = Event()

    def cancel(self):
        """Stop the timer if it hasn't finished yet."""
        self.finished.set()

    def run(self):
        """Run until canceled"""
        while not self.finished.wait(self.interval):
            self.function(*self.args, **self.kwargs)

The wait() method called is using a condition variable, so it should be rather efficient.

无畏 2024-08-31 09:17:03

不久前我遇到了类似的问题,所以我制作了一个 python 模块 event-scheduler 来解决这。它具有与 sched 库非常相似的 API,但有一些差异:

  1. 它使用后台线程,并且始终能够在后台接受和运行作业,直到调度程序显式停止(不需要 while 循环)。
  2. 它附带一个 API,可以按照用户指定的时间间隔安排重复事件,直到明确取消为止。

可以通过pip install event-scheduler安装

from event_scheduler import EventScheduler

event_scheduler = EventScheduler()
event_scheduler.start()
# Schedule the recurring event to print "hello world" every 60 seconds with priority 1
# You can use the event_id to cancel the recurring event later
event_id = event_scheduler.enter_recurring(60, 1, print, ("hello world",))

I ran into a similar issue a while back so I made a python module event-scheduler to address this. It has a very similar API to the sched library with a few differences:

  1. It utilizes a background thread and is always able to accept and run jobs in the background until the scheduler is stopped explicitly (no need for a while loop).
  2. It comes with an API to schedule recurring events at a user specified interval until explicitly cancelled.

It can be installed by pip install event-scheduler

from event_scheduler import EventScheduler

event_scheduler = EventScheduler()
event_scheduler.start()
# Schedule the recurring event to print "hello world" every 60 seconds with priority 1
# You can use the event_id to cancel the recurring event later
event_id = event_scheduler.enter_recurring(60, 1, print, ("hello world",))
凉栀 2024-08-31 09:17:03

我从 JavaScript 中找到了 setTimeout 和 setInterval 的模仿,但是是在 Python 中。

使用此类,您可以:

  1. 使用间隔 ID 设置超时(通过变量,不是必需的)。
  2. 以与超时相同的方式设置间隔。
  3. 使用 ID 设置间隔,然后设置超时,该超时将在给定时间内取消该间隔。
  4. 用一个ID设置超时,用另一个setTimeout取消超时。

我知道它看起来很复杂,但我需要它拥有的所有功能,就像 JavaScript 中一样。请注意,您需要预先创建一个函数并使用函数名称作为参数(请参阅代码)。

#If I'm not mistaken, I believe the only modules you need are time and threading. The rest were for something I'm working on.
import time
import math
import pygame
import threading
import ctypes
from sys import exit

#Functions
class Timer:
    def __init__(self):
        self.timers = {}
        self.timer_id = 0
    
    def setInterval(self, fn, time, *args):
        def interval_callback():
            fn(*args)
            if timer_id in self.timers:
                self.timers[timer_id] = threading.Timer(time/1000, interval_callback)
                self.timers[timer_id].start()

        timer_id = self.timer_id
        self.timer_id += 1
        self.timers[timer_id] = threading.Timer(time/1000, interval_callback)
        self.timers[timer_id].start()
        return timer_id

    def clearInterval(self, timer_id):
        if timer_id in self.timers:
            self.timers[timer_id].cancel()
            del self.timers[timer_id]

    def setTimeout(self, fn, delay, *args, **kwargs):
        def timer_callback():
            self.timers.pop(timer_id, None)
            fn(*args, **kwargs)
        
        timer_id = self.timer_id
        self.timer_id += 1
        t = threading.Timer(delay / 1000, timer_callback)
        self.timers[timer_id] = t
        t.start()
        return timer_id
    
    def clearTimeout(self, timer_id):
        t = self.timers.pop(timer_id, None)
        if t is not None:
            t.cancel()
t = Timer()

#Usage Example
lall = t.setInterval(print, 1000, "hi")
t.setTimeout(t.clearInterval, 3000, lall)

希望这有帮助。

I figured out a mimic for setTimeout and setInterval from JavaScript, but in Python.

With this class you can:

  1. Set a timeout with an interval ID (through variable, not required).
  2. Set an interval in the same wise as a timeout.
  3. Set an interval with an ID, and then set a timeout that will cancel the interval in a given amount of time.
  4. Set a timeout with an ID and cancel the timeout with another setTimeout.

I know it seems complex but I needed all the functionality it had just like in JavaScript. Note that you will need to pre-create a function and use the function name as a parameter (see code).

#If I'm not mistaken, I believe the only modules you need are time and threading. The rest were for something I'm working on.
import time
import math
import pygame
import threading
import ctypes
from sys import exit

#Functions
class Timer:
    def __init__(self):
        self.timers = {}
        self.timer_id = 0
    
    def setInterval(self, fn, time, *args):
        def interval_callback():
            fn(*args)
            if timer_id in self.timers:
                self.timers[timer_id] = threading.Timer(time/1000, interval_callback)
                self.timers[timer_id].start()

        timer_id = self.timer_id
        self.timer_id += 1
        self.timers[timer_id] = threading.Timer(time/1000, interval_callback)
        self.timers[timer_id].start()
        return timer_id

    def clearInterval(self, timer_id):
        if timer_id in self.timers:
            self.timers[timer_id].cancel()
            del self.timers[timer_id]

    def setTimeout(self, fn, delay, *args, **kwargs):
        def timer_callback():
            self.timers.pop(timer_id, None)
            fn(*args, **kwargs)
        
        timer_id = self.timer_id
        self.timer_id += 1
        t = threading.Timer(delay / 1000, timer_callback)
        self.timers[timer_id] = t
        t.start()
        return timer_id
    
    def clearTimeout(self, timer_id):
        t = self.timers.pop(timer_id, None)
        if t is not None:
            t.cancel()
t = Timer()

#Usage Example
lall = t.setInterval(print, 1000, "hi")
t.setTimeout(t.clearInterval, 3000, lall)

Hope this helps.

并安 2024-08-31 09:17:02

您可以使用 threading.Timer ,但这也会调度一次性事件,类似于调度程序对象的 .enter 方法。

将一次性调度程序转换为周期性调度程序的正常模式(在任何语言中)是让每个事件以指定的时间间隔重新调度自身。例如,对于 sched,我不会像您一样使用循环,而是使用类似的东西:

def periodic(scheduler, interval, action, actionargs=()):
    scheduler.enter(interval, 1, periodic,
                    (scheduler, interval, action, actionargs))
    action(*actionargs)

并通过调用启动整个“永久定期计划”

periodic(scheduler, 3600, query_rate_limit)

或者,我可以使用 threading.Timer 而不是 scheduler.enter,但模式非常相似。

如果您需要更精细的变化(例如,在给定时间或在某些条件下停止定期重新安排),那么使用一些额外参数并不难。

You could use threading.Timer, but that also schedules a one-off event, similarly to the .enter method of scheduler objects.

The normal pattern (in any language) to transform a one-off scheduler into a periodic scheduler is to have each event re-schedule itself at the specified interval. For example, with sched, I would not use a loop like you're doing, but rather something like:

def periodic(scheduler, interval, action, actionargs=()):
    scheduler.enter(interval, 1, periodic,
                    (scheduler, interval, action, actionargs))
    action(*actionargs)

and initiate the whole "forever periodic schedule" with a call

periodic(scheduler, 3600, query_rate_limit)

Or, I could use threading.Timer instead of scheduler.enter, but the pattern's quite similar.

If you need a more refined variation (e.g., stop the periodic rescheduling at a given time or upon certain conditions), that's not too hard to accomodate with a few extra parameters.

国产ˉ祖宗 2024-08-31 09:17:02

您可以使用时间表。它适用于 Python 2.7 和 3.3,并且相当轻量级:

import schedule
import time

def job():
   print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)

while 1:
   schedule.run_pending()
   time.sleep(1)

You could use schedule. It works on Python 2.7 and 3.3 and is rather lightweight:

import schedule
import time

def job():
   print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)

while 1:
   schedule.run_pending()
   time.sleep(1)
抱猫软卧 2024-08-31 09:17:02

我对这个主题的粗浅看法:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.function   = function
        self.interval   = interval
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

用法:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

功能:

  • 仅标准库,无外部依赖项
  • 使用 Alex Martnelli 建议的模式
  • start()stop() 可以安全调用多次即使计时器已经启动/停止
  • 要调用的函数可以有位置和命名参数
  • 您可以随时更改间隔,它将在下次运行后生效。对于 argskwargs 甚至 function 也是如此!

My humble take on the subject:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.function   = function
        self.interval   = interval
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Usage:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Features:

  • Standard library only, no external dependencies
  • Uses the pattern suggested by Alex Martnelli
  • start() and stop() are safe to call multiple times even if the timer has already started/stopped
  • function to be called can have positional and named arguments
  • You can change interval anytime, it will be effective after next run. Same for args, kwargs and even function!
总攻大人 2024-08-31 09:17:02

基于MestreLion的回答,它解决了多线程的一个小问题:

from threading import Timer, Lock

class Periodic(object):
    """
    A periodic task running in threading.Timers
    """

    def __init__(self, interval, function, *args, **kwargs):
        self._lock = Lock()
        self._timer = None
        self.function = function
        self.interval = interval
        self.args = args
        self.kwargs = kwargs
        self._stopped = True
        if kwargs.pop('autostart', True):
            self.start()

    def start(self, from_run=False):
        self._lock.acquire()
        if from_run or self._stopped:
            self._stopped = False
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
        self._lock.release()

    def _run(self):
        self.start(from_run=True)
        self.function(*self.args, **self.kwargs)

    def stop(self):
        self._lock.acquire()
        self._stopped = True
        self._timer.cancel()
        self._lock.release()

Based on MestreLion answer, it solve a little problem with multithreading:

from threading import Timer, Lock

class Periodic(object):
    """
    A periodic task running in threading.Timers
    """

    def __init__(self, interval, function, *args, **kwargs):
        self._lock = Lock()
        self._timer = None
        self.function = function
        self.interval = interval
        self.args = args
        self.kwargs = kwargs
        self._stopped = True
        if kwargs.pop('autostart', True):
            self.start()

    def start(self, from_run=False):
        self._lock.acquire()
        if from_run or self._stopped:
            self._stopped = False
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
        self._lock.release()

    def _run(self):
        self.start(from_run=True)
        self.function(*self.args, **self.kwargs)

    def stop(self):
        self._lock.acquire()
        self._stopped = True
        self._timer.cancel()
        self._lock.release()
最终幸福 2024-08-31 09:17:02

您可以使用高级 Python 调度程序。它甚至有一个类似 cron 的界面。

You could use the Advanced Python Scheduler. It even has a cron-like interface.

肤浅与狂妄 2024-08-31 09:17:02

使用 Celery

from celery.task import PeriodicTask
from datetime import timedelta


class ProcessClicksTask(PeriodicTask):
    run_every = timedelta(minutes=30)

    def run(self, **kwargs):
        #do something

Use Celery.

from celery.task import PeriodicTask
from datetime import timedelta


class ProcessClicksTask(PeriodicTask):
    run_every = timedelta(minutes=30)

    def run(self, **kwargs):
        #do something
末骤雨初歇 2024-08-31 09:17:02

根据 Alex Martelli 的回答,我实现了更容易集成的装饰器版本。

import sched
import time
import datetime
from functools import wraps
from threading import Thread


def async(func):
    @wraps(func)
    def async_func(*args, **kwargs):
        func_hl = Thread(target=func, args=args, kwargs=kwargs)
        func_hl.start()
        return func_hl
    return async_func


def schedule(interval):
    def decorator(func):
        def periodic(scheduler, interval, action, actionargs=()):
            scheduler.enter(interval, 1, periodic,
                            (scheduler, interval, action, actionargs))
            action(*actionargs)

        @wraps(func)
        def wrap(*args, **kwargs):
            scheduler = sched.scheduler(time.time, time.sleep)
            periodic(scheduler, interval, func)
            scheduler.run()
        return wrap
    return decorator


@async
@schedule(1)
def periodic_event():
    print(datetime.datetime.now())


if __name__ == '__main__':
    print('start')
    periodic_event()
    print('end')

Based on Alex Martelli's answer, I have implemented decorator version which is more easier to integrated.

import sched
import time
import datetime
from functools import wraps
from threading import Thread


def async(func):
    @wraps(func)
    def async_func(*args, **kwargs):
        func_hl = Thread(target=func, args=args, kwargs=kwargs)
        func_hl.start()
        return func_hl
    return async_func


def schedule(interval):
    def decorator(func):
        def periodic(scheduler, interval, action, actionargs=()):
            scheduler.enter(interval, 1, periodic,
                            (scheduler, interval, action, actionargs))
            action(*actionargs)

        @wraps(func)
        def wrap(*args, **kwargs):
            scheduler = sched.scheduler(time.time, time.sleep)
            periodic(scheduler, interval, func)
            scheduler.run()
        return wrap
    return decorator


@async
@schedule(1)
def periodic_event():
    print(datetime.datetime.now())


if __name__ == '__main__':
    print('start')
    periodic_event()
    print('end')
姐不稀罕 2024-08-31 09:17:02

文档:高级 Python 调度程序

@sched.cron_schedule(day='last sun')
def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")

可用字段:

| Field       | Description                                                    |
|-------------|----------------------------------------------------------------|
| year        | 4-digit year number                                            |
| month       | month number (1-12)                                            |
| day         | day of the month (1-31)                                        |
| week        | ISO week number (1-53)                                         |
| day_of_week | number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) |
| hour        | hour (0-23)                                                    |
| minute      | minute (0-59)                                                  |
| second      | second (0-59)                                                  |

Doc: Advanced Python Scheduler

@sched.cron_schedule(day='last sun')
def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")

Available fields:

| Field       | Description                                                    |
|-------------|----------------------------------------------------------------|
| year        | 4-digit year number                                            |
| month       | month number (1-12)                                            |
| day         | day of the month (1-31)                                        |
| week        | ISO week number (1-53)                                         |
| day_of_week | number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) |
| hour        | hour (0-23)                                                    |
| minute      | minute (0-59)                                                  |
| second      | second (0-59)                                                  |
沒落の蓅哖 2024-08-31 09:17:02

这是一个使用 Thread 的快速而肮脏的非阻塞循环:

#!/usr/bin/env python3
import threading,time

def worker():
    print(time.time())
    time.sleep(5)
    t = threading.Thread(target=worker)
    t.start()


threads = []
t = threading.Thread(target=worker)
threads.append(t)
t.start()
time.sleep(7)
print("Hello World")

没有什么特别的,worker 会延迟创建一个自己的新线程。可能不是最有效的,但足够简单。如果您需要更复杂的解决方案,northtree 的答案将是您的最佳选择。

基于这个,我们可以做同样的事情,只需使用Timer

#!/usr/bin/env python3
import threading,time

def hello():
    t = threading.Timer(10.0, hello)
    t.start()
    print( "hello, world",time.time() )

t = threading.Timer(10.0, hello)
t.start()
time.sleep(12)
print("Oh,hai",time.time())
time.sleep(4)
print("How's it going?",time.time())

Here's a quick and dirty non-blocking loop with Thread:

#!/usr/bin/env python3
import threading,time

def worker():
    print(time.time())
    time.sleep(5)
    t = threading.Thread(target=worker)
    t.start()


threads = []
t = threading.Thread(target=worker)
threads.append(t)
t.start()
time.sleep(7)
print("Hello World")

There's nothing particularly special, the worker creates a new thread of itself with a delay. Might not be most efficient, but simple enough. northtree's answer would be the way to go if you need more sophisticated solution.

And based on this, we can do the same, just with Timer:

#!/usr/bin/env python3
import threading,time

def hello():
    t = threading.Timer(10.0, hello)
    t.start()
    print( "hello, world",time.time() )

t = threading.Timer(10.0, hello)
t.start()
time.sleep(12)
print("Oh,hai",time.time())
time.sleep(4)
print("How's it going?",time.time())
听闻余生 2024-08-31 09:17:02

看我的样本

import sched, time

def myTask(m,n):
  print n+' '+m

def periodic_queue(interval,func,args=(),priority=1):
  s = sched.scheduler(time.time, time.sleep)
  periodic_task(s,interval,func,args,priority)
  s.run()

def periodic_task(scheduler,interval,func,args,priority):
  func(*args)
  scheduler.enter(interval,priority,periodic_task,
                   (scheduler,interval,func,args,priority))

periodic_queue(1,myTask,('world','hello'))

See my sample

import sched, time

def myTask(m,n):
  print n+' '+m

def periodic_queue(interval,func,args=(),priority=1):
  s = sched.scheduler(time.time, time.sleep)
  periodic_task(s,interval,func,args,priority)
  s.run()

def periodic_task(scheduler,interval,func,args,priority):
  func(*args)
  scheduler.enter(interval,priority,periodic_task,
                   (scheduler,interval,func,args,priority))

periodic_queue(1,myTask,('world','hello'))
安静 2024-08-31 09:17:02

有一个新包,名为 ischedule。对于这种情况,解决方案可能如下:

from ischedule import schedule, run_loop
from datetime import timedelta


def query_rate_limit():
    print("query_rate_limit")

schedule(query_rate_limit, interval=60)
run_loop(return_after=timedelta(hours=1))

一切都在主线程上运行,并且 run_loop 内没有忙等待。启动时间非常精确,通常在指定时间的几分之一毫秒内。

There is a new package, called ischedule. For this case, the solution could be as following:

from ischedule import schedule, run_loop
from datetime import timedelta


def query_rate_limit():
    print("query_rate_limit")

schedule(query_rate_limit, interval=60)
run_loop(return_after=timedelta(hours=1))

Everything runs on the main thread and there is no busy waiting inside the run_loop. The startup time is very precise, usually within a fraction of a millisecond of the specified time.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文