仅使用 ReplyTo 名称将响应发送到 ActiveMQ 临时队列

发布于 2024-12-03 13:14:39 字数 6915 浏览 1 评论 0原文

最近,我一直在尝试让 Reply-To 模式在 Apache NMS /ActiveMQ 中工作,并且在仅使用临时队列的名称将消息发送到临时队列时遇到问题。

该项目是调度程序服务,它从总线检索请求并将它们发送到另一个进程/运行时(基于复杂的路由标准)来处理请求。然后,这个单独的处理器使用回复队列名称和相关 ID 来制作响应,并将其发送到同一代理但不同连接上的原始请求者。

问题是,如果您具有消息的 NMSReplyTo 标头中的 IDestination 对象引用,则似乎只能发送到临时队列(或主题)。如果该引用丢失,则无法仅通过其名称将消息发送到临时队列(或主题)。

说明这个问题的是这个简单的“Pong”服务,它侦听消息队列并使用 NMS Reply-To 标头的内容向请求者发出响应。它模仿通过简单地调用 ProcessMessage(string,string) 方法将请求分派到另一个进程。

    using System;
    using Apache.NMS;

    namespace PongService
    {
        /// <summary>Simple request dispatcher which mimics dispatching requests to other workers in "The Cloud"</summary>
        class PongService
        {
            static ISession session = null;
            static IMessageProducer producer = null;

            public static void Main(string[] args)
            {
                Uri connecturi = new Uri("activemq:tcp://localhost:61616");
                Console.WriteLine("Connecting to " + connecturi);

                IConnectionFactory factory = new NMSConnectionFactory(connecturi);
                IConnection connection = factory.CreateConnection();
                session = connection.CreateSession();

                IDestination destination = session.GetQueue("PONG.CMD");
                Console.WriteLine("Using destination: " + destination);

                producer = session.CreateProducer(null);

                IMessageConsumer consumer = session.CreateConsumer(destination);

                connection.Start();

                consumer.Listener += new MessageListener(OnMessage);

                Console.WriteLine("Press any key to terminate Pong service . . .");

                // loop until a key is pressed
                while (!Console.KeyAvailable)
                {
                    try { System.Threading.Thread.Sleep(50); }
                    catch (Exception ex) { Console.Error.WriteLine(ex.Message + "\r\n" + ex.StackTrace); }
                } // loop

                Console.Write("Closing connection...");
                consumer.Close();
                producer.Close();
                session.Close();
                connection.Close();
                Console.WriteLine("done.");
            }


            /// <summary>Consumer call-back which receives requests and dispatches them to available workers in 'The Cloud'</summary>
            /// <param name="receivedMsg">The message received on the request queue.</param>
            protected static void OnMessage(IMessage receivedMsg)
            {
                // mimic the operation of passing this request to an external processor which can connect 
                // to the broker but will not have references to the session objects including destinations
                Console.WriteLine("Sending request to an external processor");
                ProcessMessage(receivedMsg.NMSReplyTo.ToString(), receivedMsg.NMSCorrelationID.ToString());
            }


            /// <summary>Models a worker in another process/runtime.</summary>
            /// <param name="queuename">Where to send the results of processing</param>
            /// <param name="crid">Correlation identifier of the request.</param>
            protected static void ProcessMessage(string queuename, string crid)
            {
                ITextMessage response = session.CreateTextMessage("Pong!");
                response.NMSCorrelationID = crid;

                IDestination destination = session.GetQueue(queuename);

                Console.WriteLine("Sending response with CRID of '" + crid + "' to " + queuename + "'");
                try
                {
                    producer.Send(destination, response);
                }
                catch (Exception ex)
                {
                    Console.Error.WriteLine("Could not send response: " + ex.Message);
                }

            }

        }

    }

现在为客户。它只是创建一个临时队列,开始侦听它,然后向我们的“Pong”服务正在侦听的队列发送请求。请求消息中包含临时队列的IDestination。

    using System;
    using System.Threading;
    using Apache.NMS;
    using Apache.NMS.Util;

    namespace PongClient
    {
        class PongClient
        {
            protected static AutoResetEvent semaphore = new AutoResetEvent(false);
            protected static ITextMessage message = null;
            protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(3);

            public static void Main(string[] args)
            {
                Uri connecturi = new Uri("activemq:tcp://localhost:61616");
                Console.WriteLine("About to connect to " + connecturi);

                IConnectionFactory factory = new NMSConnectionFactory(connecturi);

                IConnection connection = factory.CreateConnection();
                ISession session = connection.CreateSession();

                IDestination temporaryDestination = session.CreateTemporaryQueue();
                Console.WriteLine("Private destination: " + temporaryDestination);

                IDestination destination = session.GetQueue("PONG.CMD");
                Console.WriteLine("Service destination: " + destination);


                IMessageConsumer consumer = session.CreateConsumer(destination);
                consumer.Listener += new MessageListener(OnMessage);

                IMessageProducer producer = session.CreateProducer(destination);

                connection.Start();

                // Send a request message
                ITextMessage request = session.CreateTextMessage("Ping");
                request.NMSCorrelationID = Guid.NewGuid().ToString();
                request.NMSReplyTo = temporaryDestination;
                producer.Send(request);

                // Wait for the message
                semaphore.WaitOne((int)receiveTimeout.TotalMilliseconds, true);
                if (message == null)
                {
                    Console.WriteLine("Timed-Out!");
                }
                else
                {
                    Console.WriteLine("Received message with ID:   " + message.NMSMessageId);
                    Console.WriteLine("Received message with text: " + message.Text);
                }
            }



            protected static void OnMessage(IMessage receivedMsg)
            {
                message = receivedMsg as ITextMessage;
                semaphore.Set();
            }
        }
    }

Pong 进程似乎运行正常,只是它最终创建了一个全新的、独立于 Reply-To 标头中指定队列的队列。

以下是所涉及技术的版本:

  • Apache.NMS.ActiveMQ v1.5.1
  • Apache.NMS API v1.5.0
  • ActiveMQ 5.5.0
  • C# .NET 3.5

这个问题与 这个帖子描述了类似的问题。希望这些示例也有助于澄清该请求中的问题。

对解决方案的任何帮助或见解将不胜感激。

Lately, I've been trying to get the Reply-To pattern to work in Apache NMS /ActiveMQ and have been having problems sending messages to temporary queues using only the name of the temporary queue.

The project is dispatcher service which retrieves requests from the bus and sends them to another process/runtime (based on complex routing criteria) to process the request. This separate processor then uses the reply-to queue name and correlation ID to craft the response and sending it to the original requester on the same broker but a different connection.

The problem is that it appears you can only send to a temporary queue (or topic) if you have the IDestination object reference from the message's NMSReplyTo header. If that reference is lost, there is no way to send messages to a temporary queue (or topic) by simply using its name.

Illustrating this problem is this simple "Pong" service which listens on a message queue and issues a response to the requester using the contents of the NMS Reply-To header. It mimics dispatching the request to another process by simply calling the ProcessMessage(string,string) method.

    using System;
    using Apache.NMS;

    namespace PongService
    {
        /// <summary>Simple request dispatcher which mimics dispatching requests to other workers in "The Cloud"</summary>
        class PongService
        {
            static ISession session = null;
            static IMessageProducer producer = null;

            public static void Main(string[] args)
            {
                Uri connecturi = new Uri("activemq:tcp://localhost:61616");
                Console.WriteLine("Connecting to " + connecturi);

                IConnectionFactory factory = new NMSConnectionFactory(connecturi);
                IConnection connection = factory.CreateConnection();
                session = connection.CreateSession();

                IDestination destination = session.GetQueue("PONG.CMD");
                Console.WriteLine("Using destination: " + destination);

                producer = session.CreateProducer(null);

                IMessageConsumer consumer = session.CreateConsumer(destination);

                connection.Start();

                consumer.Listener += new MessageListener(OnMessage);

                Console.WriteLine("Press any key to terminate Pong service . . .");

                // loop until a key is pressed
                while (!Console.KeyAvailable)
                {
                    try { System.Threading.Thread.Sleep(50); }
                    catch (Exception ex) { Console.Error.WriteLine(ex.Message + "\r\n" + ex.StackTrace); }
                } // loop

                Console.Write("Closing connection...");
                consumer.Close();
                producer.Close();
                session.Close();
                connection.Close();
                Console.WriteLine("done.");
            }


            /// <summary>Consumer call-back which receives requests and dispatches them to available workers in 'The Cloud'</summary>
            /// <param name="receivedMsg">The message received on the request queue.</param>
            protected static void OnMessage(IMessage receivedMsg)
            {
                // mimic the operation of passing this request to an external processor which can connect 
                // to the broker but will not have references to the session objects including destinations
                Console.WriteLine("Sending request to an external processor");
                ProcessMessage(receivedMsg.NMSReplyTo.ToString(), receivedMsg.NMSCorrelationID.ToString());
            }


            /// <summary>Models a worker in another process/runtime.</summary>
            /// <param name="queuename">Where to send the results of processing</param>
            /// <param name="crid">Correlation identifier of the request.</param>
            protected static void ProcessMessage(string queuename, string crid)
            {
                ITextMessage response = session.CreateTextMessage("Pong!");
                response.NMSCorrelationID = crid;

                IDestination destination = session.GetQueue(queuename);

                Console.WriteLine("Sending response with CRID of '" + crid + "' to " + queuename + "'");
                try
                {
                    producer.Send(destination, response);
                }
                catch (Exception ex)
                {
                    Console.Error.WriteLine("Could not send response: " + ex.Message);
                }

            }

        }

    }

Now for the client. It simply creates a temporary queue, starts listening to it and then sends a request on the queue on which our "Pong" service is listening. The request message contains the IDestination of the temporary queue.

    using System;
    using System.Threading;
    using Apache.NMS;
    using Apache.NMS.Util;

    namespace PongClient
    {
        class PongClient
        {
            protected static AutoResetEvent semaphore = new AutoResetEvent(false);
            protected static ITextMessage message = null;
            protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(3);

            public static void Main(string[] args)
            {
                Uri connecturi = new Uri("activemq:tcp://localhost:61616");
                Console.WriteLine("About to connect to " + connecturi);

                IConnectionFactory factory = new NMSConnectionFactory(connecturi);

                IConnection connection = factory.CreateConnection();
                ISession session = connection.CreateSession();

                IDestination temporaryDestination = session.CreateTemporaryQueue();
                Console.WriteLine("Private destination: " + temporaryDestination);

                IDestination destination = session.GetQueue("PONG.CMD");
                Console.WriteLine("Service destination: " + destination);


                IMessageConsumer consumer = session.CreateConsumer(destination);
                consumer.Listener += new MessageListener(OnMessage);

                IMessageProducer producer = session.CreateProducer(destination);

                connection.Start();

                // Send a request message
                ITextMessage request = session.CreateTextMessage("Ping");
                request.NMSCorrelationID = Guid.NewGuid().ToString();
                request.NMSReplyTo = temporaryDestination;
                producer.Send(request);

                // Wait for the message
                semaphore.WaitOne((int)receiveTimeout.TotalMilliseconds, true);
                if (message == null)
                {
                    Console.WriteLine("Timed-Out!");
                }
                else
                {
                    Console.WriteLine("Received message with ID:   " + message.NMSMessageId);
                    Console.WriteLine("Received message with text: " + message.Text);
                }
            }



            protected static void OnMessage(IMessage receivedMsg)
            {
                message = receivedMsg as ITextMessage;
                semaphore.Set();
            }
        }
    }

The Pong process seems to operate correctly, only it winds-up making a completely new, separate queue from the one specified in the Reply-To header.

Here are the versions of the technologies involved:

  • Apache.NMS.ActiveMQ v1.5.1
  • Apache.NMS API v1.5.0
  • ActiveMQ 5.5.0
  • C# .NET 3.5

This question is related to this post which describes a similar problem. Hopefully these examples will help clarify the issue in that request as well.

Any help or insight to the solution would be greatly appreciated.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

年华零落成诗 2024-12-10 13:14:39

您实际上并没有在来自 PongClient 的请求消息中设置回复标头。

试试这个:

ITextMessage request = session.CreateTextMessage("Ping");
request.NMSCorrelationID = Guid.NewGuid().ToString();
request.NMSReplyTo = temporaryDestination;
producer.Send(request);

You're not actually setting the reply-to header in the request message from the PongClient.

Try this:

ITextMessage request = session.CreateTextMessage("Ping");
request.NMSCorrelationID = Guid.NewGuid().ToString();
request.NMSReplyTo = temporaryDestination;
producer.Send(request);
原野 2024-12-10 13:14:39

您需要使用传递给您的IDestination

打电话

IDestination destination = session.GetQueue(queuename); 

有点邪恶。在幕后,它调用 CreateTemporaryQueue() 将现有的临时队列替换为同名的新队列,而不会通知您。

You need to use the IDestination you are passed.

Calling

IDestination destination = session.GetQueue(queuename); 

is a little evil. Under the covers it calls CreateTemporaryQueue() replaces the existing temporary queue with a new one of the same name without informing you.

倾城°AllureLove 2024-12-10 13:14:39

我建议使用主题作为回复目的地,并让您的消费者基于 NMSCorrelationID 进行过滤。这是我在对临时队列感到非常沮丧之后转向的实现。它实际上有很多优点。

  1. 它减少了服务器上的密集资源使用(无需构造/解构临时队列)。
  2. 它允许您使用另一个使用者来监视发回的响应(您将永远无法“窥视”临时队列内部)。
  3. 而且它更加可靠,因为主题可以通过逻辑名称而不是特定的令牌 ID(您在连接中丢失)来传递。

I would recommend using a topic as a reply destination, and have your consumer filter based on the NMSCorrelationID. This is the implementation I have moved to after much frustration with temp queues. It actually has many advantages.

  1. It cuts down on intensive resource usage on the server (no need to construct/deconstruct temp queues).
  2. It allows you to use another consumer to monitor the response sent back (you will never be able to "peek" inside a temp queue).
  3. And it is much more reliable because the topic can be passed via a logical name instead of a specific token ID (which you are losing across connections).
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文