RabbitMQ 和 C#

发布于 2024-12-16 13:26:01 字数 817 浏览 3 评论 0原文

RabbitMQ 有一种类似于 MSSMQ 的使用方法,可以从队列中弹出 1000 条消息,然后将数据插入数据库并从那里继续。

我似乎无法通过对通道的订阅,然后对订阅中的 BasicDeliveryEventArgs 执行 foreach 操作,并使用我想要在给定时间处理的最大消息数执行 If 语句来做到这一点。

提前致谢 然而,这仍然从队列中取出所有 22k 消息

using (IConnection connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        channel.QueueDeclare("****", true, false, false, null);

        var subscription = new Subscription(channel, "****", false);
        int maxMessages = 5;
        int i = 0;
        foreach (BasicDeliverEventArgs eventArgs in subscription)
        {
            if (++i == maxMessages)
            {
                Console.WriteLine("Took 5 messages");
                subscription.Ack(eventArgs);
                break;
            }
        }
    }
}

With RabbitMQ is there a way to use it similar to MSSMQ where one can pop 1000 messages from the queue, then do your inserts to the database and continue from there.

I cannot seem to do that with a Subscription to a channel and then doing a foreach over the BasicDeliveryEventArgs in the Subscription, with that doing a If statement with the max message count I want to process at the given time.

Thanks in advance
This however still takes all 22k messages from the queue

using (IConnection connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        channel.QueueDeclare("****", true, false, false, null);

        var subscription = new Subscription(channel, "****", false);
        int maxMessages = 5;
        int i = 0;
        foreach (BasicDeliverEventArgs eventArgs in subscription)
        {
            if (++i == maxMessages)
            {
                Console.WriteLine("Took 5 messages");
                subscription.Ack(eventArgs);
                break;
            }
        }
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

来世叙缘 2024-12-23 13:26:01

我假设您希望通过将消息组批处理成更大的事务来优化消息到数据库的加载,而不是消耗每条消息的事务成本。强制性警告是,这样做意味着大量消息可能会同时失败,即使只有其中一条消息导致问题,您也可以按照以下方法进行操作...

在通道上设置 QOS:

channel.BasicQos(0, 1000, false);

这将预取 1000 条消息并阻止进一步的流量,直到您确认某些内容。请注意,它不会以 1000 条为单位进行提取。相反,它会确保每次最多预提取 1000 条未确认的消息。模拟块传输非常简单,只需首先处理 1000 条消息,然后一次性 ACK 全部消息即可。

请参阅此处这里比我的更权威的解释。

还有一点:您可能希望在消息可用时立即刷新队列,即使您尚未达到 1000 条消息的配额。您应该能够通过在 foreach 循环内调用 queue.BasicGet() 直到它耗尽,然后交付您拥有的任何内容(包括您拉出的消息)来完成此操作订阅)到数据库。警告:我自己还没有尝试过,所以我可能在胡说八道,但我认为它会起作用。此方法的优点在于它可以立即将消息推送到数据库中,而无需等待整批 1000 条消息。如果数据库因处理太多小事务而落后,则预取积压将在每个周期之间填满更多。

I'm assuming that you want to optimise loading of messages into the database by batching up groups of them into larger transactions rather than wearing the cost of a transaction per message. With the obligatory warning that doing so means large groups of messages can fail together, even if only one of them causes a problem, here's how you'd go about it...

Set QOS on the channel:

channel.BasicQos(0, 1000, false);

This will pre-fetch 1000 messages and block further traffic until you ACK something. Note that it doesn't fetch in blocks of 1000. Rather, it ensures that a maximum of 1000 UNACK'ed messages are pre-fetched at any one time. Simulating block transfers is as simple as processing the 1000 messages first, and then ACK'ing them all in one go.

See here and here for a more authoritative explanation than mine.

One more point: You may want to flush the queue as soon as messages are available, even if you haven't made your quota of 1000 messages. You should be able to do this by calling queue.BasicGet() inside the foreach loop until it runs dry, and then delivering whatever you have (including the message you pulled out of subscription) to the database. Caveat: I haven't tried this myself, so I could be talking rubbish, but I think it'll work. The beauty of this method is that it pushes messages into the database immediately, without having to wait for a full batch of 1000 messages. If the database falls behind from handling too many small transactions, the prefetch backlog will simply fill up more between each cycle.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文