Celery 在 Django 中的使用方法
什么是 Celery
Celery 是一个异步/定时任务调度系统。
在开始前先简要说明一下步骤便有个整体概念:
- apt 安装 RabbitMQ,安装完成后即启动服务了
- pip 安装 Django,Celery,Django-Celery
- 配置 settings.py 以添加 djcelery 应用和设置 BROKER 和调度方式等
- migrate 生成数据库后台管理界面
- 添加 celery.py 以添加 Celery 的 app 实例
- 让init.py 加载 app 实例
- 应用目录里添加 tasks.py,定义任务函数并前缀 @shared_task 修饰器
- 运行 Celery 服务 ./manage.py celery worker –beat -l info
- 后台添加定时任务,测试异步和定时任务
本文基于 Django1.9X
、 Django-Celery3.1.17
、 Celery3.1.24
测试成功
选择一个 Broker
Celery 需要一个消息调度系统来处理消息响应及任务调度,我们称之为 Broker。
Celery 默认使用 RabbitMQ,因为使用 RabbitMQ 不需要再安装其它扩展或初始化配置,我们就用它了。
首先安装它:
$ sudo apt-get install rabbitmq-server
安装完就已经启动服务,可以通过下面命令查看服务状态
$ systemctl status rabbitmq-server
也可以通过下面命令查看 rabbitmq 服务信息
$ sudo rabbitmqctl status
然后面后面 settings.py
里指定 Broker 信息
使用 Django-celery 管理在 Django 中后台管理任务
这里我们用到`django-celery`来管理任务,在虚拟环境里
```bash
(env)$ pip install django-celery
在项目的 proj/proj/settings.py
后面添加:
# for Django Celery
INSTALLED_APPS += ("djcelery", )
import djcelery
djcelery.setup_loader()
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] # Celery 接受的内容可依实际而定,如只选择'json'
在 Django 中使用 Celery
之前版本的 Celery 还需要独立的库文件来支持才能在 Django 正常工作,但 3.1 版后就不再需要了,所以你可以以这种更简单的方式使用 Celery。
为了在 Django 的项目里使用 Celery,我们必须先定义一个 Celery 库的实例(这里叫做“app”)
如果你的项目框架类似:
- proj/
- proj/__init__.py
- proj/settings.py
- proj/urls.py
- manage.py
那么建议的方法是创建一个新的 proj/proj/celery.py
单元来定义 Celery 的实例。
file: proj/proj/celery.py
from __future__ import absolute_import
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
# 为 Celery 设置默认的 Djanog Settings 设置
# 下面的两个 proj 修改为实际的项目名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
from django.conf import settings # noqa
app = Celery('proj')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 定义了一个测试任务,直接输出请求内容
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
然后,你需要在 proj/proj/__init__.py
单元中引用这个 app,这一步确保在 Django 启动时加载这个 app,以便 @shared_task 修饰器能够使用它(后文有述)。
proj/proj__init__.py
:
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app # noqa
注:此示例框架适用于大型项目,简单项目可以同时定义 app 和任务 task,类似于 First Steps with Celery 的例子
让我们来看看第一单元是怎么工作的。
首先我们使用了一个绝对导入以避免与其它库冲突
from __future__ import absolute_import
然后定义 Celery 的命令行环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
这个设置让 celery 知道你的项目所在,这段声明必须放在 app 实例创建之前,所以我们下一步就是创建 app 实例
app = Celery('proj')
接下来告诉 celery 所使用的 Django Settings
app.config_from_object('django.conf:settings')
再来告诉 celery 怎么去自动发现任务 task
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
当上述行设置好后,celery 就能够自动发现 Django 应用里的任务了,当然你必须准遵循下面的文件/目录设定
- app1/
- app1/tasks.py
- app1/models.py
- app2/
- app2/tasks.py
- app2/models.py
用这种方式,你就不用再特定的把任务单元加入到 CELERY_IMPORTS
的设置项里了,需要的时候 Django 就能通过 lambda
的定义自动发现并导入任务了。
最后,就是一个简单的调试任务来测试输入我们请求的信息
使用 @shared_task 修饰器
你所写的任务可能存在于可重用的应用中,而可重用的应用并不仅仅用于这个项目,所以你不用直接导入应用实例。
@shared_task 修饰器让你创建任务而不用创建应用实例。demoapp/tasks.py
:
from __future__ import absolute_import
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
你还可以在下面找到 Django 的非数据库的完整示例:https://github.com/celery/celery/tree/3.1/examples/django/
启动 worker 进程
在生产环境,你需要启动 worker 进程为服务方式。可参考 如何让 worker 运行于服务模式
但在开发测试环境,我们只需要以下列命令来启动 worker 就行。
(env)$./manage.py celery worker -l info
如果是要运行定时任务
(env)$./manage.py celery beat
或者 worker
和 beat
两个一起运行
(env)$./manage.py celery worker --beat -l info
完整的 celery 命令行选项说明可以参考
$ celery help
测试 tasks
在另一个控制台测试一下:
$ python manage.py shell
>>> from hello.tasks import add
>>> r = add.delay(3,5) # 执行这一行就能在 worker 的日志中看到运行状况
>>> r.wait()
8
如 tasks.py
有所修改,都必须重重启 celery worker
以便使新代码生效。
使用 Django-celery 管理后台
使用 Django-celery 可以在 admin 后台创建一个任务,在指定的时间执行。 使用步骤:
- 实现指定功能的 task
需要先通过代码实现 task 的具体功能,之后便可以在 admin 后台创建任务的时候选择该 task。
Note: 要保证实现的 task 没有错误,否则后面创建了任务之后,是不会达到预期效果的。
建议: 实现一个 task 之后,最好先在 django 的 shell comand 中先调试,保证可以成功运行,并达到预期结果了再执行后面的操作。 - 创建一个任务
进入 Django 的 admin 后台,进入 Djcelery 栏目,一共有四个选项:
- Crontabs
在此选项中,可以创建定时执行的任务需要的定时时间。 - Intervals
在此选项中,可以创建间隔执行的任务需要时间间隔。 - Periodic tasks
在此选项中,创建一个相应的任务,选择任务,需要定时执行的时间或者时间间隔,并保存。你用 @shared_task 创建的任务会在这个表单中列出
当然,还有其他一些高级的选项,可以尝试使用。
- Tasks
- Workers
到此,一个任务便创建成功了。不出意外,创建的任务会如期执行,当然,如果创建的没有 如期执行的,应该依次检查前面的步骤,查看 celery 是否在运行,创建的 task 是否报错等等, 一级一级排除错误。
开启了 beat
后,在 Periodic task
里添加了定时任务后,Celery 就会按设定时间执行任务了。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论