返回介绍

Routing

发布于 2023-06-06 15:58:22 字数 4139 浏览 0 评论 0 收藏 0

假如我们有两个worker,一个worker专门用来处理邮件发送任务和图像处理任务,一个worker专门用来处理文件上传任务。

我们创建两个队列,一个专门用于存储邮件任务队列和图像处理,一个用来存储文件上传任务队列。

Celery支持AMQP(Advanced Message Queue)所有的路由功能,我们也可以使用简单的路由设置将指定的任务发送到指定的队列中.

我们需要配置在celeryconfig.py模块中配置 CELERY_ROUTES 项, tasks.py模块修改如下:

from proj.celery import app as celery_app


@celery_app.task
def my_task1(a, b):
    print("my_task1任务正在执行....")
    return a + b


@celery_app.task
def my_task2(a, b):
    print("my_task2任务正在执行....")
    return a + b


@celery_app.task
def my_task3(a, b):
    print("my_task3任务正在执行....")
    return a + b


@celery_app.task
def my_task4(a, b):
    print("my_task3任务正在执行....")
    return a + b


@celery_app.task
def my_task5():
    print("my_task5任务正在执行....")


@celery_app.task
def my_task6():
    print("my_task6任务正在执行....")


@celery_app.task
def my_task7():
    print("my_task7任务正在执行....")

我们通过配置,将send_email和upload_file任务发送到queue1队列中,将image_process发送到queue2队列中。

我们修改celeryconfig.py:

broker_url='redis://:@127.0.0.1:6379/1'
result_backend='redis://:@127.0.0.1:6379/2'


task_routes=({
    'proj.tasks.my_task5': {'queue': 'queue1'},
    'proj.tasks.my_task6': {'queue': 'queue1'},
    'proj.tasks.my_task7': {'queue': 'queue2'},
    },
)

test.py:

from proj.tasks import *

# 发送任务到路由指定的队列中

my_task5.delay() my_task6.delay() my_task7.delay()

  开启两个worker服务器,分别处理两个队列:
```python
celery -A proj worker --loglevel=info -Q queue1
celery -A proj worker --loglevel=info -Q queue2

我们同样也可以通过apply_aynsc()方法来设置任务发送到那个队列中:

my_task1.apply_async(queue='queue1')

我们也可设置一个worker服务器处理两个队列中的任务:

celery -A proj worker --loglevel=info -Q queue1,queue2

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文