Quarkus 中消息队列连接错误

发布于 2025-01-15 13:20:16 字数 7359 浏览 1 评论 0原文

我正在尝试使用/写入 Quarkus 中的消息队列,但无法执行此操作。我有一个示例代码,可以使用它连接到队列,但它是使用基于 javax.jms 的 com.ibm.mq.allclient 库制作的,

    <dependency>
      <groupId>com.ibm.mq</groupId>
      <artifactId>com.ibm.mq.allclient</artifactId>
      <version>9.0.4.0</version>
    </dependency>

并使用以下参数进行连接:主机名、端口、名称、通道、队列名称。

使用 com.ibm.mq.allclient 库创建消费和写入连接的示例代码如下:

import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnection;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.mq.jms.MQQueueSession;

import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyClass {

    private static Logger logger = LoggerFactory.getLogger(MyClass.class);

    private MQQueueConnection connectionRq;
    private MQQueueConnection connectionRs;

    public void initConnection(){
        
        try {
            MQQueueConnectionFactory connectionFactoryRq = new MQQueueConnectionFactory();
            connectionFactoryRq.setHostName("localhost");
            connectionFactoryRq.setPort(5672);
            connectionFactoryRq.setTransportType(1);
            connectionFactoryRq.setQueueManager("QM_NAME");
            connectionFactoryRq.setChannel("CHANNEL");

            MQQueueConnectionFactory connectionFactoryRs = new MQQueueConnectionFactory();
            connectionFactoryRs.setHostName("localhost");
            connectionFactoryRs.setPort(5672);
            connectionFactoryRs.setTransportType(1);
            connectionFactoryRs.setQueueManager("QM_NAME");
            connectionFactoryRs.setChannel("CHANNEL");

            connectionRq = (MQQueueConnection) connectionFactoryRq.createQueueConnection();
            connectionRs = (MQQueueConnection) connectionFactoryRs.createQueueConnection();


        } catch (Exception e) {
            logger.info(e.getMessage());
        }
    }

    public String sendMessage(String msg, String correlativeId){
        
        String corId = null;
        MQQueueSession sessionRq = null;
        MQQueueSession sessionRs = null;
        MessageProducer producer = null;
        try {
            sessionRq = (MQQueueSession) connectionRq.createQueueSession(false, 1);
            MQQueue queueRq = (MQQueue) sessionRq.createQueue("queue:///QUEUENAME.RQ");
            queueRq.setMessageBodyStyle(1);
            queueRq.setTargetClient(1);

            sessionRs = (MQQueueSession) connectionRs.createQueueSession(false, 1);
            MQQueue queueRs = (MQQueue) sessionRs.createQueue("queue:///QUEUENAME.RS");

            producer = sessionRq.createProducer(queueRq);

            Message messageRq = sessionRq.createTextMessage(msg);
            messageRq.setJMSReplyTo(queueRs);
            messageRq.setIntProperty("JMS_IBM_Character_Set", 819);
            messageRq.setIntProperty("JMS_IBM_Encoding", 273);
            messageRq.setIntProperty("JMS_IBM_MsgType", 8);
            messageRq.setJMSMessageID(correlativeId);
            messageRq.setJMSCorrelationIDAsBytes(correlativeId.getBytes());
            messageRq.setJMSPriority(1);
            messageRq.setJMSType("Datagram");

            producer.send(messageRq);

            corId = messageRq.getJMSCorrelationID();
        } catch (Exception e) {
            logger.info(e.getMessage());
        }  finally {
            try {
                sessionRq.close();
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
            try {
                sessionRs.close();
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
            try {
                producer.close();
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
        }
        return corId;
    }

    public String consumerMessage(String correlativeId){

        String msg = null;
        MQQueueSession sessionRs = null;
        MessageConsumer consumer = null;
        try {
            sessionRs = (MQQueueSession) connectionRs.createQueueSession(false, 1);
            MQQueue queueRs = (MQQueue) sessionRs.createQueue("queue:///QUEUENAME.RS");
            consumer = sessionRs.createConsumer(queueRs, "JMSCorrelationID='" + correlativeId + "'");
            connectionRs.start();
            Message messageRs = consumer.receive(10000L);
            msg = ((TextMessage) messageRs).getText();
        } catch (Exception e) {
            logger.info(e.getMessage());
        }  finally {
            try {
                sessionRs.close();
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
            try {
                consumer.close();
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
        }
        return msg;
    }

    public static void main(String[] args) {
        MyClass myClass = new MyClass();
        myClass.initConnection();
        String corId = myClass.sendMessage("Test Message Send", "UNIQUE");
        String message = myClass.consumerMessage(corId);
        logger.info("The message: " + message);
    }
}

上面的代码工作正常,问题是该库与本机 Quarkus 编译器不兼容。

为了与 Quarkus 中的 MQ 连接,我使用该库:

    <dependency>
      <groupId>org.amqphub.quarkus</groupId>
      <artifactId>quarkus-qpid-jms</artifactId>
    </dependency>

在 application.properties 中我分配:

quarkus.qpid-jms.url=amqp://localhost:5672

当尝试在以下位置运行示例项目时: Quarkus Qpid JMS Quickstart 向我抛出以下错误:

__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
2022-03-17 12:02:31,489 INFO  [org.acm.jms.PriceConsumer] (pool-11-thread-1) Writing MQ Client...
2022-03-17 12:02:31,486 INFO  [org.acm.jms.PriceConsumer] (pool-10-thread-1) Reading MQ Client...
2022-03-17 12:02:31,757 INFO  [io.quarkus] (Quarkus Main Thread) jms-quickstart 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.2.Final) started in 4.626s. Listening on: http://localhost:8080
2022-03-17 12:02:31,758 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-03-17 12:02:31,759 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, qpid-jms, resteasy, smallrye-context-propagation, vertx]
2022-03-17 12:02:34,068 ERROR [org.apa.qpi.jms.JmsConnection] (pool-11-thread-1) Failed to connect to remote at: amqp://localhost:5672
2022-03-17 12:02:34,068 ERROR [org.apa.qpi.jms.JmsConnection] (pool-10-thread-1) Failed to connect to remote at: amqp://localhost:5672

我知道这很可能是一个配置错误,我错过了一些东西,但我是新的夸库斯和我已经阅读并尝试了很多东西,以至于我已经崩溃了。

我很感激任何形式的帮助,因为这会很棒,或者至少也欢迎文档或指导我的东西。

参考文档:

I am trying to consume/write to a messaging queue in Quarkus, but have been unable to do so. I have an example code with which I can connect to the queue but it is made with the com.ibm.mq.allclient library based on javax.jms,

    <dependency>
      <groupId>com.ibm.mq</groupId>
      <artifactId>com.ibm.mq.allclient</artifactId>
      <version>9.0.4.0</version>
    </dependency>

and to connect use the parameters: hostname, port, name, channel, queuename.

The example code, using the com.ibm.mq.allclient library to create the connections for consumption and writing, is as follows:

import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnection;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.mq.jms.MQQueueSession;

import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyClass {

    private static Logger logger = LoggerFactory.getLogger(MyClass.class);

    private MQQueueConnection connectionRq;
    private MQQueueConnection connectionRs;

    public void initConnection(){
        
        try {
            MQQueueConnectionFactory connectionFactoryRq = new MQQueueConnectionFactory();
            connectionFactoryRq.setHostName("localhost");
            connectionFactoryRq.setPort(5672);
            connectionFactoryRq.setTransportType(1);
            connectionFactoryRq.setQueueManager("QM_NAME");
            connectionFactoryRq.setChannel("CHANNEL");

            MQQueueConnectionFactory connectionFactoryRs = new MQQueueConnectionFactory();
            connectionFactoryRs.setHostName("localhost");
            connectionFactoryRs.setPort(5672);
            connectionFactoryRs.setTransportType(1);
            connectionFactoryRs.setQueueManager("QM_NAME");
            connectionFactoryRs.setChannel("CHANNEL");

            connectionRq = (MQQueueConnection) connectionFactoryRq.createQueueConnection();
            connectionRs = (MQQueueConnection) connectionFactoryRs.createQueueConnection();


        } catch (Exception e) {
            logger.info(e.getMessage());
        }
    }

    public String sendMessage(String msg, String correlativeId){
        
        String corId = null;
        MQQueueSession sessionRq = null;
        MQQueueSession sessionRs = null;
        MessageProducer producer = null;
        try {
            sessionRq = (MQQueueSession) connectionRq.createQueueSession(false, 1);
            MQQueue queueRq = (MQQueue) sessionRq.createQueue("queue:///QUEUENAME.RQ");
            queueRq.setMessageBodyStyle(1);
            queueRq.setTargetClient(1);

            sessionRs = (MQQueueSession) connectionRs.createQueueSession(false, 1);
            MQQueue queueRs = (MQQueue) sessionRs.createQueue("queue:///QUEUENAME.RS");

            producer = sessionRq.createProducer(queueRq);

            Message messageRq = sessionRq.createTextMessage(msg);
            messageRq.setJMSReplyTo(queueRs);
            messageRq.setIntProperty("JMS_IBM_Character_Set", 819);
            messageRq.setIntProperty("JMS_IBM_Encoding", 273);
            messageRq.setIntProperty("JMS_IBM_MsgType", 8);
            messageRq.setJMSMessageID(correlativeId);
            messageRq.setJMSCorrelationIDAsBytes(correlativeId.getBytes());
            messageRq.setJMSPriority(1);
            messageRq.setJMSType("Datagram");

            producer.send(messageRq);

            corId = messageRq.getJMSCorrelationID();
        } catch (Exception e) {
            logger.info(e.getMessage());
        }  finally {
            try {
                sessionRq.close();
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
            try {
                sessionRs.close();
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
            try {
                producer.close();
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
        }
        return corId;
    }

    public String consumerMessage(String correlativeId){

        String msg = null;
        MQQueueSession sessionRs = null;
        MessageConsumer consumer = null;
        try {
            sessionRs = (MQQueueSession) connectionRs.createQueueSession(false, 1);
            MQQueue queueRs = (MQQueue) sessionRs.createQueue("queue:///QUEUENAME.RS");
            consumer = sessionRs.createConsumer(queueRs, "JMSCorrelationID='" + correlativeId + "'");
            connectionRs.start();
            Message messageRs = consumer.receive(10000L);
            msg = ((TextMessage) messageRs).getText();
        } catch (Exception e) {
            logger.info(e.getMessage());
        }  finally {
            try {
                sessionRs.close();
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
            try {
                consumer.close();
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
        }
        return msg;
    }

    public static void main(String[] args) {
        MyClass myClass = new MyClass();
        myClass.initConnection();
        String corId = myClass.sendMessage("Test Message Send", "UNIQUE");
        String message = myClass.consumerMessage(corId);
        logger.info("The message: " + message);
    }
}

The code above works fine, the problem is that the library is not compatible with the native Quarkus compiler.

For the connection with the MQ in Quarkus I am using the library:

    <dependency>
      <groupId>org.amqphub.quarkus</groupId>
      <artifactId>quarkus-qpid-jms</artifactId>
    </dependency>

In the application.properties I assign:

quarkus.qpid-jms.url=amqp://localhost:5672

And when trying to run the sample project in: Quarkus Qpid JMS Quickstart throws me the following error:

__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
2022-03-17 12:02:31,489 INFO  [org.acm.jms.PriceConsumer] (pool-11-thread-1) Writing MQ Client...
2022-03-17 12:02:31,486 INFO  [org.acm.jms.PriceConsumer] (pool-10-thread-1) Reading MQ Client...
2022-03-17 12:02:31,757 INFO  [io.quarkus] (Quarkus Main Thread) jms-quickstart 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.2.Final) started in 4.626s. Listening on: http://localhost:8080
2022-03-17 12:02:31,758 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-03-17 12:02:31,759 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, qpid-jms, resteasy, smallrye-context-propagation, vertx]
2022-03-17 12:02:34,068 ERROR [org.apa.qpi.jms.JmsConnection] (pool-11-thread-1) Failed to connect to remote at: amqp://localhost:5672
2022-03-17 12:02:34,068 ERROR [org.apa.qpi.jms.JmsConnection] (pool-10-thread-1) Failed to connect to remote at: amqp://localhost:5672

I know that it is most likely a configuration error and I am missing something, but I am new to Quarkus and I have already read and tried so many things that I have already collapsed.

I appreciate any kind of help as it would be great, or at least documentation or something to guide me is also welcome.

Documentation consulted:

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

我不会写诗 2025-01-22 13:20:16

当您尝试连接到 localhost:5672 时,出现连接失败。这意味着您没有在该 TCP/IP 端口上侦听任何内容。您引用的指令有一个模板 MQSC 脚本,该脚本设置队列管理器上允许 AMQP 连接所需的资源。该脚本可以在 这里

此脚本中与您报告的具体问题最相关的命令可能是这些(尽管我建议您运行整个脚本):

START SERVICE(SYSTEM.AMQP.SERVICE)
START CHANNEL(SYSTEM.DEF.AMQP)

You are getting a connection failure when you try to connect to localhost:5672. This implies that you do not have anything listening on that TCP/IP port. The instructions you reference have a template MQSC script which sets up the required resources needed on the queue manager to allow an AMQP connection. This script can be found here.

The most pertinent commands within this script for your specific reported problem are likely these (although I suggest you run the whole script):

START SERVICE(SYSTEM.AMQP.SERVICE)
START CHANNEL(SYSTEM.DEF.AMQP)
残花月 2025-01-22 13:20:16

您的队列管理器使用哪个版本的 IBM MQ?

因为在 IBM MQ v9.2.1 之前,它仅支持通过 AMQP 的 Pub/Sub。如果您想使用点对点拓扑(获取/放入队列),那么您的队列管理器至少需要 MQ v9.2.1。请参阅 这里

第一段有蓝色标签“v9.2.1”。这意味着该功能何时引入 IBM MQ。

现在,您可以通过设置指向队列的管理主题对象,然后使用 AMQP Pub/Sub 访问队列来伪造它,但将队列管理器升级到 MQ v9.2.1 或更高版本会简单得多。当前的 IBM MQ CD 版本是 v9.2.5。

注意:IBM MQ v9.2 的 LTS 版本尚不具备该功能。它将包含在 MQ 的下一个主要版本中(即 v9.3 或任何名称)。

What version of IBM MQ are you using for your queue manager?

Because before IBM MQ v9.2.1, it only supported Pub/Sub via AMQP. If you want to use point-to-point topology (getting/putting to a queue) then you need your queue manager to be at least MQ v9.2.1. See here.

The first paragraph has the blue label "v9.2.1". That means when the feature was introduced to IBM MQ.

Now you can fake it, by setting up an administrative topic object that points to a queue then use AMQP Pub/Sub to access the queue but it would be far simpler to upgrade your queue manager to the MQ v9.2.1 or higher. The current IBM MQ CD release is v9.2.5.

Note: The LTS release of IBM MQ v9.2 does not have the feature yet. It will be included in the next major release of MQ (i.e. v9.3 or whatever it will be called).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文