如何订阅 MSMQ 队列但仅“查看”队列.Net 中的消息?

发布于 2024-08-31 00:45:54 字数 1427 浏览 15 评论 0原文

我们有一个 MSMQ 队列设置,用于接收消息并由应用程序处理。我们希望另一个进程订阅队列并读取消息并记录其内容。

我已经准备好了,问题是它不断地偷看队列。运行时服务器上的 CPU 约为 40%。 mqsvc.exe 以 30% 运行,此应用程序以 10% 运行。我宁愿有一些东西只是等待消息进入,收到通知,然后记录它,而不需要不断地轮询服务器。

    Dim lastid As String
    Dim objQueue As MessageQueue
    Dim strQueueName As String

    Public Sub Main()
        objQueue = New MessageQueue(strQueueName, QueueAccessMode.SendAndReceive)
        Dim propertyFilter As New MessagePropertyFilter
        propertyFilter.ArrivedTime = True
        propertyFilter.Body = True
        propertyFilter.Id = True
        propertyFilter.LookupId = True
        objQueue.MessageReadPropertyFilter = propertyFilter
        objQueue.Formatter = New ActiveXMessageFormatter
        AddHandler objQueue.PeekCompleted, AddressOf MessageFound

        objQueue.BeginPeek()
    end main

    Public Sub MessageFound(ByVal s As Object, ByVal args As PeekCompletedEventArgs)

        Dim oQueue As MessageQueue
        Dim oMessage As Message

        ' Retrieve the queue from which the message originated
        oQueue = CType(s, MessageQueue)

            oMessage = oQueue.EndPeek(args.AsyncResult)
            If oMessage.LookupId <> lastid Then
                ' Process the message here
                lastid = oMessage.LookupId
                ' let's write it out
                log.write(oMessage)
            End If

        objQueue.BeginPeek()
    End Sub

We have a MSMQ Queue setup that receives messages and is processed by an application. We'd like to have another process subscribe to the Queue and just read the message and log it's contents.

I have this in place already, the problem is it's constantly peeking the queue. CPU on the server when this is running is around 40%. The mqsvc.exe runs at 30% and this app runs at 10%. I'd rather have something that just waits for a message to come in, get's notified of it, and then logs it without constantly polling the server.

    Dim lastid As String
    Dim objQueue As MessageQueue
    Dim strQueueName As String

    Public Sub Main()
        objQueue = New MessageQueue(strQueueName, QueueAccessMode.SendAndReceive)
        Dim propertyFilter As New MessagePropertyFilter
        propertyFilter.ArrivedTime = True
        propertyFilter.Body = True
        propertyFilter.Id = True
        propertyFilter.LookupId = True
        objQueue.MessageReadPropertyFilter = propertyFilter
        objQueue.Formatter = New ActiveXMessageFormatter
        AddHandler objQueue.PeekCompleted, AddressOf MessageFound

        objQueue.BeginPeek()
    end main

    Public Sub MessageFound(ByVal s As Object, ByVal args As PeekCompletedEventArgs)

        Dim oQueue As MessageQueue
        Dim oMessage As Message

        ' Retrieve the queue from which the message originated
        oQueue = CType(s, MessageQueue)

            oMessage = oQueue.EndPeek(args.AsyncResult)
            If oMessage.LookupId <> lastid Then
                ' Process the message here
                lastid = oMessage.LookupId
                ' let's write it out
                log.write(oMessage)
            End If

        objQueue.BeginPeek()
    End Sub

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

巴黎盛开的樱花 2024-09-07 00:45:54

您是否尝试过使用 MSMQEvent.Arrived 跟踪消息?

当调用表示打开队列的 MSMQQueue 对象实例的 MSMQQueue.EnableNotification 方法并且在队列中找到消息或消息到达相应位置时,将触发 MSMQEvent 对象的到达事件。

Have you tried using MSMQEvent.Arrived to track the messages?

The Arrived event of the MSMQEvent object is fired when the MSMQQueue.EnableNotification method of an instance of the MSMQQueue object representing an open queue has been called and a message is found or arrives at the applicable position in the queue.

晨敛清荷 2024-09-07 00:45:54

在 peek 迭代之间使用 Thread.Sleep(10) 可能会节省大量周期。

我能想到的唯一其他选择是将日志记录构建到队列读取应用程序中。

A Thread.Sleep(10) in between peek iterations may save you a bunch of cycles.

The only other option I can think of is to build the logging into the queue reading application.

放手` 2024-09-07 00:45:54

没有 API 可以让您仅查看每条消息一次。

问题是,如果队列中已有消息,BeginPeek 会立即执行回调。由于您没有删除消息(这毕竟是查看,而不是接收!),当您的回调再次开始查看时,该过程会重新开始,因此 MessageFound 几乎不断运行。

您最好的选择是将消息记录在写入器或读取器中。 日记将在短时间内起作用(如果您只关心收到的消息),但不是长期解决方案:

虽然性能开销
从队列中检索消息
仅配置为日志记录
比检索多约 20%
没有日记的消息,真实的
成本是意外问题引起的
当未经检查的 MSMQ 服务运行时
内存不足或者机器坏了
磁盘空间

There's no API that will let you peek at each message only once.

The problem is that BeginPeek executes its callback immediately if there's already a message on the queue. Since you aren't removing the message (this is peek after all, not receive!), when your callback begins peeking again the process starts over, so MessageFound runs almost constantly.

Your best options are to log the messages in the writer or the reader. Journaling will work for short periods (if you only care about messages that are received), but aren't a long-term solution:

While the performance overhead of
retrieving messages from a queue that
is configured for Journaling is only
about 20% more than retrieving
messages without Journaling, the real
cost is unexpected problems caused
when an unchecked MSMQ service runs
out of memory or the machine is out of
disk space

眸中客 2024-09-07 00:45:54

这对我有用。它在等待消息时阻塞线程。每个循环周期都会检查类成员_bServiceRunning 以查看线程是否应中止。

    private void ProcessMessageQueue(MessageQueue taskQueue)
    {
        // Set the formatter to indicate body contains a binary message:
        taskQueue.Formatter = new BinaryMessageFormatter();

        // Specify to retrieve selected properties.
        MessagePropertyFilter myFilter = new MessagePropertyFilter();
        myFilter.SetAll();
        taskQueue.MessageReadPropertyFilter = myFilter;

        TimeSpan tsQueueReceiveTimeout = new TimeSpan(0, 0, 10); // 10 seconds

        // Monitor the MSMQ until the service is stopped:
        while (_bServiceRunning)
        {
            rxMessage = null;

            // Listen to the queue for the configured duration:
            try
            {
                // See if a message is available, and if so remove if from the queue if any required
                // web service is available:
                taskQueue.Peek(tsQueueReceiveTimeout);

                // If an IOTimeout was not thrown, there is a message in the queue
                // Get all the messages; this does not remove any messages
                Message[] arrMessages = taskQueue.GetAllMessages();

                // TODO: process the message objects here;
                //       they are copies of the messages in the queue
                //       Note that subsequent calls will return the same messages if they are
                //       still on the queue, so use some structure defined in an outer block
                //       to identify messages already processed.

            }
            catch (MessageQueueException mqe)
            {
                if (mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
                {
                    // The peek message time-out has expired; there are no messages waiting in the queue
                    continue; // at "while (_bServiceRunning)"
                }
                else
                {
                    ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, mqe);
                    break; // from "while (_bServiceRunning)"
                }
            }
            catch (Exception ex)
            {
                ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, ex);
                break; // from "while (_bServiceRunning)"
            }
        }

    } // ProcessMessageQueue()

This works for me. It blocks the thread while waiting for a message. Each loop cycle checks the class member _bServiceRunning to see if the thread should abort.

    private void ProcessMessageQueue(MessageQueue taskQueue)
    {
        // Set the formatter to indicate body contains a binary message:
        taskQueue.Formatter = new BinaryMessageFormatter();

        // Specify to retrieve selected properties.
        MessagePropertyFilter myFilter = new MessagePropertyFilter();
        myFilter.SetAll();
        taskQueue.MessageReadPropertyFilter = myFilter;

        TimeSpan tsQueueReceiveTimeout = new TimeSpan(0, 0, 10); // 10 seconds

        // Monitor the MSMQ until the service is stopped:
        while (_bServiceRunning)
        {
            rxMessage = null;

            // Listen to the queue for the configured duration:
            try
            {
                // See if a message is available, and if so remove if from the queue if any required
                // web service is available:
                taskQueue.Peek(tsQueueReceiveTimeout);

                // If an IOTimeout was not thrown, there is a message in the queue
                // Get all the messages; this does not remove any messages
                Message[] arrMessages = taskQueue.GetAllMessages();

                // TODO: process the message objects here;
                //       they are copies of the messages in the queue
                //       Note that subsequent calls will return the same messages if they are
                //       still on the queue, so use some structure defined in an outer block
                //       to identify messages already processed.

            }
            catch (MessageQueueException mqe)
            {
                if (mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
                {
                    // The peek message time-out has expired; there are no messages waiting in the queue
                    continue; // at "while (_bServiceRunning)"
                }
                else
                {
                    ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, mqe);
                    break; // from "while (_bServiceRunning)"
                }
            }
            catch (Exception ex)
            {
                ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, ex);
                break; // from "while (_bServiceRunning)"
            }
        }

    } // ProcessMessageQueue()
荭秂 2024-09-07 00:45:54

恕我直言,您应该在队列上打开日志功能。然后,您可以保证保留已提交到队列的所有消息的副本,而您费力尝试建立自己的机制来记录所有消息的情况并非如此。

如果您想要比队列本身更容易阅读的内容(我当然希望如此),那么按计划记录和删除日志消息会更容易、更可靠。然后,流程运行的速度有多快并不重要,您只需要获取一次消息,总的来说,这是解决问题的更好方法。

IMHO you should just turn on journaling on the queue. Then you are guaranteed a copy is kept of all messages that have been committed to the queue, and that just isn't the case with your laborous attempt to make your own mechanism to log it all.

Much, much easier and more reliable is to log and remove the journaled messages on a scheduled basis if you want something more easily readable than the queue itself (and I certainly would want that). Then it doesn't really matter how fast or not the process is working, you only need to get the messages once, and it's overall just a much better way to solve the problem.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文