在Spring应用程序中使用线程池增加消息的兔消耗
A AM :使用弹簧应用程序中的线程池(甚至任何其他推荐的方式)在消费者端增加消息的消耗
AIM :使用RabbitMQ作为消息Broker Scenario :I I :i有一个场景,我们的应用程序需要约15秒的时间来消耗1条消息,而我们每秒以简单的方式产生1条消息。因此,到15秒后消耗第一消息时。我们还有14条消息,依此类推。
有什么方法可以通过“增加消费者的消费量”来减少消费者和生产者方面的差距?
现有的理解:
我试图增加线程池,以使每个消费者都有15个线程。这将确保
00:00:01 - 1st msg, picked by thread 1
00:00:02 - 2nd msg, picked by thread 2
00:00:03 - 3rd msg, picked by thread 3
00:00:04 - 4th msg, picked by thread 4
..... and soon
00:00:15 - 15th msg, picked by thread 15
00:00:16 - 16th msg, picked by thread 1 (processing of 1st msg done after 15 seconds)
00:00:17 - 17th msg, picked by thread 2 (processing of 2st msg done after 15 seconds)
..... and soon
现有的实施:
val factory = new SimpleRabbitListenerContainerFactory();
factory.setTaskExecutor(Executors.newFixedThreadPool(15));
有了上述理解,我实施了上面的实施,但没有看到消费者最终消费率的显着提高。 我发现在消费者端的消费率独立于线程池
在实现之上正确还是丢失了?还有其他解决这个问题的方法吗?
AIM: Increase the consumption of a message at the Consumer End using Thread Pool in Spring Application (or even any other recommended way if possible) with Rabbitmq as message broker
Scenario: I have a scenario in which our application takes around ~15 seconds for consumption of 1 message whereas we producing 1 message per second, in simple way. So, by the time 1st message is consumed after 15 seconds. We have 14 more messages and so on.
Is there any way to reduce this gap on the consumer and producer side by "increasing Consumption at consumer end"?
Existing Understanding:
I tried to increase Thread Pool so that each consumer has 15 threads. This will ensure
00:00:01 - 1st msg, picked by thread 1
00:00:02 - 2nd msg, picked by thread 2
00:00:03 - 3rd msg, picked by thread 3
00:00:04 - 4th msg, picked by thread 4
..... and soon
00:00:15 - 15th msg, picked by thread 15
00:00:16 - 16th msg, picked by thread 1 (processing of 1st msg done after 15 seconds)
00:00:17 - 17th msg, picked by thread 2 (processing of 2st msg done after 15 seconds)
..... and soon
Existing Implementation:
val factory = new SimpleRabbitListenerContainerFactory();
factory.setTaskExecutor(Executors.newFixedThreadPool(15));
With the above understanding, I implemented above implementation, but don't see any significant improvement on the consumption rate at Consumer end. I found consumption rate at consumer end independent to Thread Pool
Is above implementation correct or missing something? Are there any other ways to solve this issue?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我们可以通过使用和增加线程池尺寸来提高消费者端的消耗率。让我们首先理解很少的术语,
1)最小并发消费者是每个听众的最小同时消费者。
2)Max并发消费者是每个听众的最大消费者。
3)螺纹池尺寸是每个连接总线/通道的大小。根据RabbitMQ,我们可以每连接最多创建2047频道/线程/消费者。该数字在该连接上的所有侦听器中共享。
注意:队列中的消费者也被称为渠道,为简单起见,我们可以考虑消费者和通道相似。
我们可以在上面的代码段中设置上述字段。
在上面的片段中,我们将线程池大小设置为15。最小和最大并发消费者的大小为1和15。根据队列上的流量,引擎盖下的应用程序自动缩放和最小消费者之间的尺度(1)和deScales(1) Max并发消费者(15),可以通过线程池容纳,因为它的尺寸仍然较小且等于其尺寸。
b/w线池大小和最大并发消费者有什么区别
如果您有多个侦听器,那么此值可能会有所不同,添加了一个很少的场景,
我们如何更改特定听众的最小消费者大小?
,并添加了上述代码,当没有流量时,您将始终至少有5名消费者。使用以前的代码段,当流量较低时,它将是1个消费者。
We can increase the Consumption Rate at Consumer end by using and increasing Thread Pool Size. Let us understand few terms first,
1) Min Concurrent Consumer is minimum concurrent consumer per listener.
2) Max Concurrent Consumer is max concurrent consumer per listener.
3) Thread Pool Size is the size of total threads/channel per connection. As per rabbitmq we can create maximum of 2047 channel/thread/consumer per connection. This number is shared across all listener on that connection.
Note: Consumer in a Queue is also known as Channel and for simplicity we can consider consumer and channel similar.
We can set above fields like done in above code snippet.
In the above snippet, we set Thread Pool Size to 15. Minimum and Maximum Concurrent Consumer size is 1 and 15. Based on the traffic on the Queue, application under the hood automatically scales and de-scales between Min Concurrent Consumer (1) and Max Concurrent Consumer (15), which can be accommodated by the Thread Pool as it's still less and equal to its size.
What's the difference b/w Thread Pool Size and Max Concurrent Consumer
In case when you have multiple Listener, then this value may differ, added a table with few scenarios,
How can we change Minimum Concurrent Consumer size for a particular Listener?
With addition of above code, you will have atleast 5 consumers always when there is no traffic. With previous code snippet, it would be 1 consumer when the traffic is low.