Spring MVC 处理控制器事务中的队列事件

发布于 2025-01-02 23:10:58 字数 3017 浏览 2 评论 0原文

我正在使用事务代理来包装服务,我的 applicationContext.xml 包含:

<bean id="someService"
    class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
    <property name="transactionManager" ref="transactionManager" />
    <property name="target" ref="someServiceImpl" />
    <property name="transactionAttributes">
        <props>
            <prop key="find*">PROPAGATION_REQUIRED,readOnly</prop>
            <prop key="create*">PROPAGATION_REQUIRED</prop>
        </props>
    </property>
</bean>


<bean name="someServiceImpl" class="impl.serviceImpl">
    <property name="dataDao" ref="dataDao" />
    <property name="eventQ" ref="eventQJMSTemplate"></property>
</bean>

队列工件定义为:

<bean id="eventQJMSTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="cachingMsgConnectionFactory" />
    <property name="defaultDestination" ref="newEventQ" />
</bean> 

<bean id="cachingMsgConnectionFactory"  class="org.springframework.jms.connection.CachingConnectionFactory" >
    <property name="targetConnectionFactory" ref="msgConnectionFactory" />
    <property name="sessionCacheSize" value="10"/>
</bean>

<bean id="msgConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" >
    <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

服务有一个操作

public void create(Object obj)
{
    dataDao.createAndSave(obj);
    eventQ.send(new NewMessageCreator(obj.getId());

}

消息处理程序定义为

<bean id="newEventProcessor" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="cachingMsgConnectionFactory"/>
    <property name="destination" ref="newEventQ"/>
    <property name="messageListener" ref="newEventListener" />
</bean> 

<bean id="newProjectEventListener" class="messages.NewEventHandler">
    <property name="dataDao" ref="dataDao" />
</bean> 

消息处理程序的实现:

 public class NewEventHandler implements MessageListener
 {
@Override
public void onMessage(Message message)
{
    Long objId;
    if (message instanceof MapMessage)
    {
        try
        {
            MapMessage eventMsg = (MapMessage) message;
            objId = eventMsg.getLong("objectId");
        } 
        catch (JMSException ex)
        {
            log.error("Error parsing new event.", ex);
            throw new RuntimeException(ex); // TODO error handling
        }


            dataDao.find(objId);
            //ERROR 
            // This fails as the object is still in the save transaction

    }
}
 }

对象的获取失败,因为交易尚未完成。事务完成后将消息发布到队列的最佳方法是什么? 我必须在创建方法中执行此操作,因为这是服务的公共接口。

所有组件都在单个 tomcat 实例中运行。

谢谢

I'm using a transaction proxy to wrap a service, my applicationContext.xml contains:

<bean id="someService"
    class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
    <property name="transactionManager" ref="transactionManager" />
    <property name="target" ref="someServiceImpl" />
    <property name="transactionAttributes">
        <props>
            <prop key="find*">PROPAGATION_REQUIRED,readOnly</prop>
            <prop key="create*">PROPAGATION_REQUIRED</prop>
        </props>
    </property>
</bean>


<bean name="someServiceImpl" class="impl.serviceImpl">
    <property name="dataDao" ref="dataDao" />
    <property name="eventQ" ref="eventQJMSTemplate"></property>
</bean>

The queue artifacts are defined as:

<bean id="eventQJMSTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="cachingMsgConnectionFactory" />
    <property name="defaultDestination" ref="newEventQ" />
</bean> 

<bean id="cachingMsgConnectionFactory"  class="org.springframework.jms.connection.CachingConnectionFactory" >
    <property name="targetConnectionFactory" ref="msgConnectionFactory" />
    <property name="sessionCacheSize" value="10"/>
</bean>

<bean id="msgConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" >
    <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

The service has an operation

public void create(Object obj)
{
    dataDao.createAndSave(obj);
    eventQ.send(new NewMessageCreator(obj.getId());

}

The message handler is defined as

<bean id="newEventProcessor" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="cachingMsgConnectionFactory"/>
    <property name="destination" ref="newEventQ"/>
    <property name="messageListener" ref="newEventListener" />
</bean> 

<bean id="newProjectEventListener" class="messages.NewEventHandler">
    <property name="dataDao" ref="dataDao" />
</bean> 

The implementation of the message handler:

 public class NewEventHandler implements MessageListener
 {
@Override
public void onMessage(Message message)
{
    Long objId;
    if (message instanceof MapMessage)
    {
        try
        {
            MapMessage eventMsg = (MapMessage) message;
            objId = eventMsg.getLong("objectId");
        } 
        catch (JMSException ex)
        {
            log.error("Error parsing new event.", ex);
            throw new RuntimeException(ex); // TODO error handling
        }


            dataDao.find(objId);
            //ERROR 
            // This fails as the object is still in the save transaction

    }
}
 }

The get of the object fails, as the transaction hasn't completed. What is the best way to have the mesage posted to the queue after the transaction completes?
I have to do it in the create method as that is the public interface into the service.

All components are running in a single tomcat instance.

Thankss

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

野生奥特曼 2025-01-09 23:10:58

如果我理解正确的话,您有一个事务将一些数据保存在数据库中,并发送一条消息。消息的接收方尝试从数据库中读取数据,但没有找到,因为消息已收到,但发送方的事务尚未提交。

您所处的情况需要一个支持 XA(两阶段提交)的事务管理器,它将数据库和 JMS 代理注册到同一个全局事务中。这样,对数据库的写入和消息的发送都是原子提交的,如果数据库写入尚未完成并且已成功,则无法发送消息。

有免费的此类事务管理器,例如 Bitronix、Atomikos 或 JBoss TM。

If I understand correctly, you have a transaction which saves some data in database, and sends a message. And the receiver of the message tries to read the data from the database, but doesn't find it because the message is received but the sender's transaction hasn't been committed yet.

You are in a situation where you need a XA-capable (2 phase commit) transaction manager, which enrolls the database and the JMS broker into the same global transaction. This way, the writes to the database and the sending of the message are committed atomically, and there's no way to have a message sent if the database write hasn't been done yet, and has succeeded.

There are free such transaction managers, like Bitronix, Atomikos or JBoss TM.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文