MessageQueue.BeginReceive是如何工作的以及如何正确使用它?

发布于 2024-12-04 20:06:17 字数 465 浏览 2 评论 0原文

我目前有一个后台线程。在这个线程中是一个无限循环。

此循环偶尔会更新数据库中的某些值,然后在 MessageQueue 上侦听 1 秒(使用 queue.Receive(TimeSpan.FromSeconds(1)) )。

只要没有消息传入,此调用就会在内部抛出 MessageQueueException(超时),该异常被捕获,然后循环继续。如果有消息,调用通常会返回并处理该消息,然后继续循环。

这会导致很多第一次机会异常(每秒一次,除非有消息要处理),并且这会垃圾邮件调试输出,并且当我忘记排除 MessageQueueExceptions 时也会中断调试器。

那么如何正确地完成 MessageQueue 的异步处理,同时仍然确保只要我的应用程序运行,队列就会受到监视并且数据库也会偶尔更新。当然这里的线程不应该使用100%的CPU。

我只需要大局观或一些正确完成的异步处理的提示。

I currently have a background thread. In this thread is a infinite loop.

This loop once in a while updates some values in a database, and then listens 1 second on the MessageQueue (with queue.Receive(TimeSpan.FromSeconds(1)) ).

As long as no message comes in, this call then internally throws a MessageQueueException (Timeout) which is caught and then the loop continues. If there is a message the call normally returns and the message is processed, after which the loop continues.

This leads to a lot of First chance exceptions (every second, except there is a message to process) and this spams the debug output and also breaks in the debugger when I forgot to exclude MessageQueueExceptions.

So how is the async handling of the MessageQueue meant to be done correctly, while still ensuring that, as long as my application runs, the queue is monitored and the database is updated too once in a while. Of course the thread here should not use up 100% CPU.

I just need the big picture or a hint to some correctly done async processing.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

好听的两个字的网名 2024-12-11 20:06:17

我建议不要在线程中循环,而是为 MessageQueue 的 ReceiveCompleted 事件注册委托,如下所述:

using System;
使用系统消息传递;

命名空间我的项目
{
///
/// 为示例提供容器类。
///
公共类 MyNewQueue
{

    //**************************************************
    // Provides an entry point into the application.
    //       
    // This example performs asynchronous receive operation
    // processing.
    //**************************************************

    public static void Main()
    {
        // Create an instance of MessageQueue. Set its formatter.
        MessageQueue myQueue = new MessageQueue(".\\myQueue");
        myQueue.Formatter = new XmlMessageFormatter(new Type[]
            {typeof(String)});

        // Add an event handler for the ReceiveCompleted event.
        myQueue.ReceiveCompleted += new 
            ReceiveCompletedEventHandler(MyReceiveCompleted);

        // Begin the asynchronous receive operation.
        myQueue.BeginReceive();

        // Do other work on the current thread.

        return;
    }


    //**************************************************
    // Provides an event handler for the ReceiveCompleted
    // event.
    //**************************************************

    private static void MyReceiveCompleted(Object source, 
        ReceiveCompletedEventArgs asyncResult)
    {
        // Connect to the queue.
        MessageQueue mq = (MessageQueue)source;

        // End the asynchronous Receive operation.
        Message m = mq.EndReceive(asyncResult.AsyncResult);

        // Display message information on the screen.
        Console.WriteLine("Message: " + (string)m.Body);

        // Restart the asynchronous Receive operation.
        mq.BeginReceive();

        return; 
    }
}

}

来源:https://learn.microsoft.com/en-us/dotnet/api/system.messaging.messagequeue.receivecompleted?view=netframework-4.7.2

Rather than looping in a thread, I would recommend registering a delegate for the ReceiveCompleted event of your MessageQueue, as described here:

using System;
using System.Messaging;

namespace MyProject
{
///
/// Provides a container class for the example.
///
public class MyNewQueue
{

    //**************************************************
    // Provides an entry point into the application.
    //       
    // This example performs asynchronous receive operation
    // processing.
    //**************************************************

    public static void Main()
    {
        // Create an instance of MessageQueue. Set its formatter.
        MessageQueue myQueue = new MessageQueue(".\\myQueue");
        myQueue.Formatter = new XmlMessageFormatter(new Type[]
            {typeof(String)});

        // Add an event handler for the ReceiveCompleted event.
        myQueue.ReceiveCompleted += new 
            ReceiveCompletedEventHandler(MyReceiveCompleted);

        // Begin the asynchronous receive operation.
        myQueue.BeginReceive();

        // Do other work on the current thread.

        return;
    }


    //**************************************************
    // Provides an event handler for the ReceiveCompleted
    // event.
    //**************************************************

    private static void MyReceiveCompleted(Object source, 
        ReceiveCompletedEventArgs asyncResult)
    {
        // Connect to the queue.
        MessageQueue mq = (MessageQueue)source;

        // End the asynchronous Receive operation.
        Message m = mq.EndReceive(asyncResult.AsyncResult);

        // Display message information on the screen.
        Console.WriteLine("Message: " + (string)m.Body);

        // Restart the asynchronous Receive operation.
        mq.BeginReceive();

        return; 
    }
}

}

Source: https://learn.microsoft.com/en-us/dotnet/api/system.messaging.messagequeue.receivecompleted?view=netframework-4.7.2

行雁书 2024-12-11 20:06:17

您是否考虑过从 MessageEnumerator 返回a href="http://msdn.microsoft.com/en-us/library/system.messaging.messagequeue.getmessageenumerator2.aspx" rel="nofollow">MessageQueue.GetMessageEnumerator2 ?

  • 您可以获取队列的动态内容,以便在迭代过程中检查消息并从队列中删除消息。
  • 如果没有消息,那么 MoveNext() 将返回 false,并且您不需要捕获第一次机会异常。
  • 如果在开始迭代后有新消息,那么它们将被迭代(如果它们是放在光标后面)。
  • 如果光标之前有新消息,那么您只需重置迭代器或继续(如果您目前不需要优先级较低的消息)。

Have you considered a MessageEnumerator which is returned from the MessageQueue.GetMessageEnumerator2 ?

  • You get a dynamic content of the queue to examine and remove messages from a queue during the iteration.
  • If there are no messages then MoveNext() will return false and you don't need to catch first-chance exceptions
  • If there are new messages after you started iteration then they will be iterated over (if they are put after a cursor).
  • If there are new messages before a cursor then you can just reset an iterator or continue (if you don't need messages with lower priority at the moment).
驱逐舰岛风号 2024-12-11 20:06:17

与杰米·迪克森的评论相反,这种情况是特殊的。请注意该方法及其参数的命名: BeginReceive(TimeSpan timeout)

如果该方法被命名为 BeginTryReceive,那么如果没有收到消息,那就完全正常了。将其命名为 BeginReceive(或 Receive,对于同步版本)意味着消息将进入队列。 TimeSpan 参数被命名为 timeout 也很重要,因为超时是异常的。超时意味着预期会得到响应,但没有给出响应,调用者选择停止等待并假设发生了错误。当您以 1 秒超时调用 BeginReceive/Receive 时,您是在声明,如果此时没有消息进入队列,则一定出现了问题,我们需要对其进行处理。

如果我理解您想要正确执行的操作,我实现此操作的方法是:

  • 以非常大的超时调用 BeginReceive,或者如果我没有将空队列视为错误,则甚至没有超时。
  • 将事件处理程序附加到 ReceiveCompleted 事件,该事件处理程序 1) 处理消息,2) 再次调用 BeginReceive。
  • 我不会使用无限循环。当使用像 BeginReceive 这样的异步方法时,这既是不好的做法,也是完全多余的。
  • 编辑:要放弃任何客户端都没有读取的队列,请让队列写入者查看该队列以确定它是否“死亡”。

编辑:我还有另一个建议。由于我不知道您申请的详细信息,我不知道它是否可行或合适。在我看来,您基本上是在客户端和服务器之间建立连接,并以消息队列作为通信通道。为什么这是“联系”?因为如果没有人在监听,队列将不会被写入。我想这就是联系。使用套接字或命名管道来传输消息不是更合适吗?这样,客户端在完成读取后只需关闭 Stream 对象,服务器就会立即收到通知。正如我所说,我不知道它是否适合你正在做的事情,但感觉这是一个更合适的沟通渠道。

Contrary to the comment by Jamie Dixon, the scenario IS exceptional. Note the naming of the method and its parameters: BeginReceive(TimeSpan timeout)

Had the method been named BeginTryReceive, it would've been perfectly normal if no message was received. Naming it BeginReceive (or Receive, for the sync version) implies that a message is expected to enter the queue. That the TimeSpan parameter is named timeout is also significant, because a timeout IS exceptional. A timeout means that a response was expected, but none was given, and the caller chooses to stop waiting and assumes that an error has occured. When you call BeginReceive/Receive with a 1 second timeout, you are stating that if no message has entered the queue by that time, something must have gone wrong and we need to handle it.

The way I would implement this, if I understand what you want to do correctly, is this:

  • Call BeginReceive either with a very large timeout, or even without a timeout if I don't see an empty queue as an error.
  • Attach an event handler to the ReceiveCompleted event, which 1) processes the message, and 2) calls BeginReceive again.
  • I would NOT use an infinite loop. This is both bad practice and completely redundant when using asynchronous methods like BeginReceive.
  • edit: To abandon a queue which isn't being read by any client, have the queue writers peek into the queue to determine if it is 'dead'.

edit: I have another suggestion. Since I don't know the details of your application I have no idea if it is either feasible or appropriate. It seems to me that you're basically establishing a connection between client and server, with the message queue as the communication channel. Why is this a 'connection'? Because the queue won't be written to if no one is listening. That's pretty much what a connection is, I think. Wouldn't it be more appropriate to use sockets or named pipes to transfer the messages? That way, the clients simply close the Stream objects when they are done reading, and the servers are immediately notified. As I said, I don't know if it can work for what you're doing, but it feels like a more appropriate communication channel.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文