JMS与Spring Integration,消除对 ActiveMQ 实例的依赖

发布于 2024-10-12 22:08:46 字数 5297 浏览 2 评论 0原文

我最近将一些日志记录设置为 AcitveMQ 作为即发即忘服务,以便应用程序可以仅向“ActivityLoggingChannel”发送消息,而不必处理日志记录的横切问题。

所有内容都发送到 ActivityLoggingGateway,它只是一个具有默认通道的接口。然后查询 Pojo(动态路由器)的通道名称以获取消息端点。动态路由器有一个 JMX 入口点,它允许我动态切换端点。如果消息端点设置为jmsChannelSender,并且无法解析ActiveMQ url,则会导致整个系统崩溃。

我遇到的问题是,如果无法访问 ActiveMQ URL,我希望系统恢复到使用简单的多线程进程中方法的不同消息通道。

下面是 spring 集成配置。使用版本 2.0.0.RELEASE

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:jms="http://www.springframework.org/schema/integration/jms"
    xmlns:int-jmx="http://www.springframework.org/schema/integration/jmx"
    xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
        http://www.springframework.org/schema/integration/jmx http://www.springframework.org/schema/integration/jmx/spring-integration-jmx-2.0.xsd
        http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd
        http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-2.0.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">

    <import resource="dm-activitylogging-services.xml"/>

    <bean name="decisionActivityLoggingAspect" class="com.idna.dm.aspects.logging.activity.DecisionActivityLogAspect" 
        factory-method="aspectOf">
        <property name="activityLoggingGateway">
            <ref bean="activityLoggingGateway" />
        </property>
    </bean>

<!-- New Activity Logging Services Reference dm-activity-logging -->    
<!-- JMS Channel Adapter -->
    <int:channel id="jmsSenderChannel" />

    <bean id="destinationLocalQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="${activemq.queuename.activitylogging}" />
    </bean>

    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${activemq.url}" />
    </bean>

    <bean id="activityLogConverter" class="com.idna.dm.domain.activitylogging.jms.ActivityLogConverter" />
    <jms:outbound-channel-adapter channel="jmsSenderChannel"  destination="destinationLocalQueue" connection-factory="connectionFactory" message-converter="activityLogConverter"/>

<!--  In Process Adapter -->

    <int:channel id="inProcessChannel" />   
    <int:outbound-channel-adapter channel="inProcessChannel" ref="inProcessAdapter" method="persistLog" />
    <bean id="inProcessAdapter" class="com.idna.dm.logging.activity.impl.InProcessActivityLoggingImpl" >
        <property name="activityLoggingService" >
            <ref bean="activityLogging" />
        </property>
    </bean>

    <int:channel id="asyncInProcessChannel" />
    <int:outbound-channel-adapter channel="asyncInProcessChannel" ref="asyncInProcessAdapter" method="persistLog" />
    <bean id="asyncInProcessAdapter" class="com.idna.dm.logging.activity.impl.AsyncInProcessActivityLoggingImpl" >
        <property name="activityLoggingService">
            <ref bean="activityLogging" />
        </property>
    </bean>

<!-- Custom channel for console output using the router -->

    <!-- Console Channel 
    <int:channel id="consoleAdapterChannel" />
    <int:outbound-channel-adapter channel="consoleAdapterChannel" ref="consoleAdapter" method="printToStdout" />
    <bean id="consoleAdapter" class="com.idna.dm.logging.activity.util.StdoutTargetAdapter" />
     -->

    <!-- Log4j Channel -->
    <int:channel id="loggingChannel" />
    <int:logging-channel-adapter auto-startup="true" level="INFO" log-full-message="true" channel="loggingChannel" />

<!-- Router -->
    <int:gateway id="activityLoggingGateway" 
        service-interface="com.idna.dm.logging.activity.logger.ActivityLoggingGateway" />


    <int:channel id="activityLoggingChannel" />

    <int:router input-channel="activityLoggingChannel" ref="dynamicRouter"
        method="route" default-output-channel="asyncInProcessChannel"
        ignore-channel-name-resolution-failures="true" />

    <bean id="dynamicRouter" class="com.idna.dm.logging.activity.router.DynamicRouter">
        <constructor-arg index="0" value="asyncInProcessChannel" />
    </bean>

I recently set up some logging to AcitveMQ as a fire and forget service so that the application can just send a message to an "ActivityLoggingChannel" and not have to deal with the cross cutting concerns of logging.

Everything is sent to the ActivityLoggingGateway which is just an interface with a default channel. This then queries a Pojo (dyanmic router) for a channel name to get the message end point. There is a JMX entry point to the dynamic router which allows me to switch end points on the fly. If the message end point is set to jmsChannelSender and it cannot resolve the ActiveMQ url, it will cause the whole system to fall over.

The problem I'm having is that if the ActiveMQ URL is not reachable I would like the system to revert to a different message channel which uses a simple multi-threaded in process approach.

Here is the spring integration configuration below. Using version 2.0.0.RELEASE

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:jms="http://www.springframework.org/schema/integration/jms"
    xmlns:int-jmx="http://www.springframework.org/schema/integration/jmx"
    xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
        http://www.springframework.org/schema/integration/jmx http://www.springframework.org/schema/integration/jmx/spring-integration-jmx-2.0.xsd
        http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd
        http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-2.0.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">

    <import resource="dm-activitylogging-services.xml"/>

    <bean name="decisionActivityLoggingAspect" class="com.idna.dm.aspects.logging.activity.DecisionActivityLogAspect" 
        factory-method="aspectOf">
        <property name="activityLoggingGateway">
            <ref bean="activityLoggingGateway" />
        </property>
    </bean>

<!-- New Activity Logging Services Reference dm-activity-logging -->    
<!-- JMS Channel Adapter -->
    <int:channel id="jmsSenderChannel" />

    <bean id="destinationLocalQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="${activemq.queuename.activitylogging}" />
    </bean>

    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${activemq.url}" />
    </bean>

    <bean id="activityLogConverter" class="com.idna.dm.domain.activitylogging.jms.ActivityLogConverter" />
    <jms:outbound-channel-adapter channel="jmsSenderChannel"  destination="destinationLocalQueue" connection-factory="connectionFactory" message-converter="activityLogConverter"/>

<!--  In Process Adapter -->

    <int:channel id="inProcessChannel" />   
    <int:outbound-channel-adapter channel="inProcessChannel" ref="inProcessAdapter" method="persistLog" />
    <bean id="inProcessAdapter" class="com.idna.dm.logging.activity.impl.InProcessActivityLoggingImpl" >
        <property name="activityLoggingService" >
            <ref bean="activityLogging" />
        </property>
    </bean>

    <int:channel id="asyncInProcessChannel" />
    <int:outbound-channel-adapter channel="asyncInProcessChannel" ref="asyncInProcessAdapter" method="persistLog" />
    <bean id="asyncInProcessAdapter" class="com.idna.dm.logging.activity.impl.AsyncInProcessActivityLoggingImpl" >
        <property name="activityLoggingService">
            <ref bean="activityLogging" />
        </property>
    </bean>

<!-- Custom channel for console output using the router -->

    <!-- Console Channel 
    <int:channel id="consoleAdapterChannel" />
    <int:outbound-channel-adapter channel="consoleAdapterChannel" ref="consoleAdapter" method="printToStdout" />
    <bean id="consoleAdapter" class="com.idna.dm.logging.activity.util.StdoutTargetAdapter" />
     -->

    <!-- Log4j Channel -->
    <int:channel id="loggingChannel" />
    <int:logging-channel-adapter auto-startup="true" level="INFO" log-full-message="true" channel="loggingChannel" />

<!-- Router -->
    <int:gateway id="activityLoggingGateway" 
        service-interface="com.idna.dm.logging.activity.logger.ActivityLoggingGateway" />


    <int:channel id="activityLoggingChannel" />

    <int:router input-channel="activityLoggingChannel" ref="dynamicRouter"
        method="route" default-output-channel="asyncInProcessChannel"
        ignore-channel-name-resolution-failures="true" />

    <bean id="dynamicRouter" class="com.idna.dm.logging.activity.router.DynamicRouter">
        <constructor-arg index="0" value="asyncInProcessChannel" />
    </bean>

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

许久 2024-10-19 22:08:46

您可以使用故障转移机制来实现它:

<channel id="input">
  <dispatcher load-balancer="none"/>
</channel>

<service-activator input-channel="input" order="1"/>

<service-activator input-channel="input" order="2"/>

在这种情况下,第一个将始终首先被调用。
只有当第一个失败时才执行第二个。

默认情况下,故障转移true

You can achieve it using failover mechanism:

<channel id="input">
  <dispatcher load-balancer="none"/>
</channel>

<service-activator input-channel="input" order="1"/>

<service-activator input-channel="input" order="2"/>

In this case the first <service-activator> will be always invoked first.
And the second one only if the first will fail.

The failover is true by default.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文