从队列中选择性读取 - 自定义 MSMQ 服务、ESB 还是其他?

发布于 2024-09-15 16:57:35 字数 1410 浏览 1 评论 0原文

寻找一些想法/模式来解决系统的设计问题,我将很快开始工作。毫无疑问,我需要使用某种消息传递(可能是 MSMQ)在系统的某些区域之间进行通信。我不想重新发明轮子,但同时我想确保我使用正确的工具来完成这项工作。我一直在修改和阅读 NServiceBus,它的功能给我留下了深刻的印象——但是我不确定它是否是为了我想要实现的目标。

这是系统需要执行的操作的(希望如此)非常简单和概念性的描述:

我有一个客户端可以向其发送消息的服务。该服务是“即发即忘”——客户端返回的最多信息可能是成功或失败(成功是指消息已被接收)。

每条消息的处理/处理都很重要,并且可能占用大量的系统资源。因此,只能同时处理 X 条消息,其中 X 是可配置值(基于系统规格等)。传入的消息将存储在队列中,直到“轮到他们”处理为止。

对于每个客户端,消息必须按顺序处理(先进先出)。但是,某些客户端可能会连续发送许多消息(数千条或更多),例如,如果它们在一段时间内失去连接。因此,消息必须在客户端之间以循环方式处理——不允许客户端贪吃,也不允许客户端挨饿。因此,系统要么必须能够查询队列中的特定客户端,要么为每个客户端创建单独的队列(自动,因为客户端在编译时不会知道)并轮流从它们中拉取。

我目前的想法是,我实际上只需要使用普通的 MSMQ,创建一项服务来接受消息并将它们写入一个或多个队列,然后创建一个进程来从队列中读取消息并处理它们。然而,像 NServicebus 这样的东西所获得的可靠性、审计性、可扩展性和易于配置性看起来非常有吸引力。

ESB 是否不是适合这项工作的工具?我还应该考虑其他一些技术或模式吗?

更新

一些说明。

关于“按顺序”处理消息——在单个客户端的上下文中,消息绝对需要按照接收到的顺序进行处理。解释确切的原因很复杂,但这是一个坚定的要求。我忽略了每个客户端只能同时处理一条消息。因此,即使有 10 个工作线程,并且只有一个客户端有消息等待处理,一次也只会处理其中一条消息 — 无需担心竞争条件。

我相信这对于普通的 MSMQ 通常是可能的——您可以在队列中拥有一个消息列表,并且总是先获取最旧的消息。

我还想澄清循环排序的用例。在此示例中,我有两个发送消息的客户端(A 和 B),并且只有一个工作线程。所有队列都是空的。客户端 A 一夜之间失去了连接,因此早上 8 点向服务发送了 1000 条消息。这些消息排队,工作线程获取最旧的消息并开始处理它。在处理第一条消息时,客户端 B 向服务发送一条消息,该消息会排队(如前所述,可能在单独的队列中)。当客户端 A 的第一条消息完成处理时,逻辑应该检查客户端 B 是否有消息(轮到客户端 B 了),既然找到了,就进行下一步处理。

如果客户端 B 在此期间没有发送消息,工作人员将继续一次处理客户端 A 的消息,并始终在处理后检查其他客户端队列是否包含等待消息,以确保没有客户端被饿死。

我仍然觉得 ESB 之间可能存在不匹配,这个问题是 ESB 旨在促进服务之间的通信;我想要实现的是消息/通信和选择性排队系统的结合。

Looking for some ideas/pattern to solve a design problem for a system I will be starting work on soon. There is no question that I will need to use some sort of messaging (probably MSMQ) to communicate between certain areas of the system. I don't want to reinvent the wheel, but at the same time I want to make sure I am using the right tool for the job. I have been tinkering with and reading up on NServiceBus, and I'm very impressed with what it does--however I'm not sure it's intended for what I'm trying to achieve.

Here is a (hopefully) very simple and conceptual description of what the system needs to do:

I have a service that clients can send messages to. The service is "Fire and Forget"--the most the client would get back is something that may say success or failure (success being that the message was received).

The handling/processing of each message is non-trivial, and may take up significant system resources. For this reason only X messages can be handled concurrently, where X is a configurable value (based on system specs, etc.). Incoming messages will be stored in queue until it's "their turn" to be handled.

For each client, messages must be handled in order (FIFO). However, some clients may send many messages in succession (thousands or more), for example if they lost connectivity for a period of time. For this reason, messages must be handled in a round-robin fashion across clients--no client is allowed to gorge and no client is allowed to starve. So the system will either have to be able to query the queue for a specific client, or create separate queues per client (automatically, since the clients won't be known at compile time) and pull from them in rotation.

My current thinking is that I really just need to use vanilla MSMQ, create a service to accept messages and write them to one or more queues, then create a process to read messages from the queue(s) and handle/process them. However, the reliability, auditing, scaleability, and ease of configuration you get with something like NServicebus looks very appealing.

Is an ESB the wrong tool for the job? Is there some other technology or pattern I should be looking at?

Update

A few clarifications.

Regarding processing messages "in order"--in the context of a single client, the messages absolutely need to be processed in the order they are received. It's complicated to explain the exact reasons why, but this is a firm requirement. I neglected to mention that only one message per client would ever be processed concurrently. So even if there were 10 worker threads and only one client had messages waiting to be processed, only one of those messages would be processed at a time--there would be no worry of a race condition.

I believe this is generally possible with vanilla MSMQ--that you can have a list of messages in a queue and always take the oldest one first.

I also wanted to clarify a use case for the round robin ordering. In this example, I have two clients (A and B) who send messages, and only one worker thread. All queues are empty. Client A has lost connectivity overnight, so at 8am sends 1000 messages to the service. These messages get queued up and the worker thread takes the oldest one and starts processing it. As this first message is being processed, client B sends a message into the service, which gets queued up (as noted, probably in a separate queue). When Client A's first message completes processing, the logic should check whether client B has a message (it's client B's "turn"), and since it finds one, process it next.

If client B hadn't sent a message during that time, the worker would continue processing client A's messages one at a time, always checking after processing to see if other client queues contained waiting messages to ensure that no client was being starved.

Where I still feel there may be a mismatch between an ESB and this problem is that an ESB is designed to facilitate communication between services; what I am trying to achieve is a combination of messaging/communication and a selective queuing system.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

舟遥客 2024-09-22 16:57:36

所以系统必须是
能够查询特定客户端的队列,

使用游标在 MSMQ 队列中搜索来自特定客户端的消息可能效率低下且无法扩展。

或者为每个客户端创建单独的队列(自动,因为
客户端在编译时不会被识别)并轮流从它们中提取。

MSMQ 无法自动创建队列。所有消息必须首先发送到已知队列。不过,您自己的自定义调度程序服务可以根据需要创建新队列并将消息的副本放入其中。

[[我避免说“移动”消息,因为你不能用应用程序代码来做到这一点;您只能读取消息并使用原始数据创建新消息。例如,当您使用源日志时,这种区别很重要。]]

干杯

约翰·布瑞克韦尔

So the system will either have to be
able to query the queue for a specific client,

Searching through an MSMQ queue for a message from a particular client using cursors can be inefficient and doesn't scale.

or create separate queues per client (automatically, since the
clients won't be known at compile time) and pull from them in rotation.

MSMQ cannot create queues automatically. All messages have to be sent to a known queue first. Your own custom dispatcher service, though, could then create new queues on demand and put copies of the messages in them.

[[I avoid saying "move" messages as you can't do that with application code; you can only read a message and create a new message using the original data. This distinction is important when you are using Source Journaling, for example.]]

Cheers

John Breakwell

蓝梦月影 2024-09-22 16:57:36

使用像 NServiceBus 这样的 ESB 似乎是解决您的问题的一个好方法。但根据您的概念描述,有一些事情需要考虑。让我们使用 NServiceBus 作为可能的 ESB 解决方案,逐步了解您的需求:

我有一个客户端可以向其发送消息的服务。该服务是“Fire and Forget”——客户端返回的最多信息可能是成功或失败(成功是指消息已被接收)。

使用 NServiceBus 可以轻松完成此操作。您可以从客户端进行 Bus.Send(Message)。如果您的客户需要答案,您可以使用 Bus.Return(ErrorCode)。您提到“成功即收到消息”。如果您使用像 NServiceBus 这样的 ESB,则由消息传递平台来传递消息。因此,如果您的 Bus.Send 没有抛出异常,您可以确定消息已正确发送。因此,您可能不必将成功/失败消息发送回客户端。

每条消息的处理/处理都很重要,并且可能占用大量的系统资源。因此,只能同时处理 X 条消息,其中 X 是可配置值(基于系统规格等)。传入消息将存储在队列中,直到“轮到他们”处理为止。

使用 NServiceBus 时,您可以通过设置“NumberOfWorkerThreads”选项来配置工作线程的数量。如果您的服务器有多个核心/CPU,您可以使用此设置来平衡工作负载。

对于每个客户端,消息必须按顺序处理 (FIFO)。

根据您的要求,这可能会导致问题。 ESB 通常不承诺处理 消息按顺序(如果有许多线程处理消息)。对于 NServiceBus,您可以将一系列消息从客户端发送到总线,并且这些消息将被处理 此外,您还可以使用 Sagas 解决一些按顺序消息传递问题

但是,某些客户端可能会连续发送许多消息(数千条或更多),例如,如果它们在一段时间内失去连接

使用 ESB 解决方案时,您的服务器无需启动即可让客户端正常工作。客户端仍然可以发送消息,服务器一恢复在线就会开始处理它们。这是对此的简短介绍。

因此,消息必须在客户端之间以循环方式处理——不允许客户端贪吃,也不允许客户端挨饿。

这不是问题,因为您已经决定使用消息:)

因此,系统要么必须能够查询特定客户端的队列,要么为每个客户端创建单独的队列(自动,因为客户端在编译时不会知道)并轮流从它们中提取。< /p>

您能对此进行扩展吗?我不确定你对这个的设计。

Using an ESB like NServiceBus seems like a good solution to your problem. But based on your conceptual description, there's some things to consider. Let's go through your requirements step-by-step, using NServiceBus as a possible ESB solution:

I have a service that clients can send messages to. The service is "Fire and Forget"--the most the client would get back is something that may say success or failure (success being that the message was received).

This is easily done with NServiceBus. You can Bus.Send(Message) from the client. If your client requires an answer, you can use Bus.Return(ErrorCode). You mention that "success being that the message was received". If you use an ESB like NServiceBus, it's up to the messaging platform the deliver the message. So, if your Bus.Send doesn't throw an exception, you can be sure that the message has been sent properly. Because of this you don't probably have to send success / failure messages back to the client.

The handling/processing of each message is non-trivial, and may take up significant system resources. For this reason only X messages can be handled concurrently, where X is a configurable value (based on system specs, etc.). Incoming messages will be stored in queue until it's "their turn" to be handled.

When using NServiceBus, you can configure the the number of worker threads by setting the "NumberOfWorkerThreads" option. If your server has multiple cores / cpus, you can use this setting to balance the work load.

For each client, messages must be handled in order (FIFO).

This is something that may cause problems depending on your requirements. ESBs in general don't promise to process the messages in-order, if they have many threads working on the messages. In a case of NServiceBus, you can send an array of messages from the client into the bus and these will be processed in-order. Also, you can solve some of the in-order messaging problems by using Sagas.

However, some clients may send many messages in succession (thousands or more), for example if they lost connectivity for a period of time

When using an ESB solution, your server doesn't have to be up for the client to work. Clients can still send messages and the server will start processing them as soon as it's back online. Here's a small introduction on this.

For this reason, messages must be handled in a round-robin fashion across clients--no client is allowed to gorge and no client is allowed to starve.

This isn't a problem because you've decided to use messages :)

So the system will either have to be able to query the queue for a specific client, or create separate queues per client (automatically, since the clients won't be known at compile time) and pull from them in rotation.

Could you expand on this? I'm not sure of your design on this one.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文