Celery 在任务内异步运行子任务并等待结果
我有一个 celery 任务,可以启动其他三个 celery 任务。我希望这些任务异步执行并等待它们完成,然后再恢复父任务。然而,子任务正在同步运行,我不知道为什么。 时,问题就开始了,
当我将 celery 从 4.4.7 升级到 5.0.0 app_celery.py
@app.task(name="app_celery.scraping_process", soft_time_limit=900, time_limit=960, max_retries=3)
def scraping_process():
sources = ["a", "b", "c"]
job = group((company_representation.s(src) for src in sources))
result = job.apply_async(queue="spiders", routing_key="spiders")
while not result.ready():
time.sleep(5)
@app.task(name="app_celery.company_representation", max_retries=3)
def company_representation(source: str):
# do something
time.sleep(60)
我正在像这样运行 celery:
celery -A app_celery worker -c 8 -Q spiders -n spiders@%%h
celery -A app_celery worker -c 2 -Q companies -n companies@%%h --without-mingle --without-heartbeat -Ofair
celery==5.0.0
I have a celery task that launches three other celery tasks. I want these tasks to execute asynchronously and wait for them to finish before i resume the parent task. However the child tasks are running synchronosly and I don't know why.
The problem started when I upgraded celery from 4.4.7 to 5.0.0
app_celery.py
@app.task(name="app_celery.scraping_process", soft_time_limit=900, time_limit=960, max_retries=3)
def scraping_process():
sources = ["a", "b", "c"]
job = group((company_representation.s(src) for src in sources))
result = job.apply_async(queue="spiders", routing_key="spiders")
while not result.ready():
time.sleep(5)
@app.task(name="app_celery.company_representation", max_retries=3)
def company_representation(source: str):
# do something
time.sleep(60)
I am running celery like this:
celery -A app_celery worker -c 8 -Q spiders -n spiders@%%h
celery -A app_celery worker -c 2 -Q companies -n companies@%%h --without-mingle --without-heartbeat -Ofair
celery==5.0.0
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
您可以将任务 ID 添加到列表中,然后执行以下操作:
因此,您将迭代任务 ID 列表并检查任务是否完成。然后,如果列表为空,您就知道所有任务都已运行,您可以继续执行其他操作。示例用法是:
如果您有大量要跟踪的任务,这是理想的选择。
You could add the task id's to a list and then do something like:
So you would iterate over the list of task IDs and check if the task is complete or not. And then if the list is empty you know that all the tasks has run and you can carry on with other operations. example usage would be:
This is ideal if you have numerous tasks that you want to keep track of.
您可以尝试使用 Celery 组并行调用多个任务并等待它们的结果。
参考:http://ask.github.io/celery/userguide/groups。 html#groups
You can try the Celery group to invoke multiple tasks in parallel and wait for their results.
Reference: http://ask.github.io/celery/userguide/groups.html#groups