生产者通过消息队列一致地向消费者进行散列?

发布于 2024-09-17 12:32:55 字数 660 浏览 10 评论 0原文

我有一个生产者,我想通过一致的散列在消费者之间一致地分配工作。例如,对于消费者节点 X 和 Y,任务 A、B、C 应始终分配给消费者 X,而 D、E、F 应分配给消费者 Y。但如果 Z 加入消费者池,情况可能会发生一些变化。

我不想编写自己的逻辑来连接到消费者节点,尤其是不想管理节点加入和离开池,所以我沿着使用 RabbitMQ 的道路,以及每个消费者节点的独占队列。

我遇到的一个问题是列出这些队列,因为生产者需要在分发工作之前知道所有可用的队列。 AMQP 甚至不支持列出队列,这让我不确定我的整个方法。 RabbitMQ 和 Alice(目前已损坏 )添加该功能: 是否有 API用于在 RabbitMQ 上列出队列和交换器?

这是 Rabbit 的明智使用吗?我应该使用消息队列吗?是否有更好的设计,以便队列可以一致在消费者之间分配我的工作,而不是我需要这样做?

I have a producer that I want to distribute work consistently across consumers by consistent hashing. For example, with consumer nodes X and Y, tasks A, B, C should always go to consumer X, and D, E, F to consumer Y. But that may shift a little if Z joins the pool of consumers.

I didn't want to deal with writing my own logic to connect to the consumer nodes, and especially not with managing nodes joining and leaving the pool, so I've gone down the path of using RabbitMQ, and an exclusive queue per consumer node.

One problem I'm running into is listing these queues, since the producer needs to know all the available queues before work is distributed. AMQP doesn't even support listing queues, which makes me uncertain of my whole approach. RabbitMQ and Alice (brokenly at the moment) add that functionality though: Is there an API for listing queues and exchanges on RabbitMQ?

Is this a wise use of Rabbit? Should I be using a message queue at all? Is there a better design so the queue can consistently divide my work among consumers, instead of me needing to do it?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

画▽骨i 2024-09-24 12:32:55

您所描述的内容在 RabbitMQ 中是可行的。

您的设置类似于:

  • 生产者将消息发布到主题交换;我们将其命名为confirm_divider;
  • 当消费者加入池时,它连接到代理并使用其名称创建一个独占队列,但不将其绑定到
  • 生产者定期轮询代理的任何内容(可能使用rabbitmqctl list_consumers)以检查消费者是否已更改;如果有,它会删除所有现有的绑定并根据需要重新绑定队列;
  • 当生产者发布时,消息会被分配一个与其任务类型匹配的路由键。

因此,如果您有 6 种任务类型:A、B、C、D、E、F,并且只有两个使用者 C1 和 C2,则您的绑定将如下所示: C1 使用路由键 A、B 和 C 绑定 3 次到一致性分隔符; C2 使用路由键 D、E 和 F 与 c_d 绑定了 3 次。

当 C3 加入池时,生产者会看到这一点并相应地重新绑定队列。

当生产者发布时,它会使用routing_keys A、B、C、D、E和/或F发送消息,并且消息将被路由到正确的队列。

这会存在两个潜在的问题:

  1. 消费者加入池和消息路由到它之间存在轻微的延迟;另外,如果队列中已经有消息,则消费者有可能获取发送给另一个消费者的消息(例如,C3 加入,生产者重新绑定,但 C2 仍然获取一些 E 和 F 消息,因为它们已经在其队列中),
  2. 如果消费者由于某种原因死亡,其队列中的消息(以及前往队列的途中)将丢失;这可以通过分别重新发布和删除消息来解决。

为了回答你的最后一个问题,你可能想使用队列,而 RabbitMQ 是一个不错的选择,但你的要求(更准确地说是“一致地分配工作”位)不太适合 AMQP。

What you describe is do-able in RabbitMQ.

Your setup would be something like:

  • a producer publishes the message to a topic exchange; let's name it consistent_divider;
  • when a consumer, joins the pool, it connects to the broker and creates an exclusive queue with its name, but doesn't bind it to anything
  • the producer periodically polls the broker (maybe using rabbitmqctl list_consumers) to check if the consumers have changed; if they have, it removes all of the existing bindings and rebinds the queues as needed;
  • when the producer publishes, messages are assigned a routing key that matches their task type.

So, if you have 6 task types: A, B, C, D, E, F, and only two consumers C1 and C2, your bindings would look like: C1 bound 3 times to consistent_divider with routing keys A, B and C; C2 bound 3 times to c_d with routing keys D, E and F.

When C3 joins the pool, the producer sees this and rebinds the queues accordingly.

When the producer publishes, it sends out the messages with routing_keys A, B, C, D, E and/or F, and the messages will get routed to the correct queues.

There would be two potential problems with this:

  1. There's a slight lag between when the consumer joins the pool and messages get routed to it; also, if there are messages already in the queues, it's possible for a consumer to get messages meant for another consumer (e.g. C3 joins, the producer rebinds, but C2 still gets some E and F messages because they were already in its queue),
  2. If a consumer dies for whatever reason, the messages in its queue (and en route to its queue) will be lost; this can be solved by republishing and dead-lettering the messages, respectively.

To answer your last question, you probably want to use queuing and RabbitMQ is a great choice, but your requirements (more precisely the `divide the work consistently' bit) don't quite fit AMQP perfectly.

九歌凝 2024-09-24 12:32:55

您可以使用rabbitmq的官方 consistent-hashing 插件作为回答这里

You could use the official consistent-hashing plugin for rabbitmq as answered here

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文