如何使 celery 任务从任务内部失败?

发布于 2024-12-08 08:40:20 字数 622 浏览 5 评论 0原文

在某些情况下,我想让芹菜任务从该任务中失败。我尝试了以下操作:

from celery.task import task
from celery import states

@task()
def run_simulation():
    if some_condition:
        run_simulation.update_state(state=states.FAILURE)
        return False

但是,任务仍然报告已成功:

任务 sim.tasks.run_simulation[9235e3a7-c6d2-4219-bbc7-acf65c816e65] 1.17847704887s 成功:False

似乎只能在任务运行时以及任务完成后修改状态 - celery 将状态更改为它认为的结果(请参阅 这个问题)。有没有什么办法,在不引发异常而使任务失败的情况下,让 celery 返回任务失败的信息?

Under some conditions, I want to make a celery task fail from within that task. I tried the following:

from celery.task import task
from celery import states

@task()
def run_simulation():
    if some_condition:
        run_simulation.update_state(state=states.FAILURE)
        return False

However, the task still reports to have succeeded:

Task sim.tasks.run_simulation[9235e3a7-c6d2-4219-bbc7-acf65c816e65]
succeeded in 1.17847704887s: False

It seems that the state can only be modified while the task is running and once it is completed - celery changes the state to whatever it deems is the outcome (refer to this question). Is there any way, without failing the task by raising an exception, to make celery return that the task has failed?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

岁月静好 2024-12-15 08:40:20

要将任务标记为失败而不引发异常,请将任务状态更新为 FAILURE,然后引发 Ignore 异常,因为返回任何值都会将任务记录为成功,示例:

from celery import Celery, states
from celery.exceptions import Ignore

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task(bind=True)
def run_simulation(self):
    if some_condition:
        # manually update the task state
        self.update_state(
            state = states.FAILURE,
            meta = 'REASON FOR FAILURE'
        )

        # ignore the task so no other state is recorded
        raise Ignore()

但最好的方法是从您的任务中引发异常,您可以创建自定义异常来跟踪这些失败:

class TaskFailure(Exception):
   pass

并从您的任务中引发此异常:

if some_condition:
    raise TaskFailure('Failure reason')

To mark a task as failed without raising an exception, update the task state to FAILURE and then raise an Ignore exception, because returning any value will record the task as successful, an example:

from celery import Celery, states
from celery.exceptions import Ignore

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task(bind=True)
def run_simulation(self):
    if some_condition:
        # manually update the task state
        self.update_state(
            state = states.FAILURE,
            meta = 'REASON FOR FAILURE'
        )

        # ignore the task so no other state is recorded
        raise Ignore()

But the best way is to raise an exception from your task, you can create a custom exception to track these failures:

class TaskFailure(Exception):
   pass

And raise this exception from your task:

if some_condition:
    raise TaskFailure('Failure reason')
魔法少女 2024-12-15 08:40:20

我想进一步扩展皮埃尔的答案,因为我在使用建议的解决方案时遇到了一些问题。

为了在将任务状态更新为 states.FAILURE 时允许自定义字段,模拟 FAILURE 状态将具有的一些属性也很重要(注意 exc_type 和 exc_message)
虽然解决方案将终止任务,但任何查询状态的尝试(例如 - 获取“失败原因”值)都将失败。

下面是我摘录的一个片段供参考:
https://www.distributedpython.com/2018/09/28 /celery-任务状态/

@app.task(bind=True)
def task(self):
    try:
        raise ValueError('Some error')
    except Exception as ex:
        self.update_state(
            state=states.FAILURE,
            meta={
                'exc_type': type(ex).__name__,
                'exc_message': traceback.format_exc().split('\n'),
                'custom': '...'
            })
        raise Ignore()

I'd like to further expand on Pierre's answer as I've encountered some issues using the suggested solution.

To allow custom fields when updating a task's state to states.FAILURE, it is important to also mock some attributes that a FAILURE state would have (notice exc_type and exc_message)
While the solution will terminate the task, any attempt to query the state (For example - to fetch the 'REASON FOR FAILURE' value) will fail.

Below is a snippet for reference I took from:
https://www.distributedpython.com/2018/09/28/celery-task-states/

@app.task(bind=True)
def task(self):
    try:
        raise ValueError('Some error')
    except Exception as ex:
        self.update_state(
            state=states.FAILURE,
            meta={
                'exc_type': type(ex).__name__,
                'exc_message': traceback.format_exc().split('\n'),
                'custom': '...'
            })
        raise Ignore()
北方的韩爷 2024-12-15 08:40:20

我从 Ask Solem 那里得到了关于此问题的有趣的回复,其中他提出了一个“after_return”处理程序来解决这个问题。这可能是未来一个有趣的选择。

与此同时,当我想让任务失败时,我通过简单地从任务中返回一个字符串“FAILURE”来解决这个问题,然后检查如下:

result = AsyncResult(task_id)
if result.state == 'FAILURE' or (result.state == 'SUCCESS' and result.get() == 'FAILURE'):
    # Failure processing task 

I got an interesting reply on this question from Ask Solem, where he proposes an 'after_return' handler to solve the issue. This might be an interesting option for the future.

In the meantime I solved the issue by simply returning a string 'FAILURE' from the task when I want to make it fail and then checking for that as follows:

result = AsyncResult(task_id)
if result.state == 'FAILURE' or (result.state == 'SUCCESS' and result.get() == 'FAILURE'):
    # Failure processing task 
云巢 2024-12-15 08:40:20

就我而言,我对接受的答案有疑问:

  1. 从任务中引发 Ignore()Reject() 导致任务在 FAILURE< /code> 状态,但是当运行包含此失败任务的 Celery 工作流程(例如 chainchordgroup)时,总是会导致工作流程挂起:
workflow = chain(my_task.si([], *args, **kwargs), other_task.s(*args, **kwargs))
res = workflow()
results = res.get() # hangs as the workflow never enters the ready state
  1. 我希望工作流程的其余部分仍然运行,即使 一项任务失败(不传播异常,或者具有难以使用的全局错误处理程序),

因此我最终总是将任务标记为成功,并在工作流程结束后进行自己的错误后处理(总是成功):

import traceback

def my_task(prev, arg1, arg2, opts={}):
  results = []
  state = {
    'state': task_state,
    'meta': {
      'custom_attr': 'test',
      # task metadata as needed
    }
  }

  try:
    # task code goes here
    task_state = 'SUCCESS'
    task_exc = None

  except BaseException as exc:
    task_state = 'FAILURE'
    task_exc = exc

  finally:
    state['state'] = 'SUCCESS'
    if task_state == 'FAILURE':
      exc_str = ' '.join(traceback.format_exception(
        etype=type(task_exc),
        value=task_exc,
        tb=task_exc.__traceback__))
      state['meta']['error'] = exc_str

    # Update task state with final status
    self.update_state(**state)

    return results

其中 具有以下优点:

  • 保留所有需要的异常数据(回溯)
  • 避免奇怪的 Celery 任务状态后处理:
    • 当任务失败时,Celery 会用异常实例替换字典数据,这会阻止以一致的方式访问任务元数据
    • 更新 FAILURE 状态而不引发 Ignore()Reject() 始终会导致任务状态为 SUCCESS...
  • 使复杂的工作流程对失败更具弹性。

这使我能够始终通过以下方式处理工作流程结果:

info = res.get() # this is always a dict containing metadata
error = info.get('error')
results = info['results']
custom_attr = info['custom_attr']

In my case, I had issues with the accepted answers:

  1. Raising Ignore() or Reject() from within the task led to the task being correctly in FAILURE state, but when running a Celery workflow (think chain, chord, or group) containing this failing task would always result in the workflow hanging:
workflow = chain(my_task.si([], *args, **kwargs), other_task.s(*args, **kwargs))
res = workflow()
results = res.get() # hangs as the workflow never enters the ready state
  1. I wanted the rest of the workflow to still run even if one of the tasks failed (not propagate exceptions, or have global error handlers that are difficult to work with)

So I ended up always marking the task as success and doing my own error post-processing after the workflow ends (always successfully):

import traceback

def my_task(prev, arg1, arg2, opts={}):
  results = []
  state = {
    'state': task_state,
    'meta': {
      'custom_attr': 'test',
      # task metadata as needed
    }
  }

  try:
    # task code goes here
    task_state = 'SUCCESS'
    task_exc = None

  except BaseException as exc:
    task_state = 'FAILURE'
    task_exc = exc

  finally:
    state['state'] = 'SUCCESS'
    if task_state == 'FAILURE':
      exc_str = ' '.join(traceback.format_exception(
        etype=type(task_exc),
        value=task_exc,
        tb=task_exc.__traceback__))
      state['meta']['error'] = exc_str

    # Update task state with final status
    self.update_state(**state)

    return results

This has the advantage of:

  • Keeping all the needed exception data (traceback)
  • Avoiding the weird Celery post-processing of task states:
    • when tasks fail Celery replaces the dict data by the exception instance, which prevents having a consistent way of accessing task metadata
    • updating the FAILURE state without raising Ignore() or Reject() always result in task state being SUCCESS...
  • Make complex workflows way more resilient to failures.

This allows me to always process workflow results in the following way:

info = res.get() # this is always a dict containing metadata
error = info.get('error')
results = info['results']
custom_attr = info['custom_attr']
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文