当队列持久化时,HornetQ Producer 速度变慢

发布于 2024-12-02 16:55:47 字数 2009 浏览 2 评论 0原文

我尝试过在 horntQ 中使用持久队列。我做了两个单独的例子(生产者,消费者)。我的消费者运行良好,但生产者花费了太多时间来完成消息发送。我既单独运行过,也一起运行过。可能是什么问题? 我的代码是:

public  class HornetProducer implements Runnable{

    Context ic = null;
    ConnectionFactory cf = null;
    Connection connection =  null;
    Queue queue = null;
    Session session = null;
    MessageProducer publisher =  null;
    TextMessage message = null;
    int messageSent=0;

     public synchronized static Context getInitialContext()throws javax.naming.NamingException {

            Properties p = new Properties( );
            p.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");
            p.put(Context.URL_PKG_PREFIXES," org.jboss.naming:org.jnp.interfaces");
            p.put(Context.PROVIDER_URL, "jnp://localhosts:1099");

            return new javax.naming.InitialContext(p);
        }  

    public HornetProducer()throws Exception{            

        ic = getInitialContext();
        cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
        queue = (Queue)ic.lookup("queue/testQueue2");
        connection = cf.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        
        publisher = session.createProducer(queue);
        connection.start();

    }

    public void publish(){      
        try{        

            message = session.createTextMessage("Hello!");
            System.out.println("StartDate: "+new Date());

            for(int i=0;i<10000;i++){                   
                 messageSent++;              
                 publisher.send(message);                
            }
            System.out.println("EndDate: "+new Date());
        }catch(Exception e){
            System.out.println("Exception in Consume: "+ e.getMessage());
        }           
    }

    public void run(){
         publish();
    }

    public static void main(String[] args) throws Exception{

        new HornetProducer().publish();    
    }

}

I have tried with Persistent Queue in horntQ. I have made two separate examples (Producer, Consumer). My consumer is working well but the Producer is taking too much time to finish sending message. I have run both separately as well as together. What could be the problem?
my code is:

public  class HornetProducer implements Runnable{

    Context ic = null;
    ConnectionFactory cf = null;
    Connection connection =  null;
    Queue queue = null;
    Session session = null;
    MessageProducer publisher =  null;
    TextMessage message = null;
    int messageSent=0;

     public synchronized static Context getInitialContext()throws javax.naming.NamingException {

            Properties p = new Properties( );
            p.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");
            p.put(Context.URL_PKG_PREFIXES," org.jboss.naming:org.jnp.interfaces");
            p.put(Context.PROVIDER_URL, "jnp://localhosts:1099");

            return new javax.naming.InitialContext(p);
        }  

    public HornetProducer()throws Exception{            

        ic = getInitialContext();
        cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
        queue = (Queue)ic.lookup("queue/testQueue2");
        connection = cf.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        
        publisher = session.createProducer(queue);
        connection.start();

    }

    public void publish(){      
        try{        

            message = session.createTextMessage("Hello!");
            System.out.println("StartDate: "+new Date());

            for(int i=0;i<10000;i++){                   
                 messageSent++;              
                 publisher.send(message);                
            }
            System.out.println("EndDate: "+new Date());
        }catch(Exception e){
            System.out.println("Exception in Consume: "+ e.getMessage());
        }           
    }

    public void run(){
         publish();
    }

    public static void main(String[] args) throws Exception{

        new HornetProducer().publish();    
    }

}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

随遇而安 2024-12-09 16:55:47

您正在持续地、非事务性地发送这些消息。这意味着,发送的每条消息都必须单独完成。

这意味着对于您发送的每条消息,您都必须与服务器进行网络往返,并等待其完成持久性,然后才能发送另一条消息。

如果在这种情况下你有多个生产者,hornetq 会对两个生产者进行批处理,这样你会节省很多时间。 (即服务器将批量处理许多写入请求)。

如果你想加快单个生产者的发送速度,你可能应该使用事务。

例如:

I - 将会话更改为事务处理:

session = connection.createSession(true, Session.SESSION_TRANSACTIONED); 

II - 提交每 N 条消息:

   for(int i=0;i<10000;i++){                   
         messageSent++;              
         publisher.send(message);  
         if (messageSent % 1000 == 0) session.commit();              
    }
    session.commit();

您还可以禁用持久消息的同步。 (异步发送)。

You are sending these messages persistently, and non transactionally. What means, each message sent has to be completed individually.

That means for each message you send, you have to make a network round trip to the server, and wait it finish persistency before you can send another message.

If you had multiple producers on this situation, hornetq would batch both producers and you would save a lot of time. (i.e. the server will batch many write requests).

If you want to speed up the sending of a single producer, you should use transactions probably.

for example:

I - Change your session to transactioned:

session = connection.createSession(true, Session.SESSION_TRANSACTIONED); 

II - commit every N messages:

   for(int i=0;i<10000;i++){                   
         messageSent++;              
         publisher.send(message);  
         if (messageSent % 1000 == 0) session.commit();              
    }
    session.commit();

You could also disable sync on Persistent messages. (Sending them asynchronously).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文