测试 celery 任务是否仍在处理中

发布于 2024-10-05 14:33:10 字数 282 浏览 0 评论 0原文

如何测试 celery 中是否仍在处理任务 (task_id)?我有以下场景:

  1. 在 Django 视图中启动任务
  2. 将BaseAsyncResult 存储在会话中
  3. 关闭 celery 守护进程(硬),以便不再处理任务
  4. 检查任务是否“已死”

有什么想法吗?可以查找 celery 正在处理的所有任务并检查我的任务是否仍然存在吗?

How can I test if a task (task_id) is still processed in celery? I have the following scenario:

  1. Start a task in a Django view
  2. Store the BaseAsyncResult in the session
  3. Shutdown the celery daemon (hard) so the task is not processed anymore
  4. Check if the task is 'dead'

Any ideas? Can a lookup all task being processed by celery and check if mine is still there?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

回眸一笑 2024-10-12 14:33:11

我认为有比在模型中存储任务对象更好的方法。例如,如果您想检查一组任务(并行)是否已完成:

# in the script you launch the task
from celery import group

job = group(
    task1.s(param1, param2),
    task2.s(param3, param4)
)
result = job.apply_async()
result.save()

# save the result ID in your model
your_obj_model = YourModel.objects.get(id='1234')
your_obj_model.task_id = result.id
your_obj_model.save()

那么在您看来

from celery.result import GroupResult
# ...
task_result = GroupResult.restore(your_obj_model.task_id)
task_finished = task_result.ready()
# will be True or False

I think there is a better way than to store a task object in the model. For example, in case you wanted to check if a group of task (parallel) have completed:

# in the script you launch the task
from celery import group

job = group(
    task1.s(param1, param2),
    task2.s(param3, param4)
)
result = job.apply_async()
result.save()

# save the result ID in your model
your_obj_model = YourModel.objects.get(id='1234')
your_obj_model.task_id = result.id
your_obj_model.save()

Then in your view

from celery.result import GroupResult
# ...
task_result = GroupResult.restore(your_obj_model.task_id)
task_finished = task_result.ready()
# will be True or False
聆听风音 2024-10-12 14:33:10

在模型中定义一个字段 (PickledObjectField) 来存储 celery 任务:

class YourModel(models.Model):
    .
    .
    celery_task = PickledObjectField()
    .
    .

    def task():
        self.celery_task = SubmitTask.apply_async(args = self.task_detail())
        self.save()

如果您的任务不特定于任何模型,您应该专门为 celery 任务创建一个字段。

否则我建议使用 django-celery。它有一个很好的监控功能:
http://ask.github.com/celery/userguide/monitoring .html#django-admin-monitor,以良好的图形方式将任务详细信息保存在 django 模型中。

define a field (PickledObjectField) in your model to store the celery task:

class YourModel(models.Model):
    .
    .
    celery_task = PickledObjectField()
    .
    .

    def task():
        self.celery_task = SubmitTask.apply_async(args = self.task_detail())
        self.save()

In case your task is not specific on any model you should create one specifically for the celery tasks.

or else I suggest using django-celery. It has a nice monitoring feature:
http://ask.github.com/celery/userguide/monitoring.html#django-admin-monitor, saves the tasks details in a django model in a nice graphical way.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文