Quarkus 中消息队列连接错误
我正在尝试使用/写入 Quarkus 中的消息队列,但无法执行此操作。我有一个示例代码,可以使用它连接到队列,但它是使用基于 javax.jms 的 com.ibm.mq.allclient 库制作的,
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.0.4.0</version>
</dependency>
并使用以下参数进行连接:主机名、端口、名称、通道、队列名称。
使用 com.ibm.mq.allclient 库创建消费和写入连接的示例代码如下:
import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnection;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.mq.jms.MQQueueSession;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyClass {
private static Logger logger = LoggerFactory.getLogger(MyClass.class);
private MQQueueConnection connectionRq;
private MQQueueConnection connectionRs;
public void initConnection(){
try {
MQQueueConnectionFactory connectionFactoryRq = new MQQueueConnectionFactory();
connectionFactoryRq.setHostName("localhost");
connectionFactoryRq.setPort(5672);
connectionFactoryRq.setTransportType(1);
connectionFactoryRq.setQueueManager("QM_NAME");
connectionFactoryRq.setChannel("CHANNEL");
MQQueueConnectionFactory connectionFactoryRs = new MQQueueConnectionFactory();
connectionFactoryRs.setHostName("localhost");
connectionFactoryRs.setPort(5672);
connectionFactoryRs.setTransportType(1);
connectionFactoryRs.setQueueManager("QM_NAME");
connectionFactoryRs.setChannel("CHANNEL");
connectionRq = (MQQueueConnection) connectionFactoryRq.createQueueConnection();
connectionRs = (MQQueueConnection) connectionFactoryRs.createQueueConnection();
} catch (Exception e) {
logger.info(e.getMessage());
}
}
public String sendMessage(String msg, String correlativeId){
String corId = null;
MQQueueSession sessionRq = null;
MQQueueSession sessionRs = null;
MessageProducer producer = null;
try {
sessionRq = (MQQueueSession) connectionRq.createQueueSession(false, 1);
MQQueue queueRq = (MQQueue) sessionRq.createQueue("queue:///QUEUENAME.RQ");
queueRq.setMessageBodyStyle(1);
queueRq.setTargetClient(1);
sessionRs = (MQQueueSession) connectionRs.createQueueSession(false, 1);
MQQueue queueRs = (MQQueue) sessionRs.createQueue("queue:///QUEUENAME.RS");
producer = sessionRq.createProducer(queueRq);
Message messageRq = sessionRq.createTextMessage(msg);
messageRq.setJMSReplyTo(queueRs);
messageRq.setIntProperty("JMS_IBM_Character_Set", 819);
messageRq.setIntProperty("JMS_IBM_Encoding", 273);
messageRq.setIntProperty("JMS_IBM_MsgType", 8);
messageRq.setJMSMessageID(correlativeId);
messageRq.setJMSCorrelationIDAsBytes(correlativeId.getBytes());
messageRq.setJMSPriority(1);
messageRq.setJMSType("Datagram");
producer.send(messageRq);
corId = messageRq.getJMSCorrelationID();
} catch (Exception e) {
logger.info(e.getMessage());
} finally {
try {
sessionRq.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
try {
sessionRs.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
try {
producer.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
}
return corId;
}
public String consumerMessage(String correlativeId){
String msg = null;
MQQueueSession sessionRs = null;
MessageConsumer consumer = null;
try {
sessionRs = (MQQueueSession) connectionRs.createQueueSession(false, 1);
MQQueue queueRs = (MQQueue) sessionRs.createQueue("queue:///QUEUENAME.RS");
consumer = sessionRs.createConsumer(queueRs, "JMSCorrelationID='" + correlativeId + "'");
connectionRs.start();
Message messageRs = consumer.receive(10000L);
msg = ((TextMessage) messageRs).getText();
} catch (Exception e) {
logger.info(e.getMessage());
} finally {
try {
sessionRs.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
try {
consumer.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
}
return msg;
}
public static void main(String[] args) {
MyClass myClass = new MyClass();
myClass.initConnection();
String corId = myClass.sendMessage("Test Message Send", "UNIQUE");
String message = myClass.consumerMessage(corId);
logger.info("The message: " + message);
}
}
上面的代码工作正常,问题是该库与本机 Quarkus 编译器不兼容。
为了与 Quarkus 中的 MQ 连接,我使用该库:
<dependency>
<groupId>org.amqphub.quarkus</groupId>
<artifactId>quarkus-qpid-jms</artifactId>
</dependency>
在 application.properties 中我分配:
quarkus.qpid-jms.url=amqp://localhost:5672
当尝试在以下位置运行示例项目时: Quarkus Qpid JMS Quickstart 向我抛出以下错误:
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2022-03-17 12:02:31,489 INFO [org.acm.jms.PriceConsumer] (pool-11-thread-1) Writing MQ Client...
2022-03-17 12:02:31,486 INFO [org.acm.jms.PriceConsumer] (pool-10-thread-1) Reading MQ Client...
2022-03-17 12:02:31,757 INFO [io.quarkus] (Quarkus Main Thread) jms-quickstart 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.2.Final) started in 4.626s. Listening on: http://localhost:8080
2022-03-17 12:02:31,758 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-03-17 12:02:31,759 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, qpid-jms, resteasy, smallrye-context-propagation, vertx]
2022-03-17 12:02:34,068 ERROR [org.apa.qpi.jms.JmsConnection] (pool-11-thread-1) Failed to connect to remote at: amqp://localhost:5672
2022-03-17 12:02:34,068 ERROR [org.apa.qpi.jms.JmsConnection] (pool-10-thread-1) Failed to connect to remote at: amqp://localhost:5672
我知道这很可能是一个配置错误,我错过了一些东西,但我是新的夸库斯和我已经阅读并尝试了很多东西,以至于我已经崩溃了。
我很感激任何形式的帮助,因为这会很棒,或者至少也欢迎文档或指导我的东西。
参考文档:
I am trying to consume/write to a messaging queue in Quarkus, but have been unable to do so. I have an example code with which I can connect to the queue but it is made with the com.ibm.mq.allclient library based on javax.jms,
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.0.4.0</version>
</dependency>
and to connect use the parameters: hostname, port, name, channel, queuename.
The example code, using the com.ibm.mq.allclient library to create the connections for consumption and writing, is as follows:
import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnection;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.mq.jms.MQQueueSession;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyClass {
private static Logger logger = LoggerFactory.getLogger(MyClass.class);
private MQQueueConnection connectionRq;
private MQQueueConnection connectionRs;
public void initConnection(){
try {
MQQueueConnectionFactory connectionFactoryRq = new MQQueueConnectionFactory();
connectionFactoryRq.setHostName("localhost");
connectionFactoryRq.setPort(5672);
connectionFactoryRq.setTransportType(1);
connectionFactoryRq.setQueueManager("QM_NAME");
connectionFactoryRq.setChannel("CHANNEL");
MQQueueConnectionFactory connectionFactoryRs = new MQQueueConnectionFactory();
connectionFactoryRs.setHostName("localhost");
connectionFactoryRs.setPort(5672);
connectionFactoryRs.setTransportType(1);
connectionFactoryRs.setQueueManager("QM_NAME");
connectionFactoryRs.setChannel("CHANNEL");
connectionRq = (MQQueueConnection) connectionFactoryRq.createQueueConnection();
connectionRs = (MQQueueConnection) connectionFactoryRs.createQueueConnection();
} catch (Exception e) {
logger.info(e.getMessage());
}
}
public String sendMessage(String msg, String correlativeId){
String corId = null;
MQQueueSession sessionRq = null;
MQQueueSession sessionRs = null;
MessageProducer producer = null;
try {
sessionRq = (MQQueueSession) connectionRq.createQueueSession(false, 1);
MQQueue queueRq = (MQQueue) sessionRq.createQueue("queue:///QUEUENAME.RQ");
queueRq.setMessageBodyStyle(1);
queueRq.setTargetClient(1);
sessionRs = (MQQueueSession) connectionRs.createQueueSession(false, 1);
MQQueue queueRs = (MQQueue) sessionRs.createQueue("queue:///QUEUENAME.RS");
producer = sessionRq.createProducer(queueRq);
Message messageRq = sessionRq.createTextMessage(msg);
messageRq.setJMSReplyTo(queueRs);
messageRq.setIntProperty("JMS_IBM_Character_Set", 819);
messageRq.setIntProperty("JMS_IBM_Encoding", 273);
messageRq.setIntProperty("JMS_IBM_MsgType", 8);
messageRq.setJMSMessageID(correlativeId);
messageRq.setJMSCorrelationIDAsBytes(correlativeId.getBytes());
messageRq.setJMSPriority(1);
messageRq.setJMSType("Datagram");
producer.send(messageRq);
corId = messageRq.getJMSCorrelationID();
} catch (Exception e) {
logger.info(e.getMessage());
} finally {
try {
sessionRq.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
try {
sessionRs.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
try {
producer.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
}
return corId;
}
public String consumerMessage(String correlativeId){
String msg = null;
MQQueueSession sessionRs = null;
MessageConsumer consumer = null;
try {
sessionRs = (MQQueueSession) connectionRs.createQueueSession(false, 1);
MQQueue queueRs = (MQQueue) sessionRs.createQueue("queue:///QUEUENAME.RS");
consumer = sessionRs.createConsumer(queueRs, "JMSCorrelationID='" + correlativeId + "'");
connectionRs.start();
Message messageRs = consumer.receive(10000L);
msg = ((TextMessage) messageRs).getText();
} catch (Exception e) {
logger.info(e.getMessage());
} finally {
try {
sessionRs.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
try {
consumer.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
}
return msg;
}
public static void main(String[] args) {
MyClass myClass = new MyClass();
myClass.initConnection();
String corId = myClass.sendMessage("Test Message Send", "UNIQUE");
String message = myClass.consumerMessage(corId);
logger.info("The message: " + message);
}
}
The code above works fine, the problem is that the library is not compatible with the native Quarkus compiler.
For the connection with the MQ in Quarkus I am using the library:
<dependency>
<groupId>org.amqphub.quarkus</groupId>
<artifactId>quarkus-qpid-jms</artifactId>
</dependency>
In the application.properties I assign:
quarkus.qpid-jms.url=amqp://localhost:5672
And when trying to run the sample project in: Quarkus Qpid JMS Quickstart throws me the following error:
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2022-03-17 12:02:31,489 INFO [org.acm.jms.PriceConsumer] (pool-11-thread-1) Writing MQ Client...
2022-03-17 12:02:31,486 INFO [org.acm.jms.PriceConsumer] (pool-10-thread-1) Reading MQ Client...
2022-03-17 12:02:31,757 INFO [io.quarkus] (Quarkus Main Thread) jms-quickstart 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.2.Final) started in 4.626s. Listening on: http://localhost:8080
2022-03-17 12:02:31,758 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-03-17 12:02:31,759 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, qpid-jms, resteasy, smallrye-context-propagation, vertx]
2022-03-17 12:02:34,068 ERROR [org.apa.qpi.jms.JmsConnection] (pool-11-thread-1) Failed to connect to remote at: amqp://localhost:5672
2022-03-17 12:02:34,068 ERROR [org.apa.qpi.jms.JmsConnection] (pool-10-thread-1) Failed to connect to remote at: amqp://localhost:5672
I know that it is most likely a configuration error and I am missing something, but I am new to Quarkus and I have already read and tried so many things that I have already collapsed.
I appreciate any kind of help as it would be great, or at least documentation or something to guide me is also welcome.
Documentation consulted:
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
当您尝试连接到
localhost:5672
时,出现连接失败。这意味着您没有在该 TCP/IP 端口上侦听任何内容。您引用的指令有一个模板 MQSC 脚本,该脚本设置队列管理器上允许 AMQP 连接所需的资源。该脚本可以在 这里。此脚本中与您报告的具体问题最相关的命令可能是这些(尽管我建议您运行整个脚本):
You are getting a connection failure when you try to connect to
localhost:5672
. This implies that you do not have anything listening on that TCP/IP port. The instructions you reference have a template MQSC script which sets up the required resources needed on the queue manager to allow an AMQP connection. This script can be found here.The most pertinent commands within this script for your specific reported problem are likely these (although I suggest you run the whole script):
您的队列管理器使用哪个版本的 IBM MQ?
因为在 IBM MQ v9.2.1 之前,它仅支持通过 AMQP 的 Pub/Sub。如果您想使用点对点拓扑(获取/放入队列),那么您的队列管理器至少需要 MQ v9.2.1。请参阅 这里。
第一段有蓝色标签“v9.2.1”。这意味着该功能何时引入 IBM MQ。
现在,您可以通过设置指向队列的管理主题对象,然后使用 AMQP Pub/Sub 访问队列来伪造它,但将队列管理器升级到 MQ v9.2.1 或更高版本会简单得多。当前的 IBM MQ CD 版本是 v9.2.5。
注意:IBM MQ v9.2 的 LTS 版本尚不具备该功能。它将包含在 MQ 的下一个主要版本中(即 v9.3 或任何名称)。
What version of IBM MQ are you using for your queue manager?
Because before IBM MQ v9.2.1, it only supported Pub/Sub via AMQP. If you want to use point-to-point topology (getting/putting to a queue) then you need your queue manager to be at least MQ v9.2.1. See here.
The first paragraph has the blue label "v9.2.1". That means when the feature was introduced to IBM MQ.
Now you can fake it, by setting up an administrative topic object that points to a queue then use AMQP Pub/Sub to access the queue but it would be far simpler to upgrade your queue manager to the MQ v9.2.1 or higher. The current IBM MQ CD release is v9.2.5.
Note: The LTS release of IBM MQ v9.2 does not have the feature yet. It will be included in the next major release of MQ (i.e. v9.3 or whatever it will be called).