Celery如何将参数传递给链接任务(这是绑定的)?
我有两个任务,其中一个任务成功后应该开始第二个任务并通过结果。两个任务都是 bind=True
因为报告重试的逻辑。
我通过以下方式调用此任务:
async def foobar(x: int):
task = foo.apply_async(kwargs={'x': x}, link=bar.si())
但不知道如何将结果传递给栏任务。 Docs 仅显示没有绑定的示例,在我的情况下结果不是传递给第二个任务 bar
回溯显示 y 未传递:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/celery/app/trace.py", line 503, in trace_task
signature(callbacks[0], app=app).apply_async(
File "/usr/local/lib/python3.8/site-packages/celery/canvas.py", line 219, in apply_async
return _apply(args, kwargs, **options)
File "/usr/local/lib/python3.8/site-packages/celery/app/task.py", line 537, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: bar() missing 1 required positional argument: 'y'
@celery_app.task(name="foo", bind=True,
autoretry_for=(Exception,),
default_retry_delay=1,
retry_backoff=2, # <- wait 1s before the 1st retry
max_retries=2,
retry_jitter=False)
def foo(self, x: int, *args):
logger.info(f'{self.request.id}, countdown: {self.request.retries}, max_retries: {self.max_retries}')
return {"x": x}
@celery_app.task(name="bar", bind=True,
autoretry_for=(Exception,),
default_retry_delay=1,
retry_backoff=2, # <- wait 1s before the 1st retry
max_retries=2,
retry_jitter=False)
def bar(self, y: int):
logger.info(f'{self.request.id}, countdown: {self.request.retries}, max_retries: {self.max_retries}')
return {"y_which_should_be_x": y}
I have 2 tasks, which one after success should start second and pass results. Both task arebind=True
because of logic of reporting retries.
I call this task by:
async def foobar(x: int):
task = foo.apply_async(kwargs={'x': x}, link=bar.si())
But have no idea how to pass result to bar task. Docs show only example without bind, and in my case result is not passed to second task bar
Traceback show that y is not passed:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/celery/app/trace.py", line 503, in trace_task
signature(callbacks[0], app=app).apply_async(
File "/usr/local/lib/python3.8/site-packages/celery/canvas.py", line 219, in apply_async
return _apply(args, kwargs, **options)
File "/usr/local/lib/python3.8/site-packages/celery/app/task.py", line 537, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: bar() missing 1 required positional argument: 'y'
@celery_app.task(name="foo", bind=True,
autoretry_for=(Exception,),
default_retry_delay=1,
retry_backoff=2, # <- wait 1s before the 1st retry
max_retries=2,
retry_jitter=False)
def foo(self, x: int, *args):
logger.info(f'{self.request.id}, countdown: {self.request.retries}, max_retries: {self.max_retries}')
return {"x": x}
@celery_app.task(name="bar", bind=True,
autoretry_for=(Exception,),
default_retry_delay=1,
retry_backoff=2, # <- wait 1s before the 1st retry
max_retries=2,
retry_jitter=False)
def bar(self, y: int):
logger.info(f'{self.request.id}, countdown: {self.request.retries}, max_retries: {self.max_retries}')
return {"y_which_should_be_x": y}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我发现我可以使用:
task = (foo.s(x) | bar.s()).apply_async()
并且它按预期工作。I found out that I can just use:
task = (foo.s(x) | bar.s()).apply_async()
and it's working as intended.