重试丢失或失败的任务(Celery、Django 和 RabbitMQ)

发布于 2024-10-23 01:15:03 字数 207 浏览 1 评论 0原文

有没有办法确定任务是否丢失并重试?


我认为丢失的原因可能是调度程序错误或工作线程崩溃。

我打算重试它们,但我不确定如何确定哪些任务需要退役?

以及如何让这个过程自动进行?我可以使用自己的自定义调度程序来创建新任务吗?

编辑:我从文档中发现 RabbitMQ 永远不会丢失任务,但是当工作线程在任务执行过程中崩溃时会发生什么?

Is there a way to determine if any task is lost and retry it?


I think that the reason for lost can be dispatcher bug or worker thread crash.

I was planning to retry them but I'm not sure how to determine which tasks need to be retired?

And how to make this process automatically? Can I use my own custom scheduler which will create new tasks?

Edit: I found from the documentation that RabbitMQ never loose tasks, but what happens when worker thread crash in the middle of task execution?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

予囚 2024-10-30 01:15:03

你需要的是设置

CELERY_ACKS_LATE = True

Late ack 意味着任务消息将在任务执行后被确认,
不只是之前,这是默认行为。
这样,如果worker崩溃了,rabbit MQ仍然会有消息。

显然,如果同时发生完全崩溃(Rabbit + 工作人员),则无法恢复任务,除非您在任务开始和任务结束时实现日志记录。
就我个人而言,我在 mongodb 中每次任务开始时写一行,任务完成时写另一行(独立形成结果),这样我可以通过分析 mongo 日志知道哪个任务被中断。

您可以通过重写 celery 基任务类的方法 __call__after_return 轻松完成此操作。

下面您会看到我的一段代码,它使用 taskLogger 类作为上下文管理器(带有入口和出口点)。
taskLogger 类只是在 mongodb 实例中写入一行包含任务信息的行。

def __call__(self, *args, **kwargs):
    """In celery task this function call the run method, here you can
    set some environment variable before the run of the task"""

    #Inizialize context managers    

    self.taskLogger = TaskLogger(args, kwargs)
    self.taskLogger.__enter__()

    return self.run(*args, **kwargs)

def after_return(self, status, retval, task_id, args, kwargs, einfo):
    #exit point for context managers
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)

我希望这能有所帮助

What you need is to set

CELERY_ACKS_LATE = True

Late ack means that the task messages will be acknowledged after the task has been executed,
not just before, which is the default behavior.
In this way if the worker crashes rabbit MQ will still have the message.

Obviously of a total crash (Rabbit + workers) at the same time there is no way of recovering the task, except if you implement a logging on task start and task end.
Personally I write in a mongodb a line every time a task start and another one when the task finish (independently form the result), in this way I can know which task was interrupted by analyzing the mongo logs.

You can do it easily by overriding the methods __call__ and after_return of the celery base task class.

Following you see a piece of my code that uses a taskLogger class as context manager (with entry and exit point).
The taskLogger class simply writes a line containing the task info in a mongodb instance.

def __call__(self, *args, **kwargs):
    """In celery task this function call the run method, here you can
    set some environment variable before the run of the task"""

    #Inizialize context managers    

    self.taskLogger = TaskLogger(args, kwargs)
    self.taskLogger.__enter__()

    return self.run(*args, **kwargs)

def after_return(self, status, retval, task_id, args, kwargs, einfo):
    #exit point for context managers
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)

I hope this could help

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文