IBM MQ 消息监听器

发布于 2024-08-07 01:35:08 字数 89 浏览 11 评论 0原文

您好,有谁知道如何使用 IBM MQ 创建消息监听器?我知道如何使用 JMS 规范来做到这一点,但我不确定如何为 IBM MQ 做到这一点。非常感谢任何链接或指针。

Hi does anyone know how to create a message listener using IBM MQ? I know how to do it using the JMS spec but I am not sure how to do it for IBM MQ. Any links or pointers are greatly appreciated.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(7

吹泡泡o 2024-08-14 01:35:08

尽管前面的响应者指出有一个 WMQ Java API,但 WMQ 也支持 JMS,因此这里有一些资源可以帮助您入门。

请查看这篇文章:IBM WebSphere Developer Technical Journal:运行独立的WebSphere MQ V6.0 上的 Java 应用程序

另外,如果您已经安装了完整的 WMQ 客户端并且不仅仅是获取 jar,那么您将安装大量示例代码。默认情况下,这些文件将位于 C:\Program Files\IBM\WebSphere MQ\tools\jms 或 /opt/mqm/samp 中,具体取决于您的平台。

如果您需要 WMQ 客户端安装介质,请获取它 此处。请注意,这是 WMQ v7 客户端,而不是 v6 客户端。它与 v6 QMgr 兼容,但由于 v6 已于 2011 年 9 月终止,您应该在 v7 客户端上进行新的开发,如果可能的话,还应该在 v7 QMgr 上进行新的开发。如果双方都是 v7,则可以提供许多功能和性能增强。

您可以获取产品手册 此处

最后,请确保在收到 JMS 异常时打印链接的异常。这不是 WMQ 的事情,而是 JMS 的事情。 Sun 为 JMS 异常提供了多级数据结构,真正有趣的部分通常在嵌套级别中。这不是什么大问题,可以用几行代码来实现:

try {
  .
  . code that might throw a JMSException
  .
} catch (JMSException je) {
  System.err.println("caught "+je);
  Exception e = je.getLinkedException();
  if (e != null) {
    System.err.println("linked exception: "+e);
  } else {
    System.err.println("No linked exception found.");
  }
}

这有助于确定 JMS 错误与传输错误之间的区别。例如,JMS 安全错误可能是 WMQ 2035,也可能是 JSSE 配置,或者应用程序可能无法访问文件系统中的某些内容。其中只有一个值得花费大量时间挖掘 WMQ 错误日志,并且只有通过打印链接的异常,您才能判断是否是那个。

Although there is a WMQ Java API as noted by the previous responders, WMQ supports JMS as well so here are some resources to get you started there.

Take a look at this article: IBM WebSphere Developer Technical Journal: Running a standalone Java application on WebSphere MQ V6.0

Also, if you have installed the full WMQ client and not just grabbed the jars then you will have lots of sample code installed. By default, these will live in C:\Program Files\IBM\WebSphere MQ\tools\jms or /opt/mqm/samp depending on your platform.

If you need the WMQ Client install media, get it here. Note that this is the WMQ v7 client and not the v6 client. It is compatible with the v6 QMgr but since v6 is end-of-life as of September 2011 you should be doing new development on the v7 client and, if possible, a v7 QMgr. There are a lot of functional and performance enhancements available if both sides are v7.

You can get the product manual here if you need it.

Finally, please be sure when you get a JMS exception to print the linked exception. This is not a WMQ thing, rather it's a JMS thing. Sun provided a multi-level data structure for JMS exceptions and the really interesting parts are often in the nested level. This is not a big deal and can be implemented in a few lines:

try {
  .
  . code that might throw a JMSException
  .
} catch (JMSException je) {
  System.err.println("caught "+je);
  Exception e = je.getLinkedException();
  if (e != null) {
    System.err.println("linked exception: "+e);
  } else {
    System.err.println("No linked exception found.");
  }
}

This helps to determine the difference between a JMS error versus a transport error. For example a JMS security error might be a WMQ 2035, or it might be the JSSE configuration, or the app might not have access to something in the file system. Only one of these is worth spending a lot of time digging through the WMQ error logs for and only by printing the linked exception will you be able to tell if it's that one.

单身情人 2024-08-14 01:35:08

查看 IBM 帮助:编写 WebSphere MQ 基础 Java 应用程序

IBM 有一个用于与队列交互的 API。这是他们的示例:

import com.ibm.mq.*;            // Include the WebSphere MQ classes for Java package


public class MQSample
{
  private String qManager = "your_Q_manager";  // define name of queue
                                               // manager to connect to.
  private MQQueueManager qMgr;                 // define a queue manager
                                               // object
  public static void main(String args[]) {
     new MQSample();
  }

  public MQSample() {
   try {

      // Create a connection to the queue manager

      qMgr = new MQQueueManager(qManager);

      // Set up the options on the queue we wish to open...
      // Note. All WebSphere MQ Options are prefixed with MQC in Java.

      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF |
                        MQC.MQOO_OUTPUT ;

      // Now specify the queue that we wish to open,
      // and the open options...

      MQQueue system_default_local_queue =
              qMgr.accessQueue("SYSTEM.DEFAULT.LOCAL.QUEUE",
                               openOptions);

      // Define a simple WebSphere MQ message, and write some text in UTF format..

      MQMessage hello_world = new MQMessage();
      hello_world.writeUTF("Hello World!");

      // specify the message options...

      MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the // defaults,
                                                           // same as MQPMO_DEFAULT

      // put the message on the queue

      system_default_local_queue.put(hello_world,pmo);

      // get the message back again...
      // First define a WebSphere MQ message buffer to receive the message into..

      MQMessage retrievedMessage = new MQMessage();
      retrievedMessage.messageId = hello_world.messageId;

      // Set the get message options...

      MQGetMessageOptions gmo = new MQGetMessageOptions(); // accept the defaults
                                                           // same as  MQGMO_DEFAULT
      // get the message off the queue...

      system_default_local_queue.get(retrievedMessage, gmo);

      // And prove we have the message by displaying the UTF message text

      String msgText = retrievedMessage.readUTF();
      System.out.println("The message is: " + msgText);
      // Close the queue...
      system_default_local_queue.close();
      // Disconnect from the queue manager

      qMgr.disconnect();
    }
      // If an error has occurred in the above, try to identify what went wrong
      // Was it a WebSphere MQ error?
    catch (MQException ex)
    {
      System.out.println("A WebSphere MQ error occurred : Completion code " +
                         ex.completionCode + " Reason code " + ex.reasonCode);
    }
      // Was it a Java buffer space error?
    catch (java.io.IOException ex)
    {
      System.out.println("An error occurred whilst writing to the message buffer: " + ex);
    }
  }
} // end of sample

我不确定 IBM jar 是否位于基本 Maven 存储库中。我知道过去我必须从本地 IBM 安装中提取它们并将它们放入本地 SVN 存储库中。我正在使用以下罐子:

<dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mq</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>
    <dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mq.pcf</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>
    <dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mqbind</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>
    <dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mqjms</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>

Take a look at IBM Help: Writing WebSphere MQ base Java applications

IBM has an API for interacting with queues. Here's their sample:

import com.ibm.mq.*;            // Include the WebSphere MQ classes for Java package


public class MQSample
{
  private String qManager = "your_Q_manager";  // define name of queue
                                               // manager to connect to.
  private MQQueueManager qMgr;                 // define a queue manager
                                               // object
  public static void main(String args[]) {
     new MQSample();
  }

  public MQSample() {
   try {

      // Create a connection to the queue manager

      qMgr = new MQQueueManager(qManager);

      // Set up the options on the queue we wish to open...
      // Note. All WebSphere MQ Options are prefixed with MQC in Java.

      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF |
                        MQC.MQOO_OUTPUT ;

      // Now specify the queue that we wish to open,
      // and the open options...

      MQQueue system_default_local_queue =
              qMgr.accessQueue("SYSTEM.DEFAULT.LOCAL.QUEUE",
                               openOptions);

      // Define a simple WebSphere MQ message, and write some text in UTF format..

      MQMessage hello_world = new MQMessage();
      hello_world.writeUTF("Hello World!");

      // specify the message options...

      MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the // defaults,
                                                           // same as MQPMO_DEFAULT

      // put the message on the queue

      system_default_local_queue.put(hello_world,pmo);

      // get the message back again...
      // First define a WebSphere MQ message buffer to receive the message into..

      MQMessage retrievedMessage = new MQMessage();
      retrievedMessage.messageId = hello_world.messageId;

      // Set the get message options...

      MQGetMessageOptions gmo = new MQGetMessageOptions(); // accept the defaults
                                                           // same as  MQGMO_DEFAULT
      // get the message off the queue...

      system_default_local_queue.get(retrievedMessage, gmo);

      // And prove we have the message by displaying the UTF message text

      String msgText = retrievedMessage.readUTF();
      System.out.println("The message is: " + msgText);
      // Close the queue...
      system_default_local_queue.close();
      // Disconnect from the queue manager

      qMgr.disconnect();
    }
      // If an error has occurred in the above, try to identify what went wrong
      // Was it a WebSphere MQ error?
    catch (MQException ex)
    {
      System.out.println("A WebSphere MQ error occurred : Completion code " +
                         ex.completionCode + " Reason code " + ex.reasonCode);
    }
      // Was it a Java buffer space error?
    catch (java.io.IOException ex)
    {
      System.out.println("An error occurred whilst writing to the message buffer: " + ex);
    }
  }
} // end of sample

I'm not sure if the IBM jars are located at the base Maven repo. I know in the past I've had to extract them from a local IBM install and put them in an local SVN repo. I'm using the following jars:

<dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mq</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>
    <dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mq.pcf</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>
    <dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mqbind</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>
    <dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mqjms</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>
眼眸印温柔 2024-08-14 01:35:08

查看上面提供的示例。

具体来说,

MQGetMessageOptions gmo = new MQGetMessageOptions();       
system_default_local_queue.get(retrievedMessage, gmo);

您可以将 get 配置为在抛出 MQRC_NO_MSG_AVAILABLE 异常之前等待指定的时间。或者你可以永远等待。

gmo.waitInterval= qTimeout;
gmo.options = MQC.MQGMO_WAIT

因此,您可以创建一个线程来不断寻找新消息,然后将它们传递给处理程序。获取和放置不需要在同一线程甚至应用程序中。

我希望这有助于回答您的问题。

Take a look at the sample provided above.

Specifically at the lines

MQGetMessageOptions gmo = new MQGetMessageOptions();       
system_default_local_queue.get(retrievedMessage, gmo);

You can configure the get to wait for a specified time before throwing a MQRC_NO_MSG_AVAILABLE exception. Or you can wait forever.

gmo.waitInterval= qTimeout;
gmo.options = MQC.MQGMO_WAIT

So you can create a thread that keeps looking for new messages then passes them off to a handler. The getting and putting do not need to be in the same thread or even application.

I hope this helps answer your question.

情感失落者 2024-08-14 01:35:08

在获取消息之前的循环中,您可以指定如下,

gmo.options = MQC.MQGMO_WAIT
gmo.waitInterval = MQConstants.MQWI_UNLIMITED;

这使得循环将等待,直到队列中有消息。
对我来说,它类似于 MessageListerner

in the loop before getting the message you can specify as following

gmo.options = MQC.MQGMO_WAIT
gmo.waitInterval = MQConstants.MQWI_UNLIMITED;

this makes the loop will wait until there is a message in the queue.
To me, it is similar to MessageListerner

时光磨忆 2024-08-14 01:35:08

以防万一有人会像我一样在 stackoverflow 上搜索 MQ Listener...
由于 JMS 实现,这可能不是答案,但这正是我正在寻找的。
像这样的事情:

MQQueueConnectionFactory cf = new MQQueueConnectionFactory();
MQQueueConnection conn = (MQQueueConnection)cf.createQueueConnection();
MQQueueSession session = (MQQueueSession)conn.createSession(false, 1);

Queue queue = session.createQueue("QUEUE");

MQQueueReceiver receiver = (MQQueueReceiver)session.createReceiver(queue);

receiver.setMessageListener(new YourListener());

conn.start();

YourListener 应该实现 MessageListener 接口,并且您将在 onMessage(Message msg) 方法中接收消息。

Just in case anyone will google stackoverflow for MQ Listener like i did...
It might be not the answer due to JMS realization, but this is what I was looking for.
Something like this:

MQQueueConnectionFactory cf = new MQQueueConnectionFactory();
MQQueueConnection conn = (MQQueueConnection)cf.createQueueConnection();
MQQueueSession session = (MQQueueSession)conn.createSession(false, 1);

Queue queue = session.createQueue("QUEUE");

MQQueueReceiver receiver = (MQQueueReceiver)session.createReceiver(queue);

receiver.setMessageListener(new YourListener());

conn.start();

YourListener should implement MessageListener interface and you will receive you messages into onMessage(Message msg) method.

坏尐絯℡ 2024-08-14 01:35:08

您好,这是使用 IBM MQ 的消息侦听器的工作示例。在这里我还使用 spring 来创建 bean 等...

package queue.app;

import javax.annotation.PostConstruct;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;


@Component
public class QueueConsumer implements MessageListener{

    private Logger logger = Logger.getLogger(getClass());

    MQQueueConnectionFactory qcf = new MQQueueConnectionFactory();
    QueueConnection qc;
    Queue queue;
    QueueSession queueSession;
    QueueReceiver qr;

    @Value("${jms.hostName}")
    String jmsHost;
    @Value("${jms.port}")
    String jmsPort;
    @Value("${jms.queue.name}")
    String QUEUE_NAME;
    @Value("${jms.queueManager}")
    String jmsQueueMgr;
    @Value("${jms.username}")
    String jmsUserName;
    @Value("${jms.channel}")
    String jmsChannel;

    @PostConstruct
    public void init() throws Exception{
        qcf.setHostName (jmsHost);
        qcf.setPort (Integer.parseInt(jmsPort));
        qcf.setQueueManager (jmsQueueMgr);
        qcf.setChannel (jmsChannel);
        qcf.setTransportType (WMQConstants.WMQ_CM_CLIENT);
        qc = qcf.createQueueConnection ();

        queue = new MQQueue(QUEUE_NAME);
        qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
        queueSession = qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
        qr = queueSession.createReceiver(queue);
        qr.setMessageListener(this);
        qc.start();

    }


    @Override
    public void onMessage(Message message) {
        logger.info("Inside On Message...");
        long t1 = System.currentTimeMillis();
        logger.info("Message consumed at ...."+t1);

        try{
            if(message instanceof TextMessage) {
                logger.info("String message recieved >> "+((TextMessage) message).getText());
            }

        }catch(Exception e){
            e.printStackTrace();
        }

    }
}

下面是我拥有的依赖项..

<dependency>
            <groupId>com.sun.messaging.mq</groupId>
            <artifactId>fscontext</artifactId>
            <version>4.2</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>jms</artifactId>
            <version>1.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>3.2.17.RELEASE</version>
        </dependency>


        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.mq</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.mq.allclient</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.mq.jmqi</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.mqjms</artifactId>
            <version>1.0</version>
        </dependency>

Hello, here is the working example of message listener with IBM MQ. Here I used spring also to create beans etc...

package queue.app;

import javax.annotation.PostConstruct;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;


@Component
public class QueueConsumer implements MessageListener{

    private Logger logger = Logger.getLogger(getClass());

    MQQueueConnectionFactory qcf = new MQQueueConnectionFactory();
    QueueConnection qc;
    Queue queue;
    QueueSession queueSession;
    QueueReceiver qr;

    @Value("${jms.hostName}")
    String jmsHost;
    @Value("${jms.port}")
    String jmsPort;
    @Value("${jms.queue.name}")
    String QUEUE_NAME;
    @Value("${jms.queueManager}")
    String jmsQueueMgr;
    @Value("${jms.username}")
    String jmsUserName;
    @Value("${jms.channel}")
    String jmsChannel;

    @PostConstruct
    public void init() throws Exception{
        qcf.setHostName (jmsHost);
        qcf.setPort (Integer.parseInt(jmsPort));
        qcf.setQueueManager (jmsQueueMgr);
        qcf.setChannel (jmsChannel);
        qcf.setTransportType (WMQConstants.WMQ_CM_CLIENT);
        qc = qcf.createQueueConnection ();

        queue = new MQQueue(QUEUE_NAME);
        qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
        queueSession = qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
        qr = queueSession.createReceiver(queue);
        qr.setMessageListener(this);
        qc.start();

    }


    @Override
    public void onMessage(Message message) {
        logger.info("Inside On Message...");
        long t1 = System.currentTimeMillis();
        logger.info("Message consumed at ...."+t1);

        try{
            if(message instanceof TextMessage) {
                logger.info("String message recieved >> "+((TextMessage) message).getText());
            }

        }catch(Exception e){
            e.printStackTrace();
        }

    }
}

Below are the dependencies i have..

<dependency>
            <groupId>com.sun.messaging.mq</groupId>
            <artifactId>fscontext</artifactId>
            <version>4.2</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>jms</artifactId>
            <version>1.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>3.2.17.RELEASE</version>
        </dependency>


        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.mq</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.mq.allclient</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.mq.jmqi</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.mqjms</artifactId>
            <version>1.0</version>
        </dependency>
北斗星光 2024-08-14 01:35:08

除了现有答案之外,还有一点很重要:JMS 提供了 MessageListener,这是一个允许您以异步回调方式接收消息的类。

本机 API 没有等效功能!您必须根据需要重复调​​用 get(...)

An important point in addition to the existing answers: JMS provides MessageListener, a class that allows you to receive messages as asynchronous callbacks.

The native API has no equivalent feature! You have to repeatedly call get(...) as appropriate.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文