如何使 celery 任务从任务内部失败?
在某些情况下,我想让芹菜任务从该任务中失败。我尝试了以下操作:
from celery.task import task
from celery import states
@task()
def run_simulation():
if some_condition:
run_simulation.update_state(state=states.FAILURE)
return False
但是,任务仍然报告已成功:
任务 sim.tasks.run_simulation[9235e3a7-c6d2-4219-bbc7-acf65c816e65] 1.17847704887s 成功:False
似乎只能在任务运行时以及任务完成后修改状态 - celery 将状态更改为它认为的结果(请参阅 这个问题)。有没有什么办法,在不引发异常而使任务失败的情况下,让 celery 返回任务失败的信息?
Under some conditions, I want to make a celery task fail from within that task. I tried the following:
from celery.task import task
from celery import states
@task()
def run_simulation():
if some_condition:
run_simulation.update_state(state=states.FAILURE)
return False
However, the task still reports to have succeeded:
Task sim.tasks.run_simulation[9235e3a7-c6d2-4219-bbc7-acf65c816e65]
succeeded in 1.17847704887s: False
It seems that the state can only be modified while the task is running and once it is completed - celery changes the state to whatever it deems is the outcome (refer to this question). Is there any way, without failing the task by raising an exception, to make celery return that the task has failed?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
要将任务标记为失败而不引发异常,请将任务状态更新为
FAILURE
,然后引发Ignore
异常,因为返回任何值都会将任务记录为成功,示例:但最好的方法是从您的任务中引发异常,您可以创建自定义异常来跟踪这些失败:
并从您的任务中引发此异常:
To mark a task as failed without raising an exception, update the task state to
FAILURE
and then raise anIgnore
exception, because returning any value will record the task as successful, an example:But the best way is to raise an exception from your task, you can create a custom exception to track these failures:
And raise this exception from your task:
我想进一步扩展皮埃尔的答案,因为我在使用建议的解决方案时遇到了一些问题。
为了在将任务状态更新为 states.FAILURE 时允许自定义字段,模拟 FAILURE 状态将具有的一些属性也很重要(注意 exc_type 和 exc_message)
虽然解决方案将终止任务,但任何查询状态的尝试(例如 - 获取“失败原因”值)都将失败。
下面是我摘录的一个片段供参考:
https://www.distributedpython.com/2018/09/28 /celery-任务状态/
I'd like to further expand on Pierre's answer as I've encountered some issues using the suggested solution.
To allow custom fields when updating a task's state to states.FAILURE, it is important to also mock some attributes that a FAILURE state would have (notice exc_type and exc_message)
While the solution will terminate the task, any attempt to query the state (For example - to fetch the 'REASON FOR FAILURE' value) will fail.
Below is a snippet for reference I took from:
https://www.distributedpython.com/2018/09/28/celery-task-states/
我从 Ask Solem 那里得到了关于此问题的有趣的回复,其中他提出了一个“after_return”处理程序来解决这个问题。这可能是未来一个有趣的选择。
与此同时,当我想让任务失败时,我通过简单地从任务中返回一个字符串“FAILURE”来解决这个问题,然后检查如下:
I got an interesting reply on this question from Ask Solem, where he proposes an 'after_return' handler to solve the issue. This might be an interesting option for the future.
In the meantime I solved the issue by simply returning a string 'FAILURE' from the task when I want to make it fail and then checking for that as follows:
就我而言,我对接受的答案有疑问:
Ignore()
或Reject()
导致任务在FAILURE< /code> 状态,但是当运行包含此失败任务的 Celery 工作流程(例如
chain
、chord
或group
)时,总是会导致工作流程挂起:因此我最终总是将任务标记为成功,并在工作流程结束后进行自己的错误后处理(总是成功):
其中 具有以下优点:
FAILURE
状态而不引发Ignore()
或Reject()
始终会导致任务状态为SUCCESS
...这使我能够始终通过以下方式处理工作流程结果:
In my case, I had issues with the accepted answers:
Ignore()
orReject()
from within the task led to the task being correctly inFAILURE
state, but when running a Celery workflow (thinkchain
,chord
, orgroup
) containing this failing task would always result in the workflow hanging:So I ended up always marking the task as success and doing my own error post-processing after the workflow ends (always successfully):
This has the advantage of:
FAILURE
state without raisingIgnore()
orReject()
always result in task state beingSUCCESS
...This allows me to always process workflow results in the following way: