用 Celery 制作的临时队列

发布于 2024-11-30 12:06:45 字数 321 浏览 3 评论 0原文

我将 Celery 与 RabbitMQ 一起使用。最近,我注意到出现了大量的临时队列。

因此,我进行了实验,发现当任务失败(即任务引发异常)时,会形成一个具有随机名称(如 c76861943b0a4f3aaa6a99a6db06952c)的临时队列,并且该队列将保留。

在rabbitmqadmin中找到的临时队列的一些属性如下 -

auto_delete : True 消费者:0 耐用:假 消息:1 messages_ready : 1

每次任务失败(即引发异常)时都会创建一个这样的临时队列。如何避免这种情况呢?因为在我的生产环境中形成了大量这样的队列。

I am using Celery with RabbitMQ. Lately, I have noticed that a large number of temporary queues are getting made.

So, I experimented and found that when a task fails (that is a tasks raises an Exception), then a temporary queue with a random name (like c76861943b0a4f3aaa6a99a6db06952c) is formed and the queue remains.

Some properties of the temporary queue as found in rabbitmqadmin are as follows -

auto_delete : True
consumers : 0
durable : False
messages : 1
messages_ready : 1

And one such temporary queue is made everytime a task fails (that is, raises an Exception). How to avoid this situation? Because in my production environment a large number of such queues get formed.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

思慕 2024-12-07 12:06:46

听起来您正在使用 amqp 作为结果后端。从文档来看,这里是使用该特定设置的陷阱:

  • 每个新任务都会在服务器上创建一个新队列,其中包含数千个任务
    代理的任务可能因队列而超载,这将影响
    以消极的方式表现。如果您使用 RabbitMQ 那么每个
    队列将是一个单独的 Erlang 进程,所以如果您打算
    同时保留许多结果,您可能必须增加 Erlang
    进程限制以及操作系统的文件描述符最大数量
    允许
  • 旧结果不会自动清除,因此您必须
    一定要消耗结果,否则队列数量会增加
    最终失控。如果您运行的是 RabbitMQ 2.1.1 或
    更高,您可以利用队列的 x-expires 参数,
    队列在一定时间限制后就会过期
    未使用。队列到期时间可以通过设置(以秒为单位)
    CELERY_AMQP_TASK_RESULT_EXPIRES 设置(默认情况下未启用)。

根据我在变更日志中读到的内容,这是不再是版本 >=2.3.0 中的默认后端,因为用户会因为这种行为而受到后端影响。如果这不是您需要的功能,我建议更改结果后端。

It sounds like you're using the amqp as the results backend. From the docs here are the pitfalls of using that particular setup:

  • Every new task creates a new queue on the server, with thousands of
    tasks the broker may be overloaded with queues and this will affect
    performance in negative ways. If you’re using RabbitMQ then each
    queue will be a separate Erlang process, so if you’re planning to
    keep many results simultaneously you may have to increase the Erlang
    process limit, and the maximum number of file descriptors your OS
    allows
  • Old results will not be cleaned automatically, so you must make
    sure to consume the results or else the number of queues will
    eventually go out of control. If you’re running RabbitMQ 2.1.1 or
    higher you can take advantage of the x-expires argument to queues,
    which will expire queues after a certain time limit after they are
    unused. The queue expiry can be set (in seconds) by the
    CELERY_AMQP_TASK_RESULT_EXPIRES setting (not enabled by default).

From what I've read in the changelog, this is no longer the default backend in versions >=2.3.0 because users were getting bit in the rear end by this behavior. I'd suggest changing the results backend if this not the functionality you need.

娇纵 2024-12-07 12:06:46

嗯,菲利普就在那里。下面介绍一下我是如何解决的。它是celeryconfig.py中的配置。

正如菲利普所说,我仍在使用 CELERY_BACKEND = "amqp" 。但除此之外,我现在使用 CELERY_IGNORE_RESULT = True。此配置将确保不会为每个任务形成额外的队列。

我已经在使用此配置,但当任务失败时,仍然会形成额外的队列。然后我注意到我正在使用另一个需要删除的配置,即 CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True。这样做的作用是它不存储所有任务的结果,而只存储错误(失败的任务),因此为失败的任务提供了一个额外的队列。

Well, Philip is right there. The following is a description of how I solved it. It is a configuration in celeryconfig.py.

I am still using CELERY_BACKEND = "amqp" as Philip had said. But in addition to that, I am now using CELERY_IGNORE_RESULT = True. This configuration will ensure that the extra queues are not formed for every task.

I was already using this configuration but still when a task fails, the extra queue was formed. Then I noticed that I was using another configuration which needed to be removed which was CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True. What this did that it did not store the results for all tasks but did only for errors (tasks which failed) and hence one extra queue for a task which failed.

任谁 2024-12-07 12:06:46

CELERY_TASK_RESULT_EXPIRES 指示临时队列的生存时间。默认值为 1 天。您可以修改该值。

The CELERY_TASK_RESULT_EXPIRES dictates the time to live of the temp queues. The default is 1 day. You can modify this value.

转角预定愛 2024-12-07 12:06:46

发生这种情况的原因是因为celery工人远程控制已启用(默认情况下已启用)。

您可以通过设置 CELERY_ENABLE_REMOTE_CONTROL 设置来禁用它为假
但是,请注意,您将无法使用 celery 命令执行 add_consumer、cancel_consumer 等操作

The reason this is happening is because celery workers remote control is enabled (it is enabled by default).

You can disable it by setting the CELERY_ENABLE_REMOTE_CONTROL setting to False
However, note that you will lose the ability to do things like add_consumer, cancel_consumer etc using the celery command

并安 2024-12-07 12:06:46

amqp 后端为每个任务创建一个新队列。如果您想避免这种情况,可以使用 rpc 后端,它将结果保存在单个队列中。

在您的配置中,设置

CELERY_RESULT_BACKEND = 'rpc'
CELERY_RESULT_PERSISTENT = True

您可以阅读有关 的更多信息这在 celery 文档上

amqp backend creates a new queue for each task. If you want to avoid it, you can use rpc backend which keeps results in a single queue.

In your config, set

CELERY_RESULT_BACKEND = 'rpc'
CELERY_RESULT_PERSISTENT = True

You can read more about this on celery docs.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文