如何在芹菜任务中使用异步功能?

发布于 2025-02-05 02:14:46 字数 535 浏览 2 评论 0原文

我找到了一些类似的问题,但找不到我想要的。
我具有异步功能,我想在芹菜任务中使用它,但不能将其称为内部任务。有什么办法吗?

db.py

async def select_users():
    sql = "SELECT * FROM Users WHERE "
    sql, parameters = self.format_args(sql, parameters=kwargs)
    return await self.execute(sql, *parameters, fetchrow=True)

tasks.py

from .celery import app
import db

@app.task
def update_credits():
    users = db.select_users()  #here I should call func with 'await'
    print(users)

I've found some similar questions but couldn't find what I want.
I have async function which is I want to use it inside my celery task but cannot call it with await inside task. Is there any way to do it?

db.py

async def select_users():
    sql = "SELECT * FROM Users WHERE "
    sql, parameters = self.format_args(sql, parameters=kwargs)
    return await self.execute(sql, *parameters, fetchrow=True)

tasks.py

from .celery import app
import db

@app.task
def update_credits():
    users = db.select_users()  #here I should call func with 'await'
    print(users)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

雨落□心尘 2025-02-12 02:14:46

您可以执行以下操作:

  1. 定义装饰器以将异步应用于同步。这需要Asgiref。
from functools import wraps
from celery import Celery, Task
from typing import Any, Callable, Coroutine, ParamSpec, TypeVar
from asgiref import sync


_P = ParamSpec("_P")
_R = TypeVar("_R")


def async_task(app: Celery, *args: Any, **kwargs: Any):
    def _decorator(func: Callable[_P, Coroutine[Any, Any, _R]]) -> Task:
        sync_call = sync.AsyncToSync(func)

        @app.task(*args, **kwargs)
        @wraps(func)
        def _decorated(*args: _P.args, **kwargs: _P.kwargs) -> _R:
            return sync_call(*args, **kwargs)

        return _decorated

    return _decorator
  1. 将装饰器应用于您的异步功能:
import asyncio
import celery
from celery.utils.log import get_task_logger

from ..celery import app
from .async_task import async_task


@async_task(app, bind=True)
async def test_task(self: celery.Task):
    await asyncio.sleep(1.0)
    return "world"


@app.task(ignore_result=True)
def say_hello(who: str):
    print(f"Hello {who}")

希望这对您有所帮助。

You can do something like the following:

  1. Define a decorator to apply async to sync. This requires asgiref.
from functools import wraps
from celery import Celery, Task
from typing import Any, Callable, Coroutine, ParamSpec, TypeVar
from asgiref import sync


_P = ParamSpec("_P")
_R = TypeVar("_R")


def async_task(app: Celery, *args: Any, **kwargs: Any):
    def _decorator(func: Callable[_P, Coroutine[Any, Any, _R]]) -> Task:
        sync_call = sync.AsyncToSync(func)

        @app.task(*args, **kwargs)
        @wraps(func)
        def _decorated(*args: _P.args, **kwargs: _P.kwargs) -> _R:
            return sync_call(*args, **kwargs)

        return _decorated

    return _decorator
  1. Apply the decorator to your async function:
import asyncio
import celery
from celery.utils.log import get_task_logger

from ..celery import app
from .async_task import async_task


@async_task(app, bind=True)
async def test_task(self: celery.Task):
    await asyncio.sleep(1.0)
    return "world"


@app.task(ignore_result=True)
def say_hello(who: str):
    print(f"Hello {who}")

Hope this help you.

故事未完 2025-02-12 02:14:46

您可以使用事件循环方法

import asyncio
from .celery import app
import db

@app.task
def update_credits():
    loop = asyncio.get_event_loop()
    users = loop.run_until_complete(db.select_users())
    print(users)

但是,如果您不等待 db.select_users() coroutine async def您的代码不是通过使用asyncio获得任何改进。

You can use the Event Loop method run_until_complete:

import asyncio
from .celery import app
import db

@app.task
def update_credits():
    loop = asyncio.get_event_loop()
    users = loop.run_until_complete(db.select_users())
    print(users)

But if you do not await the db.select_users() coroutine inside an async def your code is not getting any improve by using asyncio.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文