实施持久的发布/订阅?

发布于 2024-11-10 02:51:50 字数 679 浏览 2 评论 0原文

假设我有一个发布者和多个听众。当发布者发送消息时,所有监听者都必须接收到该消息。如果其中一位侦听器出现故障,他应该在再次恢复时立即收到消息。

我怎样才能实现这个?

我正在考虑使用队列: 每个侦听器都会创建自己的队列,并向发布者发送一条订阅消息以及其队列的位置。发布者将该位置保存到文件或数据库中,并开始将其消息发送到该队列。

因此,这将是时间表:

发布者已启动。还没有听众。

发布者发送消息 1。

发布者发送消息 2。

发布者发送消息 3。

监听器 1 启动并向发布者订阅。

发布者发送消息 4。

监听器 1 接收消息 4。

监听器 2 启动并向发布者订阅。

发布者发送消息 5。

侦听器 1 接收消息 5。

侦听器 2 接收消息 5。

侦听器 2 崩溃。

发布者发送消息 6。

监听器 1 接收消息 6。

发布者发送消息 7。

监听器 1 接收消息 7。

监听器 2 恢复,无需再次订阅。

侦听器 2 接收消息 6。

侦听器 2 接收消息 7。

底线是每个侦听器需要一个队列,以及一个队列或通道来发送和接收“开始侦听”和“停止侦听”消息。 我的想法是正确的,还是完全错误的?

Let's say I have a publisher and mutiple listeners. When the publisher sends a message, it has to be received by all listeners. If one of the listeners is down, he should get the message as soon he is back up again.

How can I implement this?

I was thinking about using queues:
Each listener makes it's own queue and sends a subscrtiption message to the publisher with the location of it's queue. The publisher saves the location to a file or DB, and starts sending it's messages to that queue.

So, this would be the timeline:

Publisher is started. No listeners yet.

Publisher sends message 1.

Publisher sends message 2.

Publisher sends message 3.

Listener 1 starts and subscribes with publisher.

Publisher sends message 4.

Listener 1 receives message 4.

Listener 2 starts and subscribes with publisher.

Publisher sends message 5.

Listener 1 receives message 5.

Listener 2 receives message 5.

Listener 2 chrashes.

Publisher sends message 6.

Listener 1 receives message 6.

Publisher sends message 7.

Listener 1 receives message 7.

Listener 2 comes back up, no need to subscribe again.

Listener 2 receives message 6.

Listener 2 receives message 7.

The bottom line is I need one queue per listener, and one queue or channel to send and receive messages for 'start listening' and 'stop listening'.
Am I thinking in the right direction, or am I completely wrong?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

初见你 2024-11-17 02:51:50

您不需要每个订阅者有一个单独的队列,但您至少需要两个队列。可扩展性的最初关键是确保当发布者传递其初始消息时,您不会尝试在该时间点将其“扇出”给所有订阅者。相反,您将其放入接收队列并立即返回,让发布者知道它已成功。从那里开始,您的工作人员由主接收队列提供数据,其职责是将消息“扇出”到各个订阅者。它通过弄清楚这些订阅者是谁并生成 N 条消息来实现这一点,其中包含来自发布者的原始消息以及每个侦听器的地址/绑定信息,并将它们放入传递队列中。最后,工作人员负责将消息从传递队列中拉出并尝试使用地址/绑定信息进行传递。

处理传递错误的方法可以是将消息移至重试队列,在该队列中消息将休眠 X 时间,然后再次替换到传递队列上。然后,您当然必须处理有毒消息,您已经重试了 5 次,而侦听器每次都会向您抛出错误。这些需要移至某种死信队列以进行错误报告。

You shouldn't need a separate queue per subscriber, but you will want at least two queues. The initial key to scalability is making sure that when the publisher delivers it's initial message you don't try to "fan it out" to all subscribers at that point in time. Instead you put it on a received queue and return immediately letting the publisher know it's succeeded. From there you have workers which are fed by the main receive queue and whose responsibility it is to "fan out" the message to the various subscribers. It does this by figuring out who those subscribers are and generating N messages containing the original message from the publisher with each listener's address/binding information and plopping those onto a delivery queue. Finally you have workers who are responsible for pulling the messages off the delivery queue and attempting delivery using the address/binding information.

How you handle delivery faults could be by moving the message to a retry queue where the messages will be put to sleep for X amount of time before being replaced on the delivery queue again. Then you of course have to handle poison messages where you've retried say 5 times and the listener just throws you errors every time. Those would need to be moved to some kind of dead letter queue for error reporting.

北恋 2024-11-17 02:51:50

您是对的,这正是 NServiceBus(例如)在 MSMQ 之上实现发布-订阅的方式。在这里阅读更多相关信息:http://docs.prefer.net/samples/pubsub/

You are right, and this is exactly how NServiceBus (for example) implements pub-sub on top of MSMQ. Read more about it here: http://docs.particular.net/samples/pubsub/.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文