如何并行处理消息,同时确保每个实体 FIFO?

发布于 2024-11-01 19:36:07 字数 914 浏览 4 评论 0原文

假设您的系统中有一个实体,例如“Person”,并且您想要处理修改各种 Person 实体的事件。重要的是:

  • 同一 Person 的事件按 FIFO 顺序处理
  • 多个 Person 事件流由不同的线程/进程并行处理

我们有一个使用共享数据库和锁来解决此问题的实现。线程竞争获取Person的锁,获取锁后按顺序处理事件。我们希望转向消息队列以避免轮询和锁定,我们认为这会减少数据库的负载并简化消费者代码的实现。

我对 ActiveMQ、RabbitMQ 和 HornetQ 进行了一些研究,但我没有看到明显的方法来实现这一点。

ActiveMQ 支持消费者订阅通配符,但我没有找到将每个队列的并发度限制为 1 的方法。如果我能做到这一点,那么解决方案将很简单:

  • 以某种方式告诉代理允许所有队列的并发度为 1与:/队列/人。
  • 发布者使用队列名称中的人员 ID 将事件写入队列。例如:/queue/person.20
  • 消费者使用通配符订阅队列:/queue/person.>
  • 每个消费者都会收到不同人员队列的消息。如果所有人员队列都在使用中,则某些消费者可能会闲置,这是可以的。
  • 在处理消息后,消费者会发送一个 ACK​​,告诉代理消息已完成,并允许将该人员队列的另一条消息发送到另一个消费者(可能是同一个)

ActiveMQ 很接近:您可以进行通配符订阅并启用“独占消费者”,但这种组合会导致单个消费者接收发送到所有匹配队列的所有消息,从而将所有人的并发度降低到 1。我觉得我错过了一些明显的东西。

问题:

  • 有没有办法通过任何主要的消息队列实现来实现上述方法?我们对选择持相当开放的态度。唯一的要求是它在 Linux 上运行。
  • 有没有不同的方法来解决我没有考虑的一般问题?

谢谢!

Let's say you have an entity, say, "Person" in your system and you want to process events that modify various Person entities. It is important that:

  • Events for the same Person are processed in FIFO order
  • Multiple Person event streams be processed in parallel by different threads/processes

We have an implementation that solves this using a shared database and locks. Threads compete to acquire the lock for a Person and then process events in order after acquiring the lock. We'd like to move to a message queue to avoid polling and locking, which we feel would reduce load on the DB and simplify the implementation of the consumer code.

I've done some research into ActiveMQ, RabbitMQ, and HornetQ but I don't see an obvious way to implement this.

ActiveMQ supports consumer subscription wildcards, but I don't see a way to limit the concurrency on each queue to 1. If I could do that, then the solution would be straightforward:

  • Somehow tell broker to allow a concurrency of 1 for all queues starting with: /queue/person.
  • Publisher writes event to queue using Person ID in the queue name. e.g.: /queue/person.20
  • Consumers subscribe to the queue using wildcards: /queue/person.>
  • Each consumer would receive messages for different person queues. If all person queues were in use, some consumers may sit idle, which is ok
  • After processing a message, the consumer sends an ACK, which tells the broker it's done with the message, and allows another message for that Person queue to be sent to another consumer (possibly the same one)

ActiveMQ came close: You can do wildcard subscriptions and enable "exclusive consumer", but that combination results in a single consumer receiving all messages sent to all matching queues, reducing your concurrency to 1 across all Persons. I feel like I'm missing something obvious.

Questions:

  • Is there way to implement the above approach with any major message queue implementation? We are fairly open to options. The only requirement is that it run on Linux.
  • Is there a different way to solve the general problem that I'm not considering?

Thanks!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

夜雨飘雪 2024-11-08 19:36:08

解决这个问题的一种通用方法(如果我的问题正确的话)是为 Person 引入一些独特的属性(例如,Person 的数据库级 id),并使用该属性的哈希值作为 FIFO 队列的索引来将该 Person 放入其中。
由于该属性的哈希值可能非常大(您无法承受 2^32 个队列/线程),因此仅使用该哈希值的 N 个最低有效位。
每个 FIFO 队列都应该有专门的工作人员来处理它——瞧,您的要求得到了满足!

这种方法有一个缺点——您的人员必须具有分布均匀的 id 才能使所有队列以或多或少相同的负载工作。如果您不能保证这一点,请考虑使用循环队列集并跟踪现在正在处理哪些人,以确保对同一个人进行顺序处理。

One general way to solve this problem (if I got your problem right) is to introduce some unique property for Person (say, database-level id of Person) and use hash of that property as index of FIFO queue to put that Person in.
Since hash of that property can be unwieldy big (you can't afford 2^32 queues/threads), use only N the least significant bits of that hash.
Each FIFO queue should have dedicated worker that will work upon it -- voila, your requirements are satisfied!

This approach have one drawback -- your Persons must have well-distributed ids to make all queues work with more-or-less equal load. If you can't guarantee that, consider using round-robin set of queues and track which Persons are being processed now to ensure sequential processing for same person.

め七分饶幸 2024-11-08 19:36:08

如果您已经有一个允许共享锁的系统,为什么不为每个队列都拥有一个锁,消费者在从队列中读取数据之前必须获取该锁呢?

If you already have a system that allows shared locks, why not have a lock for every queue, which consumers must acquire before they read from the queue?

花开雨落又逢春i 2024-11-08 19:36:07

看起来 JMSXGroupID 就是我正在寻找的。来自ActiveMQ文档:

http://activemq.apache.org/message-groups.html

他们的股票价格示例用例正是我所追求的。我唯一担心的是如果单个消费者死亡会发生什么。希望代理能够检测到这一点并选择另一个消费者与该组 ID 关联。

It looks like JMSXGroupID is what I'm looking for. From the ActiveMQ docs:

http://activemq.apache.org/message-groups.html

Their example use case with stock prices is exactly what I'm after. My only concern is what happens if the single consumer dies. Hopefully the broker will detect that and pick another consumer to associate with that group id.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文