C# - 限制消息队列

发布于 2024-09-28 10:15:04 字数 319 浏览 1 评论 0原文

我有将消息发送到 MessageQueue 的代码。

_queue.Send(new Message(myData));

该队列位于本地计算机上,并且从中 Receive() 的线程位于同一进程中。

如果消息插入速度快于提取速度,会发生什么情况? Send() 会阻塞吗? 有没有办法让我在向其中发送更多事件之前知道 MessageQueue 是否已满? (我现在更愿意只记录myData而不发送事件)。

谢谢, 塞拉。

I have code that sends a message into a MessageQueue.

_queue.Send(new Message(myData));

This queue is on the local machine and the threads that Receive() from it are in the same process.

What happens if the messages are inserted faster than they are extracted? Will Send() block?
Is there a way for me to know if the MessageQueue is full before sending into it more events?
(I would prefer at this point just to log myData and not send the event).

Thanks,
Sela.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

゛时过境迁 2024-10-05 10:15:04

简短的回答:做简单的事情,不要限制发送。

长答案:
仅当分配用于保存的磁盘空间不足时,消息队列才会真正变满 - 这同时您的日志记录也将空间不足。消息队列非常擅长保存您尚未准备好处理的数据。不要限制发送。如果您关心系统管理和磁盘空间,那么您可能更愿意依赖 Window 出色的系统监控工具和磁盘空间使用阈值警报。您不需要为您的应用程序重新设计它。

除非您在仅内存模式下运行队列,但这可能是不必要的。如果您不能足够快地处理消息,那么您肯定有足够的时间让队列管理器将消息保存到磁盘。如果您要扩展到许多服务器上的许多消费者进程并且队列管理器上的磁盘 IO 成为瓶颈,那么您应该只考虑在仅内存模式下运行队列。同一台机器上的一个进程与这种情况相差甚远。 让队列管理器做它最擅长的事情。不要过早优化。

如果您实施指定的服务质量(例如每秒 X 条消息)并为客户处理更高质量的服务收取更多费用,则在接收端进行限制。我已经使用信号量成功完成了此操作,该信号量的资源限制等于每秒消耗的消息数。每个消费者线程都会拍摄消息开始时间的快照,处理 1 条消息,然后等待一秒结束,然后再放弃信号量。这样,如果消息处理时间超过 1 秒,但不会超出服务质量,则线程池可以增长以适应服务质量。

祝你好运!

Short answer: Do the simple thing and don't limit on send.

Long answer:
The message queue will only really get full when the disk it's allocated to save to is out of space - which is the same time that your logging will be out of space. The message queue is very good at holding data you're not ready to process. Don't throttle on send. If you're concerned about system management and disk space then you might prefer to rely on Window's excellent system monitoring facilities and disk space usage threshold alerts. You don't need to reinvent this for your application.

That is unless you're running the queue in memory only mode which may not be necessary. If you can't process the messages fast enough then you definitely have enough time to let the queue manager persist the messages to disk. You should only consider running the queue in memory only mode if you're going to scale to many consumer processes on many servers and the disk IO on the queue manager becomes the bottleneck. One process on the same machine is very far away from that scenario. Let the queue manager do what it does best. Don't optimise prematurely.

If you implement a specified quality of service like X messages per second and bill your customers more for processing a higher quality of service then throttle at the receiving end. I've done this successfully using a semaphore initialised with a resource limit equal to the number of messages to consume per second. Each consumer thread took a snapshot of the message start time, processed 1 message and then waited for the end of second before giving up the semaphore. That way the thread pool could grow to accomodate the quality of service if messages took more than 1 second to process but would not exceed the quality of service.

Good luck!

月隐月明月朦胧 2024-10-05 10:15:04

设计一个系统,使消息的生成速度不超过消息的消耗速度,这是很好的做法,我同意这一点。尽管如此,消息生产者可能会因为队列已满而碰壁,尤其是当配额设置较低时。

为了应对这种情况,您需要监视 Send() 方法是否成功。如果您将消息发送到已满的队列中,则该消息将丢失,并且由于 Send() 返回 void,因此不会立即得出成功或失败的结果。但有一种方法可以检测到这一点。使用 MSMQ 时您应该请求确认。要接收它们,您需要使用管理队列。这样您就可以收到有关发生的不同事情的通知,包括队列已满。

Message msg = new Message
{
    Formatter = new BinaryMessageFormatter(),
    Body = data,
    AdministrationQueue = this.adminQueue,
    AcknowledgeType = AcknowledgeTypes.FullReachQueue
};

this.queue.Send(msg);

Message admMsg = this.adminQueue.Receive();
if (admMsg != null && admMsg.Acknowledgment == Acknowledgment.QueueExceedMaximumSize)
{
    // queue is full
}

Designing a system so that messages are produced no faster than they are consumed is good and I agree with that. Nevertheless it might happen that a producer of messages would hit a wall because queue is overfilled especially when there are low quotas set.

To prepare for such a situation you need to monitor if Send() method succeeds. If you send a message into a full queue the message is lost and because Send() returns void there is no immediate result of success or failure. But there is a way to detect that. You should request an acknowledgements when working with MSMQ. To receive them you need to use an administrative queue. This way you can be notified about different things that happen including queue being full.

Message msg = new Message
{
    Formatter = new BinaryMessageFormatter(),
    Body = data,
    AdministrationQueue = this.adminQueue,
    AcknowledgeType = AcknowledgeTypes.FullReachQueue
};

this.queue.Send(msg);

Message admMsg = this.adminQueue.Receive();
if (admMsg != null && admMsg.Acknowledgment == Acknowledgment.QueueExceedMaximumSize)
{
    // queue is full
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文