通过之前的任务将任务结果芹菜。
这是我的代码。我想通过任务 myName 结果将作为签名中的任务反向作为参数。
这是我的代码。我想通过任务 myName 结果将作为签名中的任务反向作为参数。
from app import app
from app import app
from time import sleep
from celery.utils.log import get_task_logger
import os
from celery import signature, chain, group, chord
from celery.result import allow_join_result
MyQUEUE = os.getenv("SCANS_QUEUE")
logger = get_task_logger(__name__)
@app.task(queue=MyQUEUE, ignore_result=True)
def reverse(text):
logger.info('reverse order '.format(text))
return {"reversename": str(text[::-1])}
@app.task(queue=MyQUEUE, ignore_result=True)
def add(a,b):
logger.info('Addition --> a : {0} & b : {1} '.format(a,b))
return {"addition": str(a+b)}
@app.task(queue=MyQUEUE, ignore_result=True)
def myname(a):
logger.info('Name --> a : {0}'.format(a))
return {"name": str(a)}
@app.task(queue=MyQUEUE, ignore_result=True)
def run_pipeline(a,b,n):
resultchain = chain([
group([
signature(
add,
args=(a,b),
queue=MyQUEUE
),
signature(
myname,
args=(n),
queue=MyQUEUE
)
]),
signature
(
reverse,
args=(-------),
queue=MyQUEUE
)
]).apply_async()
with allow_join_result():
results = resultchain.join()
return results
Here is my code. I want to pass the task myname result pass to be the task reverse in the signature as an argument.
Here is my code. I want to pass the task myname result pass to be the task reverse in the signature as an argument.
from app import app
from app import app
from time import sleep
from celery.utils.log import get_task_logger
import os
from celery import signature, chain, group, chord
from celery.result import allow_join_result
MyQUEUE = os.getenv("SCANS_QUEUE")
logger = get_task_logger(__name__)
@app.task(queue=MyQUEUE, ignore_result=True)
def reverse(text):
logger.info('reverse order '.format(text))
return {"reversename": str(text[::-1])}
@app.task(queue=MyQUEUE, ignore_result=True)
def add(a,b):
logger.info('Addition --> a : {0} & b : {1} '.format(a,b))
return {"addition": str(a+b)}
@app.task(queue=MyQUEUE, ignore_result=True)
def myname(a):
logger.info('Name --> a : {0}'.format(a))
return {"name": str(a)}
@app.task(queue=MyQUEUE, ignore_result=True)
def run_pipeline(a,b,n):
resultchain = chain([
group([
signature(
add,
args=(a,b),
queue=MyQUEUE
),
signature(
myname,
args=(n),
queue=MyQUEUE
)
]),
signature
(
reverse,
args=(-------),
queue=MyQUEUE
)
]).apply_async()
with allow_join_result():
results = resultchain.join()
return results
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
首先也是最重要的是,如果您要使用
链
,group
,starmap
或其他类型的任务工作流,则将在需要使用ignore_result = false
设置未来或省略参数(默认值为false)。至少在myname
和添加
现在,需要存储该值,现在,
反向
以获取group
of Add
和myName
,您需要调整反向
来处理组结果(结果列表)。对于
链
,任务的结果将用作下一个任务的第一个参数,在这种情况下,组结果将以[{{{{{ '添加':...},{'name':...}]
,您可以访问正确的值。最后,如果您只想扭转
myName
的结果,则必须仅链接myName
和recters
。First and most important, if you are gonna use
chain
,group
,starmap
or another kind of task workflow, tasks which results will be used on the future need to be set withignore_result=False
or omit the argument (default value is False). Needed to store the value, at least onmyname
andadd
Now, for
reverse
to obtain the results in thegroup
ofadd
andmyname
, you need to adjustreverse
to handle the group result (a list of the results).For a
chain
the results of a task will be used as the first argument of the next task, in this case the group results will be injected in the first value of the reverse task as[{'addition': ...}, {'name': ...}]
, with that you can access the correct value.Finally if you only want to reverse the result of
myname
, you have to chain onlymyname
andreverse
.