仅使用 ReplyTo 名称将响应发送到 ActiveMQ 临时队列
最近,我一直在尝试让 Reply-To 模式在 Apache NMS /ActiveMQ 中工作,并且在仅使用临时队列的名称将消息发送到临时队列时遇到问题。
该项目是调度程序服务,它从总线检索请求并将它们发送到另一个进程/运行时(基于复杂的路由标准)来处理请求。然后,这个单独的处理器使用回复队列名称和相关 ID 来制作响应,并将其发送到同一代理但不同连接上的原始请求者。
问题是,如果您具有消息的 NMSReplyTo 标头中的 IDestination 对象引用,则似乎只能发送到临时队列(或主题)。如果该引用丢失,则无法仅通过其名称将消息发送到临时队列(或主题)。
说明这个问题的是这个简单的“Pong”服务,它侦听消息队列并使用 NMS Reply-To 标头的内容向请求者发出响应。它模仿通过简单地调用 ProcessMessage(string,string) 方法将请求分派到另一个进程。
using System;
using Apache.NMS;
namespace PongService
{
/// <summary>Simple request dispatcher which mimics dispatching requests to other workers in "The Cloud"</summary>
class PongService
{
static ISession session = null;
static IMessageProducer producer = null;
public static void Main(string[] args)
{
Uri connecturi = new Uri("activemq:tcp://localhost:61616");
Console.WriteLine("Connecting to " + connecturi);
IConnectionFactory factory = new NMSConnectionFactory(connecturi);
IConnection connection = factory.CreateConnection();
session = connection.CreateSession();
IDestination destination = session.GetQueue("PONG.CMD");
Console.WriteLine("Using destination: " + destination);
producer = session.CreateProducer(null);
IMessageConsumer consumer = session.CreateConsumer(destination);
connection.Start();
consumer.Listener += new MessageListener(OnMessage);
Console.WriteLine("Press any key to terminate Pong service . . .");
// loop until a key is pressed
while (!Console.KeyAvailable)
{
try { System.Threading.Thread.Sleep(50); }
catch (Exception ex) { Console.Error.WriteLine(ex.Message + "\r\n" + ex.StackTrace); }
} // loop
Console.Write("Closing connection...");
consumer.Close();
producer.Close();
session.Close();
connection.Close();
Console.WriteLine("done.");
}
/// <summary>Consumer call-back which receives requests and dispatches them to available workers in 'The Cloud'</summary>
/// <param name="receivedMsg">The message received on the request queue.</param>
protected static void OnMessage(IMessage receivedMsg)
{
// mimic the operation of passing this request to an external processor which can connect
// to the broker but will not have references to the session objects including destinations
Console.WriteLine("Sending request to an external processor");
ProcessMessage(receivedMsg.NMSReplyTo.ToString(), receivedMsg.NMSCorrelationID.ToString());
}
/// <summary>Models a worker in another process/runtime.</summary>
/// <param name="queuename">Where to send the results of processing</param>
/// <param name="crid">Correlation identifier of the request.</param>
protected static void ProcessMessage(string queuename, string crid)
{
ITextMessage response = session.CreateTextMessage("Pong!");
response.NMSCorrelationID = crid;
IDestination destination = session.GetQueue(queuename);
Console.WriteLine("Sending response with CRID of '" + crid + "' to " + queuename + "'");
try
{
producer.Send(destination, response);
}
catch (Exception ex)
{
Console.Error.WriteLine("Could not send response: " + ex.Message);
}
}
}
}
现在为客户。它只是创建一个临时队列,开始侦听它,然后向我们的“Pong”服务正在侦听的队列发送请求。请求消息中包含临时队列的IDestination。
using System;
using System.Threading;
using Apache.NMS;
using Apache.NMS.Util;
namespace PongClient
{
class PongClient
{
protected static AutoResetEvent semaphore = new AutoResetEvent(false);
protected static ITextMessage message = null;
protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(3);
public static void Main(string[] args)
{
Uri connecturi = new Uri("activemq:tcp://localhost:61616");
Console.WriteLine("About to connect to " + connecturi);
IConnectionFactory factory = new NMSConnectionFactory(connecturi);
IConnection connection = factory.CreateConnection();
ISession session = connection.CreateSession();
IDestination temporaryDestination = session.CreateTemporaryQueue();
Console.WriteLine("Private destination: " + temporaryDestination);
IDestination destination = session.GetQueue("PONG.CMD");
Console.WriteLine("Service destination: " + destination);
IMessageConsumer consumer = session.CreateConsumer(destination);
consumer.Listener += new MessageListener(OnMessage);
IMessageProducer producer = session.CreateProducer(destination);
connection.Start();
// Send a request message
ITextMessage request = session.CreateTextMessage("Ping");
request.NMSCorrelationID = Guid.NewGuid().ToString();
request.NMSReplyTo = temporaryDestination;
producer.Send(request);
// Wait for the message
semaphore.WaitOne((int)receiveTimeout.TotalMilliseconds, true);
if (message == null)
{
Console.WriteLine("Timed-Out!");
}
else
{
Console.WriteLine("Received message with ID: " + message.NMSMessageId);
Console.WriteLine("Received message with text: " + message.Text);
}
}
protected static void OnMessage(IMessage receivedMsg)
{
message = receivedMsg as ITextMessage;
semaphore.Set();
}
}
}
Pong 进程似乎运行正常,只是它最终创建了一个全新的、独立于 Reply-To 标头中指定队列的队列。
以下是所涉及技术的版本:
- Apache.NMS.ActiveMQ v1.5.1
- Apache.NMS API v1.5.0
- ActiveMQ 5.5.0
- C# .NET 3.5
这个问题与 这个帖子描述了类似的问题。希望这些示例也有助于澄清该请求中的问题。
对解决方案的任何帮助或见解将不胜感激。
Lately, I've been trying to get the Reply-To pattern to work in Apache NMS /ActiveMQ and have been having problems sending messages to temporary queues using only the name of the temporary queue.
The project is dispatcher service which retrieves requests from the bus and sends them to another process/runtime (based on complex routing criteria) to process the request. This separate processor then uses the reply-to queue name and correlation ID to craft the response and sending it to the original requester on the same broker but a different connection.
The problem is that it appears you can only send to a temporary queue (or topic) if you have the IDestination object reference from the message's NMSReplyTo header. If that reference is lost, there is no way to send messages to a temporary queue (or topic) by simply using its name.
Illustrating this problem is this simple "Pong" service which listens on a message queue and issues a response to the requester using the contents of the NMS Reply-To header. It mimics dispatching the request to another process by simply calling the ProcessMessage(string,string) method.
using System;
using Apache.NMS;
namespace PongService
{
/// <summary>Simple request dispatcher which mimics dispatching requests to other workers in "The Cloud"</summary>
class PongService
{
static ISession session = null;
static IMessageProducer producer = null;
public static void Main(string[] args)
{
Uri connecturi = new Uri("activemq:tcp://localhost:61616");
Console.WriteLine("Connecting to " + connecturi);
IConnectionFactory factory = new NMSConnectionFactory(connecturi);
IConnection connection = factory.CreateConnection();
session = connection.CreateSession();
IDestination destination = session.GetQueue("PONG.CMD");
Console.WriteLine("Using destination: " + destination);
producer = session.CreateProducer(null);
IMessageConsumer consumer = session.CreateConsumer(destination);
connection.Start();
consumer.Listener += new MessageListener(OnMessage);
Console.WriteLine("Press any key to terminate Pong service . . .");
// loop until a key is pressed
while (!Console.KeyAvailable)
{
try { System.Threading.Thread.Sleep(50); }
catch (Exception ex) { Console.Error.WriteLine(ex.Message + "\r\n" + ex.StackTrace); }
} // loop
Console.Write("Closing connection...");
consumer.Close();
producer.Close();
session.Close();
connection.Close();
Console.WriteLine("done.");
}
/// <summary>Consumer call-back which receives requests and dispatches them to available workers in 'The Cloud'</summary>
/// <param name="receivedMsg">The message received on the request queue.</param>
protected static void OnMessage(IMessage receivedMsg)
{
// mimic the operation of passing this request to an external processor which can connect
// to the broker but will not have references to the session objects including destinations
Console.WriteLine("Sending request to an external processor");
ProcessMessage(receivedMsg.NMSReplyTo.ToString(), receivedMsg.NMSCorrelationID.ToString());
}
/// <summary>Models a worker in another process/runtime.</summary>
/// <param name="queuename">Where to send the results of processing</param>
/// <param name="crid">Correlation identifier of the request.</param>
protected static void ProcessMessage(string queuename, string crid)
{
ITextMessage response = session.CreateTextMessage("Pong!");
response.NMSCorrelationID = crid;
IDestination destination = session.GetQueue(queuename);
Console.WriteLine("Sending response with CRID of '" + crid + "' to " + queuename + "'");
try
{
producer.Send(destination, response);
}
catch (Exception ex)
{
Console.Error.WriteLine("Could not send response: " + ex.Message);
}
}
}
}
Now for the client. It simply creates a temporary queue, starts listening to it and then sends a request on the queue on which our "Pong" service is listening. The request message contains the IDestination of the temporary queue.
using System;
using System.Threading;
using Apache.NMS;
using Apache.NMS.Util;
namespace PongClient
{
class PongClient
{
protected static AutoResetEvent semaphore = new AutoResetEvent(false);
protected static ITextMessage message = null;
protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(3);
public static void Main(string[] args)
{
Uri connecturi = new Uri("activemq:tcp://localhost:61616");
Console.WriteLine("About to connect to " + connecturi);
IConnectionFactory factory = new NMSConnectionFactory(connecturi);
IConnection connection = factory.CreateConnection();
ISession session = connection.CreateSession();
IDestination temporaryDestination = session.CreateTemporaryQueue();
Console.WriteLine("Private destination: " + temporaryDestination);
IDestination destination = session.GetQueue("PONG.CMD");
Console.WriteLine("Service destination: " + destination);
IMessageConsumer consumer = session.CreateConsumer(destination);
consumer.Listener += new MessageListener(OnMessage);
IMessageProducer producer = session.CreateProducer(destination);
connection.Start();
// Send a request message
ITextMessage request = session.CreateTextMessage("Ping");
request.NMSCorrelationID = Guid.NewGuid().ToString();
request.NMSReplyTo = temporaryDestination;
producer.Send(request);
// Wait for the message
semaphore.WaitOne((int)receiveTimeout.TotalMilliseconds, true);
if (message == null)
{
Console.WriteLine("Timed-Out!");
}
else
{
Console.WriteLine("Received message with ID: " + message.NMSMessageId);
Console.WriteLine("Received message with text: " + message.Text);
}
}
protected static void OnMessage(IMessage receivedMsg)
{
message = receivedMsg as ITextMessage;
semaphore.Set();
}
}
}
The Pong process seems to operate correctly, only it winds-up making a completely new, separate queue from the one specified in the Reply-To header.
Here are the versions of the technologies involved:
- Apache.NMS.ActiveMQ v1.5.1
- Apache.NMS API v1.5.0
- ActiveMQ 5.5.0
- C# .NET 3.5
This question is related to this post which describes a similar problem. Hopefully these examples will help clarify the issue in that request as well.
Any help or insight to the solution would be greatly appreciated.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
您实际上并没有在来自 PongClient 的请求消息中设置回复标头。
试试这个:
You're not actually setting the reply-to header in the request message from the PongClient.
Try this:
您需要使用传递给您的
IDestination
。打电话
有点邪恶。在幕后,它调用 CreateTemporaryQueue() 将现有的临时队列替换为同名的新队列,而不会通知您。
You need to use the
IDestination
you are passed.Calling
is a little evil. Under the covers it calls CreateTemporaryQueue() replaces the existing temporary queue with a new one of the same name without informing you.
我建议使用主题作为回复目的地,并让您的消费者基于 NMSCorrelationID 进行过滤。这是我在对临时队列感到非常沮丧之后转向的实现。它实际上有很多优点。
I would recommend using a topic as a reply destination, and have your consumer filter based on the NMSCorrelationID. This is the implementation I have moved to after much frustration with temp queues. It actually has many advantages.