使用 RabbitMq 锁定并批量获取消息

发布于 2024-12-28 15:59:22 字数 558 浏览 3 评论 0原文

我正在尝试以一种更非常规的方式使用 RabbitMq(尽管此时我可以根据需要选择任何其他消息队列实现)。消费者没有将 Rabbit 推送消息留给我的消费者,而是连接到一个队列并获取一批 N 条消息(在此期间它消耗了一些消息,并可能拒绝一些消息),之后它跳转到另一个队列,依此类推。这样做是为了冗余。如果某些消费者崩溃,所有消息都保证被其他消费者消费。

问题是我有多个消费者,我不希望他们竞争同一个队列。有没有办法保证队列上的锁?如果不是,我至少可以确保如果两个消费者连接到同一个队列,他们不会读取相同的消息吗?事务可能在某种程度上对我有帮助,但我听说它们将从 RabbitMQ 中删除。

其他架构建议也受到欢迎。

谢谢!

编辑: 正如评论中指出的,我需要如何处理消息有一个特殊性。它们只有在分组时才有意义,并且相关消息很有可能聚集在队列中。例如,如果我提取一批 100 条消息,那么我很有可能能够对消息 1-3、4-5,6-10 等执行某些操作。如果我无法找到某些消息的组,我会会将它们重新提交到队列中。 WorkQueue 不起作用,因为它将消息从同一组传播到多个工人,而这些工人不知道如何处理它们。

I'm trying to use RabbitMq in a more unconventional way (though at this point i can pick any other message queue implementation if needed). Instead of leaving Rabbit push messages to my consumers, the consumer connects to a queue and fetches a batch of N messages (during which it consumes some and possible rejects some), after which it jumps to another queue and so on. This is done for redundancy. If some consumers crash all messages are guaranteed to be consumed by some other consumer.

The problem is that I have multiple consumers and I don't want them to compete over the same queue. Is there a way to guarantee a lock on a queue? If not, can I at least make sure that if 2 consumers are connected to the same queue they don't read the same message? Transactions might help me to some degree but I've heard talk that they'll get removed from RabbitMQ.

Other architectural suggestions are welcomed too.

Thanks!

EDIT:
As pointed in the comment there's an a particularity in how I need to process the messages. They only make sense taken in groups and there's a high probability that related messages are clumped together in a queue. If for example I pull a batch of 100 messages, there's a high probability that I'll be able to do something with messages 1-3, 4-5,6-10 etc. If I fail to find a group for some messages I'll resubmit them to the queue. WorkQueue wouldn't work because it would spread messages from the same group to multiple workers that wouldn't know what to do with them.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

一个人的旅程 2025-01-04 15:59:22

您是否看过这本关于企业集成模式的免费在线书籍?

听起来您确实需要一个工作流程,在消息到达您的工作人员之前,您需要一个批处理组件。使用 RabbitMQ 有两种方法可以做到这一点。要么使用一种可以为您进行批处理的交换类型(和消息格式),要么使用一个队列,以及一个对批次进行排序并将每个批次放入其自己的队列中的工作人员。批处理程序可能还应该向控制队列发送“批处理就绪”消息,以便工作人员可以发现新批处理队列的存在。处理完批次后,工作人员可以删除批次队列。

如果您可以控制消息格式,则可以让 RabbitMQ 通过多种方式隐式执行批处理。通过主题交换,您可以确保每条消息上的路由密钥的格式为 work.batchid.something,然后获悉批次 xxyzz 存在的工作人员将使用像 #.xxyzz.# 这样的绑定密钥来仅消费这些消息。无需重新发布。

另一种方法是在标头中包含批次 ID 并使用较新的标头交换类型。当然,如果您愿意编写少量的 Erlang 代码,您也可以实现自己的自定义交换类型。

不过,我确实建议您查看这本书,因为它比大多数人开始使用的典型工作队列概念更好地概述了消息传递架构。

Have you had a look at this free online book on Enterprise Integration Patterns?

It sounds like you really need a workflow where you have a batcher component before the messages get to your workers. With RabbitMQ there are two ways to do that. Either use an exchange type (and message format) that can do the batching for you, or have one queue, and a worker that sorts out batches and places each batch on its own queue. The batcher should probably also send a "batch ready" message to a control queue so that a worker can discover the existence of the new batch queue. Once the batch is processed the worker could delete the batch queue.

If you have control over the message format, you might be able to get RabbitMQ to do the batching implicitly in a couple of ways. With a topic exchange, you could make sure that the routing key on each message is of the format work.batchid.something and then a worker that learns of the existence of batch xxyzz would use a binding key like #.xxyzz.# to only consume those messages. No republishing needed.

The other way is to include a batch id in a header and use the newer headers exchange type. Of course you can also implement your own custom exchange types if you are willing to write a small amount of Erlang code.

I do recommend checking the book though, because it gives a better overview of messaging architecture than the typical worker queue concept that most people start with.

若相惜即相离 2025-01-04 15:59:22

让您的消费者只从一个队列中拉取。他们将被保证不会共享消息(Rabbit 将在当前连接的消费者之间循环消息),并且它针对确切的使用模式进行了大量优化。

它开箱即用。在 RabbitMQ 文档中,它称为 工作队列 模型。一个队列,多个消费者,没有一个人共享任何东西。听起来像是你所需要的。

Have your consumers pull from just one queue. They will be guaranteed not to share messages (Rabbit will round-robin the messages among the currently-connected consumers) and it's heavily optimized for that exact usage pattern.

It's ready-to-use, out of the box. In the RabbitMQ docs it's called the Work Queue model. One queue, multiple consumers, with none of them sharing anything. It sounds like what you need.

冬天旳寂寞 2025-01-04 15:59:22

您可以设置通道/消费者级别的预取计数来批量消费消息。为了重新提交消息,您应该使用 basic.reject AMQP 方法,并且可以选择将这些消息重新排队或转发到死信队列。多个消费者尝试从同一个队列中提取消息不是问题,因为 AMQP basic.get 方法将同步以处理并发消费者。

https://groups.google.com/forum/#!topic /rabbitmq-users/hJ8f5du-GCA

You can set a channel/consumer level prefetch count to consume messages in batches. In order to re-submit messages, you should use the basic.reject AMQP method and those messages can be chosen to be requeued or forwarded to a dead letter queue. Multiple consumers trying to pull messages from the same queue is not an issue asthe AMQP basic.get method will be synchronized to handle concurrent consumers.

https://groups.google.com/forum/#!topic/rabbitmq-users/hJ8f5du-GCA

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文