Celery 任务不会被撤销

发布于 2024-12-04 18:55:10 字数 930 浏览 1 评论 0原文

我正在通过 django 的 celery (版本 2.3.2)将多个模拟作为任务运行。模拟由另一个任务设置:

在views.py中:

result = setup_simulations.delay(parameters)
request.session['sim'] = result.task_id # Store main task id

在tasks.py中:

@task(priority=1)
def setup_simulations(parameters):
    task_ids = []
    for i in range(number_of_simulations):
        result = run_simulation.delay(other_parameters)
        task_ids.append(result.task_id)
    return task_ids

初始任务(setup_simulations)完成后,我尝试按如下方式撤销模拟任务

main_task_id = request.session['sim']
main_result = AsyncResult(main_task_id)
# Revoke sub tasks
from celery.task.control import revoke
for sub_task_id in main_result.get():
    sub_result = AsyncResult(sub_task_id); sub_result.revoke() # Does not work
    # revoke(sub_task_id) # Does not work neither

: py celeryd -l info”,任务就像什么都没发生一样执行。有人有什么想法可能出了问题吗?

I'm running multiple simulations as tasks through celery (version 2.3.2) from django. The simulations get set up by another task:

In views.py:

result = setup_simulations.delay(parameters)
request.session['sim'] = result.task_id # Store main task id

In tasks.py:

@task(priority=1)
def setup_simulations(parameters):
    task_ids = []
    for i in range(number_of_simulations):
        result = run_simulation.delay(other_parameters)
        task_ids.append(result.task_id)
    return task_ids

After the initial task (setup_simulations) has finished, I try to revoke the simulation tasks as follows:

main_task_id = request.session['sim']
main_result = AsyncResult(main_task_id)
# Revoke sub tasks
from celery.task.control import revoke
for sub_task_id in main_result.get():
    sub_result = AsyncResult(sub_task_id); sub_result.revoke() # Does not work
    # revoke(sub_task_id) # Does not work neither

When I look at the output from "python manage.py celeryd -l info", the tasks get executed as if nothing had happened. Any ideas somebody what could have gone wrong?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

小ぇ时光︴ 2024-12-11 18:55:10

正如您在评论中提到的,revoke 是一个远程控制命令,因此当前仅受 amqp 和 redis 传输支持。

您可以通过在数据库中存储已撤销的标志来自己完成此操作,例如:

from celery import states
from celery import task
from celery.exceptions import Ignore

from myapp.models import RevokedTasks


@task
def foo():
    if RevokedTasks.objects.filter(task_id=foo.request.id).count():
        if not foo.ignore_result:
            foo.update_state(state=states.REVOKED)
        raise Ignore()

如果您的任务正在某个模型上运行,您甚至可以在其中存储一个标志。

As you mention in the comment, revoke is a remote control command so it's only currently supported by the amqp and redis transports.

You can accomplish this yourself by storing a revoked flag in your database, e.g:

from celery import states
from celery import task
from celery.exceptions import Ignore

from myapp.models import RevokedTasks


@task
def foo():
    if RevokedTasks.objects.filter(task_id=foo.request.id).count():
        if not foo.ignore_result:
            foo.update_state(state=states.REVOKED)
        raise Ignore()

If your task is working on some model you could even store a flag in that.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文