检索 Celery 队列中的任务列表
如何检索队列中尚未处理的任务列表?
How can I retrieve a list of tasks in a queue that are yet to be processed?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
如何检索队列中尚未处理的任务列表?
How can I retrieve a list of tasks in a queue that are yet to be processed?
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
接受
或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
发布评论
评论(19)
编辑:请参阅其他答案以获取队列中的任务列表。
您应该在此处查看:
Celery 指南 - 检查 Workers
基本上是这样的:
取决于什么你想要
EDIT: See other answers for getting a list of tasks in the queue.
You should look here:
Celery Guide - Inspecting Workers
Basically this:
Depending on what you want
如果您使用 Celery+Django 最简单的方法来检查任务,请直接从虚拟环境中的终端使用命令或使用 celery 的完整路径 :
文档:http ://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers
另外,如果您使用Celery+RabbitMQ,您可以使用以下命令检查队列列表:
更多信息:https://linux.die.net/man/1/rabbitmqctl
If you are using Celery+Django simplest way to inspect tasks using commands directly from your terminal in your virtual environment or using a full path to celery:
Doc: http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers
Also if you are using Celery+RabbitMQ you can inspect the list of queues using the following command:
More info: https://linux.die.net/man/1/rabbitmqctl
如果您使用的是rabbitMQ,请在终端中使用它:
它将打印带有待处理任务数量的队列列表。例如:
右列中的数字是队列中的任务数。在上面,celery 队列有 166 个待处理任务。
if you are using rabbitMQ, use this in terminal:
it will print list of queues with number of pending tasks. for example:
the number in right column is number of tasks in the queue. in above, celery queue has 166 pending task.
如果不使用优先任务,这实际上是 如果您使用 Redis,则非常简单。要获取任务计数:
但是,优先任务在redis中使用不同的键,所以整体情况稍微复杂一些。完整的情况是,您需要查询 Redis 以获取任务的每个优先级。在 python 中(以及来自 Flower 项目),这看起来像:
如果你想获得实际任务,你可以使用类似的东西:
从那里你必须反序列化返回的列表。就我而言,我能够通过以下方式完成此操作:
请注意,反序列化可能需要一些时间,并且您需要调整上面的命令以处理各种优先级。
If you don't use prioritized tasks, this is actually pretty simple if you're using Redis. To get the task counts:
But, prioritized tasks use a different key in redis, so the full picture is slightly more complicated. The full picture is that you need to query redis for every priority of task. In python (and from the Flower project), this looks like:
If you want to get an actual task, you can use something like:
From there you'll have to deserialize the returned list. In my case I was able to accomplish this with something like:
Just be warned that deserialization can take a moment, and you'll need to adjust the commands above to work with various priorities.
要从后端检索任务,请使用此
To retrieve tasks from backend, use this
具有 json 序列化功能的 Redis 复制粘贴解决方案:
它适用于 Django。只是不要忘记更改
yourproject.celery
。A copy-paste solution for Redis with json serialization:
It works with Django. Just don't forget to change
yourproject.celery
.这在我的应用程序中对我有用:
active_jobs
将是与队列中的任务相对应的字符串列表。不要忘记将 CELERY_APP_INSTANCE 替换为您自己的。
感谢 @ashish 的回答为我指明了正确的方向:https://stackoverflow.com/a/19465670/9843399
This worked for me in my application:
active_jobs
will be a list of strings that correspond to tasks in the queue.Don't forget to swap out CELERY_APP_INSTANCE with your own.
Thanks to @ashish for pointing me in the right direction with his answer here: https://stackoverflow.com/a/19465670/9843399
celery 检查模块似乎只从工作人员的角度了解任务。如果您想查看队列中的消息(尚未被工作人员拉取),我建议使用 pyrabbit,它可以与rabbitmq http api接口,从队列中检索各种信息。
可以在这里找到一个例子:
使用 Celery(RabbitMQ、Django)检索队列长度
The celery inspect module appears to only be aware of the tasks from the workers perspective. If you want to view the messages that are in the queue (yet to be pulled by the workers) I suggest to use pyrabbit, which can interface with the rabbitmq http api to retrieve all kinds of information from the queue.
An example can be found here:
Retrieve queue length with Celery (RabbitMQ, Django)
我认为获取正在等待的任务的唯一方法是保留您启动的任务列表,并让任务在启动时从列表中删除自己。
使用rabbitmqctl和list_queues,您可以了解有多少任务正在等待,但不能了解任务本身:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
如果您想要的内容包括正在处理但尚未完成的任务,您可以保留任务列表并检查它们的状态:
或者您让 Celery 使用 CELERY_RESULT_BACKEND 存储结果,并检查哪些任务不在那里。
I think the only way to get the tasks that are waiting is to keep a list of tasks you started and let the task remove itself from the list when it's started.
With rabbitmqctl and list_queues you can get an overview of how many tasks are waiting, but not the tasks itself: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
If what you want includes the task being processed, but are not finished yet, you can keep a list of you tasks and check their states:
Or you let Celery store the results with CELERY_RESULT_BACKEND and check which of your tasks are not in there.
据我所知,Celery 没有提供用于检查队列中等待的任务的 API。这是经纪人特定的。如果您使用 Redis 作为代理,那么检查在 celery(默认)队列中等待的任务就像这样简单:
请记住,这些任务正在等待可用的工作人员挑选。您的集群可能正在运行一些任务 - 这些任务不会在此列表中,因为它们已被选择。
在特定队列中检索任务的过程是特定于代理的。
As far as I know Celery does not give API for examining tasks that are waiting in the queue. This is broker-specific. If you use Redis as a broker for an example, then examining tasks that are waiting in the
celery
(default) queue is as simple as:celery
list (LRANGE command for an example)Keep in mind that these are tasks WAITING to be picked by available workers. Your cluster may have some tasks running - those will not be in this list as they have already been picked.
The process of retrieving tasks in particular queue is broker-specific.
我得出的结论是,获取队列中作业数量的最佳方法是使用rabbitmqctl,正如此处多次建议的那样。为了允许任何选定的用户使用 sudo 运行命令,我按照说明操作这里(我确实跳过了编辑配置文件部分,因为我不介意在命令之前输入 sudo。)
我还抓住了 jamesc 的
grep
和cut
片段并将其包装在子流程调用中。I've come to the conclusion the best way to get the number of jobs on a queue is to use
rabbitmqctl
as has been suggested several times here. To allow any chosen user to run the command withsudo
I followed the instructions here (I did skip editing the profile part as I don't mind typing in sudo before the command.)I also grabbed jamesc's
grep
andcut
snippet and wrapped it up in subprocess calls.要获取队列中的任务数量,您可以使用 flower 库,这里是一个简化的示例:
To get the number of tasks on a queue you can use the flower library, here is a simplified example:
如果您控制任务的代码,那么您可以通过让任务在第一次执行时触发一次简单的重试,然后检查
inspect().reserved()
来解决该问题。重试将任务注册到结果后端,celery 可以看到这一点。该任务必须接受self
或context
作为第一个参数,以便我们可以访问重试计数。该解决方案与代理无关,即。您不必担心是否使用 RabbitMQ 还是 Redis 来存储任务。
编辑:经过测试,我发现这只是部分解决方案。保留的大小仅限于工作线程的预取设置。
If you control the code of the tasks then you can work around the problem by letting a task trigger a trivial retry the first time it executes, then checking
inspect().reserved()
. The retry registers the task with the result backend, and celery can see that. The task must acceptself
orcontext
as first parameter so we can access the retry count.This solution is broker agnostic, ie. you don't have to worry about whether you are using RabbitMQ or Redis to store the tasks.
EDIT: after testing I've found this to be only a partial solution. The size of reserved is limited to the prefetch setting for the worker.
我从 Flower 代码库中找到了一个用于获取代理队列长度的用例。
它与经纪人访问一样快。
I found a usecase from the Flower codebase to get the broker queue length.
It's fast as broker access.
在这里,它对我有用,无需删除队列中的消息
不要忘记将 CELERY_APP_INSTANCE 替换为您自己的。
@Owen:希望我的解决方案满足您的期望。
Here it works for me without remove messages in queue
Don't forget to swap out CELERY_APP_INSTANCE with your own.
@Owen: Hope my solution meet your expectations.
这利用了 celery 的控制和检查命令,但也密切关注已提交的任务。
仅此并不能真正起作用,除非您有某种将项目排队的循环,如下所示:
使用此方法,发生的情况如下:
total
,它是特定队列中特定工作人员已处理的任务数。active_tasks
或已被 celery 处理的任务数。这意味着如果提交了
50
个任务并且已处理30
任务,则队列中有50-30 = 20
个任务This leverages
celery
'scontrol
andinspect
commands but also keeps an eye on the tasks that have been submitted.This alone doesn't really work unless you have some sort of loop that is enqueueing items, like the following:
With this what's happening is the following:
total
which is the number of tasks that have been processed by a specific worker in a particular queue.active_tasks
or the tasks that have been processed by celery.What this means is that if there are
50
tasks submitted and30
have been processed, then there are50-30 = 20
tasks in the queue使用
subprocess.run
:小心将
my_proj
更改为your_proj
With
subprocess.run
:Be careful to change
my_proj
withyour_proj