ActiveMQ:非持久主题消费者有时不关闭

发布于 2024-12-04 10:59:27 字数 8203 浏览 1 评论 0原文

我正在使用 Apache ActiveMQ 5.5 并具有以下场景 -

嵌入式代理

- 非持久生产者和主题

- 主题的消费者。

正常的东西正在工作 - 我发布到一个主题,订阅者从中消费。

我已经实现了一个 MessageListener,因此当消费者订阅和取消订阅时,我会打印一些内容来表明这一点。 当我必须关闭消费者时,我只需调用它的 close 方法。

有时消费者成功关闭 - 我看到日志和内存使用情况都很好。

但有时它不会关闭,尽管我可以在日志中看到我调用了 close 方法。 这次在日志中,MessageListener 没有提及订阅者如何取消订阅。 结果,内存使用量上升,因为现在发布者正在向主题发送消息,并且我已经关闭了消费者(并未真正关闭) 并停止处理消息。

所以我不知道在哪里以及如何解决这个问题...... 我认为这与异步 activemq 工作线程及其行为有关。

下面是我正在使用的与 ActiveMQ 相关的所有类..让我知道是否应该发布任何额外的代码。

            public class Consumer {

              private MessageConsumer consumer;

              public Consumer(String brokerUrl, String topic, MessageListener ml) throws JMSException {

                ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);

                //with straight through processing of messages
                //and optimized acknowledgement
                cf.setAlwaysSessionAsync(false);
                cf.setOptimizeAcknowledge(true);
                Connection connection = cf.createConnection();
                connection.start();

                //-- Use the default session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                //-- Set the prefetch size for topics - by parsing a configuration parameter in
                // the name of the topic
                //-- topic=test.topic?consumer.prefetchSize=32766
                Topic topicObj = session.createTopic(topic);

                consumer = session.createConsumer(topicObj);

                //-- Attach the passed in message listener
                consumer.setMessageListener(ml);

              }

              /**
               * @return the consumer
               */
              public MessageConsumer getConsumer() {
                return consumer;
              }
            }   


            public class ConsumerAdvisoryListener implements MessageListener {

              private XMLLogUtil xlu;
              private MyLogger ml;

              public ConsumerAdvisoryListener() throws IOException{
                xlu=XMLLogUtil.getInstance();
                ml=xlu.getCustomLogger(ConsumerAdvisoryListener.class);
              }

              public void onMessage(Message message) {
                ActiveMQMessage msg = (ActiveMQMessage) message;
                DataStructure ds = msg.getDataStructure();
                if (ds != null) {
                  switch (ds.getDataStructureType()) {
                  case CommandTypes.CONSUMER_INFO:
                    ConsumerInfo consumerInfo = (ConsumerInfo) ds;
                    ml.info("Consumer '" + consumerInfo.getConsumerId()
                        + "' subscribed to '" + consumerInfo.getDestination()
                        + "'");
                    break;
                  case CommandTypes.REMOVE_INFO:
                    RemoveInfo removeInfo = (RemoveInfo) ds;
                    ConsumerId consumerId = ((ConsumerId) removeInfo.getObjectId());
                    ml.info("Consumer '" + consumerId + "' unsubscribed");
                    break;
                  default:
                    ml.info("Unkown data structure type");
                  }
                } else {
                  ml.info("No data structure provided");
                }
              }
            }


            public class EmbeddedBroker {

              /**
               * Singleton
               */
              private static EmbeddedBroker INSTANCE;
              private BrokerService broker; 

              /**
               * Return singleton instance
               * @return
               */
              public static EmbeddedBroker getInstance(){
                if(EmbeddedBroker.INSTANCE ==null){
                  throw new IllegalStateException("Not Initialized");
                }

                return INSTANCE;
              }

              /**
               * Initialize singleton instance.
               * 
               * @return
               * @throws Exception 
               */
              public static EmbeddedBroker initialize(String brokerName) throws Exception{
                if(EmbeddedBroker.INSTANCE ==null){
                  EmbeddedBroker.INSTANCE=new EmbeddedBroker(brokerName, false);
                }
                else{
                  throw new IllegalStateException("Already Initialized");
                }

                return INSTANCE;
              }

              /**
               * Initialize singleton instance.
               * 
               * @return
               * @throws Exception 
               */
              public static EmbeddedBroker initialize(String brokerName, boolean enableTCPConnector) throws Exception{
                if(EmbeddedBroker.INSTANCE ==null){
                  EmbeddedBroker.INSTANCE=new EmbeddedBroker(brokerName, enableTCPConnector);
                }
                else{
                  throw new IllegalStateException("Already Initialized");
                }

                return INSTANCE;
              }

              /**
               * Private constructor
               * @throws Exception 
               */
              private EmbeddedBroker(String brokerName, boolean enableTCPConnector) throws Exception{

                //-- By default a broker always listens on vm://<broker name>
                this.broker = new BrokerService();
                this.broker.setBrokerName(brokerName);

                //-- Enable Advisory Support.  Its true by default, but this is to explicitly mention it for documentation purposes
                this.broker.setAdvisorySupport(true);

                /* Create non-persistent broker to use inMemory Store,
                 * instead of KAHA or any other persistent store.
                 * See Section 4.6.1 of ActiveMQInAction  
                 */
                this.broker.setPersistent(false);

                //-- 64 MB
                this.broker.getSystemUsage().getMemoryUsage().setLimit(64*1024*1024);

                //-- Set the Destination policies
                PolicyEntry policy = new PolicyEntry();

                //-- Set a memory limit of 4mb for each destination
                policy.setMemoryLimit(4 * 1024 *1024);

                //-- Disable flow control
                policy.setProducerFlowControl(false);
                PolicyMap pMap = new PolicyMap();

                //-- Configure the policy
                pMap.setDefaultEntry(policy);
                this.broker.setDestinationPolicy(pMap);

                if(enableTCPConnector)
                  broker.addConnector("tcp://localhost:61616");

                //-- Start the Broker.
                this.broker.start();

              }

            }




            public class NonPersistentProducer {

              private final MessageProducer producer;
              private final Session session;

              public NonPersistentProducer(String brokerUrl, String topic) throws JMSException{
                //-- Tell the connection factory to connect to a broker and topic passed in.
                ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);

                //-- Disable message copying
                cf.setCopyMessageOnSend(false);

                Connection connection = cf.createConnection();
                connection.start();

                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                Topic topicObj = session.createTopic(topic);

                producer = session.createProducer(topicObj);

                //-- Send non-persistent messages
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

              }//-- Producer

              /**
               * @return the producer
               */
              public MessageProducer getProducer() {
                return producer;
              }

              /**
               * @return the session
               */
              public Session getSession() {
                return session;
              }



            }

I'm using Apache ActiveMQ 5.5 and have the following scenario

-Embedded Broker

-NonPersistent Producer and Topic

-A consumer of the topic.

The normal stuff is working - I publish to a topic and a subscriber consumes from it.

I've implemented a MessageListener, so when a consumer subscribes and unsubscribes, I print something to indicate that.
When I have to close the consumer, I just call it's close method.

Sometimes the consumer closes successfully - I see the log and memory usage is fine.

But sometimes it doesnt close, though I can see in the log that I called the close method.
This time in the log the MessageListener doesnt mention how the subcriber unsubscribed.
And as a result, the memory usage just goes up, because now publisher is sending messages to the topic and I've closed the consumer (which is not really closed)
and stopped handling the messages.

So I'm not sure where and how to troubleshoot this issue......
I think this has something to do with asynch activemq worker threads and their behavior.

Below are all the classes related to ActiveMQ that i'm using..let me know if I should put out any additional code.

            public class Consumer {

              private MessageConsumer consumer;

              public Consumer(String brokerUrl, String topic, MessageListener ml) throws JMSException {

                ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);

                //with straight through processing of messages
                //and optimized acknowledgement
                cf.setAlwaysSessionAsync(false);
                cf.setOptimizeAcknowledge(true);
                Connection connection = cf.createConnection();
                connection.start();

                //-- Use the default session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                //-- Set the prefetch size for topics - by parsing a configuration parameter in
                // the name of the topic
                //-- topic=test.topic?consumer.prefetchSize=32766
                Topic topicObj = session.createTopic(topic);

                consumer = session.createConsumer(topicObj);

                //-- Attach the passed in message listener
                consumer.setMessageListener(ml);

              }

              /**
               * @return the consumer
               */
              public MessageConsumer getConsumer() {
                return consumer;
              }
            }   


            public class ConsumerAdvisoryListener implements MessageListener {

              private XMLLogUtil xlu;
              private MyLogger ml;

              public ConsumerAdvisoryListener() throws IOException{
                xlu=XMLLogUtil.getInstance();
                ml=xlu.getCustomLogger(ConsumerAdvisoryListener.class);
              }

              public void onMessage(Message message) {
                ActiveMQMessage msg = (ActiveMQMessage) message;
                DataStructure ds = msg.getDataStructure();
                if (ds != null) {
                  switch (ds.getDataStructureType()) {
                  case CommandTypes.CONSUMER_INFO:
                    ConsumerInfo consumerInfo = (ConsumerInfo) ds;
                    ml.info("Consumer '" + consumerInfo.getConsumerId()
                        + "' subscribed to '" + consumerInfo.getDestination()
                        + "'");
                    break;
                  case CommandTypes.REMOVE_INFO:
                    RemoveInfo removeInfo = (RemoveInfo) ds;
                    ConsumerId consumerId = ((ConsumerId) removeInfo.getObjectId());
                    ml.info("Consumer '" + consumerId + "' unsubscribed");
                    break;
                  default:
                    ml.info("Unkown data structure type");
                  }
                } else {
                  ml.info("No data structure provided");
                }
              }
            }


            public class EmbeddedBroker {

              /**
               * Singleton
               */
              private static EmbeddedBroker INSTANCE;
              private BrokerService broker; 

              /**
               * Return singleton instance
               * @return
               */
              public static EmbeddedBroker getInstance(){
                if(EmbeddedBroker.INSTANCE ==null){
                  throw new IllegalStateException("Not Initialized");
                }

                return INSTANCE;
              }

              /**
               * Initialize singleton instance.
               * 
               * @return
               * @throws Exception 
               */
              public static EmbeddedBroker initialize(String brokerName) throws Exception{
                if(EmbeddedBroker.INSTANCE ==null){
                  EmbeddedBroker.INSTANCE=new EmbeddedBroker(brokerName, false);
                }
                else{
                  throw new IllegalStateException("Already Initialized");
                }

                return INSTANCE;
              }

              /**
               * Initialize singleton instance.
               * 
               * @return
               * @throws Exception 
               */
              public static EmbeddedBroker initialize(String brokerName, boolean enableTCPConnector) throws Exception{
                if(EmbeddedBroker.INSTANCE ==null){
                  EmbeddedBroker.INSTANCE=new EmbeddedBroker(brokerName, enableTCPConnector);
                }
                else{
                  throw new IllegalStateException("Already Initialized");
                }

                return INSTANCE;
              }

              /**
               * Private constructor
               * @throws Exception 
               */
              private EmbeddedBroker(String brokerName, boolean enableTCPConnector) throws Exception{

                //-- By default a broker always listens on vm://<broker name>
                this.broker = new BrokerService();
                this.broker.setBrokerName(brokerName);

                //-- Enable Advisory Support.  Its true by default, but this is to explicitly mention it for documentation purposes
                this.broker.setAdvisorySupport(true);

                /* Create non-persistent broker to use inMemory Store,
                 * instead of KAHA or any other persistent store.
                 * See Section 4.6.1 of ActiveMQInAction  
                 */
                this.broker.setPersistent(false);

                //-- 64 MB
                this.broker.getSystemUsage().getMemoryUsage().setLimit(64*1024*1024);

                //-- Set the Destination policies
                PolicyEntry policy = new PolicyEntry();

                //-- Set a memory limit of 4mb for each destination
                policy.setMemoryLimit(4 * 1024 *1024);

                //-- Disable flow control
                policy.setProducerFlowControl(false);
                PolicyMap pMap = new PolicyMap();

                //-- Configure the policy
                pMap.setDefaultEntry(policy);
                this.broker.setDestinationPolicy(pMap);

                if(enableTCPConnector)
                  broker.addConnector("tcp://localhost:61616");

                //-- Start the Broker.
                this.broker.start();

              }

            }




            public class NonPersistentProducer {

              private final MessageProducer producer;
              private final Session session;

              public NonPersistentProducer(String brokerUrl, String topic) throws JMSException{
                //-- Tell the connection factory to connect to a broker and topic passed in.
                ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);

                //-- Disable message copying
                cf.setCopyMessageOnSend(false);

                Connection connection = cf.createConnection();
                connection.start();

                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                Topic topicObj = session.createTopic(topic);

                producer = session.createProducer(topicObj);

                //-- Send non-persistent messages
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

              }//-- Producer

              /**
               * @return the producer
               */
              public MessageProducer getProducer() {
                return producer;
              }

              /**
               * @return the session
               */
              public Session getSession() {
                return session;
              }



            }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

岛徒 2024-12-11 10:59:27

我想通了。 close(阅读文档)方法会阻塞,直到接收或侦听器完成。在我的情况下,我的听众由于异常而迷路了。只要侦听器或接收成功完成,关闭就会完成并且消费者将被删除。

I figured it out. The close (read documentation) method blocks till the receive or listener finishes. In my situation, my listener was getting lost due to an exception. As long as the listener or receive complete successfully, the close will complete and consumer will be removed.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文