Spring 集成入站网关 当队列为空时触发事件

发布于 2024-11-07 18:59:50 字数 603 浏览 4 评论 0原文

我是新手,但我会尽力保持理智。

{INPUT QUEUE}->[INBOUND-GATEWAY-1]-->[ROUTER]----------->(ACTIVATOR)<---------------
                                        \                                          /
                                         \-->{HOLD QUEUE}--->[INBOUND-GATEWAY-2]--^

我遇到的情况是,我必须像前者一样动态更改流程中的路由条件。来自队列的消息被发送到激活器进行处理,或者发送到另一个队列进行搁置。在特定时间,我必须关闭 INBOUND-GATEWAY-1,以便没有新消息进入流中,并打开 INBOUND-GATEWAY-2 以处理来自 HOLD QUEUE 的所有消息。一旦 HOLD QUEUE 中的所有消息都被使用,两个网关都必须像以前一样关闭/打开。这里的问题是我如何知道 HOLD QUEUE 何时为空,以便我可以触发一个可以启动 gateway-1 的方法?

如果有人能帮助我,我将不胜感激。

提前致谢

I'm a newbie around but I'll try to be consice.

{INPUT QUEUE}->[INBOUND-GATEWAY-1]-->[ROUTER]----------->(ACTIVATOR)<---------------
                                        \                                          /
                                         \-->{HOLD QUEUE}--->[INBOUND-GATEWAY-2]--^

I'm having an scenario in which I have to dynamically change routing conditions in a flow like the former. Messages comming from a queue are sent to an activator to be processed, or another queue to be put on hold. At certain time, I have to close INBOUND-GATEWAY-1 so no new messages come into the flow, and open INBOUND-GATEWAY-2 to let all messages from HOLD QUEUE be processed. Once all messages from HOLD QUEUE were been consumed, both gateways must me closed/opened as they were before. The thing here is how could I know when HOLD QUEUE is empty so I could trigger a method in which gateway-1 could be started?

I'd be grateful if somebody could help me.

Thanks in advance

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

风月客 2024-11-14 18:59:51

经过一番调试和阅读,终于找到了解决这个问题的方法。入站网关是一个 JmsMessageDrivenEndpoint,基于两个内部组件:MessageListenerContainer 和 MessageListener。 MessageListenerContainer 是负责调度 MessageListener 行为的人,因此,重写 noMessageReceived 和 messageReceived,并添加一些属性来控制所需的行为,我可以实现“魔法”。

我的 MessageListenerContainer 实现是这样的。

public class ControlMessageListenerContainer extends DefaultMessageListenerContainer{

    private JmsMessageDrivenEndpoint mainInputGateway;

    private long timeOut;

    private long lastTimeReceived;  

    public PassControlMessageListenerContainer() {
        this.setAutoStartup(false);
    }

    @Override
    public void start() throws JmsException {
        /*When the container is started the lastTimeReceived is set to actial time*/
        lastTimeReceived = (new Date()).getTime();
        super.start();
    }

    @Override
    protected void noMessageReceived(Object invoker, Session session) {
        long actualTime = (new Date()).getTime();

        if((actualTime - lastTimeReceived) >= timeOut 
                && mainInputGateway != null && !mainInputGateway.isRunning()){
            mainInputGateway.start();
        }       
        super.noMessageReceived(invoker, session);
    }

    @Override
    protected void messageReceived(Object invoker, Session session) {
        /*lastTimeReceived is set again to actual time at new message arrive*/
        lastTimeReceived = (new Date()).getTime();
        super.messageReceived(invoker, session);
    }
}

最后,spring bean 配置如下:

<bean id="listenerContainer" 
    class="org.merol.ControlMessageListenerContainer">
    <property name="mainInputGateway" ref="mainGateway" />
    <property name="destination" ref="onHoldQueue" />
    <property name="timeOut" value="10000"/>
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

<bean id="messageListener" 
    class="org.springframework.integration.jms.ChannelPublishingJmsMessageListener">
    <property name="requestChannel" ref="outputChannel" />
</bean>

<bean id="inboundGateway" 
    class="org.springframework.integration.jms.JmsMessageDrivenEndpoint">
    <constructor-arg name="listenerContainer" ref="listenerContainer" />
    <constructor-arg name="listener" ref="messageListener" />
</bean>

希望这对其他人有帮助。

感谢@Nicholas 提供的线索。

After some debugging and reading, finally I came to a solution for this issue. An inbound-gateway is a JmsMessageDrivenEndpoint, based in two inner components, a MessageListenerContainer and a MessageListener. MessageListenerContainer is the one in charge at scheduling MessageListener behaviour so, overriding the noMessageReceived and messageReceived, and adding some attributes to control the desired behaviour, I could be able to do the "magic".

My MessageListenerContainer implementation got like this.

public class ControlMessageListenerContainer extends DefaultMessageListenerContainer{

    private JmsMessageDrivenEndpoint mainInputGateway;

    private long timeOut;

    private long lastTimeReceived;  

    public PassControlMessageListenerContainer() {
        this.setAutoStartup(false);
    }

    @Override
    public void start() throws JmsException {
        /*When the container is started the lastTimeReceived is set to actial time*/
        lastTimeReceived = (new Date()).getTime();
        super.start();
    }

    @Override
    protected void noMessageReceived(Object invoker, Session session) {
        long actualTime = (new Date()).getTime();

        if((actualTime - lastTimeReceived) >= timeOut 
                && mainInputGateway != null && !mainInputGateway.isRunning()){
            mainInputGateway.start();
        }       
        super.noMessageReceived(invoker, session);
    }

    @Override
    protected void messageReceived(Object invoker, Session session) {
        /*lastTimeReceived is set again to actual time at new message arrive*/
        lastTimeReceived = (new Date()).getTime();
        super.messageReceived(invoker, session);
    }
}

And finally, the spring bean config get like this:

<bean id="listenerContainer" 
    class="org.merol.ControlMessageListenerContainer">
    <property name="mainInputGateway" ref="mainGateway" />
    <property name="destination" ref="onHoldQueue" />
    <property name="timeOut" value="10000"/>
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

<bean id="messageListener" 
    class="org.springframework.integration.jms.ChannelPublishingJmsMessageListener">
    <property name="requestChannel" ref="outputChannel" />
</bean>

<bean id="inboundGateway" 
    class="org.springframework.integration.jms.JmsMessageDrivenEndpoint">
    <constructor-arg name="listenerContainer" ref="listenerContainer" />
    <constructor-arg name="listener" ref="messageListener" />
</bean>

Hope this could be helpful for someone else.

Thanks to @Nicholas for the clues.

櫻之舞 2024-11-14 18:59:51

我会将此功能放入入站网关处理器中。例如:

Gateway1Processor:

  • start():从主线程启动消费者
    队列和进程。
  • stop():停止消费者。

Gateway2Processor:

  • start():从 HOLD 队列中启动消费者。指定适当的超时。当超时触发时,(HOLD 队列为空)调用 stop()。
  • stop():启动Gateway1Processor并停止该消费者。

因此,操作顺序为:

  1. 启动Gateway1Processor
  2. 某个时间,调用Gateway1Processor.stop()Gateway2Processor.start( )
  3. Gateway2Processor 将耗尽 HOLD 队列,重新启动 Gateway1Processor 然后停止。
  4. 转到#2。

I would put this functionality into the inbound gateway processors. For example:

Gateway1Processor:

  • start(): Start consumer off the main
    queue and process.
  • stop(): Stop consumer.

Gateway2Processor:

  • start(): Start consumer off the HOLD queue. Specify an appropriate timeout. When timeout is fired, (the HOLD queue is empty) call stop().
  • stop(): Start Gateway1Processor and stop this consumer.

Therefore, the operating sequence would be:

  1. Start Gateway1Processor
  2. At a certain time, call Gateway1Processor.stop() and Gateway2Processor.start()
  3. Gateway2Processor will drain the HOLD queue, restart Gateway1Processor and then stop.
  4. Go To #2.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文