如何为apache.nms.amqp客户端启用多个消费者?
想知道如何为apache.nms.amqp客户端启用多个消费者。已经尝试了多个会话与不同的消费者进行同一队列 - 但听众只要求每个队列一个消费者。 以下是示例代码。忽略我认为这可能是原因但行不通的,忽略每队列的连接。给定消费者一个名称-Consumername以识别被调用的消费者。
var queueClientLogger = loggerFactory.CreateLogger<QueueClient>();
var queueClient1 = new QueueClient(queueClientLogger, "Q/test1");
await queueClient1.InitializeAsync();
var queueClient2 = new QueueClient(queueClientLogger, "Q/test1");
await queueClient2.InitializeAsync();
var queueClient3 = new QueueClient(queueClientLogger, "Q/test2");
await queueClient3.InitializeAsync();
-----------------------------------------------
internal class QueueClient : IDisposable
{
private readonly ILogger<QueueClient> logger;
private IMessageConsumer consumer;
private bool disposedValue;
#region constructor
public QueueClient(ILogger<QueueClient> logger, string queueName)
{
this.logger = logger;
QueueName = queueName;
ConsumerName = $"{QueueName}-{Guid.NewGuid()}";
}
#endregion
#region Properties
internal string? QueueName { get; private set; }
internal string ConsumerName { get; private set; }
internal Apache.NMS.ISession Session { get; private set; }
internal Apache.NMS.IConnection Connection { get; private set; }
#endregion
#region Methods
internal async Task InitializeAsync()
{
string brokerUri = $"amqp://localhost:5672"; // Default port
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
Connection = await factory.CreateConnectionAsync();
await Connection.StartAsync();
Session = await Connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
Apache.NMS.IDestination dest = await Session.GetQueueAsync(QueueName);
consumer = await Session.CreateConsumerAsync(dest);
consumer.Listener += Consumer_Listener;
}
private void Consumer_Listener(Apache.NMS.IMessage message)
{
logger.LogInformation($"{ConsumerName}: Message from queue - {QueueName}");
Thread.Sleep(1000);
string content = string.Empty;
if (message is ITextMessage)
{
ITextMessage? txtMsg = message as ITextMessage;
content = txtMsg?.Text ?? "";
}
else if (message is IBytesMessage)
{
IBytesMessage? bytesMsg = message as IBytesMessage;
if (bytesMsg == null)
{
content = $"NULL message received";
}
else
{
content = Encoding.UTF8.GetString(bytesMsg.Content);
}
}
else
{
content = "Unexpected message type: " + message.GetType().Name;
}
logger.LogInformation($"{content}");
}
//Ignore IDosposable code
}
Would like to know how to enable multiple consumers for Apache.NMS.AMQP client. Have tried with multiple sessions for same queue with different consumer - but the listener is only getting called for one consumer per queue.
Below is the sample code. Ignore the connection per queue as I thought that might be the cause, but doesn't work. Given consumer one name -ConsumerName to identify consumer being called.
var queueClientLogger = loggerFactory.CreateLogger<QueueClient>();
var queueClient1 = new QueueClient(queueClientLogger, "Q/test1");
await queueClient1.InitializeAsync();
var queueClient2 = new QueueClient(queueClientLogger, "Q/test1");
await queueClient2.InitializeAsync();
var queueClient3 = new QueueClient(queueClientLogger, "Q/test2");
await queueClient3.InitializeAsync();
-----------------------------------------------
internal class QueueClient : IDisposable
{
private readonly ILogger<QueueClient> logger;
private IMessageConsumer consumer;
private bool disposedValue;
#region constructor
public QueueClient(ILogger<QueueClient> logger, string queueName)
{
this.logger = logger;
QueueName = queueName;
ConsumerName = quot;{QueueName}-{Guid.NewGuid()}";
}
#endregion
#region Properties
internal string? QueueName { get; private set; }
internal string ConsumerName { get; private set; }
internal Apache.NMS.ISession Session { get; private set; }
internal Apache.NMS.IConnection Connection { get; private set; }
#endregion
#region Methods
internal async Task InitializeAsync()
{
string brokerUri = quot;amqp://localhost:5672"; // Default port
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
Connection = await factory.CreateConnectionAsync();
await Connection.StartAsync();
Session = await Connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
Apache.NMS.IDestination dest = await Session.GetQueueAsync(QueueName);
consumer = await Session.CreateConsumerAsync(dest);
consumer.Listener += Consumer_Listener;
}
private void Consumer_Listener(Apache.NMS.IMessage message)
{
logger.LogInformation(quot;{ConsumerName}: Message from queue - {QueueName}");
Thread.Sleep(1000);
string content = string.Empty;
if (message is ITextMessage)
{
ITextMessage? txtMsg = message as ITextMessage;
content = txtMsg?.Text ?? "";
}
else if (message is IBytesMessage)
{
IBytesMessage? bytesMsg = message as IBytesMessage;
if (bytesMsg == null)
{
content = quot;NULL message received";
}
else
{
content = Encoding.UTF8.GetString(bytesMsg.Content);
}
}
else
{
content = "Unexpected message type: " + message.GetType().Name;
}
logger.LogInformation(quot;{content}");
}
//Ignore IDosposable code
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论