如何控制 Spring 从队列接收的速率?

发布于 2024-12-27 20:31:36 字数 477 浏览 0 评论 0原文

我正在使用 Spring 的消息驱动 POJO 框架(尤其是 DefaultMessageListenerContainer)来监听多个队列和主题。

对于一个特定队列的情况,需要减慢队列的排空速度,大约每五分钟一条消息。消息的实际处理是亚秒级的操作,但我希望侦听器在消息之间闲置一段时间。

我创建了一些 hack,但它绝对不是最佳的:我所做的是将最大并发数设置为 1,并在处理每个之后添加一个 Thread.sleep(..)信息。我想找到一种方法来使用 DefaultMessageListenerContainer 在尝试接收之间等待,而不是让处理程序在可能的消息处理期间进行等待。

我曾考虑过是否有一个 ScheduledExecutor 可以提供帮助,但我意识到需要在生成任务的地方进行限制。我是否可以重写 DefaultMessageListenerContainer 中的某些方法来完成我所追求的目标?

I am using Spring's message-driven POJO framework (and DefaultMessageListenerContainer in particular) to listen to several queues and topics.

In the case of one particularly queue, there is a need to slow the rate at which I drain the queue, on the order of one message every five minutes. The actual processing of the messages is a sub-second operation, but I would like the listener to sit idle for some time in between messages.

I have created a bit of a hack, but it is decidedly sub-optimal: What I've done is to set the max concurrency to 1 and add a Thread.sleep(..) after processing each message. I would like to find a way instead to use the DefaultMessageListenerContainer to wait between attempts to receive, rather than causing the handler to do the waiting during the would-be processing of a message.

I had considered if there was a ScheduledExecutor that would help, but I realize that the throttling would need to be done where the tasks are produced. Is there perhaps some method from DefaultMessageListenerContainer that I could override to accomplish what I'm after?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

疧_╮線 2025-01-03 20:31:36

根据队列的提供者,您也许可以为使用其队列的消费者设置最大速率。

例如,在 hornetQ 中,您可以使用消费者最大速率在连接工厂中进行设置。

Depending on the provider of the queue, you may be able to set a max rate for consumers that consume it's queues.

For example in hornetQ you set this in the connection factory using consumer-max-rate.

白色秋天 2025-01-03 20:31:36

修改消费者行为的另一种方法是使用 Apache Camel 来延迟该特定队列上的消息。

http://camel.apache.org/delayer.html 描述了 Camel Delayer 模式的功能。例如:

<route>
    <from uri="jms:YOURQUEUE"/>
    <delay>
        <constant>1000</constant>
    </delay>
    <to uri="jms:DELAYEDQUEUE"/>
</route>

您将在其中使用 DELAYEDQUEUE,并且所有消息都会延迟 1 秒。

An alternative to modifying the behavior of your consumer would be to make use of Apache Camel to delay the messages on that one specific queue.

http://camel.apache.org/delayer.html describes the functionality of the Camel Delayer pattern. So for example:

<route>
    <from uri="jms:YOURQUEUE"/>
    <delay>
        <constant>1000</constant>
    </delay>
    <to uri="jms:DELAYEDQUEUE"/>
</route>

Where you would then consume the DELAYEDQUEUE and all messages would be delayed by 1 second.

梦年海沫深 2025-01-03 20:31:36

这是一个扩展 DefaultMessageListenerContainer 以提供限制功能的解决方案。这种方法的优点是 Thread.sleep() 不会在 onMessage() 内调用。如果交易正在进行(如下例中配置),这将使交易保持打开状态的时间超过必要的时间。对 Thread.sleep() 的调用发生在事务提交之后。实现此限制功能的一个限制是我们只能支持一个消费者线程,因此名称为 ThrottlingSingleConsumerMessageListenerContainer。

@Configuration
@EnableJms
@EnableTransactionManagement
public class Config
{
    private static final long THROTTLE_FIVE_SECONDS = 5_000;

    @Bean
    public DefaultMessageListenerContainer defaultMessageListenerContainer(
            ConnectionFactory connectionFactory,
            PlatformTransactionManager transactionManager,
            MyJmsListener myJmsListner)
    {
        DefaultMessageListenerContainer dmlc = new ThrottlingSingleConsumerMessageListenerContainer(THROTTLE_FIVE_SECONDS);
        dmlc.setConnectionFactory(connectionFactory);
        dmlc.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        dmlc.setSessionTransacted(true);
        dmlc.setTransactionManager(transactionManager);
        dmlc.setDestinationName("QUEUE.IN");
        dmlc.setMessageListener(myJmsListner);
        return dmlc;
    }
}

@Component
public class MyJmsListener implements MessageListener
{
    @Override
    public void onMessage(Message message)
    {
        // process the message
    }
}

public class ThrottlingSingleConsumerMessageListenerContainer extends DefaultMessageListenerContainer
{
    private static final Logger log = LoggerFactory.getLogger(ThrottlingSingleConsumerMessageListenerContainer.class);

    private final long delayMillis;

    public ThrottlingSingleConsumerMessageListenerContainer(long delayMillis)
    {
        this.delayMillis = delayMillis;
        super.setMaxConcurrentConsumers(1);
    }

    @Override
    protected boolean receiveAndExecute(Object invoker, @Nullable Session session, @Nullable MessageConsumer consumer) throws JMSException
    {
        boolean messageReceived = super.receiveAndExecute(invoker, session, consumer);

        if (messageReceived) {
            log.info("Sleeping for {} millis", delayMillis);
            try {
                Thread.sleep(delayMillis);
            } catch (InterruptedException e) {
                log.warn("Sleeping thread has been interrupted");
                Thread.currentThread().interrupt();
            }
        }

        return messageReceived;
    }

    @Override
    public void setMaxConcurrentConsumers(int maxConcurrentConsumers)
    {
        super.setMaxConcurrentConsumers(maxConcurrentConsumers);
        Assert.isTrue(getMaxConcurrentConsumers() <= 1,  "Throttling does not support maxConcurrentConsumers > 1");
    }

    @Override
    public void setConcurrency(String concurrency)
    {
        super.setConcurrency(concurrency);
        Assert.isTrue(getMaxConcurrentConsumers() <= 1,  "Throttling does not support maxConcurrentConsumers > 1");
    }
}

这已经在 org.springframework 5.x 上进行了测试,但也应该在早期版本上运行。

Here's a solution that extends DefaultMessageListenerContainer to provide the throttling functionality. The advantage of this approach is that Thread.sleep() is not being called within onMessage(). This would hold a Transaction open for longer than necessary if a Transaction is in play (as configured in this example below). The call to Thread.sleep() occurs after the transaction has been committed. A limitation to implementing this throttling feature is that we can only support one consumer thread, hence the name ThrottlingSingleConsumerMessageListenerContainer.

@Configuration
@EnableJms
@EnableTransactionManagement
public class Config
{
    private static final long THROTTLE_FIVE_SECONDS = 5_000;

    @Bean
    public DefaultMessageListenerContainer defaultMessageListenerContainer(
            ConnectionFactory connectionFactory,
            PlatformTransactionManager transactionManager,
            MyJmsListener myJmsListner)
    {
        DefaultMessageListenerContainer dmlc = new ThrottlingSingleConsumerMessageListenerContainer(THROTTLE_FIVE_SECONDS);
        dmlc.setConnectionFactory(connectionFactory);
        dmlc.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        dmlc.setSessionTransacted(true);
        dmlc.setTransactionManager(transactionManager);
        dmlc.setDestinationName("QUEUE.IN");
        dmlc.setMessageListener(myJmsListner);
        return dmlc;
    }
}

@Component
public class MyJmsListener implements MessageListener
{
    @Override
    public void onMessage(Message message)
    {
        // process the message
    }
}

public class ThrottlingSingleConsumerMessageListenerContainer extends DefaultMessageListenerContainer
{
    private static final Logger log = LoggerFactory.getLogger(ThrottlingSingleConsumerMessageListenerContainer.class);

    private final long delayMillis;

    public ThrottlingSingleConsumerMessageListenerContainer(long delayMillis)
    {
        this.delayMillis = delayMillis;
        super.setMaxConcurrentConsumers(1);
    }

    @Override
    protected boolean receiveAndExecute(Object invoker, @Nullable Session session, @Nullable MessageConsumer consumer) throws JMSException
    {
        boolean messageReceived = super.receiveAndExecute(invoker, session, consumer);

        if (messageReceived) {
            log.info("Sleeping for {} millis", delayMillis);
            try {
                Thread.sleep(delayMillis);
            } catch (InterruptedException e) {
                log.warn("Sleeping thread has been interrupted");
                Thread.currentThread().interrupt();
            }
        }

        return messageReceived;
    }

    @Override
    public void setMaxConcurrentConsumers(int maxConcurrentConsumers)
    {
        super.setMaxConcurrentConsumers(maxConcurrentConsumers);
        Assert.isTrue(getMaxConcurrentConsumers() <= 1,  "Throttling does not support maxConcurrentConsumers > 1");
    }

    @Override
    public void setConcurrency(String concurrency)
    {
        super.setConcurrency(concurrency);
        Assert.isTrue(getMaxConcurrentConsumers() <= 1,  "Throttling does not support maxConcurrentConsumers > 1");
    }
}

This has been tested on org.springframework 5.x but should run on earlier versions also.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文