ActiveMQ:非持久主题消费者有时不关闭
我正在使用 Apache ActiveMQ 5.5 并具有以下场景 -
嵌入式代理
- 非持久生产者和主题
- 主题的消费者。
正常的东西正在工作 - 我发布到一个主题,订阅者从中消费。
我已经实现了一个 MessageListener,因此当消费者订阅和取消订阅时,我会打印一些内容来表明这一点。 当我必须关闭消费者时,我只需调用它的 close 方法。
有时消费者成功关闭 - 我看到日志和内存使用情况都很好。
但有时它不会关闭,尽管我可以在日志中看到我调用了 close 方法。 这次在日志中,MessageListener 没有提及订阅者如何取消订阅。 结果,内存使用量上升,因为现在发布者正在向主题发送消息,并且我已经关闭了消费者(并未真正关闭) 并停止处理消息。
所以我不知道在哪里以及如何解决这个问题...... 我认为这与异步 activemq 工作线程及其行为有关。
下面是我正在使用的与 ActiveMQ 相关的所有类..让我知道是否应该发布任何额外的代码。
public class Consumer {
private MessageConsumer consumer;
public Consumer(String brokerUrl, String topic, MessageListener ml) throws JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);
//with straight through processing of messages
//and optimized acknowledgement
cf.setAlwaysSessionAsync(false);
cf.setOptimizeAcknowledge(true);
Connection connection = cf.createConnection();
connection.start();
//-- Use the default session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//-- Set the prefetch size for topics - by parsing a configuration parameter in
// the name of the topic
//-- topic=test.topic?consumer.prefetchSize=32766
Topic topicObj = session.createTopic(topic);
consumer = session.createConsumer(topicObj);
//-- Attach the passed in message listener
consumer.setMessageListener(ml);
}
/**
* @return the consumer
*/
public MessageConsumer getConsumer() {
return consumer;
}
}
public class ConsumerAdvisoryListener implements MessageListener {
private XMLLogUtil xlu;
private MyLogger ml;
public ConsumerAdvisoryListener() throws IOException{
xlu=XMLLogUtil.getInstance();
ml=xlu.getCustomLogger(ConsumerAdvisoryListener.class);
}
public void onMessage(Message message) {
ActiveMQMessage msg = (ActiveMQMessage) message;
DataStructure ds = msg.getDataStructure();
if (ds != null) {
switch (ds.getDataStructureType()) {
case CommandTypes.CONSUMER_INFO:
ConsumerInfo consumerInfo = (ConsumerInfo) ds;
ml.info("Consumer '" + consumerInfo.getConsumerId()
+ "' subscribed to '" + consumerInfo.getDestination()
+ "'");
break;
case CommandTypes.REMOVE_INFO:
RemoveInfo removeInfo = (RemoveInfo) ds;
ConsumerId consumerId = ((ConsumerId) removeInfo.getObjectId());
ml.info("Consumer '" + consumerId + "' unsubscribed");
break;
default:
ml.info("Unkown data structure type");
}
} else {
ml.info("No data structure provided");
}
}
}
public class EmbeddedBroker {
/**
* Singleton
*/
private static EmbeddedBroker INSTANCE;
private BrokerService broker;
/**
* Return singleton instance
* @return
*/
public static EmbeddedBroker getInstance(){
if(EmbeddedBroker.INSTANCE ==null){
throw new IllegalStateException("Not Initialized");
}
return INSTANCE;
}
/**
* Initialize singleton instance.
*
* @return
* @throws Exception
*/
public static EmbeddedBroker initialize(String brokerName) throws Exception{
if(EmbeddedBroker.INSTANCE ==null){
EmbeddedBroker.INSTANCE=new EmbeddedBroker(brokerName, false);
}
else{
throw new IllegalStateException("Already Initialized");
}
return INSTANCE;
}
/**
* Initialize singleton instance.
*
* @return
* @throws Exception
*/
public static EmbeddedBroker initialize(String brokerName, boolean enableTCPConnector) throws Exception{
if(EmbeddedBroker.INSTANCE ==null){
EmbeddedBroker.INSTANCE=new EmbeddedBroker(brokerName, enableTCPConnector);
}
else{
throw new IllegalStateException("Already Initialized");
}
return INSTANCE;
}
/**
* Private constructor
* @throws Exception
*/
private EmbeddedBroker(String brokerName, boolean enableTCPConnector) throws Exception{
//-- By default a broker always listens on vm://<broker name>
this.broker = new BrokerService();
this.broker.setBrokerName(brokerName);
//-- Enable Advisory Support. Its true by default, but this is to explicitly mention it for documentation purposes
this.broker.setAdvisorySupport(true);
/* Create non-persistent broker to use inMemory Store,
* instead of KAHA or any other persistent store.
* See Section 4.6.1 of ActiveMQInAction
*/
this.broker.setPersistent(false);
//-- 64 MB
this.broker.getSystemUsage().getMemoryUsage().setLimit(64*1024*1024);
//-- Set the Destination policies
PolicyEntry policy = new PolicyEntry();
//-- Set a memory limit of 4mb for each destination
policy.setMemoryLimit(4 * 1024 *1024);
//-- Disable flow control
policy.setProducerFlowControl(false);
PolicyMap pMap = new PolicyMap();
//-- Configure the policy
pMap.setDefaultEntry(policy);
this.broker.setDestinationPolicy(pMap);
if(enableTCPConnector)
broker.addConnector("tcp://localhost:61616");
//-- Start the Broker.
this.broker.start();
}
}
public class NonPersistentProducer {
private final MessageProducer producer;
private final Session session;
public NonPersistentProducer(String brokerUrl, String topic) throws JMSException{
//-- Tell the connection factory to connect to a broker and topic passed in.
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);
//-- Disable message copying
cf.setCopyMessageOnSend(false);
Connection connection = cf.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topicObj = session.createTopic(topic);
producer = session.createProducer(topicObj);
//-- Send non-persistent messages
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}//-- Producer
/**
* @return the producer
*/
public MessageProducer getProducer() {
return producer;
}
/**
* @return the session
*/
public Session getSession() {
return session;
}
}
I'm using Apache ActiveMQ 5.5 and have the following scenario
-Embedded Broker
-NonPersistent Producer and Topic
-A consumer of the topic.
The normal stuff is working - I publish to a topic and a subscriber consumes from it.
I've implemented a MessageListener, so when a consumer subscribes and unsubscribes, I print something to indicate that.
When I have to close the consumer, I just call it's close method.
Sometimes the consumer closes successfully - I see the log and memory usage is fine.
But sometimes it doesnt close, though I can see in the log that I called the close method.
This time in the log the MessageListener doesnt mention how the subcriber unsubscribed.
And as a result, the memory usage just goes up, because now publisher is sending messages to the topic and I've closed the consumer (which is not really closed)
and stopped handling the messages.
So I'm not sure where and how to troubleshoot this issue......
I think this has something to do with asynch activemq worker threads and their behavior.
Below are all the classes related to ActiveMQ that i'm using..let me know if I should put out any additional code.
public class Consumer {
private MessageConsumer consumer;
public Consumer(String brokerUrl, String topic, MessageListener ml) throws JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);
//with straight through processing of messages
//and optimized acknowledgement
cf.setAlwaysSessionAsync(false);
cf.setOptimizeAcknowledge(true);
Connection connection = cf.createConnection();
connection.start();
//-- Use the default session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//-- Set the prefetch size for topics - by parsing a configuration parameter in
// the name of the topic
//-- topic=test.topic?consumer.prefetchSize=32766
Topic topicObj = session.createTopic(topic);
consumer = session.createConsumer(topicObj);
//-- Attach the passed in message listener
consumer.setMessageListener(ml);
}
/**
* @return the consumer
*/
public MessageConsumer getConsumer() {
return consumer;
}
}
public class ConsumerAdvisoryListener implements MessageListener {
private XMLLogUtil xlu;
private MyLogger ml;
public ConsumerAdvisoryListener() throws IOException{
xlu=XMLLogUtil.getInstance();
ml=xlu.getCustomLogger(ConsumerAdvisoryListener.class);
}
public void onMessage(Message message) {
ActiveMQMessage msg = (ActiveMQMessage) message;
DataStructure ds = msg.getDataStructure();
if (ds != null) {
switch (ds.getDataStructureType()) {
case CommandTypes.CONSUMER_INFO:
ConsumerInfo consumerInfo = (ConsumerInfo) ds;
ml.info("Consumer '" + consumerInfo.getConsumerId()
+ "' subscribed to '" + consumerInfo.getDestination()
+ "'");
break;
case CommandTypes.REMOVE_INFO:
RemoveInfo removeInfo = (RemoveInfo) ds;
ConsumerId consumerId = ((ConsumerId) removeInfo.getObjectId());
ml.info("Consumer '" + consumerId + "' unsubscribed");
break;
default:
ml.info("Unkown data structure type");
}
} else {
ml.info("No data structure provided");
}
}
}
public class EmbeddedBroker {
/**
* Singleton
*/
private static EmbeddedBroker INSTANCE;
private BrokerService broker;
/**
* Return singleton instance
* @return
*/
public static EmbeddedBroker getInstance(){
if(EmbeddedBroker.INSTANCE ==null){
throw new IllegalStateException("Not Initialized");
}
return INSTANCE;
}
/**
* Initialize singleton instance.
*
* @return
* @throws Exception
*/
public static EmbeddedBroker initialize(String brokerName) throws Exception{
if(EmbeddedBroker.INSTANCE ==null){
EmbeddedBroker.INSTANCE=new EmbeddedBroker(brokerName, false);
}
else{
throw new IllegalStateException("Already Initialized");
}
return INSTANCE;
}
/**
* Initialize singleton instance.
*
* @return
* @throws Exception
*/
public static EmbeddedBroker initialize(String brokerName, boolean enableTCPConnector) throws Exception{
if(EmbeddedBroker.INSTANCE ==null){
EmbeddedBroker.INSTANCE=new EmbeddedBroker(brokerName, enableTCPConnector);
}
else{
throw new IllegalStateException("Already Initialized");
}
return INSTANCE;
}
/**
* Private constructor
* @throws Exception
*/
private EmbeddedBroker(String brokerName, boolean enableTCPConnector) throws Exception{
//-- By default a broker always listens on vm://<broker name>
this.broker = new BrokerService();
this.broker.setBrokerName(brokerName);
//-- Enable Advisory Support. Its true by default, but this is to explicitly mention it for documentation purposes
this.broker.setAdvisorySupport(true);
/* Create non-persistent broker to use inMemory Store,
* instead of KAHA or any other persistent store.
* See Section 4.6.1 of ActiveMQInAction
*/
this.broker.setPersistent(false);
//-- 64 MB
this.broker.getSystemUsage().getMemoryUsage().setLimit(64*1024*1024);
//-- Set the Destination policies
PolicyEntry policy = new PolicyEntry();
//-- Set a memory limit of 4mb for each destination
policy.setMemoryLimit(4 * 1024 *1024);
//-- Disable flow control
policy.setProducerFlowControl(false);
PolicyMap pMap = new PolicyMap();
//-- Configure the policy
pMap.setDefaultEntry(policy);
this.broker.setDestinationPolicy(pMap);
if(enableTCPConnector)
broker.addConnector("tcp://localhost:61616");
//-- Start the Broker.
this.broker.start();
}
}
public class NonPersistentProducer {
private final MessageProducer producer;
private final Session session;
public NonPersistentProducer(String brokerUrl, String topic) throws JMSException{
//-- Tell the connection factory to connect to a broker and topic passed in.
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);
//-- Disable message copying
cf.setCopyMessageOnSend(false);
Connection connection = cf.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topicObj = session.createTopic(topic);
producer = session.createProducer(topicObj);
//-- Send non-persistent messages
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}//-- Producer
/**
* @return the producer
*/
public MessageProducer getProducer() {
return producer;
}
/**
* @return the session
*/
public Session getSession() {
return session;
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我想通了。 close(阅读文档)方法会阻塞,直到接收或侦听器完成。在我的情况下,我的听众由于异常而迷路了。只要侦听器或接收成功完成,关闭就会完成并且消费者将被删除。
I figured it out. The close (read documentation) method blocks till the receive or listener finishes. In my situation, my listener was getting lost due to an exception. As long as the listener or receive complete successfully, the close will complete and consumer will be removed.