如何在python中使用celery清除特定队列的所有任务?

发布于 2024-12-12 03:43:23 字数 303 浏览 3 评论 0原文

如何在python中使用celery清除特定que的所有计划和正在运行的任务?这些问题似乎很直接,但要补充的是,我不是在寻找命令行代码

,我有以下行,它定义了 que 并希望清除该 que 来管理任务:

CELERY_ROUTES = {"socialreport.tasks.twitter_save": {"queue": "twitter_save"}}

在 1 个时间点,我想清除其中的所有任务que twitter_save 与 python 代码,也许与广播功能?我找不到有关此的文档。这可能吗?

How to purge all scheduled and running tasks of a specific que with celery in python? The questions seems pretty straigtforward, but to add I am not looking for the command line code

I have the following line, which defines the que and would like to purge that que to manage tasks:

CELERY_ROUTES = {"socialreport.tasks.twitter_save": {"queue": "twitter_save"}}

At 1 point in time I wanna purge all tasks in the que twitter_save with python code, maybe with a broadcast function? I couldn't find the documentation about this. Is this possible?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

雪若未夕 2024-12-19 03:43:23

只是为了更新 celery 3.1 的 @Sam Stoelinga 答案,现在可以在终端上像这样完成:

celery amqp queue.purge <QUEUE_NAME>

对于 Django,请务必从 manage.py 文件启动它:

./manage.py celery amqp queue.purge <QUEUE_NAME> 

如果没有,请确保 celery 能够正确指向代理通过设置 --broker= 标志。

just to update @Sam Stoelinga answer for celery 3.1, now it can be done like this on a terminal:

celery amqp queue.purge <QUEUE_NAME>

For Django be sure to start it from the manage.py file:

./manage.py celery amqp queue.purge <QUEUE_NAME> 

If not, be sure celery is able to point correctly to the broker by setting the --broker= flag.

阪姬 2024-12-19 03:43:23

原来的答案不适用于 Celery 3.1。如果您想从命令行执行此操作,Hassek 的更新是正确的命令。但是,如果您想以编程方式执行此操作,请执行以下操作:

假设您将 Celery 应用程序运行为:

celery_app = Celery(...)

然后:

import celery.bin.amqp
amqp = celery.bin.amqp.amqp(app = celery_app)
amqp.run('queue.purge', 'name_of_your_queue')

这对于您已将一堆任务排入队列并且一个任务遇到致命错误的情况非常方便您知道的条件将阻止其余任务的执行。

例如,您将一堆网络爬虫任务排入队列,并且在任务执行过程中,您的服务器的 IP 地址被阻止。执行其余任务是没有意义的。因此,在这种情况下,您的任务本身可以清除自己的队列。

The original answer does not work for Celery 3.1. Hassek's update is the correct command if you want to do it from the command line. But if you want to do it programmatically, do this:

Assuming you ran your Celery app as:

celery_app = Celery(...)

Then:

import celery.bin.amqp
amqp = celery.bin.amqp.amqp(app = celery_app)
amqp.run('queue.purge', 'name_of_your_queue')

This is handy for cases where you've enqueued a bunch of tasks, and one task encounters a fatal condition that you know will prevent the rest of the tasks from executing.

E.g. you enqueued a bunch of web crawler tasks, and in the middle of your tasks your server's IP address gets blocked. There's no point in executing the rest of the tasks. So in that case, your task it self can purge its own queue.

宫墨修音 2024-12-19 03:43:23

哈哈,这很简单,但希望有人能帮助我。

from celery.bin.camqadm import camqadm
camqadm('queue.purge', queue_name_as_string)

唯一的问题是我仍然需要在清除队列之前停止 celeryd,清除后我需要再次运行 celeryd 来处理队列的任务。如果我成功,将更新这个问题。

我成功了,但如果这不是停止 celeryd、清除 que 并重新启动它的好方法,请纠正我。我知道我正在使用术语,因为我实际上希望它终止任务。

kill_command =  "ps auxww | grep 'celeryd -n twitter_save' | awk '{print $2}' | xargs kill -9"
subprocess.call(kill_command, shell=True)

camqadm('queue.purge', 'twitter_save')
rerun_command = "/home/samos/Software/virt_env/twittersyncv1/bin/python %s/manage.py celeryd -n twitter_save -l info -Q twitter_save" % settings.PROJECT_ROOT

os.popen(rerun_command+' &')
send_task("socialreport.tasks.twitter_save")

Lol it's quite easy, hope somebody can help me still though.

from celery.bin.camqadm import camqadm
camqadm('queue.purge', queue_name_as_string)

The only problem with this I still need to stop the celeryd before purging the que, after purging I need to run the celeryd again to handle tasks for the queue. Will update this question if i succeed.

I succeeded, but please correct me if this is not a good method to stop the celeryd, purge que and start it again. I know I am using term, because I actually want it to be terminated the task.

kill_command =  "ps auxww | grep 'celeryd -n twitter_save' | awk '{print $2}' | xargs kill -9"
subprocess.call(kill_command, shell=True)

camqadm('queue.purge', 'twitter_save')
rerun_command = "/home/samos/Software/virt_env/twittersyncv1/bin/python %s/manage.py celeryd -n twitter_save -l info -Q twitter_save" % settings.PROJECT_ROOT

os.popen(rerun_command+' &')
send_task("socialreport.tasks.twitter_save")
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文