Celery 在 Django 中的使用方法

发布于 2024-09-15 22:51:37 字数 6620 浏览 36 评论 0

什么是 Celery

Celery 是一个异步/定时任务调度系统。

在开始前先简要说明一下步骤便有个整体概念:

  • apt 安装 RabbitMQ,安装完成后即启动服务了
  • pip 安装 Django,Celery,Django-Celery
  • 配置 settings.py 以添加 djcelery 应用和设置 BROKER 和调度方式等
  • migrate 生成数据库后台管理界面
  • 添加 celery.py 以添加 Celery 的 app 实例
  • init.py 加载 app 实例
  • 应用目录里添加 tasks.py,定义任务函数并前缀 @shared_task 修饰器
  • 运行 Celery 服务 ./manage.py celery worker –beat -l info
  • 后台添加定时任务,测试异步和定时任务

本文基于 Django1.9XDjango-Celery3.1.17Celery3.1.24 测试成功

选择一个 Broker

Celery 需要一个消息调度系统来处理消息响应及任务调度,我们称之为 Broker。

Celery 默认使用 RabbitMQ,因为使用 RabbitMQ 不需要再安装其它扩展或初始化配置,我们就用它了。
首先安装它:

$ sudo apt-get install rabbitmq-server

安装完就已经启动服务,可以通过下面命令查看服务状态

$ systemctl status rabbitmq-server

也可以通过下面命令查看 rabbitmq 服务信息

$ sudo rabbitmqctl status

然后面后面 settings.py 里指定 Broker 信息

使用 Django-celery 管理在 Django 中后台管理任务

这里我们用到`django-celery`来管理任务,在虚拟环境里
​```bash
(env)$ pip install django-celery

在项目的 proj/proj/settings.py 后面添加:

# for Django Celery
INSTALLED_APPS += ("djcelery", )
import djcelery
djcelery.setup_loader()
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] # Celery 接受的内容可依实际而定,如只选择'json'

在 Django 中使用 Celery

之前版本的 Celery 还需要独立的库文件来支持才能在 Django 正常工作,但 3.1 版后就不再需要了,所以你可以以这种更简单的方式使用 Celery。

为了在 Django 的项目里使用 Celery,我们必须先定义一个 Celery 库的实例(这里叫做“app”)

如果你的项目框架类似:

- proj/
- proj/__init__.py
- proj/settings.py
- proj/urls.py
- manage.py

那么建议的方法是创建一个新的 proj/proj/celery.py 单元来定义 Celery 的实例。

file: proj/proj/celery.py

from __future__ import absolute_import
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
# 为 Celery 设置默认的 Djanog Settings 设置
# 下面的两个 proj 修改为实际的项目名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
from django.conf import settings # noqa

app = Celery('proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

# 定义了一个测试任务,直接输出请求内容
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

然后,你需要在 proj/proj/__init__.py 单元中引用这个 app,这一步确保在 Django 启动时加载这个 app,以便 @shared_task 修饰器能够使用它(后文有述)。

proj/proj__init__.py :

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app # noqa

注:此示例框架适用于大型项目,简单项目可以同时定义 app 和任务 task,类似于 First Steps with Celery 的例子

让我们来看看第一单元是怎么工作的。
首先我们使用了一个绝对导入以避免与其它库冲突

from __future__ import absolute_import

然后定义 Celery 的命令行环境

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

这个设置让 celery 知道你的项目所在,这段声明必须放在 app 实例创建之前,所以我们下一步就是创建 app 实例

app = Celery('proj')

接下来告诉 celery 所使用的 Django Settings

app.config_from_object('django.conf:settings')

再来告诉 celery 怎么去自动发现任务 task

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

当上述行设置好后,celery 就能够自动发现 Django 应用里的任务了,当然你必须准遵循下面的文件/目录设定

- app1/
- app1/tasks.py
- app1/models.py
- app2/
- app2/tasks.py
- app2/models.py

用这种方式,你就不用再特定的把任务单元加入到 CELERY_IMPORTS 的设置项里了,需要的时候 Django 就能通过 lambda 的定义自动发现并导入任务了。

最后,就是一个简单的调试任务来测试输入我们请求的信息

使用 @shared_task 修饰器

你所写的任务可能存在于可重用的应用中,而可重用的应用并不仅仅用于这个项目,所以你不用直接导入应用实例。
@shared_task 修饰器让你创建任务而不用创建应用实例。
demoapp/tasks.py :

from __future__ import absolute_import
from celery import shared_task

@shared_task
def add(x, y):
return x + y

@shared_task
def mul(x, y):
return x * y

@shared_task
def xsum(numbers):
return sum(numbers)

你还可以在下面找到 Django 的非数据库的完整示例:https://github.com/celery/celery/tree/3.1/examples/django/

启动 worker 进程

在生产环境,你需要启动 worker 进程为服务方式。可参考 如何让 worker 运行于服务模式

但在开发测试环境,我们只需要以下列命令来启动 worker 就行。

(env)$./manage.py celery worker -l info

如果是要运行定时任务

(env)$./manage.py celery beat

或者 workerbeat 两个一起运行

(env)$./manage.py celery worker --beat -l info

完整的 celery 命令行选项说明可以参考

$ celery help

测试 tasks

在另一个控制台测试一下:

$ python manage.py shell
>>> from hello.tasks import add
>>> r = add.delay(3,5) # 执行这一行就能在 worker 的日志中看到运行状况
>>> r.wait()
8

tasks.py 有所修改,都必须重重启 celery worker 以便使新代码生效。

使用 Django-celery 管理后台

使用 Django-celery 可以在 admin 后台创建一个任务,在指定的时间执行。 使用步骤:

  1. 实现指定功能的 task
    需要先通过代码实现 task 的具体功能,之后便可以在 admin 后台创建任务的时候选择该 task。
    Note: 要保证实现的 task 没有错误,否则后面创建了任务之后,是不会达到预期效果的。
    建议: 实现一个 task 之后,最好先在 django 的 shell comand 中先调试,保证可以成功运行,并达到预期结果了再执行后面的操作。
  2. 创建一个任务
    进入 Django 的 admin 后台,进入 Djcelery 栏目,一共有四个选项:
  • Crontabs
    在此选项中,可以创建定时执行的任务需要的定时时间。
  • Intervals
    在此选项中,可以创建间隔执行的任务需要时间间隔。
  • Periodic tasks
    在此选项中,创建一个相应的任务,选择任务,需要定时执行的时间或者时间间隔,并保存。

    你用 @shared_task 创建的任务会在这个表单中列出

当然,还有其他一些高级的选项,可以尝试使用。

  • Tasks
  • Workers

到此,一个任务便创建成功了。不出意外,创建的任务会如期执行,当然,如果创建的没有 如期执行的,应该依次检查前面的步骤,查看 celery 是否在运行,创建的 task 是否报错等等, 一级一级排除错误。

开启了 beat 后,在 Periodic task 里添加了定时任务后,Celery 就会按设定时间执行任务了。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

文章
评论
835 人气
更多

推荐作者

李珊平

文章 0 评论 0

Quxin

文章 0 评论 0

范无咎

文章 0 评论 0

github_ZOJ2N8YxBm

文章 0 评论 0

若言

文章 0 评论 0

南…巷孤猫

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文