学习ActiveMQ 中遇到问题,希望有人能指点下。

发布于 2021-11-13 08:40:45 字数 6722 浏览 872 评论 4

问题描述:

 参照网上的一些例子做了个JMS工具类,但存在点问题。只能发送和接收queue的消息,topic的消息只能发送,不能接收。调用messageConsumer.receive()时阻塞不动。如果接收时添加了MessageListener 就会报错,错误信息如下。

javax.jms.IllegalStateException: Cannot synchronously receive a message when a MessageListener is set

使用的是ActiveMQ 5.5

工具类代码如下:

import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author
 * @version 1.0 
 * 创建时间:2012-3-19 下午01:07:11 
 * 最后修改人: 
 * 修改时间: 
 * 类说明: JMS 调用工具类
 */
public class JMSUtil {

 

 private static ConnectionFactory connectionFactory;

 private static ThreadLocal<LocalJMS> threadLocal = new ThreadLocal<LocalJMS>();

 

 //初始化连接工厂

 static{

 String userName = PropertyConfig.getString("jms", "userName");

 String password = PropertyConfig.getString("jms", "password");

 String brokerURL = PropertyConfig.getString("jms", "brokerURL");

 connectionFactory = new ActiveMQConnectionFactory(userName,password,brokerURL);

 }

 

 private JMSUtil(){}

 

 /**

  * 发送消息

  * @param topic  是否为主题

  * @param subject 消息 key

  * @param message 消息主体内容

  */

 public static void sendMessage(boolean topic,String subject,String message) {

 LocalJMS jms = threadLocal.get();

 Session session = jms.session;

 try {

 MessageProducer messageProducer = jms.getMessageProducer(topic, subject);

 messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

 TextMessage tm = session.createTextMessage(message);

 messageProducer.send(tm);

 } catch (JMSException e) {

 // TODO Auto-generated catch block

 e.printStackTrace();

 } 

 }

 

 /**

  * 接收消息

  * @param topic 是否为主题

  * @param subject 消息key

  * @param timeout 接收最大等待时间(毫秒)

  * @return 消息主体内容

  */

 public static String receiveMessage(boolean topic,String subject,long timeout) {

 final LocalJMS jms = threadLocal.get();

 try {

 final MessageConsumer messageConsumer = jms.getMessageConsumer(topic, subject);

 messageConsumer.setMessageListener(new MessageListener(){

 

 @Override

 public void onMessage(Message message) {

 // TODO Auto-generated method stub

 TextMessage tm = (TextMessage)message;

 try {

 System.out.println(tm.getText());

 jms.session.commit(); 

 } catch (JMSException e) {

 // TODO Auto-generated catch block

 e.printStackTrace();

 }

 }

 

 });

 Message message = messageConsumer.receive(timeout);

 if(message != null && message instanceof TextMessage){

 TextMessage txtMsg = (TextMessage) message;

 return txtMsg.getText();

 }

 } catch (JMSException e) {

 // TODO Auto-generated catch block

 e.printStackTrace();

 } 

 return null;

 }

 

 /**

  * 开始

  */

 public static void begin() {

 try {

 Connection connection = connectionFactory.createConnection();

 connection.start();

 Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);

 LocalJMS jms = new LocalJMS(connection, session);

 threadLocal.set(jms);

 } catch (JMSException e) {

 // TODO Auto-generated catch block

 e.printStackTrace();

 }

 }

 

 /**

  *结束 

  */

 public static void end() {

 try {

 LocalJMS jms = threadLocal.get();

 Session session = jms.session;

 session.commit();

 session.close();

 Connection connection = jms.connection;

 connection.stop();

 connection.close();

 jms.messageConsumerMap = null;

 jms.messageProducerMap = null;

 jms.destinationMap = null;

 jms.session = null;

 jms.connection = null;

 threadLocal.remove();

 } catch (JMSException e) {

 // TODO Auto-generated catch block

 e.printStackTrace();

 }

 }

 

 /**

  * 消息操作包装类

  * @author 

  *

  */

 private static class LocalJMS{

 private Connection connection;

 private Session session;

 private Map<String, MessageProducer> messageProducerMap;

 private Map<String, MessageConsumer> messageConsumerMap;

 private Map<String, Destination> destinationMap;

 

 public LocalJMS(Connection connection, Session session) {

 this.connection = connection;

 this.session = session;

 }

 

 /**

  * 获取消息目标

  * @param topic 是否为主题

  * @param subject 消息key

  * @return

  */

 private Destination getDestination(boolean topic,String subject){

 if(destinationMap == null){

 destinationMap = new HashMap<String, Destination>();

 }

 String key = topic + subject;

 if(destinationMap.containsKey(key)){

 return destinationMap.get(key);

 }else{

 try {

 Destination destination = topic? session.createTopic(subject) : session.createQueue(subject);

 destinationMap.put(key, destination);

 return destination;

 } catch (JMSException e) {

 // TODO Auto-generated catch block

 e.printStackTrace();

 }

 }

 return null;

 }

 

 /**

  * 获取消息提供者

  * @param topic

  * @param subject

  * @return

  */

 public MessageProducer getMessageProducer(boolean topic,String subject){

 if(messageProducerMap == null){

 messageProducerMap = new HashMap<String, MessageProducer>();

 }

 String key = topic + subject;

 if(messageProducerMap.containsKey(key)){

 return messageProducerMap.get(key);

 }else{

 try {

 Destination destination = getDestination(topic,subject);

 MessageProducer messageProducer = session.createProducer(destination);

 messageProducerMap.put(key, messageProducer);

 return messageProducer;

 } catch (JMSException e) {

 // TODO Auto-generated catch block

 e.printStackTrace();

 }

 }

 return null;

 }

 

 /**

  * 获取消息消费者

  * @param topic

  * @param subject

  * @return

  */

 public MessageConsumer getMessageConsumer(boolean topic,String subject){

 if(messageConsumerMap == null){

 messageConsumerMap = new HashMap<String, MessageConsumer>();

 }

 String key = topic + subject;

 if(messageConsumerMap.containsKey(key)){

 return messageConsumerMap.get(key);

 }else{

 try {

 Destination destination = getDestination(topic,subject);

 final MessageConsumer messageConsumer = session.createConsumer(destination);

 messageConsumerMap.put(key, messageConsumer);

 return messageConsumer;

 } catch (JMSException e) {

 // TODO Auto-generated catch block

 e.printStackTrace();

 }

 }

 return null;

 }

 }

 

 public static void main(String[] args) {

 JMSUtil.begin();

 //JMSUtil.sendMessage(true, "v5", "测试JMS消息");

 String a = JMSUtil.receiveMessage(true, "v5", 3000);

 System.out.println(a);

 JMSUtil.end();

 }

}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

彼岸花ソ最美的依靠 2021-11-17 01:56:13

肯定是不能两个都用的,setMessageListener应该用Session而不是Consumer。

平生欢 2021-11-16 20:43:53

肯定是不能两个都用的,setMessageListener应该用Session而不是Consumer。

夜司空 2021-11-16 17:35:01

接受消息只能采用MessageListener
或consumer.receive其中一种,你这里两种都用了。

心舞飞扬 2021-11-15 20:46:52

接受消息只能采用MessageListener
或consumer.receive其中一种,你这里两种都用了。

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文