通过之前的任务将任务结果芹菜。

发布于 2025-02-09 11:35:22 字数 1509 浏览 0 评论 0原文

这是我的代码。我想通过任务 myName 结果将作为签名中的任务反向作为参数。

这是我的代码。我想通过任务 myName 结果将作为签名中的任务反向作为参数。

from app import app
from app import app
from time import sleep
from celery.utils.log import get_task_logger
import os
from celery import signature, chain, group, chord
from celery.result import allow_join_result


MyQUEUE = os.getenv("SCANS_QUEUE")
logger = get_task_logger(__name__)

@app.task(queue=MyQUEUE, ignore_result=True)
def reverse(text):
    logger.info('reverse order '.format(text))
    return {"reversename": str(text[::-1])}

@app.task(queue=MyQUEUE, ignore_result=True)
def add(a,b):
    logger.info('Addition --> a : {0} & b : {1} '.format(a,b))
    return {"addition": str(a+b)}

@app.task(queue=MyQUEUE, ignore_result=True)
def myname(a):
    logger.info('Name --> a : {0}'.format(a))
    return {"name": str(a)}


@app.task(queue=MyQUEUE, ignore_result=True)
def run_pipeline(a,b,n):
    resultchain = chain([
        group([
            signature(
                add,
                args=(a,b),
                queue=MyQUEUE
            ),
            signature(
                myname,
                args=(n),
                queue=MyQUEUE
            )
        ]),
        signature
        (
            reverse,
            args=(-------),
            queue=MyQUEUE
        )
    ]).apply_async()

    with allow_join_result():
        results = resultchain.join()
    return results

Here is my code. I want to pass the task myname result pass to be the task reverse in the signature as an argument.

Here is my code. I want to pass the task myname result pass to be the task reverse in the signature as an argument.

from app import app
from app import app
from time import sleep
from celery.utils.log import get_task_logger
import os
from celery import signature, chain, group, chord
from celery.result import allow_join_result


MyQUEUE = os.getenv("SCANS_QUEUE")
logger = get_task_logger(__name__)

@app.task(queue=MyQUEUE, ignore_result=True)
def reverse(text):
    logger.info('reverse order '.format(text))
    return {"reversename": str(text[::-1])}

@app.task(queue=MyQUEUE, ignore_result=True)
def add(a,b):
    logger.info('Addition --> a : {0} & b : {1} '.format(a,b))
    return {"addition": str(a+b)}

@app.task(queue=MyQUEUE, ignore_result=True)
def myname(a):
    logger.info('Name --> a : {0}'.format(a))
    return {"name": str(a)}


@app.task(queue=MyQUEUE, ignore_result=True)
def run_pipeline(a,b,n):
    resultchain = chain([
        group([
            signature(
                add,
                args=(a,b),
                queue=MyQUEUE
            ),
            signature(
                myname,
                args=(n),
                queue=MyQUEUE
            )
        ]),
        signature
        (
            reverse,
            args=(-------),
            queue=MyQUEUE
        )
    ]).apply_async()

    with allow_join_result():
        results = resultchain.join()
    return results

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

罪歌 2025-02-16 11:35:22

首先也是最重要的是,如果您要使用groupstarmap或其他类型的任务工作流,则将在需要使用ignore_result = false设置未来或省略参数(默认值为false)。至少在myname添加现在,需要存储该值,

@app.task(queue=MyQUEUE)
def add(a,b):
    logger.info('Addition --> a : {0} & b : {1} '.format(a,b))
    return {"addition": str(a+b)}

@app.task(queue=MyQUEUE)
def myname(a):
    logger.info('Name --> a : {0}'.format(a))
    return {"name": str(a)}

现在,反向以获取group of AddmyName,您需要调整反向来处理组结果(结果列表)。

对于,任务的结果将用作下一个任务的第一个参数,在这种情况下,组结果将以[{{{{{ '添加':...},{'name':...}],您可以访问正确的值。

@app.task(queue=MyQUEUE)
def reverse(group_data):
    # group_data value: [{'addition': '3'}, {'name': 'VALUE'}]
    text = group_data[1]['name']
    logger.info('reverse order '.format(text))
    return {"reversename": str(text[::-1])}

最后,如果您只想扭转myName的结果,则必须仅链接myNamerecters

resultchain = chain([
    signature(myname, args=(n,)),
    signature(reverse)
]).apply_async()

First and most important, if you are gonna use chain, group, starmap or another kind of task workflow, tasks which results will be used on the future need to be set with ignore_result=False or omit the argument (default value is False). Needed to store the value, at least on myname and add

@app.task(queue=MyQUEUE)
def add(a,b):
    logger.info('Addition --> a : {0} & b : {1} '.format(a,b))
    return {"addition": str(a+b)}

@app.task(queue=MyQUEUE)
def myname(a):
    logger.info('Name --> a : {0}'.format(a))
    return {"name": str(a)}

Now, for reverse to obtain the results in the group of add and myname, you need to adjust reverse to handle the group result (a list of the results).

For a chain the results of a task will be used as the first argument of the next task, in this case the group results will be injected in the first value of the reverse task as [{'addition': ...}, {'name': ...}], with that you can access the correct value.

@app.task(queue=MyQUEUE)
def reverse(group_data):
    # group_data value: [{'addition': '3'}, {'name': 'VALUE'}]
    text = group_data[1]['name']
    logger.info('reverse order '.format(text))
    return {"reversename": str(text[::-1])}

Finally if you only want to reverse the result of myname, you have to chain only myname and reverse.

resultchain = chain([
    signature(myname, args=(n,)),
    signature(reverse)
]).apply_async()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文