如何确定芹菜任务的名称?

发布于 2025-02-06 02:53:14 字数 1631 浏览 2 评论 0原文

我有一个fastapi应用,我想调用芹菜任务 我无法在两个不同的代码库中导入任务。因此,我必须使用其名称称呼它。

任务

imagery = Celery(
    "imagery", broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL")
)

...

@imagery.task(bind=True, name="filter")
def filter_task(self, **kwargs) -> Dict[str, Any]:
  print('running task')

。 FastAPI代码库我想运行过滤器任务。 因此,我的理解是我必须

app.py中使用celery.send_task()函数,我

from celery import Celery, states
from celery.execute import send_task
from fastapi import FastAPI
from starlette.responses import JSONResponse, PlainTextResponse

from app import models

app = FastAPI()
tasks = Celery(broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL"))


@app.post("/filter", status_code=201)
async def upload_images(data: models.FilterProductsModel):
    """
    TODO: use a celery task(s) to query the database and upload the results to S3
    """
    data = ['ok', 'un test']
    data = ['ok', 'un test']
    result = tasks.send_task('workers.imagery.filter', args=list(data))
    return PlainTextResponse(f"here is the id: {str(result.ready())}")

在调用/filter endpoint之后,我看不出任何任务是被工人捡起。 因此,我在send_task()

  • filter
  • imagery.filter
  • worker.imagery.filter

中尝试了其他名称。 我的任务名称错误吗?

编辑: 工作过程在Docker中运行。这是该文件在磁盘上的完整路径。

  • tasks.py:/workers/worker.py

因此,如果我遵循导入架构。任务的名称将是worker.worker.filter,但这不起作用,在Docker的日志中没有打印任何东西。应该出现在芹菜CLI的街区中吗?

I have a fastAPI app where I want to call a celery task
I can not import the task as they are in two different code base. So I have to call it using its name.

in tasks.py

imagery = Celery(
    "imagery", broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL")
)

...

@imagery.task(bind=True, name="filter")
def filter_task(self, **kwargs) -> Dict[str, Any]:
  print('running task')

The celery worker is running with this command:

celery worker -A worker.imagery -P threads --loglevel=INFO --queues=imagery

Now in my FastAPI code base I want to run the filter task.
So my understanding is I have to use the celery.send_task() function

In app.py I have

from celery import Celery, states
from celery.execute import send_task
from fastapi import FastAPI
from starlette.responses import JSONResponse, PlainTextResponse

from app import models

app = FastAPI()
tasks = Celery(broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL"))


@app.post("/filter", status_code=201)
async def upload_images(data: models.FilterProductsModel):
    """
    TODO: use a celery task(s) to query the database and upload the results to S3
    """
    data = ['ok', 'un test']
    data = ['ok', 'un test']
    result = tasks.send_task('workers.imagery.filter', args=list(data))
    return PlainTextResponse(f"here is the id: {str(result.ready())}")

After calling the /filter endpoint, I don't see any task being picked up by the worker.
So I tried different name in send_task()

  • filter
  • imagery.filter
  • worker.imagery.filter

How come my task never get picked up by the worker and nothing shows in the log?
Is my task name wrong?

Edit:
The worker process run in docker. Here is the fullpath of the file on its disk.

  • tasks.py : /workers/worker.py

So if I follow the import schema. the name of the task would be workers.worker.filter but this does not work, nothing get printed in the logs of docker. Is a print supposed to appear in the STDOUT of the celery cli?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

尘世孤行 2025-02-13 02:53:15

OP在这里。

这是我使用的解决方案。

task = signature("filter", kwargs=data.dict() ,queue="imagery")
res = task.delay()

如@Dejanlekic所述,我必须指定队列。

OP Here.

This is the solution I used.

task = signature("filter", kwargs=data.dict() ,queue="imagery")
res = task.delay()

As mentioned by @DejanLekic I had to specify the queue.

青巷忧颜 2025-02-13 02:53:15

您的芹菜工人被订阅图像 queue 。另一方面,您尝试将任务发送到 default queue(如果您没有更改配置,则使用>结果,该队列的名称为celery) = tasks.send_task('workers.imagery.filter',args = list(data))。毫不奇怪,您看不到工人执行任务,因为您一直在将任务发送到默认队列。

要解决此问题,请尝试以下操作:

result = tasks.send_task('workers.imagery.filter', args=list(data), queue='imagery')

Your Celery worker is subscribed to the imagery queue only . On the other hand, you try to send the task to the default queue (if you did not change configuration, the name of that queue is celery) with result = tasks.send_task('workers.imagery.filter', args=list(data)). It is not surprising you do not see task being executed by your worker as you have been sending tasks to the default queue whole time.

To fix this, try the following:

result = tasks.send_task('workers.imagery.filter', args=list(data), queue='imagery')
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文