返回介绍

Spring Boot 中使用 RocketMQ

发布于 2025-02-18 23:57:56 字数 8104 浏览 0 评论 0 收藏 0

本文快速入门,RocketMQ 消息系统的安装部署,发送,和接收消息,监控消息,的详细说明。

环境需要

64 位操作系统,建议使用 Linux / Unix /

  • CentOs7.3
  • 64bit JDK 1.8+
  • Maven 3.2.x
  • Git 1.8.3.1

环境安装

请参考我的另一篇文章

搭建 Apache RocketMQ 单机环境

http://www.ymq.io/2018/02/01/RocketMQ-install

新加项目

新建一个 maven 项目,这里就不详细操作了,大家都会的

不过也可以下载我的示例源码,下载地址如下

GitHub 源码: https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rocketmq

添加依赖

在 POM 中添加如下依赖

<!-- RocketMq 客户端相关依赖 -->
<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-client</artifactId>
	<version>4.1.0-incubating</version>
</dependency>

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-common</artifactId>
	<version>4.1.0-incubating</version>
</dependency>

配置文件

在配置文件 application.properties 添加一下内容

# 消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer

# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer

# NameServer 地址
apache.rocketmq.namesrvAddr=192.168.252.121:9876

消息生产者

@Component
public class Producer {

  /**
   * 生产者的组名
   */
  @Value("${apache.rocketmq.producer.producerGroup}")
  private String producerGroup;

  /**
   * NameServer 地址
   */
  @Value("${apache.rocketmq.namesrvAddr}")
  private String namesrvAddr;

  @PostConstruct
  public void defaultMQProducer() {

    //生产者的组名
    DefaultMQProducer producer = new DefaultMQProducer(producerGroup);

    //指定 NameServer 地址,多个地址以 ; 隔开
    producer.setNamesrvAddr(namesrvAddr);

    try {

      /**
       * Producer 对象在使用之前必须要调用 start 初始化,初始化一次即可
       * 注意:切记不可以在每次发送消息时,都调用 start 方法
       */
      producer.start();

      for (int i = 0; i < 100; i++) {

        String messageBody = "我是消息内容:" + i;

        String message = new String(messageBody.getBytes(), "utf-8");

        //构建消息
        Message msg = new Message("PushTopic" /* PushTopic */, "push"/* Tag  */, "key_" + i /* Keys */, message.getBytes());

        //发送消息
        SendResult result = producer.send(msg);

        System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());

      }

    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      producer.shutdown();
    }

  }
}

消息消费者

@Component
public class Consumer {

  /**
   * 消费者的组名
   */
  @Value("${apache.rocketmq.consumer.PushConsumer}")
  private String consumerGroup;

  /**
   * NameServer 地址
   */
  @Value("${apache.rocketmq.namesrvAddr}")
  private String namesrvAddr;

  @PostConstruct
  public void defaultMQPushConsumer() {

    //消费者的组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

    //指定 NameServer 地址,多个地址以 ; 隔开
    consumer.setNamesrvAddr(namesrvAddr);
    try {
      //订阅 PushTopic 下 Tag 为 push 的消息
      consumer.subscribe("PushTopic", "push");

      //设置 Consumer 第一次启动是从队列头部开始消费还是队列尾部开始消费
      //如果非第一次启动,那么按照上次消费的位置继续消费
      consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
      consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
          try {
            for (MessageExt messageExt : list) {

              System.out.println("messageExt: " + messageExt);//输出消息内容

              String messageBody = new String(messageExt.getBody(), "utf-8");

              System.out.println("消费响应:Msg: " + messageExt.getMsgId() + ",msgBody: " + messageBody);//输出消息内容

            }
          } catch (Exception e) {
            e.printStackTrace();
            return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
          }
          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
        }


      });
      consumer.start();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

}

启动服务

@SpringBootApplication
public class SpringBootRocketmqApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringBootRocketmqApplication.class, args);
	}
}

控制台会有响应

发送响应:MsgId:0AFF015E556818B4AAC208A0504F0063,发送状态:SEND_OK

messageExt: MessageExt [queueId=0, storeSize=195, queueOffset=113824, sysFlag=0, bornTimestamp=1517559124047, bornHost=/192.168.252.1:62165, storeTimestamp=1517559135052, storeHost=/192.168.252.121:10911, msgId=C0A8FC7900002A9F00000000056F499C, commitLogOffset=91179420, bodyCRC=1687852546, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=113825, KEYS=key_99, CONSUME_START_TIME=1517559124049, UNIQ_KEY=0AFF015E556818B4AAC208A0504F0063, WAIT=true, TAGS=push}, body=21]]

消费响应:Msg: 0AFF015E556818B4AAC208A0504F0063,msgBody: 我是消息内容:99

...

监控服务

RocketMQ web 界面监控 RocketMQ-Console-Ng 部署

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

下载并且编译

下载并且 maven 编译

git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-externals/rocketmq-console/
mvn clean package -Dmaven.test.skip=true

启动监控服务

rocketmq.config.namesrvAddr NameServer 地址,默认启动端口 8080

nohup java -jar target/rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr=127.0.0.1:9876  > /dev/null 2>&1 &

关于报错

关闭防火墙

org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, PushTopic
See http://rocketmq.apache.org/docs/faq/ for further details.

开启服务器自动创建 Topic 功能

autoCreateTopicEnable=true

Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.4:10911> failed

这个错,主要是启动的时候指定的 ip 是 -n localhost:9876

在服务器使用,不能使用连接 rocketmq 解决步骤

步骤一,启动 Name Server
nohup sh bin/mqnamesrv > /dev/null 2>&1 &

步骤二,指定 Broker 外网 IP
添加
vi /opt/apache-rocketmq/conf/broker.conf
brokerIP1=116.196.97.159

输入终端执行
export NAMESRV_ADDR=116.196.97.159:9876

步骤三,启动 Broker
nohup sh bin/mqbroker -n 116.196.97.159:9876 > autoCreateTopicEnable=true -c /opt/apache-rocketmq/conf/broker.conf /dev/null 2>&1 &

步骤四,启动监控页面
nohup java -jar target/rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr=116.196.97.159:9876  > /dev/null 2>&1 &

访问监控服务

GitHub 源码: https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rocketmq

Gitee 源码: https://gitee.com/souyunku/spring-boot-examples/tree/master/spring-boot-rabbitmq

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文