如何确定芹菜任务的名称?
我有一个fastapi应用,我想调用芹菜任务 我无法在两个不同的代码库中导入任务。因此,我必须使用其名称称呼它。
任务
imagery = Celery(
"imagery", broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL")
)
...
@imagery.task(bind=True, name="filter")
def filter_task(self, **kwargs) -> Dict[str, Any]:
print('running task')
中
在
。 FastAPI代码库我想运行过滤器任务。 因此,我的理解是我必须
在app.py
中使用celery.send_task()函数,我
from celery import Celery, states
from celery.execute import send_task
from fastapi import FastAPI
from starlette.responses import JSONResponse, PlainTextResponse
from app import models
app = FastAPI()
tasks = Celery(broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL"))
@app.post("/filter", status_code=201)
async def upload_images(data: models.FilterProductsModel):
"""
TODO: use a celery task(s) to query the database and upload the results to S3
"""
data = ['ok', 'un test']
data = ['ok', 'un test']
result = tasks.send_task('workers.imagery.filter', args=list(data))
return PlainTextResponse(f"here is the id: {str(result.ready())}")
在调用/filter
endpoint之后,我看不出任何任务是被工人捡起。 因此,我在send_task()
- filter
- imagery.filter
- worker.imagery.filter
中尝试了其他名称。 我的任务名称错误吗?
编辑: 工作过程在Docker中运行。这是该文件在磁盘上的完整路径。
- tasks.py:
/workers/worker.py
,
因此,如果我遵循导入架构。任务的名称将是worker.worker.filter
,但这不起作用,在Docker的日志中没有打印任何东西。应该出现在芹菜CLI的街区中吗?
I have a fastAPI app where I want to call a celery task
I can not import the task as they are in two different code base. So I have to call it using its name.
in tasks.py
imagery = Celery(
"imagery", broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL")
)
...
@imagery.task(bind=True, name="filter")
def filter_task(self, **kwargs) -> Dict[str, Any]:
print('running task')
The celery worker is running with this command:
celery worker -A worker.imagery -P threads --loglevel=INFO --queues=imagery
Now in my FastAPI code base I want to run the filter task.
So my understanding is I have to use the celery.send_task() function
In app.py
I have
from celery import Celery, states
from celery.execute import send_task
from fastapi import FastAPI
from starlette.responses import JSONResponse, PlainTextResponse
from app import models
app = FastAPI()
tasks = Celery(broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL"))
@app.post("/filter", status_code=201)
async def upload_images(data: models.FilterProductsModel):
"""
TODO: use a celery task(s) to query the database and upload the results to S3
"""
data = ['ok', 'un test']
data = ['ok', 'un test']
result = tasks.send_task('workers.imagery.filter', args=list(data))
return PlainTextResponse(f"here is the id: {str(result.ready())}")
After calling the /filter
endpoint, I don't see any task being picked up by the worker.
So I tried different name in send_task()
- filter
- imagery.filter
- worker.imagery.filter
How come my task never get picked up by the worker and nothing shows in the log?
Is my task name wrong?
Edit:
The worker process run in docker. Here is the fullpath of the file on its disk.
- tasks.py :
/workers/worker.py
So if I follow the import schema. the name of the task would be workers.worker.filter
but this does not work, nothing get printed in the logs of docker. Is a print supposed to appear in the STDOUT of the celery cli?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
OP在这里。
这是我使用的解决方案。
如@Dejanlekic所述,我必须指定队列。
OP Here.
This is the solution I used.
As mentioned by @DejanLekic I had to specify the queue.
您的芹菜工人被订阅
图像
queue 仅。另一方面,您尝试将任务发送到 default queue(如果您没有更改配置,则使用>结果,该队列的名称为
。毫不奇怪,您看不到工人执行任务,因为您一直在将任务发送到默认队列。celery
) = tasks.send_task('workers.imagery.filter',args = list(data))要解决此问题,请尝试以下操作:
Your Celery worker is subscribed to the
imagery
queue only . On the other hand, you try to send the task to the default queue (if you did not change configuration, the name of that queue iscelery
) withresult = tasks.send_task('workers.imagery.filter', args=list(data))
. It is not surprising you do not see task being executed by your worker as you have been sending tasks to the default queue whole time.To fix this, try the following: