JMS - 从一个消费者到多个消费者

发布于 2024-10-10 12:16:45 字数 310 浏览 0 评论 0原文

我有一个 JMS 客户端,它生成消息并通过 JMS 队列发送给其唯一的使用者。

我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便当前和新的消费者可以订阅并获取传递给所有人的相同消息。

这显然涉及修改生产者和消费者方面的当前客户端代码。

我还想看看其他选项,例如创建第二个队列,这样我就不必修改现有的消费者。我相信这种方法有一些优点,例如(如果我错了,请纠正我)平衡两个不同队列之间的负载,而不是一个,这可能会对性能产生积极影响。

我想获得有关这些选项以及您可能会看到的缺点/优点的建议。任何反馈都将受到高度赞赏。

I have a JMS client which is producing messages and sending over a JMS queue to its unique consumer.

What I want is more than one consumer getting those messages. The first thing that comes to my mind is converting the queue to a topic, so current and new consumers can subscribe and get the same message delivered to all of them.

This will obviously involve modifying the current clients code in both producer and consumer side of things.

I would like to also look at other options like creating a second queue, so that I don't have to modify the existing consumer. I believe there are advantages in this approach like (correct me if I am wrong) balancing the load between two different queues rather than one, which might have a positive impact on performance.

I would like to get advise on these options and cons / pros that you might see. Any feedback is highly appreciated.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

逆光下的微笑 2024-10-17 12:16:45

正如您所说,您有几个选择。

如果将其转换为主题以获得相同的效果,则需要使消费者成为持久消费者。如果您的消费者不存在,队列提供的一件事是持久性。这将取决于您使用的 MQ 系统。

如果您想坚持使用队列,您将为每个消费者创建一个队列,并创建一个侦听原始队列的调度程序。

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

主题的优点

  • 更容易动态添加新消费者。所有消费者无需任何工作即可收到新消息。
  • 您可以创建循环主题,以便 Consumer_1 将获取消息,然后是 Consumer_2,然后 Consumer_3
  • 可以向消费者推送新消息,而不必查询队列,使它们成为反应性的。

主题的缺点

  • 除非您的代理支持此配置,否则消息不会持久。如果消费者离线并返回,则可能会丢失消息,除非设置了持久消费者。
  • 很难让Consumer_1和Consumer_2收到消息,但Consumer_3却收不到消息。使用调度程序和队列,调度程序无法将消息放入 Consumer_3 的队列中。

队列的优点

  • 消息是持久的,直到消费者删除它们。
  • 调度程序可以通过不将消息放入相应的消费者队列来过滤哪些消费者获取哪些消息。不过,这可以通过过滤器对主题来完成。

队列的缺点

  • 需要创建额外的队列来支持多个消费者。在动态环境中,这不会有效。

在开发消息系统时,我更喜欢主题,因为它给了我最大的力量,但是由于您已经在使用队列,因此需要您更改系统的工作方式来实现主题。

具有多个消费者的队列系统的设计和实现

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

来源

请记住,您还需要注意其他事项,例如问题异常处理、重新连接到连接以及如果您失去连接,则会排队等。这只是为了让您了解如何完成我所描述的内容。

在真实的系统中,我可能不会在第一次异常时退出。我会让系统继续尽可能地运行并记录错误。正如这段代码所示,如​​果将消息放入单个消费者队列失败,整个调度程序将停止。

Dispatcher.java

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package stackoverflow_4615895;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

public class Dispatcher {

    private static long QUEUE_WAIT_TIME = 1000;
    private boolean mStop = false;
    private QueueConnectionFactory mFactory;
    private String mSourceQueueName;
    private String[] mConsumerQueueNames;

    /**
     * Create a dispatcher
     * @param factory
     *      The QueueConnectionFactory in which new connections, session, and consumers
     *      will be created. This is needed to ensure the connection is associated
     *      with the correct thread.
     * @param source
     *
     * @param consumerQueues
     */
    public Dispatcher(
        QueueConnectionFactory factory, 
        String sourceQueue, 
        String[] consumerQueues) {

        mFactory = factory;
        mSourceQueueName = sourceQueue;
        mConsumerQueueNames = consumerQueues;
    }

    public void start() {
        Thread thread = new Thread(new Runnable() {

            public void run() {
                Dispatcher.this.run();
            }
        });
        thread.setName("Queue Dispatcher");
        thread.start();
    }

    public void stop() {
        mStop = true;
    }

    private void run() {

        QueueConnection connection = null;
        MessageProducer producer = null;
        MessageConsumer consumer = null;
        QueueSession session = null;
        try {
            // Setup connection and queues for receiving the messages
            connection = mFactory.createQueueConnection();
            session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            Queue sourceQueue = session.createQueue(mSourceQueueName);
            consumer = session.createConsumer(sourceQueue);

            // Create a null producer allowing us to send messages
            // to any queue.
            producer = session.createProducer(null);

            // Create the destination queues based on the consumer names we
            // were given.
            Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
            for (int index = 0; index < mConsumerQueueNames.length; ++index) {
                destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
            }

            connection.start();

            while (!mStop) {

                // Only wait QUEUE_WAIT_TIME in order to give
                // the dispatcher a chance to see if it should
                // quit
                Message m = consumer.receive(QUEUE_WAIT_TIME);
                if (m == null) {
                    continue;
                }

                // Take the message we received and put
                // it in each of the consumers destination
                // queues for them to process
                for (Queue q : destinationQueues) {
                    producer.send(q, m);
                }
            }

        } catch (JMSException ex) {
            // Do wonderful things here 
        } finally {
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException ex) {
                }
            }
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException ex) {
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException ex) {
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException ex) {
                }
            }
        }
    }
}

Main.java

    QueueConnectionFactory factory = ...;

    Dispatcher dispatcher =
            new Dispatcher(
            factory,
            "Queue_Original",
            new String[]{
                "Consumer_Queue_1",
                "Consumer_Queue_2",
                "Consumer_Queue_3"});
    dispatcher.start();

You have a few options as you stated.

If you convert it to a topic to get the same effect you will need to make the consumers persistent consumers. One thing the queue offers is persistence if your consumer isn't alive. This will depend on the MQ system you are using.

If you want to stick with queues, you will create a queue for each consumer and a dispatcher that will listen on the original queue.

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

Pros of Topics

  • Easier to dynamically add new consumers. All consumers will get new messages without any work.
  • You can create round-robin topics, so that Consumer_1 will get a message, then Consumer_2, then Consumer_3
  • Consumers can be pushed new messages, instead of having to query a queue making them reactive.

Cons of Topics

  • Messages are not persistent unless your Broker supports this configuration. If a consumer goes off line and comes back it is possible to have missed messages unless Persistent consumers are setup.
  • Difficult to allow Consumer_1 and Consumer_2 to receive a message but not Consumer_3. With a Dispatcher and Queues, the Dispatcher can not put a message in Consumer_3's queue.

Pros of Queues

  • Messages are persistent until a Consumer removes them
  • A dispatcher can filter which consumers get which messages by not placing messages into the respective consumers queues. This can be done with topics through filters though.

Cons of Queues

  • Additional Queues need to be created to support multiple consumers. In a dynamic environment this wouldn't be efficient.

When developing a Messaging System I prefer topics as it gives me the most power, but seeing as you are already using Queues it would require you to change how your system works to implement Topics instead.

Design and Implementation of Queue System with multiple consumers

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

Source

Keep in mind there are other things you'll need to take care of such as problem exception handling, reconnection to the connection and queues if you lose your connection, etc. This is just designed to give you an idea of how to accomplish what I described.

In a real system I probably wouldn't exit out at the first exception. I would allow the system to continue operating the best it could and log errors. As it stands in this code if putting a message in a single consumers queue fails, the whole dispatcher will stop.

Dispatcher.java

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package stackoverflow_4615895;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

public class Dispatcher {

    private static long QUEUE_WAIT_TIME = 1000;
    private boolean mStop = false;
    private QueueConnectionFactory mFactory;
    private String mSourceQueueName;
    private String[] mConsumerQueueNames;

    /**
     * Create a dispatcher
     * @param factory
     *      The QueueConnectionFactory in which new connections, session, and consumers
     *      will be created. This is needed to ensure the connection is associated
     *      with the correct thread.
     * @param source
     *
     * @param consumerQueues
     */
    public Dispatcher(
        QueueConnectionFactory factory, 
        String sourceQueue, 
        String[] consumerQueues) {

        mFactory = factory;
        mSourceQueueName = sourceQueue;
        mConsumerQueueNames = consumerQueues;
    }

    public void start() {
        Thread thread = new Thread(new Runnable() {

            public void run() {
                Dispatcher.this.run();
            }
        });
        thread.setName("Queue Dispatcher");
        thread.start();
    }

    public void stop() {
        mStop = true;
    }

    private void run() {

        QueueConnection connection = null;
        MessageProducer producer = null;
        MessageConsumer consumer = null;
        QueueSession session = null;
        try {
            // Setup connection and queues for receiving the messages
            connection = mFactory.createQueueConnection();
            session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            Queue sourceQueue = session.createQueue(mSourceQueueName);
            consumer = session.createConsumer(sourceQueue);

            // Create a null producer allowing us to send messages
            // to any queue.
            producer = session.createProducer(null);

            // Create the destination queues based on the consumer names we
            // were given.
            Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
            for (int index = 0; index < mConsumerQueueNames.length; ++index) {
                destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
            }

            connection.start();

            while (!mStop) {

                // Only wait QUEUE_WAIT_TIME in order to give
                // the dispatcher a chance to see if it should
                // quit
                Message m = consumer.receive(QUEUE_WAIT_TIME);
                if (m == null) {
                    continue;
                }

                // Take the message we received and put
                // it in each of the consumers destination
                // queues for them to process
                for (Queue q : destinationQueues) {
                    producer.send(q, m);
                }
            }

        } catch (JMSException ex) {
            // Do wonderful things here 
        } finally {
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException ex) {
                }
            }
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException ex) {
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException ex) {
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException ex) {
                }
            }
        }
    }
}

Main.java

    QueueConnectionFactory factory = ...;

    Dispatcher dispatcher =
            new Dispatcher(
            factory,
            "Queue_Original",
            new String[]{
                "Consumer_Queue_1",
                "Consumer_Queue_2",
                "Consumer_Queue_3"});
    dispatcher.start();
゛清羽墨安 2024-10-17 12:16:45

您可能不必修改代码;这取决于你怎么写。

例如,如果您的代码使用 MessageProducer 而不是 QueueSender 发送消息,那么它将适用于主题和队列。同样,如果您使用 MessageConsumer 而不是 QueueReceiver

本质上,在 JMS 应用程序中使用非特定接口与 JMS 系统交互是一种很好的做法,例如 MessageProducerMessageConsumerDestination等等。如果是这种情况,那么这只是一个配置问题。

You may not have to modify the code; it depends on how you wrote it.

For example, if your code sends messages using MessageProducer rather than QueueSender, then it will work for topics as well as queues. Similarly if you used MessageConsumer rather than QueueReceiver.

Essentially, it is good practice in JMS applications to use non-specific interfaces to interact with the JMS system, such as MessageProducer, MessageConsumer, Destination, etc. If that's the case, it's a "mere" matter of configuration.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文