芹菜任务_reject_on_worker_lost do do do do do do do dodis do redis作为消息经纪

发布于 2025-02-02 11:52:01 字数 80 浏览 2 评论 0原文

我目前正在使用Redis的芹菜和6.2.6版的5.2.6版。当我打开

关于如何与Message Broker实现相同行为的任何指示?

I'm currently using version 5.2.6 of Celery and version 6.2.6 of Redis. When I turn on the task_reject_on_worker_lost flag, I am expecting Celery to redeliver a task executed by a worker that died abruptly. However, trying this on Redis as message broker my task doesn't actually get redelivered immediately after a worker goes down. On the other hand, when I try the exact same configuration with RabbitMQ it works as expected.

Any pointers on how to achieve the same behavior with Redis as message broker?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

春庭雪 2025-02-09 11:52:01

我最近是芹菜的新手,面对与您相同的问题。
这意味着使用ACK Config:

task_acks_late = True                       # ack after task end
task_acks_on_failure_or_timeout = True      # ack if task exception
task_reject_on_worker_lost = True           # no ack if worker killed

如果经纪人配置使用Redis:

broker_url = f'redis://127.0.0.1:6379/1'

如果在运行任务中被杀死并再次重新启动,则不会重新排队。

但是,如果使用兔子:

broker_url = 'amqp://guest:guest@localhost:5672/'

任务被重新排队以运行。

我的环境

  • Linux 5.15.10-arch1-1
  • python 3.8.13
  • 芹菜== 5.2.7

最后,我发现了评论 celery github问题。

附加配置值visibility_timile_timeout BROKER_TRANSPORT_OPTIONSredis Broker所必需的。

我在配置中添加了其他配置,并且正在工作。

仅供参考,这是我的配置文件:

  • celery_config.py
broker_url = f'redis://127.0.0.1:6379/1'
result_backend = f'redis://127.0.0.1:6379/2'

# task message ack
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-task_acks_late
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-acks-on-failure-or-timeout
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-reject-on-worker-lost
task_acks_late = True                       # ack after task end
task_acks_on_failure_or_timeout = True      # ack if task exception
task_reject_on_worker_lost = True           # no ack if worker killed
# only for redis broker
# https://github.com/celery/celery/issues/4984
broker_transport_options = {'visibility_timeout': 10}
  • app.py
import celery

import celery_config

app = celery.Celery("celery")
app.config_from_object(celery_config)

I am new to celery recently and facing the same issue as you did.
Which means with ack config:

task_acks_late = True                       # ack after task end
task_acks_on_failure_or_timeout = True      # ack if task exception
task_reject_on_worker_lost = True           # no ack if worker killed

If broker config use redis:

broker_url = f'redis://127.0.0.1:6379/1'

Task will not be re queued if worker being killed during running task and restarted again.

But if use rabbitmq:

broker_url = 'amqp://guest:guest@localhost:5672/'

Task got re queued to run.

My environment

  • linux 5.15.10-arch1-1
  • python 3.8.13
  • celery==5.2.7

Finally, I found this comment from celery github issues.

Additional config value visibility_timeout of broker_transport_options is required for redis broker.

I added the additional config in my config and it's working.

FYI, here is my config file :

  • celery_config.py
broker_url = f'redis://127.0.0.1:6379/1'
result_backend = f'redis://127.0.0.1:6379/2'

# task message ack
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-task_acks_late
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-acks-on-failure-or-timeout
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-reject-on-worker-lost
task_acks_late = True                       # ack after task end
task_acks_on_failure_or_timeout = True      # ack if task exception
task_reject_on_worker_lost = True           # no ack if worker killed
# only for redis broker
# https://github.com/celery/celery/issues/4984
broker_transport_options = {'visibility_timeout': 10}
  • app.py
import celery

import celery_config

app = celery.Celery("celery")
app.config_from_object(celery_config)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文