2008R2上消息队列的正确使用

发布于 2024-10-26 02:41:58 字数 6511 浏览 2 评论 0原文

我不是程序员,但我试图通过给他们一些指导来帮助他们。我们不再拥有任何有关 msmq 的内部专业知识。我们正在尝试使用它来将一些功能与调度应用程序集成。

调度应用程序通过使用自定义构建的 dll 进行网络调用来启动作业。 dll 调用 weburl。 Web 应用程序将运行其任务并向网站发送有关其执行的任务的更新。网站将消息写入队列。调用该站点的 dll 正在监视队列中是否有带有分配给该作业的标签的消息。当它收到最终状态消息时,它会关闭。

我们每隔几个小时就会收到以下消息。我们每小时运行近 100 个使用此方法的作业。在底部列出的代码中,jobid 对应于消息队列中消息的标签。每个作业在开始时都会发出一个 jobid,并将使用它作为发送到该作业的 msmq 的每条消息的标签。

 System.Messaging.MessageQueueException (0x80004005): Message that the cursor is currently pointing to has been removed from the queue by another process or by another call to Receive without the use of this cursor.
  at System.Messaging.MessageQueue.ReceiveCurrent(TimeSpan timeout, Int32 action, CursorHandle cursor, MessagePropertyFilter filter, MessageQueueTransaction internalTransaction, MessageQueueTransactionType transactionType)
  at System.Messaging.MessageEnumerator.get_Current() 

这是它的代码。

  while ( running )
        {
            // System.Console.WriteLine( "Begin Peek" );
            messageQueue.Peek();
            //System.Console.WriteLine( "End Peek" );
            messageQueue.MessageReadPropertyFilter.SetAll();

            using ( MessageEnumerator enumerator = messageQueue.GetMessageEnumerator2() )
            {
                enumerator.Reset();

                while ( enumerator.MoveNext() )
                {
                    Message msg = enumerator.Current;

                    if ( msg.Label.Equals( this.jobid ) )
                    {
                        StringBuilder sb = new StringBuilder();
                        /*
                        try
                        {
                            sb.Append( "Message Source: " );
                            //sb.Append( msg.SourceMachine );
                            sb.Append( " Sent: " );
                            sb.Append( msg.SentTime );
                            sb.Append( " Label " );
                            sb.Append( msg.Label );
                            sb.Append( " ID: " );
                            sb.Append( msg.Id );
                            sb.Append( " CorrelationID: " );
                            sb.Append( msg.CorrelationId );
                            sb.Append( " Body Type: " );
                            sb.Append( msg.BodyType );
                        }
                        catch ( Exception )
                        {
                            throw;
                        }
                        finally
                        {
                            System.Console.WriteLine( sb.ToString() );
                        }
                        */
                        //System.Console.WriteLine( "Receiving Message started" );
                        using ( Message message = messageQueue.ReceiveById( msg.Id ) )
                        {
                            //System.Console.WriteLine( "Receiving Message Complete" );
                            //sb = new StringBuilder();
                            string bodyText = string.Empty;

                            try
                            {
                                System.IO.StringWriter sw = new System.IO.StringWriter( sb );
                                System.IO.StreamReader sr = new System.IO.StreamReader( message.BodyStream );

                                while ( !sr.EndOfStream )
                                {
                                    sw.WriteLine( sr.ReadLine() );
                                }
                                sr.Close();
                                sw.Close();
                                bodyText = ( string ) FromXml( sb.ToString(), typeof( string ) );
                                int indx = bodyText.IndexOf( ',' );
                                string tokens = bodyText.Substring( indx + 1 );
                                indx = tokens.IndexOf( ',' );
                                string command = tokens.Substring( 0, indx );
                                tokens = tokens.Substring( indx + 1 );
                                if ( command.Equals( COMMAND_STARTED ) )
                                {
                                    System.Console.WriteLine( "STARTED " + tokens );
                                }
                                else if ( command.Equals( COMMAND_UPDATE ) )
                                {
                                    System.Console.WriteLine( tokens );
                                }
                                else if ( command.Equals( COMMAND_ENDED_OK ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Success" );
                                    finalResults = new FinalResults( 0, 0, "Success" );
                                    running = false;
                                }
                                else if ( command.Equals( COMMAND_ENDED_WARNING ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Warning Issued" );
                                    finalResults = new FinalResults( 1, 1, "Warning" );
                                    running = false;
                                }
                                else if ( command.Equals( COMMAND_ENDED_FAIL ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Failure" );
                                    finalResults = new FinalResults( 2, 16, "Failure" );
                                    running = false;
                                }
                            }
                            catch ( Exception )
                            {
                                throw;
                            }
                            finally
                            {
                                //System.Console.WriteLine( "Body: " + bodyText );
                            }
                        }
                    }
                }
            }
        }

        return finalResults;
    }

    MessageQueue messageQueue = null;
    string webServiceURL = "";
    Dictionary<string, string> parms = new Dictionary<string, string>();
    string jobid = "NONE";

I am not a programmer but I am trying to help them out by giving them some guidance. We no longer have any in house expertise on msmq. We are trying to use this to integrate some functions with a scheduling application.

The scheduling app fires off a job by making a webcall using a custom built dll. The dll calls the weburl. The web app will run its task and send updates to a website about the task it performed. Website writes the message to the queue. The dll which called the site is monitoring the queue for messages with the label that was assigned to that job. When it receives the final status message it closes.

We are getting the following message every few hours. We run close to 100 jobs per hour that use this method. In the code listed at the bottom, the jobid corresponds to the label for the message in the message queue. Each job is issued a jobid at the start and will use that as the label for each message it sends to the msmq for that job.

 System.Messaging.MessageQueueException (0x80004005): Message that the cursor is currently pointing to has been removed from the queue by another process or by another call to Receive without the use of this cursor.
  at System.Messaging.MessageQueue.ReceiveCurrent(TimeSpan timeout, Int32 action, CursorHandle cursor, MessagePropertyFilter filter, MessageQueueTransaction internalTransaction, MessageQueueTransactionType transactionType)
  at System.Messaging.MessageEnumerator.get_Current() 

Here is the code for it.

  while ( running )
        {
            // System.Console.WriteLine( "Begin Peek" );
            messageQueue.Peek();
            //System.Console.WriteLine( "End Peek" );
            messageQueue.MessageReadPropertyFilter.SetAll();

            using ( MessageEnumerator enumerator = messageQueue.GetMessageEnumerator2() )
            {
                enumerator.Reset();

                while ( enumerator.MoveNext() )
                {
                    Message msg = enumerator.Current;

                    if ( msg.Label.Equals( this.jobid ) )
                    {
                        StringBuilder sb = new StringBuilder();
                        /*
                        try
                        {
                            sb.Append( "Message Source: " );
                            //sb.Append( msg.SourceMachine );
                            sb.Append( " Sent: " );
                            sb.Append( msg.SentTime );
                            sb.Append( " Label " );
                            sb.Append( msg.Label );
                            sb.Append( " ID: " );
                            sb.Append( msg.Id );
                            sb.Append( " CorrelationID: " );
                            sb.Append( msg.CorrelationId );
                            sb.Append( " Body Type: " );
                            sb.Append( msg.BodyType );
                        }
                        catch ( Exception )
                        {
                            throw;
                        }
                        finally
                        {
                            System.Console.WriteLine( sb.ToString() );
                        }
                        */
                        //System.Console.WriteLine( "Receiving Message started" );
                        using ( Message message = messageQueue.ReceiveById( msg.Id ) )
                        {
                            //System.Console.WriteLine( "Receiving Message Complete" );
                            //sb = new StringBuilder();
                            string bodyText = string.Empty;

                            try
                            {
                                System.IO.StringWriter sw = new System.IO.StringWriter( sb );
                                System.IO.StreamReader sr = new System.IO.StreamReader( message.BodyStream );

                                while ( !sr.EndOfStream )
                                {
                                    sw.WriteLine( sr.ReadLine() );
                                }
                                sr.Close();
                                sw.Close();
                                bodyText = ( string ) FromXml( sb.ToString(), typeof( string ) );
                                int indx = bodyText.IndexOf( ',' );
                                string tokens = bodyText.Substring( indx + 1 );
                                indx = tokens.IndexOf( ',' );
                                string command = tokens.Substring( 0, indx );
                                tokens = tokens.Substring( indx + 1 );
                                if ( command.Equals( COMMAND_STARTED ) )
                                {
                                    System.Console.WriteLine( "STARTED " + tokens );
                                }
                                else if ( command.Equals( COMMAND_UPDATE ) )
                                {
                                    System.Console.WriteLine( tokens );
                                }
                                else if ( command.Equals( COMMAND_ENDED_OK ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Success" );
                                    finalResults = new FinalResults( 0, 0, "Success" );
                                    running = false;
                                }
                                else if ( command.Equals( COMMAND_ENDED_WARNING ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Warning Issued" );
                                    finalResults = new FinalResults( 1, 1, "Warning" );
                                    running = false;
                                }
                                else if ( command.Equals( COMMAND_ENDED_FAIL ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Failure" );
                                    finalResults = new FinalResults( 2, 16, "Failure" );
                                    running = false;
                                }
                            }
                            catch ( Exception )
                            {
                                throw;
                            }
                            finally
                            {
                                //System.Console.WriteLine( "Body: " + bodyText );
                            }
                        }
                    }
                }
            }
        }

        return finalResults;
    }

    MessageQueue messageQueue = null;
    string webServiceURL = "";
    Dictionary<string, string> parms = new Dictionary<string, string>();
    string jobid = "NONE";

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

小…楫夜泊 2024-11-02 02:41:58

kprobst 的解释很可能就是正在发生的事情。即使您看到队列中存在此特定消息,如果不同的应用程序(或同一应用程序的不同实例)从此队列中选取一条(任何)消息,也会使光标无效。

从本质上讲,如果多个进程从同一队列中获取数据,则此代码无法正常工作。

kprobst's explanation is likely what is happening. Even if you are seeing this particular message is in the queue, if a different application(or different instance of the same application) picks a(any) message from this queue, that will invalidate the cursor.

Inherently this code is not designed to work if multiple processes are feeding off the same queue.

苏别ゝ 2024-11-02 02:41:58

这通常意味着您正在 Receiving() 的消息在接收操作完成之前已被其他内容删除。另一个应用程序或与您的代码位于同一进程中的另一个线程使用不同的队列引用。

您是否有可能同时运行两个处理器代码实例(我猜这是一个控制台应用程序)?在相同或不同的机器上?或者其他一些应用程序或工具从队列中删除消息?

.NET 2.0 的预发行版本之一曾经存在一个错误,在某些压力条件下会导致此问题,但据我记得它在发布之前已修复。

This typically means that the message you're Receiving() is being removed by something else before the receive operation can be completed. Another application, or another thread in the same process as your code using a different queue reference.

Is it possible that you might have two instances of the processor code (I guess it's a console app) running at the same time? On the same or different machines? Or some other application or tool removing messages from the queue?

There used to be a bug in one of the pre-release versions of .NET 2.0 that would cause this under some stress conditions but as far as I remember it was fixed before they shipped.

陈甜 2024-11-02 02:41:58

由于 MessageQueue 的内部方法 ReceiveCurrent 中存在并发问题,此操作失败。
异常堆栈跟踪显示调用源自 enumerator.Current 行,异常发生在 ReceiveCurrent。 Enumerator.Current 使用“peek”选项调用 ReceiveCurrent。您可以问,当我遇到同样的问题时,我也遇到过,查看如何失败并出现“消息已收到”错误?它只是想查看下一条尚未收到的消息?
答案就在 ReceiveCurrent 代码中,可以在此处查看:
https://referencesource.microsoft.com/# System.Messaging/System/Messaging/MessageQueue.cs,02c33cc512659fd7,references

ReceiveCurrent 首先进行 StaleSafeReceive 调用来查看下一条消息。但是,如果此调用返回它需要更多内存来接收整个消息(带有
“while (MessageQueue.IsMemoryError(status)”在其源代码中),它分配所需的内存并进行另一个 StaleSafeReceive 调用来获取消息。
这是非常经典的 Win32 API 使用模式,因为它最终是基于 C 的。

这里的问题是,如果在 ReceiveCurrent 内对 StaleSafeReceive 的第一次和第二次调用之间另一个进程或线程“接收”,即从队列中删除该消息,则第二次调用会抛出这个确切的异常。这就是“查看”操作失败的原因。
请注意,它可能是导致异常的枚举器正在扫描的任何消息,而不是正在查找的消息。这就解释了为什么在抛出异常并且方法失败后,具有该作业 ID 的消息仍然存在于队列中。

可以做的是使用 try catch 来保护枚举器。当前调用,如果捕获到此特定异常,则只需使用队列中的下一个可用消息继续枚举即可。

我使用了 Cursor 对象而不是枚举器,但它遇到了同样的问题。但是使用 Cursor 有另一种方法可以降低发生这种情况的风险,即在扫描/查看消息时关闭当前 Queue 对象的 MessagePropertyFilter 的所有不需要的属性,尤其是 Body 属性。因为在查看期间,通常不需要接收正文,但大多数情况下,消息正文会导致重新分配内存,并需要在 ReceiveCurrent 中进行第二个 StaleSafeReceive 调用。
在 peek 调用中直接使用 Cursor 时,仍然需要尝试捕获此异常。

This is failing because of a concurrency issue in the internal method ReceiveCurrent of MessageQueue.
The exception stack trace shows the call originated at enumerator.Current line and exception happened at ReceiveCurrent. Enumerator.Current calls ReceiveCurrent with "peek" option. You can ask, which I had also when I had encountered the same problem, how can a peek fail with "Message is already received" error? It is only trying to peek the next message that is not already received anyway?
Well answer to it lies in the ReceiveCurrent code, which is avaliable for review here:
https://referencesource.microsoft.com/#System.Messaging/System/Messaging/MessageQueue.cs,02c33cc512659fd7,references

ReceiveCurrent first makes a StaleSafeReceive call to peek the next message. But if this call returns that it needs more memory to receive the whole message (the line with
"while (MessageQueue.IsMemoryError(status)" in its source code), it allocates the needed memory and makes another StaleSafeReceive call to get the message.
This is very classical Win32 API usage pattern due to its being C based eventually.

The problem here is, if between the first and second call to StaleSafeReceive inside ReceiveCurrent another process or thread "receives", i.e. removes that message from the queue, the second call throws this exact exception. And that is how a "peek" operation fails.
Note that it could be any message that is being scanned over by the enumerator causing the exception, not the message that is being looked for. Which explains why the message with that job id is still there int the queue after exception is thrown and method fails.

What can be done is to guard the enumerator.Current call with a try catch and if this particular exception is caught, just continue with the enumeration with the next available message in queue.

I had used the Cursor object rather than the enumerator, but it runs into same problem. But with Cursor usage there is another way to reduce the risk of this happening, that is while scanning/peeking the message is to turn off all the unneeded properties of the current Queue object's MessagePropertyFilter, especially the Body property. Because during peeking the body is usually not needed to be received, but most often the body of the message causes memory to be reallocated and requiring the second StaleSafeReceive call inside ReceiveCurrent.
Still a try catch for this exception would be needed with direct Cursor usage too with the peek calls.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文