如何控制 Spring 从队列接收的速率?
我正在使用 Spring 的消息驱动 POJO 框架(尤其是 DefaultMessageListenerContainer)来监听多个队列和主题。
对于一个特定队列的情况,需要减慢队列的排空速度,大约每五分钟一条消息。消息的实际处理是亚秒级的操作,但我希望侦听器在消息之间闲置一段时间。
我创建了一些 hack,但它绝对不是最佳的:我所做的是将最大并发数设置为 1,并在处理每个之后添加一个 Thread.sleep(..)
信息。我想找到一种方法来使用 DefaultMessageListenerContainer
在尝试接收之间等待,而不是让处理程序在可能的消息处理期间进行等待。
我曾考虑过是否有一个 ScheduledExecutor 可以提供帮助,但我意识到需要在生成任务的地方进行限制。我是否可以重写 DefaultMessageListenerContainer
中的某些方法来完成我所追求的目标?
I am using Spring's message-driven POJO framework (and DefaultMessageListenerContainer
in particular) to listen to several queues and topics.
In the case of one particularly queue, there is a need to slow the rate at which I drain the queue, on the order of one message every five minutes. The actual processing of the messages is a sub-second operation, but I would like the listener to sit idle for some time in between messages.
I have created a bit of a hack, but it is decidedly sub-optimal: What I've done is to set the max concurrency to 1 and add a Thread.sleep(..)
after processing each message. I would like to find a way instead to use the DefaultMessageListenerContainer
to wait between attempts to receive, rather than causing the handler to do the waiting during the would-be processing of a message.
I had considered if there was a ScheduledExecutor
that would help, but I realize that the throttling would need to be done where the tasks are produced. Is there perhaps some method from DefaultMessageListenerContainer
that I could override to accomplish what I'm after?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
根据队列的提供者,您也许可以为使用其队列的消费者设置最大速率。
例如,在 hornetQ 中,您可以使用消费者最大速率在连接工厂中进行设置。
Depending on the provider of the queue, you may be able to set a max rate for consumers that consume it's queues.
For example in hornetQ you set this in the connection factory using consumer-max-rate.
修改消费者行为的另一种方法是使用 Apache Camel 来延迟该特定队列上的消息。
http://camel.apache.org/delayer.html 描述了 Camel Delayer 模式的功能。例如:
您将在其中使用 DELAYEDQUEUE,并且所有消息都会延迟 1 秒。
An alternative to modifying the behavior of your consumer would be to make use of Apache Camel to delay the messages on that one specific queue.
http://camel.apache.org/delayer.html describes the functionality of the Camel Delayer pattern. So for example:
Where you would then consume the DELAYEDQUEUE and all messages would be delayed by 1 second.
这是一个扩展
DefaultMessageListenerContainer
以提供限制功能的解决方案。这种方法的优点是Thread.sleep()
不会在onMessage()
内调用。如果交易正在进行(如下例中配置),这将使交易保持打开状态的时间超过必要的时间。对 Thread.sleep() 的调用发生在事务提交之后。实现此限制功能的一个限制是我们只能支持一个消费者线程,因此名称为 ThrottlingSingleConsumerMessageListenerContainer。这已经在 org.springframework 5.x 上进行了测试,但也应该在早期版本上运行。
Here's a solution that extends
DefaultMessageListenerContainer
to provide the throttling functionality. The advantage of this approach is thatThread.sleep()
is not being called withinonMessage()
. This would hold a Transaction open for longer than necessary if a Transaction is in play (as configured in this example below). The call to Thread.sleep() occurs after the transaction has been committed. A limitation to implementing this throttling feature is that we can only support one consumer thread, hence the nameThrottlingSingleConsumerMessageListenerContainer
.This has been tested on org.springframework 5.x but should run on earlier versions also.