在芹菜中使用共享_task时,如何在任务执行之前运行代码

发布于 2025-01-30 06:30:28 字数 403 浏览 0 评论 0原文

我要求我所有的芹菜任务都必须用特定的关键字参数调用。我想在执行任务之前检查并使用关键字的值。 例如,假设我有以下内容:

@shared_task
def my_task(*args, **kwargs):
    foo = kwargs.get('bar')  # -> I don't want to copy this to all my tasks
    # Do stuff here

如何创建一个名为my_special_shared_task的新装饰器,以便以下内容等同于上述:

@my_special_shared_task
def my_task(*args, **kwargs):
    # Do stuff here

I have a requirement that all my Celery tasks must be called with a specific keyword argument. I want to check and use the value of the keyword before my task is executed.
For instance, suppose I have the following:

@shared_task
def my_task(*args, **kwargs):
    foo = kwargs.get('bar')  # -> I don't want to copy this to all my tasks
    # Do stuff here

How can I create a new decorator called my_special_shared_task so that the below is equivalent to the above:

@my_special_shared_task
def my_task(*args, **kwargs):
    # Do stuff here

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

Spring初心 2025-02-06 06:30:28

那任务继承呢?
类似:

class BaseTask(celery.Task):
    foo = "some_value"


@app.task(base=BaseTask)
def my_task(*args, **kwargs):
    # Do stuff here
    print(self.foo)

在这里是文档。

What about task inheritance?
something like:

class BaseTask(celery.Task):
    foo = "some_value"


@app.task(base=BaseTask)
def my_task(*args, **kwargs):
    # Do stuff here
    print(self.foo)

here is the documentation.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文