如何并行运行多个芹菜任务(使用组)?
我是芹菜的新手。我想并行运行Demo_task,但是它依次运行任务,而不是并行运行。请让我知道我做错了什么。
import time
from celery import Celery
from celery import chain, group, chord, chunks
import pandas as pd
CONFIG = {
'BROKER_URL': 'redis://localhost:6379/0',
'CELERY_RESULT_BACKEND': 'redis://localhost:6379/0',
}
app = Celery()
app.config_from_object(CONFIG)
@app.task(name='demo_task')
def demo_task(x, y):
print("demo_task", x, y)
pd.DataFrame({"a": [1, 2, 3], "b": [2, 3, 4]}).to_csv(f"demo{x}.csv", index=False)
print("saved")
time.sleep(8)
def run_task():
print("start chain_call")
t = group(*[demo_task.signature((3, 3)),
demo_task.signature((4, 4)),
demo_task.signature((5, 5))]
).apply_async()
if __name__ == '__main__':
run_task()
[命令]
芹菜-A芹菜_demo工作者-L信息-Pool =独奏-Purge
[log]
[2022-04-22 16:29:51,668: WARNING/MainProcess] Please run `celery upgrade settings path/to/settings.py` to avoid these warnings and to allow a smoother upgrade to Celery 6.0.
[2022-04-22 16:29:51,668: INFO/MainProcess] Connected to redis://localhost:6379/0
[2022-04-22 16:29:51,668: INFO/MainProcess] mingle: searching for neighbors
[2022-04-22 16:29:52,672: INFO/MainProcess] mingle: all alone
[2022-04-22 16:30:05,602: WARNING/MainProcess]
[2022-04-22 16:30:05,602: WARNING/MainProcess] 4
[2022-04-22 16:30:05,602: WARNING/MainProcess]
[2022-04-22 16:30:05,602: WARNING/MainProcess] 4
[2022-04-22 16:30:05,602: WARNING/MainProcess] saved
[2022-04-22 16:30:13,614: INFO/MainProcess] Task demo_task[c017c03e-b49d-4d54-85c5-4af57dd55908] succeeded in 8.016000000061467s: None
[2022-04-22 16:30:13,614: INFO/MainProcess] Task demo_task[d60071c6-4332-4ec1-88fd-3fce79c06ab5] received
[2022-04-22 16:30:13,614: WARNING/MainProcess] demo_task
[2022-04-22 16:30:13,614: WARNING/MainProcess]
[2022-04-22 16:30:13,614: WARNING/MainProcess] 5
[2022-04-22 16:30:13,614: WARNING/MainProcess]
[2022-04-22 16:30:13,614: WARNING/MainProcess] 5
[2022-04-22 16:30:13,614: WARNING/MainProcess] saved
[2022-04-22 16:30:21,634: INFO/MainProcess] Task demo_task[d60071c6-4332-4ec1-88fd-3fce79c06ab5] succeeded in 8.015000000130385s: None
I am new to Celery. I want to run demo_task in parallel, but it runs tasks sequentially instead of in parallel. Please let me know if I did something wrong.
import time
from celery import Celery
from celery import chain, group, chord, chunks
import pandas as pd
CONFIG = {
'BROKER_URL': 'redis://localhost:6379/0',
'CELERY_RESULT_BACKEND': 'redis://localhost:6379/0',
}
app = Celery()
app.config_from_object(CONFIG)
@app.task(name='demo_task')
def demo_task(x, y):
print("demo_task", x, y)
pd.DataFrame({"a": [1, 2, 3], "b": [2, 3, 4]}).to_csv(f"demo{x}.csv", index=False)
print("saved")
time.sleep(8)
def run_task():
print("start chain_call")
t = group(*[demo_task.signature((3, 3)),
demo_task.signature((4, 4)),
demo_task.signature((5, 5))]
).apply_async()
if __name__ == '__main__':
run_task()
[Command]
celery -A celery_demo worker -l info --pool=solo --purge
[Log]
[2022-04-22 16:29:51,668: WARNING/MainProcess] Please run `celery upgrade settings path/to/settings.py` to avoid these warnings and to allow a smoother upgrade to Celery 6.0.
[2022-04-22 16:29:51,668: INFO/MainProcess] Connected to redis://localhost:6379/0
[2022-04-22 16:29:51,668: INFO/MainProcess] mingle: searching for neighbors
[2022-04-22 16:29:52,672: INFO/MainProcess] mingle: all alone
[2022-04-22 16:30:05,602: WARNING/MainProcess]
[2022-04-22 16:30:05,602: WARNING/MainProcess] 4
[2022-04-22 16:30:05,602: WARNING/MainProcess]
[2022-04-22 16:30:05,602: WARNING/MainProcess] 4
[2022-04-22 16:30:05,602: WARNING/MainProcess] saved
[2022-04-22 16:30:13,614: INFO/MainProcess] Task demo_task[c017c03e-b49d-4d54-85c5-4af57dd55908] succeeded in 8.016000000061467s: None
[2022-04-22 16:30:13,614: INFO/MainProcess] Task demo_task[d60071c6-4332-4ec1-88fd-3fce79c06ab5] received
[2022-04-22 16:30:13,614: WARNING/MainProcess] demo_task
[2022-04-22 16:30:13,614: WARNING/MainProcess]
[2022-04-22 16:30:13,614: WARNING/MainProcess] 5
[2022-04-22 16:30:13,614: WARNING/MainProcess]
[2022-04-22 16:30:13,614: WARNING/MainProcess] 5
[2022-04-22 16:30:13,614: WARNING/MainProcess] saved
[2022-04-22 16:30:21,634: INFO/MainProcess] Task demo_task[d60071c6-4332-4ec1-88fd-3fce79c06ab5] succeeded in 8.015000000130385s: None
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
如果使用“独奏”池,您如何期望任务并行运行?
取而代之的是,从前叉并发开始(默认值):
芹菜-a celery_demo Worker -L Info -C 8
这将使芹菜工作者产生8个可以并行执行任务的工作过程。如果您的计算机具有8个以上的核心,则可以将该数字从8个数字增加到n是主机上可用的内核数。我总是去做N-1,让系统还有其他一些其他东西的备用核心。
预叉并发非常适合与CPU结合的任务。如果您的任务更多地是关于I/O的,则请尝试“ Gevent”或“ Eventlet”并发类型A。
How do you expect tasks to run in parallel if you use the "solo" pool?
Instead, start with the prefork concurrency (the default):
celery -A celery_demo worker -l info -c 8
This will make Celery worker spawn 8 worker processes that can execute tasks in parallel. If your machine has more than 8 cores then you could increase that number from 8 to N where N is number of cores available on the host machine. I always go for N-1 to let the system have one more spare core for some other stuff.
Prefork concurrency is great for CPU-bound tasks. If your tasks are more about I/O, then give the "gevent" or "eventlet" concurrency type a try.
修改您的run_task函数
Modify your run_task function