学习ActiveMQ 中遇到问题,希望有人能指点下。
问题描述:
参照网上的一些例子做了个JMS工具类,但存在点问题。只能发送和接收queue的消息,topic的消息只能发送,不能接收。调用messageConsumer.receive()时阻塞不动。如果接收时添加了MessageListener 就会报错,错误信息如下。
javax.jms.IllegalStateException: Cannot synchronously receive a message when a MessageListener is set
使用的是ActiveMQ 5.5
工具类代码如下:
import java.util.HashMap; import java.util.Map; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author * @version 1.0 * 创建时间:2012-3-19 下午01:07:11 * 最后修改人: * 修改时间: * 类说明: JMS 调用工具类 */ public class JMSUtil { private static ConnectionFactory connectionFactory; private static ThreadLocal<LocalJMS> threadLocal = new ThreadLocal<LocalJMS>(); //初始化连接工厂 static{ String userName = PropertyConfig.getString("jms", "userName"); String password = PropertyConfig.getString("jms", "password"); String brokerURL = PropertyConfig.getString("jms", "brokerURL"); connectionFactory = new ActiveMQConnectionFactory(userName,password,brokerURL); } private JMSUtil(){} /** * 发送消息 * @param topic 是否为主题 * @param subject 消息 key * @param message 消息主体内容 */ public static void sendMessage(boolean topic,String subject,String message) { LocalJMS jms = threadLocal.get(); Session session = jms.session; try { MessageProducer messageProducer = jms.getMessageProducer(topic, subject); messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); TextMessage tm = session.createTextMessage(message); messageProducer.send(tm); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 接收消息 * @param topic 是否为主题 * @param subject 消息key * @param timeout 接收最大等待时间(毫秒) * @return 消息主体内容 */ public static String receiveMessage(boolean topic,String subject,long timeout) { final LocalJMS jms = threadLocal.get(); try { final MessageConsumer messageConsumer = jms.getMessageConsumer(topic, subject); messageConsumer.setMessageListener(new MessageListener(){ @Override public void onMessage(Message message) { // TODO Auto-generated method stub TextMessage tm = (TextMessage)message; try { System.out.println(tm.getText()); jms.session.commit(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); Message message = messageConsumer.receive(timeout); if(message != null && message instanceof TextMessage){ TextMessage txtMsg = (TextMessage) message; return txtMsg.getText(); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } /** * 开始 */ public static void begin() { try { Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); LocalJMS jms = new LocalJMS(connection, session); threadLocal.set(jms); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** *结束 */ public static void end() { try { LocalJMS jms = threadLocal.get(); Session session = jms.session; session.commit(); session.close(); Connection connection = jms.connection; connection.stop(); connection.close(); jms.messageConsumerMap = null; jms.messageProducerMap = null; jms.destinationMap = null; jms.session = null; jms.connection = null; threadLocal.remove(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 消息操作包装类 * @author * */ private static class LocalJMS{ private Connection connection; private Session session; private Map<String, MessageProducer> messageProducerMap; private Map<String, MessageConsumer> messageConsumerMap; private Map<String, Destination> destinationMap; public LocalJMS(Connection connection, Session session) { this.connection = connection; this.session = session; } /** * 获取消息目标 * @param topic 是否为主题 * @param subject 消息key * @return */ private Destination getDestination(boolean topic,String subject){ if(destinationMap == null){ destinationMap = new HashMap<String, Destination>(); } String key = topic + subject; if(destinationMap.containsKey(key)){ return destinationMap.get(key); }else{ try { Destination destination = topic? session.createTopic(subject) : session.createQueue(subject); destinationMap.put(key, destination); return destination; } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return null; } /** * 获取消息提供者 * @param topic * @param subject * @return */ public MessageProducer getMessageProducer(boolean topic,String subject){ if(messageProducerMap == null){ messageProducerMap = new HashMap<String, MessageProducer>(); } String key = topic + subject; if(messageProducerMap.containsKey(key)){ return messageProducerMap.get(key); }else{ try { Destination destination = getDestination(topic,subject); MessageProducer messageProducer = session.createProducer(destination); messageProducerMap.put(key, messageProducer); return messageProducer; } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return null; } /** * 获取消息消费者 * @param topic * @param subject * @return */ public MessageConsumer getMessageConsumer(boolean topic,String subject){ if(messageConsumerMap == null){ messageConsumerMap = new HashMap<String, MessageConsumer>(); } String key = topic + subject; if(messageConsumerMap.containsKey(key)){ return messageConsumerMap.get(key); }else{ try { Destination destination = getDestination(topic,subject); final MessageConsumer messageConsumer = session.createConsumer(destination); messageConsumerMap.put(key, messageConsumer); return messageConsumer; } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return null; } } public static void main(String[] args) { JMSUtil.begin(); //JMSUtil.sendMessage(true, "v5", "测试JMS消息"); String a = JMSUtil.receiveMessage(true, "v5", 3000); System.out.println(a); JMSUtil.end(); } }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
肯定是不能两个都用的,setMessageListener应该用Session而不是Consumer。
肯定是不能两个都用的,setMessageListener应该用Session而不是Consumer。
接受消息只能采用MessageListener
或consumer.receive其中一种,你这里两种都用了。
接受消息只能采用MessageListener
或consumer.receive其中一种,你这里两种都用了。