如何使队列在HornetQ 2.2.5核心客户端中持久化?

发布于 2024-12-02 18:37:16 字数 4321 浏览 0 评论 0原文

我想在核心 hornetQ 客户端中创建持久队列。问题是当我停止服务器时,队列和数据将被破坏。如何让队列持久化? 我的代码是:

import java.util.Date;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;

public class EmbeddedExample
{

   public static void main(final String[] args)
   {
      try
      {

         // Step 1. Create the Configuration, and set the properties accordingly
         Configuration configuration = new ConfigurationImpl();
         configuration.setPersistenceEnabled(false);
         configuration.setSecurityEnabled(false);

         configuration.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));

         // Step 2. Create and start the server
         HornetQServer server = HornetQServers.newHornetQServer(configuration);
         server.start();

         // Step 3. As we are not using a JNDI environment we instantiate the objects directly
         ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(InVMConnectorFactory.class.getName()));
         ClientSessionFactory sf = serverLocator.createSessionFactory();        

         // Step 4. Create a core queue        
         ClientSession coreSession = sf.createSession(false, false, false);      

         final String queueName = "queue.exampleQueue";

         coreSession.createQueue(queueName, queueName, true);


         coreSession.close();

         ClientSession session = null;

         try
         {

            // Step 5. Create the session, and producer
            session = sf.createSession();

           ClientProducer producer = session.createProducer(queueName);

            // Step 6. Create and send a message
            ClientMessage message = session.createMessage(true);

            final String propName = "myprop";

            message.putStringProperty(propName, "Hello sent at " + new Date());

            System.out.println("Producer:");
            System.out.println("StartDate: "+new Date());
             for (int i = 0; i < 100000; i++)
             {  
                   message = session.createMessage(true); // move it   
                    message.putStringProperty(propName, "Message: " + i);
                    producer.send(message);       
             }
            System.out.println("EndDate: "+new Date());
            // Step 7. Create the message consumer and start the connection
            ClientConsumer messageConsumer = session.createConsumer(queueName);

            session.start();

            // Step 8. Receive the message.
            System.out.println("Consumer:");
            System.out.println("StartDate: "+new Date());

            //for (int i = 0; i <= 100000; i++)         
             int i=0;
            while(true)
            {   
                 i++;
                 if(i == 10000){    
                     i=0;
                     session.start();
                     System.out.println("EndDate: "+new Date());                        
                 }
                 ClientMessage messageReceived = messageConsumer.receive(5000);
                 if (messageReceived!=null) messageReceived.acknowledge();
                 //System.out.println(messageReceived.getStringProperty(propName));
            }

         }
         finally
         {
            // Step 9. Be sure to close our resources!
            if (sf != null)
            {
               sf.close();
            }

            // Step 10. Stop the server
            server.stop();
         }
      }
      catch (Exception e)
      {
         e.printStackTrace();
         System.exit(-1);
      }
   }
}

I want to make persisted queue in core hornetQ client. The problem is when I stop the server the queue and the data will be destroyed. How to make a queue persisted?
My code is:

import java.util.Date;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;

public class EmbeddedExample
{

   public static void main(final String[] args)
   {
      try
      {

         // Step 1. Create the Configuration, and set the properties accordingly
         Configuration configuration = new ConfigurationImpl();
         configuration.setPersistenceEnabled(false);
         configuration.setSecurityEnabled(false);

         configuration.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));

         // Step 2. Create and start the server
         HornetQServer server = HornetQServers.newHornetQServer(configuration);
         server.start();

         // Step 3. As we are not using a JNDI environment we instantiate the objects directly
         ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(InVMConnectorFactory.class.getName()));
         ClientSessionFactory sf = serverLocator.createSessionFactory();        

         // Step 4. Create a core queue        
         ClientSession coreSession = sf.createSession(false, false, false);      

         final String queueName = "queue.exampleQueue";

         coreSession.createQueue(queueName, queueName, true);


         coreSession.close();

         ClientSession session = null;

         try
         {

            // Step 5. Create the session, and producer
            session = sf.createSession();

           ClientProducer producer = session.createProducer(queueName);

            // Step 6. Create and send a message
            ClientMessage message = session.createMessage(true);

            final String propName = "myprop";

            message.putStringProperty(propName, "Hello sent at " + new Date());

            System.out.println("Producer:");
            System.out.println("StartDate: "+new Date());
             for (int i = 0; i < 100000; i++)
             {  
                   message = session.createMessage(true); // move it   
                    message.putStringProperty(propName, "Message: " + i);
                    producer.send(message);       
             }
            System.out.println("EndDate: "+new Date());
            // Step 7. Create the message consumer and start the connection
            ClientConsumer messageConsumer = session.createConsumer(queueName);

            session.start();

            // Step 8. Receive the message.
            System.out.println("Consumer:");
            System.out.println("StartDate: "+new Date());

            //for (int i = 0; i <= 100000; i++)         
             int i=0;
            while(true)
            {   
                 i++;
                 if(i == 10000){    
                     i=0;
                     session.start();
                     System.out.println("EndDate: "+new Date());                        
                 }
                 ClientMessage messageReceived = messageConsumer.receive(5000);
                 if (messageReceived!=null) messageReceived.acknowledge();
                 //System.out.println(messageReceived.getStringProperty(propName));
            }

         }
         finally
         {
            // Step 9. Be sure to close our resources!
            if (sf != null)
            {
               sf.close();
            }

            // Step 10. Stop the server
            server.stop();
         }
      }
      catch (Exception e)
      {
         e.printStackTrace();
         System.exit(-1);
      }
   }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

一影成城 2024-12-09 18:37:16

如果禁用持久性,则不会有持久性:

Configuration configuration = new ConfigurationImpl();
configuration.setPersistenceEnabled(true); <<<<  Make this true

UnsatisfiedLinkError 可能是因为您选择 AIO 并且 LD_LIBRARY_PATH 上没有本机库,请设置 Journal 或使本机库在 Linux 系统中可用。

configuration.setJournalType(JournalType.NIO);

If you disable persistence, you won't have persistence:

Configuration configuration = new ConfigurationImpl();
configuration.setPersistenceEnabled(true); <<<<  Make this true

The UnsatisfiedLinkError is probably because you select AIO and didn't have the native library on the LD_LIBRARY_PATH, either set Journal or make the native library available in a Linux system.

configuration.setJournalType(JournalType.NIO);
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文