Spring JMS接收主题消息

发布于 2024-12-11 06:30:58 字数 6400 浏览 0 评论 0原文

我正在编写一个简单的教程。我有一个发布者发送有关某个主题的消息,订阅者则接收该消息。当我启动应用程序时, spring 配置文件加载,然后出现以下错误

    2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.support.destination.JndiDestinationResolver] - Located object with JNDI name [RateTopic]
2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.support.destination.JndiDestinationResolver] - Located object with JNDI name [RateTopic]
2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.connection.CachingConnectionFactory] - Closing cached Session: ActiveMQSession {id=ID:Reverb0253-PC-62259-1319161839013-0:1:3,started=true}
2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.connection.CachingConnectionFactory] - Closing cached Session: ActiveMQSession {id=ID:Reverb0253-PC-62259-1319161839013-0:1:2,started=true}
2011-10-20 21:50:44,348 WARN [org.springframework.jms.listener.DefaultMessageListenerContainer] - Setup of JMS message listener invoker failed for destination 'RateTopic' - trying to recover. Cause: Destination [RateTopic] is not of expected type [javax.jms.Queue]
org.springframework.jms.support.destination.DestinationResolutionException: Destination [RateTopic] is not of expected type [javax.jms.Queue]
    at org.springframework.jms.support.destination.JndiDestinationResolver.validateDestination(JndiDestinationResolver.java:147)
    at org.springframework.jms.support.destination.JndiDestinationResolver.resolveDestinationName(JndiDestinationResolver.java:112)
    at org.springframework.jms.support.destination.JmsDestinationAccessor.resolveDestinationName(JmsDestinationAccessor.java:100)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:221)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1081)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1057)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
    at java.lang.Thread.run(Thread.java:722)

Why does spring think that it should be a queuerather not topic

我的 jndi 文件看起来像这个

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager
connectionFactoryNames = TopicCF
topic.RateTopic = RateTopic

spring 配置文件是

<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
    <property name="environment">
        <props>
            <prop key="java.naming.factory.initial">
                org.apache.activemq.jndi.ActiveMQInitialContextFactory
            </prop>
            <prop key="java.naming.provider.url">tcp://localhost:61616</prop>
            <prop key="java.naming.security.principal">system</prop>
            <prop key="java.naming.security.credentials">manager</prop>
        </props>
    </property>
</bean>

<bean id="jndiTopicConnFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiTemplate" ref="jndiTemplate"/>
    <!-- JNDI name of connection factory as defined by provider -->
    <property name="jndiName" value="TopicCF"/>
</bean>

<bean id="topicConnFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="jndiTopicConnFactory"/>
    <!-- Number of sessions that will be cached -->
    <property name="sessionCacheSize" value="1"/>
</bean>

<bean id="destinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
    <property name="jndiTemplate" ref="jndiTemplate"/>
    <property name="cache" value="true"/>
    <!-- do not create a dynamic destination if the destination name is not found in JNDI -->
    <property name="fallbackToDynamicDestination" value="false"/>
</bean>

<bean id="messageListener" class="com.merc.springjmspubsublenderborrower.TBorrower"/>

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="topicConnFactory"/>
    <property name="destinationResolver" ref="destinationResolver"/>
    <property name="concurrentConsumers" value="3" />
    <property name="destinationName" value="RateTopic"/>
    <property name="messageListener" ref="messageListener" />
    <property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>

我的订阅者实现了 MessageListener

@Override
public void onMessage(Message message) {
    try {
        // Get the data from the message
        BytesMessage msg = (BytesMessage) message;
        double newRate = msg.readDouble();
        // If the rate is at least 1 point lower than the current rate, then
        //recommend refinancing
        if ((currentRate - newRate) >= 1.0) {
            System.out.println(
                    "New rate = " + newRate + " - Consider refinancing loan");
        } else {
            System.out.println("New rate = " + newRate + " - Keep existing loan");
        }
        System.out.println("\nWaiting for rate updates...");
    } catch (Exception ex) {
        ex.printStackTrace(System.out);
        System.exit(1);
    }
}

public static void main(String argv[]) {

    ApplicationContext ctx = new ClassPathXmlApplicationContext("app-context.xml");

    try {
        // Run until enter is pressed
        BufferedReader stdin = new BufferedReader(new InputStreamReader(System.in));
        System.out.println("TBorrower application started");
        System.out.println("Press enter to quit application");
        stdin.readLine();
    } catch (IOException ioe) {
        ioe.printStackTrace();
    }
}

I am working on a simple tutorial. I have a publisher that sends message on a topic and subscribers to receive it. When I start the application, spring config file loads up and then I get the following error

    2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.support.destination.JndiDestinationResolver] - Located object with JNDI name [RateTopic]
2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.support.destination.JndiDestinationResolver] - Located object with JNDI name [RateTopic]
2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.connection.CachingConnectionFactory] - Closing cached Session: ActiveMQSession {id=ID:Reverb0253-PC-62259-1319161839013-0:1:3,started=true}
2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.connection.CachingConnectionFactory] - Closing cached Session: ActiveMQSession {id=ID:Reverb0253-PC-62259-1319161839013-0:1:2,started=true}
2011-10-20 21:50:44,348 WARN [org.springframework.jms.listener.DefaultMessageListenerContainer] - Setup of JMS message listener invoker failed for destination 'RateTopic' - trying to recover. Cause: Destination [RateTopic] is not of expected type [javax.jms.Queue]
org.springframework.jms.support.destination.DestinationResolutionException: Destination [RateTopic] is not of expected type [javax.jms.Queue]
    at org.springframework.jms.support.destination.JndiDestinationResolver.validateDestination(JndiDestinationResolver.java:147)
    at org.springframework.jms.support.destination.JndiDestinationResolver.resolveDestinationName(JndiDestinationResolver.java:112)
    at org.springframework.jms.support.destination.JmsDestinationAccessor.resolveDestinationName(JmsDestinationAccessor.java:100)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:221)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1081)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1057)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
    at java.lang.Thread.run(Thread.java:722)

Why does spring think that it should be a queue instead of topic

my jndi file looks like this

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager
connectionFactoryNames = TopicCF
topic.RateTopic = RateTopic

spring config file is

<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
    <property name="environment">
        <props>
            <prop key="java.naming.factory.initial">
                org.apache.activemq.jndi.ActiveMQInitialContextFactory
            </prop>
            <prop key="java.naming.provider.url">tcp://localhost:61616</prop>
            <prop key="java.naming.security.principal">system</prop>
            <prop key="java.naming.security.credentials">manager</prop>
        </props>
    </property>
</bean>

<bean id="jndiTopicConnFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiTemplate" ref="jndiTemplate"/>
    <!-- JNDI name of connection factory as defined by provider -->
    <property name="jndiName" value="TopicCF"/>
</bean>

<bean id="topicConnFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="jndiTopicConnFactory"/>
    <!-- Number of sessions that will be cached -->
    <property name="sessionCacheSize" value="1"/>
</bean>

<bean id="destinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
    <property name="jndiTemplate" ref="jndiTemplate"/>
    <property name="cache" value="true"/>
    <!-- do not create a dynamic destination if the destination name is not found in JNDI -->
    <property name="fallbackToDynamicDestination" value="false"/>
</bean>

<bean id="messageListener" class="com.merc.springjmspubsublenderborrower.TBorrower"/>

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="topicConnFactory"/>
    <property name="destinationResolver" ref="destinationResolver"/>
    <property name="concurrentConsumers" value="3" />
    <property name="destinationName" value="RateTopic"/>
    <property name="messageListener" ref="messageListener" />
    <property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>

My subscriber implements MessageListener

@Override
public void onMessage(Message message) {
    try {
        // Get the data from the message
        BytesMessage msg = (BytesMessage) message;
        double newRate = msg.readDouble();
        // If the rate is at least 1 point lower than the current rate, then
        //recommend refinancing
        if ((currentRate - newRate) >= 1.0) {
            System.out.println(
                    "New rate = " + newRate + " - Consider refinancing loan");
        } else {
            System.out.println("New rate = " + newRate + " - Keep existing loan");
        }
        System.out.println("\nWaiting for rate updates...");
    } catch (Exception ex) {
        ex.printStackTrace(System.out);
        System.exit(1);
    }
}

public static void main(String argv[]) {

    ApplicationContext ctx = new ClassPathXmlApplicationContext("app-context.xml");

    try {
        // Run until enter is pressed
        BufferedReader stdin = new BufferedReader(new InputStreamReader(System.in));
        System.out.println("TBorrower application started");
        System.out.println("Press enter to quit application");
        stdin.readLine();
    } catch (IOException ioe) {
        ioe.printStackTrace();
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

夢归不見 2024-12-18 06:30:58

您正在尝试从主题消费,但尚未设置 pubSubDomain DefaultMessageListenerContainer,默认为“false”,表示点对点,表示队列而不是主题。因此,错误消息告诉您 RateTopic 不是 javax.jms.Queue

You're trying to consume from a topic, but you haven't set the pubSubDomain property on the DefaultMessageListenerContainer, and it defaults to "false", meaning point-to-point, meaning a queue instead of a topic. Thus the error message telling you that RateTopic isn't a javax.jms.Queue.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文