如何有选择地从 AMQP (RabbitMQ) 队列中删除消息?

发布于 2024-09-13 18:35:43 字数 369 浏览 11 评论 0原文

我想有选择地从 AMQP 队列中删除消息,甚至不读取它们。

场景如下:

发送方希望基于类型X的新信息到达的事实使类型X的消息过期。因为订阅者很可能还没有消费最新的 X 类型消息,所以发布者应该删除以前的 X 类型消息并将最新的消息放入队列中。整个操作对于订阅者来说应该是透明的 - 事实上他应该使用像 STOMP 这样简单的东西来获取消息。

如何使用 AMQP 做到这一点?或者也许在其他消息传递协议中更方便?

我想避免复杂的基础设施。所需的整个消息传递就像上面一样简单:一个队列、一个订阅者、一个发布者,但发布者必须能够根据给定条件临时删除消息。

发布者客户端将使用 Ruby,但实际上,只要我发现如何在协议中执行此操作,我就会处理任何语言。

I'd like to selectively delete messages from an AMQP queue without even reading them.

The scenario is as follows:

Sending side wants to expire messages of type X based on a fact that new information of type X arrived. Because it's very probable that the subscriber didn't consume latest message of type X yet, publisher should just delete previous X-type messages and put a newest one into the queue. The whole operation should be transparent to the subscriber - in fact he should use something as simple as STOMP to get the messages.

How to do it using AMQP? Or maybe it's more convenient in another messaging protocol?

I'd like to avoid a complicated infrastructure. The whole messaging needed is as simple as above: one queue, one subscriber, one publisher, but the publisher must have an ability to ad-hoc deleting the messages for a given criteria.

The publisher client will use Ruby but actually I'd deal with any language as soon as I discover how to do it in the protocol.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

新人笑 2024-09-20 18:35:43

您不需要消息队列,您需要键值数据库。例如,您可以使用 Redis 或 Tokyo Tyrant 来获取简单的可通过网络访问的键值数据库。或者只使用内存缓存。

每个消息类型都是一个键。当您使用相同的密钥写入新消息时,它会覆盖以前的值,因此该数据库的读者将永远无法获取过时的信息。

此时,您只需要一个消息队列来确定读取键的顺序(如果这很重要)。否则,就不断扫描数据库。如果您确实持续扫描数据库,最好将数据库放在读取器附近以减少网络流量。

我可能会做这样的事情
键:类型代码
value: lastUpdated, important data

然后我会发送包含以下内容的消息
typecode, lastUpdated 这样,读者就可以将该键的 LastUpdated 与他们上次从数据库中读取的键进行比较,并跳过读取它,因为它们已经是最新的了。

如果您确实需要使用 AMQP 执行此操作,请使用 RabbitMQ 和自定义交换类型,特别是最后值缓存交换。示例代码在这里 https://github.com/squaremo/rabbitmq-lvc-plugin

You do not want a message queue, you want a key-value database. For instance you could use Redis or Tokyo Tyrant to get a simple network-accessible key-value database. Or just use a memcache.

Each message type is a key. When you write a new message with the same key, it overwrites the previous value so the reader of this database will never be able to get out of date information.

At this point, you only need a message queue to establish the order in which keys should be read, if that is important. Otherwise, just continually scan the database. If you do continually scan the database, it is best to put the database near the readers to reduce network traffic.

I would probably do something like this
key: typecode
value: lastUpdated, important data

Then I would send messages that contain
typecode, lastUpdated That way the reader can compare lastupdated for that key to the one that they last read from the database and skip reading it because they are already up to date.

If you really need to do this with AMQP, then use RabbitMQ and a custom exchange type, specifically a Last Value Cache Exchange. Example code is here https://github.com/squaremo/rabbitmq-lvc-plugin

想念有你 2024-09-20 18:35:43

目前,您无法在 RabbitMQ(或更一般地说,在 AMQP 中)自动执行此操作。但是,这里有一个简单的解决方法。

假设您要发送三种类型的消息:X、Y 和 Z。如果我正确理解你的问题,当 X 消息到达时,你希望代理忘记所有其他尚未传递的 X 消息。

这在 RabbitMQ 中相当容易做到:

  • 声明三个队列:X、Y 和 Z(它们会自动绑定到默认交换器,其名称作为路由键,这正是我们想要的)
  • 生产者在发布消息时 ,生产者首先清除相关队列(因此​​,如果它正在发布 X 消息,它首先清除 X 队列);这有效地删除了过时的消息,
  • 消费者只需从它想要的队列中消费(X代表X消息,Y代表Y消息等);从它的角度来看,它只需要执行 basic.get 即可获取下一条相关消息。

这意味着当两个生产者几乎同时发送相同类型的消息时出现竞争条件。结果是队列可能同时有两条(或更多)消息,但由于消息数量受到生产者数量的上限,并且多余的消息在下一次发布时被清除,这应该不是什么大问题。

总而言之,该解决方案距离最佳解决方案仅多了一步,即在发布类型 X 的消息之前清除队列 X。

如果您在设置此配置时需要任何帮助,寻求建议的最佳位置是rabbitmq-discuss 邮件列表。

You cannot currently do this in RabbitMQ (or more generally, in AMQP) automatically. But, here's an easy workaround.

Let's say you want to send three types of messages: Xs, Ys and Zs. If I understand your question correctly, when an X message arrives, you want the broker to forget all other X messages that haven't been delivered.

This is fairly easy to do in RabbitMQ:

  • the producer declares three queues: X, Y, and Z (they're automatically bound to the default exchange with their names as routing keys, which is exactly what we want),
  • when publishing a message, the producer first purges the relevant queue (so, if it's publishing an X message, it first purges the X queue); this effectively removes the outdated messages,
  • the consumer simply consumes from the queue it wants (X for X messages, Y for Y messages, etc.); from its point of view, it just has to do a basic.get to get the next relevant message.

This implies a race condition when two producers send the same type of message at the about the same time. The result is that its possible for the a queue to have two (or more) messages at the same time, but since the number of messages is upper-bounded by the number of producers, and since the superfluous messages are purged on the next publish, this shouldn't be much of a problem.

To summarize, this solution has just one extra step from the optimal solution, namely purge queue X before publishing a message of type X.

If you need any help setting up this configuration, the perfect place to ask for advice is the rabbitmq-discuss mailing list.

好菇凉咱不稀罕他 2024-09-20 18:35:43

它似乎也可以在 RabbitMQ Web-UI 上工作,如果您只想从队列中删除前 n 条消息,

  • 请从选项卡“队列”中选择队列,向下滚动到“获取消息”部分,
  • 设置参数“Requeue=No”和数字您要从队列中删除的消息
  • 按“获取消息”按钮

It seems to work also from the RabbitMQ Web-UI, if you just want to remove first n messages from the queue

  • select the queue from tab "Queues", scroll down to section "Get messages"
  • set parameter "Requeue=No" and number of messages you want to remove from the queue
  • press "Get messages" button
沐歌 2024-09-20 18:35:43

这个问题因其标题而具有很高的知名度。仔细阅读描述会涉及更具体的场景。
因此,对于那些希望从队列中实际删除下一条(记住 FIFO)消息的用户,您可以使用rabbitmqadmin 并发出以下命令:

rabbitmqadmin getqueue=queuename requeue=false count=1

该命令本质上是消耗消息而不执行任何操作。带有用于备份消息的标志的完整命令可能如下所示。确保根据您的要求添加任何其他参数。

sudo pythonrabbitmqadmin -V virtualhostname -u user -p pass getqueue=queuename requeue=false count=1 Payload_file=~/origmsg

This question has high visibility due the title of it. Going through the description dwells with more specific scenario.
So for those users who are looking to actually delete the next (remember FIFO) message from the queue, you can make use of rabbitmqadmin and issue the below command:

rabbitmqadmin get queue=queuename requeue=false count=1

This command is essentially consuming the message and doing nothing. A complete command with flag to take backup of the message(s) might look like the below one. Make sure to add any other parameters as per your requirement.

sudo python rabbitmqadmin -V virtualhostname -u user -p pass get queue=queuename requeue=false count=1 payload_file=~/origmsg

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文