RabbitMQ - 无法以相同顺序获取消息集

发布于 2024-09-05 04:40:22 字数 1111 浏览 3 评论 0原文

我在 C# 中使用 Rabbit MQ。这是我的场景

  1. 一个单独的进程将消息发布到队列
  2. 客户端必须从队列中读取 N 条消息
  3. 处理 N 条消息
  4. 确认 N 条消息

在同一通道下,我接收消息,然后处理它们,然后确认它们。服务器进程不断发布消息。我面临的问题是,当我尝试获取下一组消息时,它们的顺序与发布过程发布的顺序不同。消息以随机顺序出现。只有第一组消息按正确的顺序出现。

有人知道这里出了什么问题吗?创建一个新通道来访问下一组消息是不是不对?以下是示例代码:

while (true)
         {
             using (IModel getChannel = MQConnection.CreateModel())
             {
                 // Create a consumer
                 QueueingBasicConsumer consumer = CreateQueueConsumer(getChannel, exchangeName, queueName);

                 int numberOfMessages = 100;
                 // Next Recieve
                 List<object> msgSet = GetNextSetOfMessages(consumer, getChannel, exchangeName, queueName, numberOfMessages, out finalDeliverytag);


                 // Do some processing

                 if (finalDeliverytag > 0)
                     AckFinishedMessages(exchangeName, queueName, finalDeliverytag, getChannel);

                 if (finalDeliverytag == 0)
                     break;

             }
         }

请帮忙。谢谢!

I am using Rabbit MQ in C#. This is my scenario

  1. A separate process publishes messages to the queue
  2. Client has to read set of N messages from queue
  3. Process the N messages
  4. Acknowledge the N messages

Under the same channel, I receive the messages and then process them and then acknowledge them. The server process keeps publishing messages. The problem I am facing is, when I try to get next set of messages, they do not come in the same order as it was published by the publishing process. The messages come in a random order. Only the first set of messages come in the correct order.

Does any one what is going wrong here? Is creating a new channel to access the next set of messages not right? Below is the sample code:

while (true)
         {
             using (IModel getChannel = MQConnection.CreateModel())
             {
                 // Create a consumer
                 QueueingBasicConsumer consumer = CreateQueueConsumer(getChannel, exchangeName, queueName);

                 int numberOfMessages = 100;
                 // Next Recieve
                 List<object> msgSet = GetNextSetOfMessages(consumer, getChannel, exchangeName, queueName, numberOfMessages, out finalDeliverytag);


                 // Do some processing

                 if (finalDeliverytag > 0)
                     AckFinishedMessages(exchangeName, queueName, finalDeliverytag, getChannel);

                 if (finalDeliverytag == 0)
                     break;

             }
         }

Kindly help. Thanks!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文