从超过 max_retries 失败的任务中恢复

发布于 2024-11-17 13:39:17 字数 1214 浏览 3 评论 0原文

我尝试异步使用 Web 服务,因为它最多需要 45 秒才能返回。不幸的是,这个 Web 服务也有些不可靠,可能会引发错误。我已经设置了 django-celery 并执行了我的任务,它可以正常工作,直到任务失败超过 max_retries 为止。

到目前为止,这是我所得到的:

@task(default_retry_delay=5, max_retries=10)
def request(xml):
    try:
        server = Client('https://www.whatever.net/RealTimeService.asmx?wsdl')
        xml = server.service.RunRealTimeXML(
            username=settings.WS_USERNAME,
            password=settings.WS_PASSWORD,
            xml=xml
        )
    except Exception, e:
        result = Result(celery_id=request.request.id, details=e.reason, status="i")
        result.save()
        try:
            return request.retry(exc=e)
        except MaxRetriesExceededError, e:
            result = Result(celery_id=request.request.id, details="Max Retries Exceeded", status="f")
            result.save()
            raise
    result = Result(celery_id=request.request.id, details=xml, status="s")
    result.save()
    return result

不幸的是,Retry() 没有抛出 MaxRetriesExceededError ,所以我不确定如何处理此任务的失败。 Django 已经将 HTML 返回给客户端,并且我正在通过 AJAX 检查 Result 的内容,这永远不会达到完全失败 f 状态。

所以问题是:当 Celery 任务超过 max_retries 时,如何更新数据库?

I am attempting to asynchronously consume a web service because it takes up to 45 seconds to return. Unfortunately, this web service is also somewhat unreliable and can throw errors. I have set up django-celery and have my tasks executing, which works fine until the task fails beyond max_retries.

Here is what I have so far:

@task(default_retry_delay=5, max_retries=10)
def request(xml):
    try:
        server = Client('https://www.whatever.net/RealTimeService.asmx?wsdl')
        xml = server.service.RunRealTimeXML(
            username=settings.WS_USERNAME,
            password=settings.WS_PASSWORD,
            xml=xml
        )
    except Exception, e:
        result = Result(celery_id=request.request.id, details=e.reason, status="i")
        result.save()
        try:
            return request.retry(exc=e)
        except MaxRetriesExceededError, e:
            result = Result(celery_id=request.request.id, details="Max Retries Exceeded", status="f")
            result.save()
            raise
    result = Result(celery_id=request.request.id, details=xml, status="s")
    result.save()
    return result

Unfortunately, MaxRetriesExceededError is not being thrown by retry(), so I'm not sure how to handle the failure of this task. Django has already returned HTML to the client, and I am checking the contents of Result via AJAX, which is never getting to a full fail f status.

So the question is: How can I update my database when the Celery task has exceeded max_retries?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

征棹 2024-11-24 13:39:17

问题是,当 celery 达到重试限制时,它会尝试重新引发您传入的异常。执行此重新提升的代码位于: https://github.com/celery/celery/blob/v3.1.20/celery/app/task.py#L673-L681

解决这个问题的最简单方法就是根本没有芹菜管理你的异常:

@task(max_retries=10)
def mytask():
    try:
        do_the_thing()
    except Exception as e:
        try:
            mytask.retry()
        except MaxRetriesExceededError:
            do_something_to_handle_the_error()
            logger.exception(e)

The issue is that celery is trying to re-raise the exception you passed in when it hits the retry limit. The code for doing this re-raising is here: https://github.com/celery/celery/blob/v3.1.20/celery/app/task.py#L673-L681

The simplest way around this is to just not have celery manage your exceptions at all:

@task(max_retries=10)
def mytask():
    try:
        do_the_thing()
    except Exception as e:
        try:
            mytask.retry()
        except MaxRetriesExceededError:
            do_something_to_handle_the_error()
            logger.exception(e)
草莓酥 2024-11-24 13:39:17

您可以重写 celery 任务类的 after_return 方法,该方法在任务执行后调用,无论 ret 状态如何(SUCCESS、FAILED、RETRY)

class MyTask(celery.task.Task)

    def run(self, xml, **kwargs)
        #Your stuffs here

    def after_return(self, status, retval, task_id, args, kwargs, einfo=None):
        if self.max_retries == int(kwargs['task_retries']):
            #If max retries are equals to task retries do something
        if status == "FAILURE":
            #You can do also something if the tasks fail instead of check the retries

http://readthedocs.org/docs/celery/en/latest/reference/celery.task.base.html#celery.task.base.BaseTask.after_return

< a href="http://celery.readthedocs.org/en/latest/reference/celery.app.task.html?highlight=after_return#celery.app.task.Task.after_return" rel="noreferrer">http://celery.readthedocs.org/en/latest/reference/celery.app.task.html?highlight=after_return#celery.app.task.Task.after_return

You can override the after_return method of the celery task class, this method is called after the execution of the task whatever is the ret status (SUCCESS,FAILED,RETRY)

class MyTask(celery.task.Task)

    def run(self, xml, **kwargs)
        #Your stuffs here

    def after_return(self, status, retval, task_id, args, kwargs, einfo=None):
        if self.max_retries == int(kwargs['task_retries']):
            #If max retries are equals to task retries do something
        if status == "FAILURE":
            #You can do also something if the tasks fail instead of check the retries

http://readthedocs.org/docs/celery/en/latest/reference/celery.task.base.html#celery.task.base.BaseTask.after_return

http://celery.readthedocs.org/en/latest/reference/celery.app.task.html?highlight=after_return#celery.app.task.Task.after_return

涫野音 2024-11-24 13:39:17

对于 Celery 2.3.2 版本,这种方法对我来说效果很好:

class MyTask(celery.task.Task):
    abstract = True

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        if self.max_retries == self.request.retries:
            #If max retries is equal to task retries do something

@task(base=MyTask, default_retry_delay=5, max_retries=10)
def request(xml):
    #Your stuff here

With Celery version 2.3.2 this approach has worked well for me:

class MyTask(celery.task.Task):
    abstract = True

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        if self.max_retries == self.request.retries:
            #If max retries is equal to task retries do something

@task(base=MyTask, default_retry_delay=5, max_retries=10)
def request(xml):
    #Your stuff here
別甾虛僞 2024-11-24 13:39:17

我现在只考虑这个,这样可以省去子类化 Task 的工作,并且很容易理解。

# auto-retry with delay as defined below. After that, hook is disabled.
@celery.shared_task(bind=True, max_retries=5, default_retry_delay=300)
def post_data(self, hook_object_id, url, event, payload):
    headers = {'Content-type': 'application/json'}
    try:
        r = requests.post(url, data=payload, headers=headers)
        r.raise_for_status()
    except requests.exceptions.RequestException as e:
        if self.request.retries >= self.max_retries:
            log.warning("Auto-deactivating webhook %s for event %s", hook_object_id, event)
            Webhook.objects.filter(object_id=hook_object_id).update(active=False)
            return False
        raise self.retry(exc=e)
    return True

I'm just going with this for now, spares me the work of subclassing Task and is easily understood.

# auto-retry with delay as defined below. After that, hook is disabled.
@celery.shared_task(bind=True, max_retries=5, default_retry_delay=300)
def post_data(self, hook_object_id, url, event, payload):
    headers = {'Content-type': 'application/json'}
    try:
        r = requests.post(url, data=payload, headers=headers)
        r.raise_for_status()
    except requests.exceptions.RequestException as e:
        if self.request.retries >= self.max_retries:
            log.warning("Auto-deactivating webhook %s for event %s", hook_object_id, event)
            Webhook.objects.filter(object_id=hook_object_id).update(active=False)
            return False
        raise self.retry(exc=e)
    return True
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文