删除芹菜中所有待处理的任务?

发布于 2024-11-30 16:45:53 字数 55 浏览 8 评论 0原文

在不知道每个任务的 task_id 的情况下,如何删除所有待处理任务?

How can I delete all pending tasks without knowing the task_id for each task?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(12

橙味迷妹 2024-12-07 16:45:53

来自 文档

$ celery -A proj purge

from proj.celery import app
app.control.purge()

(编辑:使用当前方法更新。)

From the docs:

$ celery -A proj purge

or

from proj.celery import app
app.control.purge()

(EDIT: Updated with current method.)

以可爱出名 2024-12-07 16:45:53

对于 celery 3.0+:

$ celery purge

要清除特定队列:

$ celery -Q queue_name purge

For celery 3.0+:

$ celery purge

To purge a specific queue:

$ celery -Q queue_name purge
伤感在游骋 2024-12-07 16:45:53

对于 Celery 2.x 和 3.x:

例如,当使用带有 -Q 参数的工作程序来定义队列时,

celery worker -Q queue1,queue2,queue3

celery purge 将不起作用,因为您无法将队列参数传递给它。它只会删除默认队列。
解决方案是使用 --purge 参数启动工作程序,如下所示:

celery worker -Q queue1,queue2,queue3 --purge

但这将运行工作程序。

其他选项是使用 celery 的 amqp 子命令

celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3

For Celery 2.x and 3.x:

When using worker with -Q parameter to define queues, for example

celery worker -Q queue1,queue2,queue3

then celery purge will not work, because you cannot pass the queue params to it. It will only delete the default queue.
The solution is to start your workers with --purge parameter like this:

celery worker -Q queue1,queue2,queue3 --purge

This will however run the worker.

Other option is to use the amqp subcommand of celery

celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3
调妓 2024-12-07 16:45:53

在 Celery 3+ 中:

CLI:

$ celery -A proj purge

以编程方式:

>>> from proj.celery import app
>>> app.control.purge()

http:// /docs.celeryproject.org/en/latest/faq.html#how-do-i-purge-all-waiting-tasks

In Celery 3+:

CLI:

$ celery -A proj purge

Programatically:

>>> from proj.celery import app
>>> app.control.purge()

http://docs.celeryproject.org/en/latest/faq.html#how-do-i-purge-all-waiting-tasks

情域 2024-12-07 16:45:53

我发现 celery purge 不适用于我更复杂的 celery 配置。我将多个命名队列用于不同的目的:

$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ...  # Output sorted, whitespaced for readability
celery                                          0   2
[email protected]                      0   1
[email protected]                      0   1
apns                                            0   1
[email protected]                        0   1
analytics                                       1   1
[email protected]                   0   1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1

第一列是队列名称,第二列是队列中等待的消息数量,第三列是该队列的侦听器数量。这些队列是:

  • celery - 标准幂等 celery 任务的队列
  • apns - Apple 推送通知服务任务的队列,不太像幂等
  • 分析 - 长时间运行的夜间分析的队列
  • *.pidbox - 工作命令的队列,例如关闭和重置,每个工作人员一个(2 个芹菜工作人员,一个 apns 工作人员,一个分析工作人员)
  • bcast.* - 广播队列,用于向侦听队列的所有工作人员发送消息(而不仅仅是第一个抢占的工作人员 ) it)
  • celeryev.* - Celery 事件队列,用于报告任务分析

分析任务是一种强力任务,在小型数据集上效果很好,但现在需要超过 24 小时才能处理。有时,会出现问题,并且会卡在等待数据库上。它需要重写,但在那之前,当它卡住时,我会终止任务,清空队列,然后重试。我通过查看分析队列的消息计数来检测“卡住”,该计数应该为 0(已完成分析)或 1(等待昨晚的分析完成)。 2 或更高是不好的,我收到一封电子邮件。

celery purge 可以从广播队列之一中删除任务,但我没有看到选择不同命名队列的选项。

这是我的过程:

$ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop  # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
$ sudo /etc/init.d/celeryd start

I found that celery purge doesn't work for my more complex celery config. I use multiple named queues for different purposes:

$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ...  # Output sorted, whitespaced for readability
celery                                          0   2
[email protected]                      0   1
[email protected]                      0   1
apns                                            0   1
[email protected]                        0   1
analytics                                       1   1
[email protected]                   0   1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1

The first column is the queue name, the second is the number of messages waiting in the queue, and the third is the number of listeners for that queue. The queues are:

  • celery - Queue for standard, idempotent celery tasks
  • apns - Queue for Apple Push Notification Service tasks, not quite as idempotent
  • analytics - Queue for long running nightly analytics
  • *.pidbox - Queue for worker commands, such as shutdown and reset, one per worker (2 celery workers, one apns worker, one analytics worker)
  • bcast.* - Broadcast queues, for sending messages to all workers listening to a queue (rather than just the first to grab it)
  • celeryev.* - Celery event queues, for reporting task analytics

The analytics task is a brute force tasks that worked great on small data sets, but now takes more than 24 hours to process. Occasionally, something will go wrong and it will get stuck waiting on the database. It needs to be re-written, but until then, when it gets stuck I kill the task, empty the queue, and try again. I detect "stuckness" by looking at the message count for the analytics queue, which should be 0 (finished analytics) or 1 (waiting for last night's analytics to finish). 2 or higher is bad, and I get an email.

celery purge offers to erase tasks from one of the broadcast queues, and I don't see an option to pick a different named queue.

Here's my process:

$ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop  # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
$ sudo /etc/init.d/celeryd start
静待花开 2024-12-07 16:45:53

如果您想删除所有挂起的任务以及活动和保留的任务以完全停止 Celery,这对我有用:

from proj.celery import app
from celery.task.control import inspect, revoke

# remove pending tasks
app.control.purge()

# remove active tasks
i = inspect()
jobs = i.active()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

# remove reserved tasks
jobs = i.reserved()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

If you want to remove all pending tasks and also the active and reserved ones to completely stop Celery, this is what worked for me:

from proj.celery import app
from celery.task.control import inspect, revoke

# remove pending tasks
app.control.purge()

# remove active tasks
i = inspect()
jobs = i.active()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

# remove reserved tasks
jobs = i.reserved()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)
滥情稳全场 2024-12-07 16:45:53

在 Celery 3+ 中

http:// /docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

CLI

清除命名队列:

 celery -A proj amqp queue.purge <queue name>

清除配置的队列

celery -A proj purge

我已清除消息,但队列中仍有消息?
答:任务一旦实际执行就会被确认(从队列中删除)。 Worker 收到任务后,需要一段时间才能真正执行,特别是如果已经有很多任务正在等待执行。未确认的消息将由工作线程保留,直到它关闭与代理(AMQP 服务器)的连接。当该连接关闭时(例如,因为工作程序已停止),任务将由代理重新发送到下一个可用的工作程序(或重新启动时的同一个工作程序),因此要正确清除等待任务的队列,您可以必须停止所有工作人员,然后使用 celery.control.purge() 清除任务。

因此,要清除整个队列,必须停止工作人员。

In Celery 3+

http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

CLI

Purge named queue:

 celery -A proj amqp queue.purge <queue name>

Purge configured queue

celery -A proj purge

I’ve purged messages, but there are still messages left in the queue?
Answer: Tasks are acknowledged (removed from the queue) as soon as they are actually executed. After the worker has received a task, it will take some time until it is actually executed, especially if there are a lot of tasks already waiting for execution. Messages that are not acknowledged are held on to by the worker until it closes the connection to the broker (AMQP server). When that connection is closed (e.g. because the worker was stopped) the tasks will be re-sent by the broker to the next available worker (or the same worker when it has been restarted), so to properly purge the queue of waiting tasks you have to stop all the workers, and then purge the tasks using celery.control.purge().

So to purge the entire queue workers must be stopped.

静若繁花 2024-12-07 16:45:53

对于 Celery 5.0+,要从 CLI 执行此操作并定位特定队列:

celery -A APP_NAME purge --queues QUEUE_NAME

-f 选项添加到末尾以跳过如果您想像我一样在步骤中执行此操作,请执行确认步骤。

For Celery 5.0+, to do it from the CLI and to target a specific queue:

celery -A APP_NAME purge --queues QUEUE_NAME

Add -f option to the end to skip the confirmation step if you're trying to do it in on step like I was.

哀由 2024-12-07 16:45:53

芹菜4+
celery purge 命令以

celery -A *APPNAME* purge

编程方式清除所有配置的任务队列:

from proj.celery import app
app.control.purge()

所有挂起的任务都将被清除。
参考: 芹菜文档

celery 4+
celery purge command to purge all configured task queues

celery -A *APPNAME* purge

programmatically:

from proj.celery import app
app.control.purge()

all pending task will be purged.
Reference: celerydoc

痴情换悲伤 2024-12-07 16:45:53

对于以 RabbitMQ 作为代理的 Celery 版本 5.0+

我们需要首先建立从程序到代理的新连接,
并将连接与要清除的队列绑定。

# proj/celery.py
from celery import Celery
app = Celery('proj')
from proj.celery import app
queues = ['queue_A', 'queue_B', 'queue_C']
with app.connection_for_write() as conn:
    conn.connect()
    for queue in queues:
        count = app.amqp.queues[queue].bind(conn).purge()
        print(f'Purge {queue} with {count} message(s)')

For Celery Version 5.0+ with RabbitMQ as broker

We need establish a new connection from program to broker first,
and bind the connection with the queues to purge.

# proj/celery.py
from celery import Celery
app = Celery('proj')
from proj.celery import app
queues = ['queue_A', 'queue_B', 'queue_C']
with app.connection_for_write() as conn:
    conn.connect()
    for queue in queues:
        count = app.amqp.queues[queue].bind(conn).purge()
        print(f'Purge {queue} with {count} message(s)')
最丧也最甜 2024-12-07 16:45:53

1.
要正确清除等待任务队列,您必须停止所有工作人员(http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-still-messages-left-in-the-queue ):

$ sudo rabbitmqctl stop

或(如果 RabbitMQ/消息代理由 Supervisor 管理):

$ sudo supervisorctl stop all

2.
...然后从特定队列中清除任务:

$ cd <source_dir>
$ celery amqp queue.purge <queue name>

3.
启动 RabbitMQ:

$ sudo rabbitmqctl start

或(如果 RabbitMQ 由 Supervisor 管理):

$ sudo supervisorctl start all

1.
To properly purge the queue of waiting tasks you have to stop all the workers (http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-still-messages-left-in-the-queue):

$ sudo rabbitmqctl stop

or (in case RabbitMQ/message broker is managed by Supervisor):

$ sudo supervisorctl stop all

2.
...and then purge the tasks from a specific queue:

$ cd <source_dir>
$ celery amqp queue.purge <queue name>

3.
Start RabbitMQ:

$ sudo rabbitmqctl start

or (in case RabbitMQ is managed by Supervisor):

$ sudo supervisorctl start all
如果没有 2024-12-07 16:45:53

我想这可能已经解决了。旧任务仍然在我的 django 管理中的 django-celery-beat> 定期任务中,所以转到 django 管理页面并删除所有这些,然后如果您在 docker 容器中,重新启动它,所有问题都解决了

I think that may have solved it. the old tasks were still in django-celery-beat>periodic tasks in my django admin, so go to django admin page and delete all of them, then if you are in docker container, restart the it all the problem solved

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文