Celery 在任务内异步运行子任务并等待结果

发布于 2025-01-10 22:12:14 字数 910 浏览 3 评论 0原文

我有一个 celery 任务,可以启动其他三个 celery 任务。我希望这些任务异步执行并等待它们完成,然后再恢复父任务。然而,子任务正在同步运行,我不知道为什么。 时,问题就开始了,

当我将 celery 从 4.4.7 升级到 5.0.0 app_celery.py

@app.task(name="app_celery.scraping_process", soft_time_limit=900, time_limit=960, max_retries=3)
def scraping_process():
    sources = ["a", "b", "c"]
    job = group((company_representation.s(src) for src in sources))
    result = job.apply_async(queue="spiders", routing_key="spiders")
    while not result.ready():
        time.sleep(5)
    
@app.task(name="app_celery.company_representation", max_retries=3)
def company_representation(source: str):
    # do something
    time.sleep(60)
    

我正在像这样运行 celery:

celery -A app_celery worker -c 8 -Q spiders -n spiders@%%h
celery -A app_celery worker -c 2 -Q companies -n companies@%%h --without-mingle --without-heartbeat -Ofair

celery==5.0.0

I have a celery task that launches three other celery tasks. I want these tasks to execute asynchronously and wait for them to finish before i resume the parent task. However the child tasks are running synchronosly and I don't know why.
The problem started when I upgraded celery from 4.4.7 to 5.0.0

app_celery.py

@app.task(name="app_celery.scraping_process", soft_time_limit=900, time_limit=960, max_retries=3)
def scraping_process():
    sources = ["a", "b", "c"]
    job = group((company_representation.s(src) for src in sources))
    result = job.apply_async(queue="spiders", routing_key="spiders")
    while not result.ready():
        time.sleep(5)
    
@app.task(name="app_celery.company_representation", max_retries=3)
def company_representation(source: str):
    # do something
    time.sleep(60)
    

I am running celery like this:

celery -A app_celery worker -c 8 -Q spiders -n spiders@%%h
celery -A app_celery worker -c 2 -Q companies -n companies@%%h --without-mingle --without-heartbeat -Ofair

celery==5.0.0

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

琉璃梦幻 2025-01-17 22:12:14

您可以将任务 ID 添加到列表中,然后执行以下操作:

def poll_job_status(active_jobs):
    if len(active_jobs) == 1:
        task = task.AsyncResult(active_jobs[0])
        if not task.ready():
            return active_jobs
    _new_active_jobs = []
    for taskid in active_jobs:
        task = task.AsyncResult(taskid)
        if task.state == "PENDING" or task.state == "RETRY":
            _new_active_jobs.append(taskid)
    active_jobs = _new_active_jobs
    return active_jobs

因此,您将迭代任务 ID 列表并检查任务是否完成。然后,如果列表为空,您就知道所有任务都已运行,您可以继续执行其他操作。示例用法是:

active_tasks_list = []
active_tasks_list.append(task.delay(args).id)
while len(active_tasks_list) > 0:
   poll_job_status(active_tasks_list)
# carry on other processes

如果您有大量要跟踪的任务,这是理想的选择。

You could add the task id's to a list and then do something like:

def poll_job_status(active_jobs):
    if len(active_jobs) == 1:
        task = task.AsyncResult(active_jobs[0])
        if not task.ready():
            return active_jobs
    _new_active_jobs = []
    for taskid in active_jobs:
        task = task.AsyncResult(taskid)
        if task.state == "PENDING" or task.state == "RETRY":
            _new_active_jobs.append(taskid)
    active_jobs = _new_active_jobs
    return active_jobs

So you would iterate over the list of task IDs and check if the task is complete or not. And then if the list is empty you know that all the tasks has run and you can carry on with other operations. example usage would be:

active_tasks_list = []
active_tasks_list.append(task.delay(args).id)
while len(active_tasks_list) > 0:
   poll_job_status(active_tasks_list)
# carry on other processes

This is ideal if you have numerous tasks that you want to keep track of.

薄情伤 2025-01-17 22:12:14

您可以尝试使用 Celery 组并行调用多个任务并等待它们的结果。

@app.task(name="app_celery.scraping_process", soft_time_limit=900, time_limit=960, max_retries=3)
def scraping_process():
    sources = ["a", "b", "c"]
    tasks =[]
    for src in sources:
        tasks.append(company_representation.s(src))

    # create a group with all the tasks
    job = group(tasks)
    result = job.apply_async(queue="spiders", routing_key="spiders")
    ret_val = result.get(disable_sync_subtasks=False)
    return ret_val
    
@app.task(name="app_celery.company_representation", max_retries=3)
def company_representation(source: str):
    # do something
    time.sleep(60)

参考:http://ask.github.io/celery/userguide/groups。 html#groups

You can try the Celery group to invoke multiple tasks in parallel and wait for their results.

@app.task(name="app_celery.scraping_process", soft_time_limit=900, time_limit=960, max_retries=3)
def scraping_process():
    sources = ["a", "b", "c"]
    tasks =[]
    for src in sources:
        tasks.append(company_representation.s(src))

    # create a group with all the tasks
    job = group(tasks)
    result = job.apply_async(queue="spiders", routing_key="spiders")
    ret_val = result.get(disable_sync_subtasks=False)
    return ret_val
    
@app.task(name="app_celery.company_representation", max_retries=3)
def company_representation(source: str):
    # do something
    time.sleep(60)

Reference: http://ask.github.io/celery/userguide/groups.html#groups

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文