Celery如何将参数传递给链接任务(这是绑定的)?

发布于 2025-01-11 22:39:39 字数 1871 浏览 3 评论 0原文

我有两个任务,其中一个任务成功后应该开始第二个任务并通过结果。两个任务都是 bind=True 因为报告重试的逻辑。

我通过以下方式调用此任务:

async def foobar(x: int):
    task = foo.apply_async(kwargs={'x': x}, link=bar.si())

但不知道如何将结果传递给栏任务。 Docs 仅显示没有绑定的示例,在我的情况下结果不是传递给第二个任务 bar

回溯显示 y 未传递:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/celery/app/trace.py", line 503, in trace_task
    signature(callbacks[0], app=app).apply_async(
  File "/usr/local/lib/python3.8/site-packages/celery/canvas.py", line 219, in apply_async
    return _apply(args, kwargs, **options)
  File "/usr/local/lib/python3.8/site-packages/celery/app/task.py", line 537, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: bar() missing 1 required positional argument: 'y'
@celery_app.task(name="foo", bind=True,
                 autoretry_for=(Exception,),
                 default_retry_delay=1,
                 retry_backoff=2,  # <- wait 1s before the 1st retry
                 max_retries=2,
                 retry_jitter=False)
def foo(self, x: int, *args):
    logger.info(f'{self.request.id}, countdown: {self.request.retries}, max_retries: {self.max_retries}')
    return {"x": x}


@celery_app.task(name="bar", bind=True,
                 autoretry_for=(Exception,),
                 default_retry_delay=1,
                 retry_backoff=2,  # <- wait 1s before the 1st retry
                 max_retries=2,
                 retry_jitter=False)
def bar(self, y: int):
    logger.info(f'{self.request.id}, countdown: {self.request.retries}, max_retries: {self.max_retries}')
    return {"y_which_should_be_x": y}

I have 2 tasks, which one after success should start second and pass results. Both task are
bind=True because of logic of reporting retries.

I call this task by:

async def foobar(x: int):
    task = foo.apply_async(kwargs={'x': x}, link=bar.si())

But have no idea how to pass result to bar task. Docs show only example without bind, and in my case result is not passed to second task bar

Traceback show that y is not passed:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/celery/app/trace.py", line 503, in trace_task
    signature(callbacks[0], app=app).apply_async(
  File "/usr/local/lib/python3.8/site-packages/celery/canvas.py", line 219, in apply_async
    return _apply(args, kwargs, **options)
  File "/usr/local/lib/python3.8/site-packages/celery/app/task.py", line 537, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: bar() missing 1 required positional argument: 'y'
@celery_app.task(name="foo", bind=True,
                 autoretry_for=(Exception,),
                 default_retry_delay=1,
                 retry_backoff=2,  # <- wait 1s before the 1st retry
                 max_retries=2,
                 retry_jitter=False)
def foo(self, x: int, *args):
    logger.info(f'{self.request.id}, countdown: {self.request.retries}, max_retries: {self.max_retries}')
    return {"x": x}


@celery_app.task(name="bar", bind=True,
                 autoretry_for=(Exception,),
                 default_retry_delay=1,
                 retry_backoff=2,  # <- wait 1s before the 1st retry
                 max_retries=2,
                 retry_jitter=False)
def bar(self, y: int):
    logger.info(f'{self.request.id}, countdown: {self.request.retries}, max_retries: {self.max_retries}')
    return {"y_which_should_be_x": y}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

方觉久 2025-01-18 22:39:39

我发现我可以使用:

task = (foo.s(x) | bar.s()).apply_async() 并且它按预期工作。

I found out that I can just use:

task = (foo.s(x) | bar.s()).apply_async() and it's working as intended.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文