Celery 中的PeriodicTask run() 方法拥有自己的参数

发布于 2024-09-02 11:46:02 字数 753 浏览 1 评论 0原文

我正在编写一个小型 Django 应用程序,我应该能够创建 对于每个模型对象,其定期任务将被执行 一定的间隔。我使用 Celery 应用程序,但我无法理解一件事:

class ProcessQueryTask(PeriodicTask):
   run_every = timedelta(minutes=1)

   def run(self, query_task_pk, **kwargs):
       logging.info('Process celery task for QueryTask %d' %
query_task_pk)
       task = QueryTask.objects.get(pk=query_task_pk)
       task.exec_task()
       return True

然后我执行以下操作:

>>> from tasks.tasks import ProcessQueryTask
>>> result1 = ProcessQueryTask.delay(query_task_pk=1)
>>> result2 = ProcessQueryTask.delay(query_task_pk=2)

第一次调用成功,但其他定期调用返回错误 - TypeError: run() 恰好需要 2 个非关键字参数(给定 1 个) 芹菜服务器。 我可以将自己的参数传递给PeriodicTask run()吗?

I am writing a small Django application and I should be able to create
for each model object its periodical task which will be executed with
a certain interval. I'm use for this a Celery application, but i can't understand one thing:

class ProcessQueryTask(PeriodicTask):
   run_every = timedelta(minutes=1)

   def run(self, query_task_pk, **kwargs):
       logging.info('Process celery task for QueryTask %d' %
query_task_pk)
       task = QueryTask.objects.get(pk=query_task_pk)
       task.exec_task()
       return True

Then i'm do following:

>>> from tasks.tasks import ProcessQueryTask
>>> result1 = ProcessQueryTask.delay(query_task_pk=1)
>>> result2 = ProcessQueryTask.delay(query_task_pk=2)

First call is success, but other periodical calls returning the error
- TypeError: run() takes exactly 2 non-keyword arguments (1 given) in
celeryd server.
Can I pass own params to PeriodicTask run()?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

Bonjour°[大白 2024-09-09 11:46:02

Ask Solem 在对您的问题的回答中对此做出了精彩的回答 celery-users Google 群组

周期性任务不使用参数,因此您需要创建多个
类或创建一个处理多个“模型”的周期性任务。

例如:

from celery.task import PeriodicTask
from celery.decorators import periodic_task

# base class
class BaseProcessQueryTask(PeriodicTask):
    abstract = True
    run_every = timedelta(minutes=1)
    query_task_pk  = None

    def run(self):
        task = QueryTask.objects.get(pk=self.query_task_pk)
        task.exec_task()

class ProcessQueryTask1(BaseProcessQueryTask):
    query_task_pk = 1

class ProcessQueryTask2(BaseProcessQueryTask):
    query_task_pk = 2

但您更有可能想要这样的东西:

@task(ignore_result=True)
def execute_query_task(task):
    task.exec_task()

@periodic_task(run_every=timedelta(minutes=1))
def process_query_tasks():
    for task in QueryTask.objects.all():
        ExecuteQueryTask.delay(task)

This was answered wonderfully by Ask Solem in his response to your question on the celery-users Google group.

Periodic tasks doesn't use arguments, so you need to make several
classes or make a periodic task that processes more than one "model".

E.g.:

from celery.task import PeriodicTask
from celery.decorators import periodic_task

# base class
class BaseProcessQueryTask(PeriodicTask):
    abstract = True
    run_every = timedelta(minutes=1)
    query_task_pk  = None

    def run(self):
        task = QueryTask.objects.get(pk=self.query_task_pk)
        task.exec_task()

class ProcessQueryTask1(BaseProcessQueryTask):
    query_task_pk = 1

class ProcessQueryTask2(BaseProcessQueryTask):
    query_task_pk = 2

but it's more likely you want something like this:

@task(ignore_result=True)
def execute_query_task(task):
    task.exec_task()

@periodic_task(run_every=timedelta(minutes=1))
def process_query_tasks():
    for task in QueryTask.objects.all():
        ExecuteQueryTask.delay(task)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文