如何访问由多个任务组成的 celery AsyncResult/Signature 的任务依赖关系图?

发布于 2025-01-15 02:06:09 字数 568 浏览 2 评论 0原文

我希望能够提取 celery AsyncResult/Signature 的任务依赖关系图。我的 AsyncResult/Signature 可能是一个复杂的链/组/和弦。我想从父/子 AsyncResults 中提取 task_id 的图表并将其序列化,以便我可以从 重新构建 AsyncResult稍后的task_id 字符串。

我怀疑此输出将来自遍历 AsyncResult.childrenAsyncResult.parent 任务树,但想查看 celery 中是否已有任何内容为此而存在,而无需编写我自己的遍历代码。

我想要一个大致类似于以下内容的输出:

{
    "GroupTask-id-xxx": [
        "Task-id-xxx",
        "Task-id-xxx",
    ]
}

I'd like to be able to extract the task dependency graph for a celery AsyncResult/Signature. My AsyncResult/Signature may be a complex chain/group/chord. I'd like to extract the graph of task_id from parent/children AsyncResults and serialize it so that I can reconstitute the AsyncResult from task_id string at a later date.

I suspect this output would come from traversing the AsyncResult.children or AsyncResult.parent tree of tasks, but wanted to see if anything in celery already existed for this without having to write my own traversal code.

I'd like an output something roughly akin to:

{
    "GroupTask-id-xxx": [
        "Task-id-xxx",
        "Task-id-xxx",
    ]
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

压抑⊿情绪 2025-01-22 02:06:09

看起来 GitHub 上的此答案中讨论了许多要使用的工具。

主要工具:

  • Async/GroupResult.as_tuple() 给出整个依赖图的序列化元组表示。与 celery.result.result_from_tuple(tuple_representation) 结合使用,从元组序列化中重新生成 Async/GroupResult
  • my_group_result.save() 如果将来尝试从单个 ID 访问 GroupResult 对象。这会将元组表示保存到后端。与GroupResult.restore(group_id)结合使用。请注意,它不会捕获该组的父级或其子级 AsyncResults

如果您想将这些结果保存到数据库中,以便将来只需使用 id 即可完全检索它们,则可以使用以下方法:

import json
from celery.result import AsyncResult, GroupResult, result_from_tuple

def save_result(result: Union[AsyncResult, GroupResult]) -> None:
    """Save result compute structure to backend"""
    result.backend.set(result.id, json.dumps(result.as_tuple()))


def restore_result(result_id: str) -> Union[AsyncResult, GroupResult]:
    """Restore result (including parents) from backend

    Raises:
        ValueError if result not found in backend
    """
    try:
        return result_from_tuple(json.loads(tasks.celery_app.backend.get(result_id)))
    except TypeError:
        raise ValueError(f"Result id '{result_id}', not found.")

Looks like lots of the tools to use are discussed in this answer on GitHub.

Main tools:

  • Async/GroupResult.as_tuple() gives a serialized tuple representation of the entire dependency graph. Used in conjunction with celery.result.result_from_tuple(tuple_representation) to rehydrate an Async/GroupResult from the tuple serialization.
  • my_group_result.save() if trying to access GroupResult objects in the future from a single id. This saves the tuple representation to the bckend. Used in conjunction with GroupResult.restore(group_id). Note that it does not capture the parents of either the group or its children AsyncResults.

If you want to save these results to a database so that they can be fully retrieved at a future date with just an id the following methods provide that:

import json
from celery.result import AsyncResult, GroupResult, result_from_tuple

def save_result(result: Union[AsyncResult, GroupResult]) -> None:
    """Save result compute structure to backend"""
    result.backend.set(result.id, json.dumps(result.as_tuple()))


def restore_result(result_id: str) -> Union[AsyncResult, GroupResult]:
    """Restore result (including parents) from backend

    Raises:
        ValueError if result not found in backend
    """
    try:
        return result_from_tuple(json.loads(tasks.celery_app.backend.get(result_id)))
    except TypeError:
        raise ValueError(f"Result id '{result_id}', not found.")

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文