为什么 RabbitMQ 不将消息持久保存在持久队列上?

发布于 2024-10-18 06:27:59 字数 1546 浏览 1 评论 0原文

我通过 Celery 将 RabbitMQ 与 Django 结合使用。我正在使用最基本的设置:

# RabbitMQ connection settings
BROKER_HOST = 'localhost'
BROKER_PORT = '5672'
BROKER_USER = 'guest'
BROKER_PASSWORD = 'guest'
BROKER_VHOST = '/'

我导入了一个 Celery 任务并将其排队等待一年后运行。从 iPython shell:

In [1]: from apps.test_app.tasks import add

In [2]: dt=datetime.datetime(2012, 2, 18, 10, 00)

In [3]: add.apply_async((10, 6), eta=dt)
DEBUG:amqplib:Start from server, version: 8.0, properties: {u'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', u'product': 'RabbitMQ', u'version': '2.2.0', u'copyright': 'Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.', u'platform': 'Erlang/OTP'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: ['en_US']
DEBUG:amqplib:Open OK! known_hosts []
DEBUG:amqplib:using channel_id: 1
DEBUG:amqplib:Channel open
DEBUG:amqplib:Closed channel #1
Out[3]: <AsyncResult: cfc507a1-175f-438e-acea-8c989a120ab3>

RabbitMQ 在 celery 队列中收到此消息:

$  rabbitmqctl list_queues name messages durable
Listing queues ...
KTMacBook.local.celeryd.pidbox  0   false
celery  1   true
celeryctl_KTMacBook.local   0   true
...done.

然后,我通过按 control-C 后跟“a”来中止来终止 RabbitMQ。当我再次启动服务器并使用rabbitmqctl检查它时,它说芹菜队列中没有消息:

$  rabbitmqctl list_queues name messages durable
Listing queues ...
celery  0   true
celeryctl_KTMacBook.local   0   true
...done.

芹菜队列是持久的。为什么消息没有被保留?我需要做什么才能使消息持久化?

I am using RabbitMQ with Django through Celery. I am using the most basic setup:

# RabbitMQ connection settings
BROKER_HOST = 'localhost'
BROKER_PORT = '5672'
BROKER_USER = 'guest'
BROKER_PASSWORD = 'guest'
BROKER_VHOST = '/'

I imported a Celery task and queued it to run one year later. From the iPython shell:

In [1]: from apps.test_app.tasks import add

In [2]: dt=datetime.datetime(2012, 2, 18, 10, 00)

In [3]: add.apply_async((10, 6), eta=dt)
DEBUG:amqplib:Start from server, version: 8.0, properties: {u'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', u'product': 'RabbitMQ', u'version': '2.2.0', u'copyright': 'Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.', u'platform': 'Erlang/OTP'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: ['en_US']
DEBUG:amqplib:Open OK! known_hosts []
DEBUG:amqplib:using channel_id: 1
DEBUG:amqplib:Channel open
DEBUG:amqplib:Closed channel #1
Out[3]: <AsyncResult: cfc507a1-175f-438e-acea-8c989a120ab3>

RabbitMQ received this message in the celery queue:

$  rabbitmqctl list_queues name messages durable
Listing queues ...
KTMacBook.local.celeryd.pidbox  0   false
celery  1   true
celeryctl_KTMacBook.local   0   true
...done.

I then killed RabbitMQ by hitting control-C followed by 'a' to abort. When I start the server again and check it with rabbitmqctl, it says that there are no messages in the celery queue:

$  rabbitmqctl list_queues name messages durable
Listing queues ...
celery  0   true
celeryctl_KTMacBook.local   0   true
...done.

The celery queue was durable. Why were the messages not persisted? What do I need to do to make the messages persistent?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

红焚 2024-10-25 06:27:59

使队列持久化与使其上的消息持久化不同。持久队列意味着当服务器重新启动时它们会自动再次出现 - 这显然发生在您的情况下。但这并不影响消息本身。

要使消息持久化,您还必须将消息的 delivery_mode 属性标记为 2。请参阅经典文章 兔子和华伦 以获得完整的解释。

编辑:完整链接已损坏,但截至 2013 年 12 月,您仍然可以从主 URL 找到该博客文章: http: //blogs.digitar.com/jjww/

Making a queue durable is not the same as making the messages on it persistent. Durable queues mean they come up again automatically when the server has restarted - which has obviously happened in your case. But this doesn't affect the messages themselves.

To make messages persistent, you have to also mark the message's delivery_mode property to 2. See the classic write-up Rabbits and Warrens for a full explanation.

Edit: Full link is broken, but as of Dec 2013 you could still find the blog post from the main URL: http://blogs.digitar.com/jjww/

风追烟花雨 2024-10-25 06:27:59

要找出消息delivery_mode,您可以使用它并查看消息属性:

>>> from tasks import add
>>> add.delay(2, 2)

>>> from celery import current_app
>>> conn = current_app.broker_connection()
>>> consumer = current_app.amqp.get_task_consumer(conn)

>>> messages = []
>>> def callback(body, message):
...     messages.append(message)
>>> consumer.register_callback(callback)
>>> consumer.consume()

>>> conn.drain_events(timeout=1)

>>> messages[0].properties
>>> messages[0].properties
{'application_headers': {}, 'delivery_mode': 2, 'content_encoding': u'binary',    'content_type': u'application/x-python-serialize'}

To find out the messages delivery_mode you can consume it and look at the message properties:

>>> from tasks import add
>>> add.delay(2, 2)

>>> from celery import current_app
>>> conn = current_app.broker_connection()
>>> consumer = current_app.amqp.get_task_consumer(conn)

>>> messages = []
>>> def callback(body, message):
...     messages.append(message)
>>> consumer.register_callback(callback)
>>> consumer.consume()

>>> conn.drain_events(timeout=1)

>>> messages[0].properties
>>> messages[0].properties
{'application_headers': {}, 'delivery_mode': 2, 'content_encoding': u'binary',    'content_type': u'application/x-python-serialize'}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文