重试丢失或失败的任务(Celery、Django 和 RabbitMQ)
有没有办法确定任务是否丢失并重试?
我认为丢失的原因可能是调度程序错误或工作线程崩溃。
我打算重试它们,但我不确定如何确定哪些任务需要退役?
以及如何让这个过程自动进行?我可以使用自己的自定义调度程序来创建新任务吗?
编辑:我从文档中发现 RabbitMQ 永远不会丢失任务,但是当工作线程在任务执行过程中崩溃时会发生什么?
Is there a way to determine if any task is lost and retry it?
I think that the reason for lost can be dispatcher bug or worker thread crash.
I was planning to retry them but I'm not sure how to determine which tasks need to be retired?
And how to make this process automatically? Can I use my own custom scheduler which will create new tasks?
Edit: I found from the documentation that RabbitMQ never loose tasks, but what happens when worker thread crash in the middle of task execution?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
你需要的是设置
CELERY_ACKS_LATE = True
Late ack 意味着任务消息将在任务执行后被确认,
不只是之前,这是默认行为。
这样,如果worker崩溃了,rabbit MQ仍然会有消息。
显然,如果同时发生完全崩溃(Rabbit + 工作人员),则无法恢复任务,除非您在任务开始和任务结束时实现日志记录。
就我个人而言,我在 mongodb 中每次任务开始时写一行,任务完成时写另一行(独立形成结果),这样我可以通过分析 mongo 日志知道哪个任务被中断。
您可以通过重写 celery 基任务类的方法
__call__
和after_return
轻松完成此操作。下面您会看到我的一段代码,它使用 taskLogger 类作为上下文管理器(带有入口和出口点)。
taskLogger 类只是在 mongodb 实例中写入一行包含任务信息的行。
我希望这能有所帮助
What you need is to set
CELERY_ACKS_LATE = True
Late ack means that the task messages will be acknowledged after the task has been executed,
not just before, which is the default behavior.
In this way if the worker crashes rabbit MQ will still have the message.
Obviously of a total crash (Rabbit + workers) at the same time there is no way of recovering the task, except if you implement a logging on task start and task end.
Personally I write in a mongodb a line every time a task start and another one when the task finish (independently form the result), in this way I can know which task was interrupted by analyzing the mongo logs.
You can do it easily by overriding the methods
__call__
andafter_return
of the celery base task class.Following you see a piece of my code that uses a taskLogger class as context manager (with entry and exit point).
The taskLogger class simply writes a line containing the task info in a mongodb instance.
I hope this could help