使用依赖图执行 Celery 任务

发布于 2024-11-18 15:19:00 字数 458 浏览 3 评论 0原文

我希望 Celery 任务依赖于 2 个或更多其他任务的结果。我研究过 Python+Celery:链接作业?http://pypi.python.org/pypi/celery-tasktree ,但只有当任务只有一个依赖任务时,这些才是好的。

我了解 TaskSet,但似乎没有办法在 TaskSetResult.ready() 变为 True 时立即执行回调。我现在想到的是有一个定期任务,每隔几毫秒左右轮询一次 TaskSetResult.ready() 并在返回 True 时触发回调,但这对我来说听起来相当不优雅。

有什么建议吗?

I would like to have Celery tasks that depend on the result of 2 or more other tasks. I have looked into Python+Celery: Chaining jobs? and http://pypi.python.org/pypi/celery-tasktree , but those are good only if tasks have just one dependent task.

I know about TaskSet, but there does not seem to be a way to instantly execute a callback when TaskSetResult.ready() becomes True. What I have in mind right now is to have a periodic task that polls TaskSetResult.ready() every few [milli]seconds or so and fire the callback as it returns True, but that sounds rather inelegant to me.

Any suggestions?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

吻风 2024-11-25 15:19:00

在最新版本的 Celery (3.0+) 中,您可以使用所谓的和弦来实现所需的效果:

来自 http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives

简单和弦

和弦原语使我们能够添加在所有情况下调用的回调
组中的任务已完成执行,这通常是
对于不太并行的算法来说是必需的:

 >>> from celery import chord
 >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
 >>> res.get()
 90

免责声明:我自己还没有尝试过这个。

In the recent versions of Celery (3.0+) you can use a so-called chord to achieve the desired effect:

From http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives:

Simple chord

The chord primitive enables us to add callback to be called when all
of the tasks in a group have finished executing, which is often
required for algorithms that aren't embarrassingly parallel:

 >>> from celery import chord
 >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
 >>> res.get()
 90

Disclaimer: I haven't tried this myself yet.

牛↙奶布丁 2024-11-25 15:19:00

mrbox 是 true,您可以重试,直到结果准备好,但文档中不太清楚,当您重试时,您必须传递 setid 和子任务元素,并且为了恢复它,您必须使用下面的 map 函数是一个示例代码,用于解释我的意思。

def run(self, setid=None, subtasks=None, **kwargs):

    if not setid or not subtasks:
        #Is the first time that I launch this task, I'm going to launch the subtasks
        …
        tasks = []
        for slice in slices:
            tasks.append(uploadTrackSlice.subtask((slice,folder_name)))

        job = TaskSet(tasks=tasks)
        task_set_result = job.apply_async()
        setid = task_set_result.taskset_id
        subtasks = [result.task_id for result in task_set_result.subtasks]
        self.retry(exc=Exception("Result not ready"), args=[setid,subtasks])

    #Is a retry than we just have to check the results        
    tasks_result = TaskSetResult(setid, map(AsyncResult,subtasks))
    if not tasks_result.ready():
        self.retry(exc=Exception("Result not ready"), args=[setid,subtasks])
    else:    
        if tasks_result.successful():
            return tasks_result.join()
        else:
            raise Exception("Some of the tasks was failing")

mrbox is true, you can retry until the results are ready, but is not so clear in the docs that when you retry you have to pass the setid and the subtasks elements, and for recovery it you have to use the map function, below there is a sample code for explain what I mean.

def run(self, setid=None, subtasks=None, **kwargs):

    if not setid or not subtasks:
        #Is the first time that I launch this task, I'm going to launch the subtasks
        …
        tasks = []
        for slice in slices:
            tasks.append(uploadTrackSlice.subtask((slice,folder_name)))

        job = TaskSet(tasks=tasks)
        task_set_result = job.apply_async()
        setid = task_set_result.taskset_id
        subtasks = [result.task_id for result in task_set_result.subtasks]
        self.retry(exc=Exception("Result not ready"), args=[setid,subtasks])

    #Is a retry than we just have to check the results        
    tasks_result = TaskSetResult(setid, map(AsyncResult,subtasks))
    if not tasks_result.ready():
        self.retry(exc=Exception("Result not ready"), args=[setid,subtasks])
    else:    
        if tasks_result.successful():
            return tasks_result.join()
        else:
            raise Exception("Some of the tasks was failing")
似狗非友 2024-11-25 15:19:00

恕我直言,你可以做类似于文档中所做的事情 -

您可以使用 max_retries=None 的重试方法 - 如果“基本”任务之一 .ready() 为 false,您可以触发 .retry() 方法,直到两项“基本”任务均已完成。

IMHO you can do sth similiar to the thing done in docs- link

Or you can use retry method with max_retries=None - if one of the 'base' tasks .ready() is false, you can fire .retry() method till the both 'base' tasks are completed.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文