如何为apache.nms.amqp客户端启用多个消费者?

发布于 2025-02-06 16:06:12 字数 2735 浏览 1 评论 0原文

想知道如何为apache.nms.amqp客户端启用多个消费者。已经尝试了多个会话与不同的消费者进行同一队列 - 但听众只要求每个队列一个消费者。 以下是示例代码。忽略我认为这可能是原因但行不通的,忽略每队列的连接。给定消费者一个名称-Consumername以识别被调用的消费者。

var queueClientLogger = loggerFactory.CreateLogger<QueueClient>();
var queueClient1 = new QueueClient(queueClientLogger, "Q/test1");
await queueClient1.InitializeAsync();

var queueClient2 = new QueueClient(queueClientLogger, "Q/test1");
await queueClient2.InitializeAsync();

var queueClient3 = new QueueClient(queueClientLogger, "Q/test2");
await queueClient3.InitializeAsync();

-----------------------------------------------
internal class QueueClient : IDisposable
{
  private readonly ILogger<QueueClient> logger;
  private IMessageConsumer consumer;
  private bool disposedValue;

  #region constructor

  public QueueClient(ILogger<QueueClient> logger, string queueName)
  {
      this.logger = logger;
      QueueName = queueName;
      ConsumerName = $"{QueueName}-{Guid.NewGuid()}";
  }

  #endregion

  #region Properties

  internal string? QueueName { get; private set; }
  internal string ConsumerName { get; private set; }
  internal Apache.NMS.ISession Session { get; private set; }
  internal Apache.NMS.IConnection Connection { get; private set; }


  #endregion

  #region Methods

  internal async Task InitializeAsync()
  {
      string brokerUri = $"amqp://localhost:5672";  // Default port
      NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
      Connection = await factory.CreateConnectionAsync();
      await Connection.StartAsync();
      Session = await Connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
      Apache.NMS.IDestination dest = await Session.GetQueueAsync(QueueName);
      consumer = await Session.CreateConsumerAsync(dest);
      consumer.Listener += Consumer_Listener;
  }

  private void Consumer_Listener(Apache.NMS.IMessage message)
  {
      logger.LogInformation($"{ConsumerName}: Message from queue - {QueueName}");            
      Thread.Sleep(1000);
      string content = string.Empty;
      if (message is ITextMessage)
      {
          ITextMessage? txtMsg = message as ITextMessage;
          content = txtMsg?.Text ?? "";
      }
      else if (message is IBytesMessage)
      {
          IBytesMessage? bytesMsg = message as IBytesMessage;
          if (bytesMsg == null)
          {
              content = $"NULL message received";
          }
          else
          {
              content = Encoding.UTF8.GetString(bytesMsg.Content);
          }
      }
      else
      {
          content = "Unexpected message type: " + message.GetType().Name;
      }
      logger.LogInformation($"{content}");
  }
  
  //Ignore IDosposable code
}

Would like to know how to enable multiple consumers for Apache.NMS.AMQP client. Have tried with multiple sessions for same queue with different consumer - but the listener is only getting called for one consumer per queue.
Below is the sample code. Ignore the connection per queue as I thought that might be the cause, but doesn't work. Given consumer one name -ConsumerName to identify consumer being called.

var queueClientLogger = loggerFactory.CreateLogger<QueueClient>();
var queueClient1 = new QueueClient(queueClientLogger, "Q/test1");
await queueClient1.InitializeAsync();

var queueClient2 = new QueueClient(queueClientLogger, "Q/test1");
await queueClient2.InitializeAsync();

var queueClient3 = new QueueClient(queueClientLogger, "Q/test2");
await queueClient3.InitializeAsync();

-----------------------------------------------
internal class QueueClient : IDisposable
{
  private readonly ILogger<QueueClient> logger;
  private IMessageConsumer consumer;
  private bool disposedValue;

  #region constructor

  public QueueClient(ILogger<QueueClient> logger, string queueName)
  {
      this.logger = logger;
      QueueName = queueName;
      ConsumerName = 
quot;{QueueName}-{Guid.NewGuid()}";
  }

  #endregion

  #region Properties

  internal string? QueueName { get; private set; }
  internal string ConsumerName { get; private set; }
  internal Apache.NMS.ISession Session { get; private set; }
  internal Apache.NMS.IConnection Connection { get; private set; }


  #endregion

  #region Methods

  internal async Task InitializeAsync()
  {
      string brokerUri = 
quot;amqp://localhost:5672";  // Default port
      NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
      Connection = await factory.CreateConnectionAsync();
      await Connection.StartAsync();
      Session = await Connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
      Apache.NMS.IDestination dest = await Session.GetQueueAsync(QueueName);
      consumer = await Session.CreateConsumerAsync(dest);
      consumer.Listener += Consumer_Listener;
  }

  private void Consumer_Listener(Apache.NMS.IMessage message)
  {
      logger.LogInformation(
quot;{ConsumerName}: Message from queue - {QueueName}");            
      Thread.Sleep(1000);
      string content = string.Empty;
      if (message is ITextMessage)
      {
          ITextMessage? txtMsg = message as ITextMessage;
          content = txtMsg?.Text ?? "";
      }
      else if (message is IBytesMessage)
      {
          IBytesMessage? bytesMsg = message as IBytesMessage;
          if (bytesMsg == null)
          {
              content = 
quot;NULL message received";
          }
          else
          {
              content = Encoding.UTF8.GetString(bytesMsg.Content);
          }
      }
      else
      {
          content = "Unexpected message type: " + message.GetType().Name;
      }
      logger.LogInformation(
quot;{content}");
  }
  
  //Ignore IDosposable code
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文