任务调度模块 APScheduler 总结

发布于 2024-03-04 06:35:33 字数 8525 浏览 33 评论 0

首先学习该模块之前,需要了解进程和操作系统的相关知识。

什么是任务调度?

任务调度是指基于给定的时间点,给定的时间间隔或者给定的执行次数自动的执行任务。任务调度是操作系统的重要组成部分,而对于实时的操作系统,任务调度直接影响着操作系统的实时性能。任务调度涉及到多线程并发、运行时间规则定制及解析、线程池的维护等诸多方面的工作。

APScheduler 介绍

APScheduler 是一个 Python 定时任务框架。提供了基于日期、固定时间间隔以及 crontab 类型的任务。目前最新版本为 3.6.0。APScheduler 模块的官方文档,请点击 这里

基本概念

APScheduler 模块包含了四个组件:

  • 触发器(trigger)
  • 作业存储(job store)
  • 执行器(executer)
  • 调度器(scheduler)

触发器包含了调度逻辑。每个任务都有它自己的触发器,触发器确定了任务下次什么时候该执行。除了他们的初始化配置,触发器完全是无状态的。

作业存储存放了预定任务。默认的作业存储只是将作业保存在内存中,但是其他作业存储可以存储在各种不同类型的数据库(sqlite 或者 MongoDB)中。作业的数据保存到持久性作业存储时会被序列化,并在从其中加载时进行反序列化。作业存储(默认存储除外)不会将作业数据保留在内存中,而是充当中间人,用于在后端保存、更新和搜索作业。绝对不能在调度器之间共享作业存储。

执行器是处理任务的运行。它们通常通过将作业中指定的可调用对象交给线程或进程池来完成此操作。当任务完成后,这个执行器会通知调度器,调度器随后发出相应的事件。

调度器将其余部分绑定在一起。你通常只在应用程序中调用一个调度程序(器)。应用程序开发人员通常不直接处理作业存储,执行器或触发器。相反的,调度程序提供了适当的接口来处理所有这些。配置作业存储和执行器是通过调度程序(器) 完成的、添加、修改和删除作业也是如此。

安装

pip3 install apscheduler

调度器

APScheduler 模块有多个不同的调度器。如何选择调度器取决于你程序运行环境和你使用 APScheduler 的目的。

  • BlockingScheduler:阻塞调度。当你的程序只运行这个调度器时可以使用。
  • BackgroundScheduler:后台调度。你的应用程序不止运行调度器,且想它在应用程序中后台运行时,可以使用该调度器。
  • AsyncIOScheduler:如果 你应用的程序使用了 asyncio 模块,需要使用此调度器。
  • GeventScheduler:如果你的应用程序使用了 gevent 技术,需要使用此调度器。
  • TornadoScheduler:可以用于开发 Tornado 应用程序。
  • TwistedScheduler:可以用于开发 Twisted 应用程序。
  • QtScheduler:可以用于开发 Qt 应用程序。

作业存储

要选择适当的作业存储,需要确定是否需要作业持久性。如果您总是在应用程序启动时重新创建作业,那么您可以使用默认作业存储(MemoryJobStore)。但是,如果您需要将作业保留在调度程序重新启动或应用程序崩溃之上,那么您的选择通常可以归结为编程环境中使用的工具。 如果你可以自由选择,那么由于其强大的数据完整性保护,PostgreSQL 后端上的 SQLAlchemy JobStore 是比较推荐的选择。

执行器

默认的执行器 ThreadPoolExecutor 应该足以满足大多数用途,最大支持 10 个线程数。如果你的工作负载涉及 CPU 密集型操作,则应考虑使用 ProcessPoolExecutor 来使用多个 CPU 核心。甚至你可以同时使用两者,将进程池执行程序添加为辅助执行程序。

触发器

APScheduler 的触发器总共有三种触发类型。

  1. date:当你想要在确定的时间运行一次作业(任务),可以使用。
  2. interval:当你想要运行在固定的间隔时间内运行作业(任务)。
  3. cron:当你想要周期性的运行该任务。例如每天,每周或者每月,可以使用 cron 类型

配置和使用

配置调度器

使用默认的 MemoryJobStore 和 ThreadPoolExecutor。先创建调度器,在配置和添加作业。

from apscheduler.schedulers.blocking import BlockingScheduler
def my_job():
  print('hello world')
# 初始化一个 Blocking 类型的调度器
sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5, id='my_job_id')
sched.start()

不使用默认的作业存储和执行器。

from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
  'mongo': MongoDBJobStore(),
  'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
  'default': ThreadPoolExecutor(20),
  'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
  'coalesce': False,
  'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

操作作业

运行调度器,通过调用 start() 方法运行。

sched.start()

添加作业,通过 add_job() 方法或者使用 scheduler_job() 装饰器进行添加。

from apscheduler.schedulers.background import BackgroundScheduler
# 装饰器的方法添加作业
scheduler=BackgroundScheduler()
@scheduler.scheduled_job('date',run_date=datetime(2019, 6, 18, 16, 40, 30))
def test():
	print('Test')

暂停和恢复作业

#暂停
apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()
#恢复
apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()

移除作业。通过 remove() 或者 remove_job() 方法

# remove() 方法
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
# remove_job() 方法
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

获取作业(job) 列表。通过 get_jobs() 返回所有 job 实例或者 print_jobs() 输出格式化的作业列表

scheduler.get_jobs()
scheduler.print_jobs()

修改作业。通过 modify() 和 modify_job() 方法。可以修改除 id 外,job 的任何属性。

job.modify(max_instances=6, name='Alternate name')

重新 rescheduler 作业。可以使用 reschedule() 或者 reschedule_job() 方法。需要重新组件一个触发器,并且重新计算下一次触发时间。

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

关闭调度。使用 shutdown() 方法关闭,不想等待,可以将 wait 设置为 False

scheduler.shutdown(wait=False)

触发器参数

date 定时,作业只执行一次。

  • run_date (datetime|str) – the date/time to run the job at
  • timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
# 2019 年 7 月 6 号 16 时 30 分 5s 执行
sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])

interval 间隔调度

  • weeks (int) – number of weeks to wait
  • days (int) – number of days to wait
  • hours (int) – number of hours to wait
  • minutes (int) – number of minutes to wait
  • seconds (int) – number of seconds to wait
  • start_date (datetime|str) – starting point for the interval calculation
  • end_date (datetime|str) – latest possible date/time to trigger on
  • timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
scheduler.add_job(job_function, 'interval', hours=2)

cron 调度

  • year (int|str) – 4-digit year
  • month (int|str) – month (1-12)
  • day (int|str) – day of the (1-31)
  • week (int|str) – ISO week (1-53)
  • day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
  • hour (int|str) – hour (0-23)
  • minute (int|str) – minute (0-59)
  • second (int|str) – second (0-59)
  • start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
  • end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
  • timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
# 6-8,11-12 月第三个周五 00:00, 01:00, 02:00, 03:00 运行
scheduler.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 每周一到周五运行 直到 2024-05-30 00:00:00
scheduler.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'

代码实例

实例一

import time
import os
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler

def tick():
  """
  """
  print('Tick! The time is: %s' % datetime.now())

# 非阻塞,cron 类型触发器
scheduler = BackgroundScheduler()
scheduler.add_job(tick, 'cron', day_of_week='mon-fri', hour=9, minute=30)

if __name__ == "__main__":
  # 运行任务调度
  scheduler.start()
  print(scheduler.get_jobs())
  print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
  try:
    while True:
      time.sleep(5)
      print('sleep!')
  except (KeyboardInterrupt,SystemExit):
    # 关闭调度
    scheduler.shutdown(wait=False)
    print('Exit The Job!')

解决模块使用问题

查看开发者经常提到的问题

请点击 这里 ,查看问题回答。

通过查看日志

如果你的调度程序没有按预期执行,可以按如下设置查看打印日志

import logging
logging.basicConfig()
logging.getLogger('apscheduler').setlevel(logging.DEBUG)

提交模块 bug

如果你发现 APScheduler 模块有 bug,可以通过 Github 方式提交 问题

获取帮助

在 Gitter 的 apscheduler room 中提问。

StackOverflow 中提问并且打上 apscheduler 的 tag 标签。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

谁与争疯

暂无简介

0 文章
0 评论
23 人气
更多

推荐作者

内心激荡

文章 0 评论 0

JSmiles

文章 0 评论 0

左秋

文章 0 评论 0

迪街小绵羊

文章 0 评论 0

瞳孔里扚悲伤

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文