RabbitMQ:带有主题交换的持久消息

发布于 2024-11-09 21:18:19 字数 525 浏览 2 评论 0原文

我对 RabbitMQ 很陌生。

我已经建立了一个“主题”交换。消费者可以在发布者之后启动。我希望消费者能够接收在启动之前发送且尚未消耗的消息。

交换器使用以下参数进行设置:

exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0

使用此参数发布消息:

delivery_mode => 2

消费者使用 get() 从交换器检索消息。

不幸的是,在任何客户端启动之前发布的任何消息都会丢失。我使用了不同的组合。

我想我的问题是交换不保存消息。也许我需要在发布者和消费者之间有一个队列。但这似乎不适用于通过密钥路由消息的“主题”交换。

我应该如何进行?我使用 Perl 绑定 Net::RabbitMQ (应该没关系)和 RabbitMQ 2.2.0

I am very new to RabbitMQ.

I have set up a 'topic' exchange. The consumers may be started after the publisher. I'd like the consumers to be able to receive messages that have been sent before they were up, and that was not consumed yet.

The exchange is set up with the following parameters:

exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0

The messages are published with this parameter:

delivery_mode => 2

Consumers use get() to retrieve the messages from the exchange.

Unfortunately, any message published before any client was up is lost. I have used different combinations.

I guess my problem is that the exchange does not hold messages. Maybe I need to have a queue between the publisher and the consumer. But this does not seem to work with a 'topic' exchange where messages are routed by a key.

How should I proceed? I use the Perl binding Net::RabbitMQ (shouldn't matter) and RabbitMQ 2.2.0.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

悲凉≈ 2024-11-16 21:18:19

如果在发布消息时没有连接的使用者可用于处理消息,则需要一个持久队列来存储消息。

交换器不存储消息,但队列可以。令人困惑的部分是,交换可以被标记为“持久”,但这真正意味着,如果您重新启动代理,交换本身仍将存在,但它确实不存在。 > 意味着发送到该交换器的任何消息都会自动保留。

鉴于此,这里有两个选项:

  1. 在启动发布商自行创建队列之前,执行管理步骤。您可以使用 Web UI 或命令行工具来执行此操作。确保将其创建为持久队列,以便即使没有活动的使用者,它也能存储路由到它的任何消息。
  2. 假设您的消费者被编码为始终在启动时声明(因此自动创建)他们的交换和队列(并且他们将它们声明为持久的),只需在启动任何发布者之前运行所有消费者至少一次 。这将确保正确创建所有队列。然后,您可以关闭消费者,直到真正需要它们为止,因为队列将持久存储路由到它们的任何未来消息。

我会选择#1。需要执行的步骤可能并不多,您始终可以编写所需步骤的脚本,以便可以重复这些步骤。另外,如果所有消费者都将从同一个队列中提取数据(而不是每个都有一个专用队列),那么这实际上是最小的管理开销。

队列是需要适当管理和控制的东西。否则,您最终可能会遇到流氓消费者声明持久队列,使用它们几分钟,但再也不会。不久之后,您将拥有一个永久增长的队列,并且没有任何东西可以减少其大小,以及即将到来的经纪人灾难。

You need a durable queue to store messages if there are no connected consumers available to process the messages at the time they are published.

An exchange doesn't store messages, but a queue can. The confusing part is that exchanges can be marked as "durable" but all that really means is that the exchange itself will still be there if you restart your broker, but it does not mean that any messages sent to that exchange are automatically persisted.

Given that, here are two options:

  1. Perform an administrative step before you start your publishers to create the queue(s) yourself. You could use the web UI or the command line tools to do this. Make sure you create it as a durable queue so that it will store any messages that are routed to it even if there are no active consumers.
  2. Assuming your consumers are coded to always declare (and therefore auto-create) their exchanges and queues on startup (and that they declare them as durable), just run all your consumers at least once before starting any publishers. That will ensure that all your queues get created correctly. You can then shut down the consumers until they're really needed because the queues will persistently store any future messages routed to them.

I would go for #1. There may not be many steps to perform and you could always script the steps required so that they could be repeated. Plus if all your consumers are going to pull from the same single queue (rather than have a dedicated queue each) it's really a minimal piece of administrative overhead.

Queues are something to be managed and controlled properly. Otherwise you could end up with rogue consumers declaring durable queues, using them for a few minutes but never again. Soon after you'll have a permanently-growing queue with nothing reducing its size, and an impending broker apocalypse.

ま柒月 2024-11-16 21:18:19

正如 Brian 所提到的,交换器不存储消息,主要负责将消息路由到另一个交换器或队列。如果交换器未绑定到队列,则发送到该交换器的所有消息都将“丢失”。

您不需要在发布者脚本中声明固定的客户端队列,因为这可能不可扩展。队列可以由发布者动态创建,并使用交换器到交换器绑定在内部路由。

RabbitMQ 支持交换到交换的绑定,这将允许拓扑灵活性、解耦和其他好处。您可以在 RabbitMQ Exchange 到 Exchange 绑定处阅读更多信息[AMPQ]

RabbitMQ Exchange 到 Exchange 绑定

示例拓扑

如果不存在使用队列的消费者,则用于创建具有持久性的交换到交换绑定的示例 Python 代码。

#!/usr/bin/env python
import pika
import sys
 
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
 
 
#Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
channel.exchange_declare(exchange='data_gateway',
exchange_type='fanout',
durable=True,
auto_delete=False)
 
#Declares the processing exchange to be used.Routes messages to various queues. For internal use only
channel.exchange_declare(exchange='data_distributor',
exchange_type='topic',
durable=True,
auto_delete=False)
 
#Binds the external/producer facing exchange to the internal exchange
channel.exchange_bind(destination='data_distributor',source='data_gateway')
 
##Create Durable Queues binded to the data_distributor exchange
channel.queue_declare(queue='trade_db',durable=True)
channel.queue_declare(queue='trade_stream_service',durable=True)
channel.queue_declare(queue='ticker_db',durable=True)
channel.queue_declare(queue='ticker_stream_service',durable=True)
channel.queue_declare(queue='orderbook_db',durable=True)
channel.queue_declare(queue='orderbook_stream_service',durable=True)
 
#Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')

As mentioned by Brian an exchange does not store messages and is mainly responsible for routing messages to either another exchange/s or queue/s. If the exchange is not bound to a queue, then all messages sent to that exchange will be 'lost'.

You should not need to declare fixed client queues in the publisher script since this might not be scalable. Queues can be created dynamically by your publishers and routed internally using exchange-to-exchange binding.

RabbitMQ supports exchange-to-exchange bindings that will allow for topology flexibility, decoupling and other benefits. You can read more here at RabbitMQ Exchange to Exchange Bindings [AMPQ]

RabbitMQ Exchange To Exchange Binding

Example Topology

Example Python code to create exchange-to-exchange binding with persistence if no consumer is present using queue.

#!/usr/bin/env python
import pika
import sys
 
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
 
 
#Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
channel.exchange_declare(exchange='data_gateway',
exchange_type='fanout',
durable=True,
auto_delete=False)
 
#Declares the processing exchange to be used.Routes messages to various queues. For internal use only
channel.exchange_declare(exchange='data_distributor',
exchange_type='topic',
durable=True,
auto_delete=False)
 
#Binds the external/producer facing exchange to the internal exchange
channel.exchange_bind(destination='data_distributor',source='data_gateway')
 
##Create Durable Queues binded to the data_distributor exchange
channel.queue_declare(queue='trade_db',durable=True)
channel.queue_declare(queue='trade_stream_service',durable=True)
channel.queue_declare(queue='ticker_db',durable=True)
channel.queue_declare(queue='ticker_stream_service',durable=True)
channel.queue_declare(queue='orderbook_db',durable=True)
channel.queue_declare(queue='orderbook_stream_service',durable=True)
 
#Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')
扭转时空 2024-11-16 21:18:19

您的情况似乎是“消息持久性”。

RabbitMQ 教程文档 中,您需要标记两个 队列消息作为持久的(下面的代码为C#版本。对于其他语言,您可以更喜欢此处)。

  1. 首先,在Publisher中,您需要确保队列能够在RabbitMQ节点重新启动后继续存在。为此,我们需要将其声明为持久的:
channel.QueueDeclare(queue: "hello",
                     durable: true,
                     ....);
  1. 其次,在 Consumer 中,您需要将您的消息标记为持久 - 通过设置 IBasicProperties.SetPercient为真。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;

Your case seems to be "Message durability".

From RabbitMQ Tutorials docs, You need to mark both the queue and messages as durable (The code below as C# version. With other languages, you can prefer here).

  1. Firstly, In Publisher, You need to make sure that the queue will survive a RabbitMQ node restart. In order to do so, we need to declare it as durable:
channel.QueueDeclare(queue: "hello",
                     durable: true,
                     ....);
  1. Secondly, In Consumer, You need to mark your messages as persistent - by setting IBasicProperties.SetPersistent to true.
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文