返回介绍

RabbitMQ 实现消息总线

发布于 2024-08-18 11:12:34 字数 16028 浏览 0 评论 0 收藏 0

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件,也称为面向消息的中间件。RabbitMQ 服务器是用高性能、可伸缩而闻名的 Erlang 语言编写而成的,其集群和故障转移是构建在开放电信平台框架上的。

AMQP是Advanced Message Queuing Protocol的简称,它是一个面向消息中间件的开放式标准应用层协议。它定义了以下这些特性:

- 消息方向

- 消息队列

- 消息路由(包括点到点和发布-订阅模式)

- 可靠性

- 安全性

AMQP要求消息的提供者和客户端接收者的行为要实现对不同供应商可以用相同的方式(比如SMTP、HTTP、FTP等)进行互相操作。在以往的中间件标准中,主要还是建立在API级别,比如JMS,集中于通过不同的中间件实现来建立标准化的程序间的互操作性,而不是在多个中间件产品间实现互操作性。

AMQP与JMS不同,JMS定义了一个API和一组消息收发必须实现的行为,而 AMQP 是一个线路级协议。线路级协议描述的是通过网络发送的数据传输格式。因此,任何符合该数据格式的消息发送和接收工具都能互相兼容和进行操作,这样就能轻易实现跨技术平台的架构方案。

RabbitMQ以AMQP协议实现,所以它可以支持多种操作系统、多种编程语言,几乎可以覆盖所有主流的企业级技术平台。在微服务架构消息中间件的选型中,它是一个非常适合且优秀的选择。因此,在Spring Cloud Bus中包含了对Rabbit的自动化默认配置,在下面的章节中,我们将先从 RabbitMQ 的基础安装和使用开始,循序渐进地学习如何与Spring Cloud Bus进行整合实现消息总线。

基本概念

在开始具体实践之前,我们先介绍一些关于 RabbitMQ 的基本概念,为后续的讲解做一些必要铺垫(如果对于RabbitMQ已经很熟悉的读者可以跳过本节,直接从“快速入门”一节开始阅读)。

- Broker:可以理解为消息队列服务器的实体,它是一个中间件应用,负责接收消息生产者的消息,然后将消息发送至消息接收者或者其他的Broker。

- Exchange:消息交换机,是消息第一个到达的地方,消息通过它指定的路由规则,分发到不同的消息队列中去。

- Queue:消息队列,消息通过发送和路由之后最终到达的地方,到达 Queue 的消息即进入逻辑上等待消费的状态。每个消息都会被发送到一个或多个队列。

- Binding:绑定,它的作用就是把Exchange和Queue按照路由规则绑定起来,也就是Exchange和Queue之间的虚拟连接。

- Routing Key:路由关键字,Exchange根据这个关键字进行消息投递。

- Virtual host:虚拟主机,它是对Broker的虚拟划分,将消费者、生产者和它们依赖的AMQP相关结构进行隔离,一般都是为了安全考虚。比如,我们可以在一个Broker中设置多个虚拟主机,对不同用户进行权限的分离。

- Connection:连接,代表生产者、消费者、Broker之间进行通信的物理网络。

- Channel:消息通道,用于连接生产者和消费者的逻辑结构。在客户端的每个连接里,可建立多个Channel,每个Channel代表一个会话任务,通过Channel可以隔离同一连接中的不同交互内容。

- Producer:消息生产者,制造消息并发送消息的程序。

- Consumer:消息消费者,接收消息并处理消息的程序。

消息投递到队列的整个过程大致如下:

1.客户端连接到消息队列服务器,打开一个Channel。

2.客户端声明一个Exchange,并设置相关属性。

3.客户端声明一个Queue,并设置相关属性。

4.客户端使用Routing Key,在Exchange和Queue之间建立好绑定关系。

5.客户端投递消息到Exchange。

6.Exchange接收到消息后,根据消息的Key和已经设置的Binding,进行消息路由,将消息投递到一个或多个Queue里。

Exchange也有几种类型。

1.Direct交换机:完全根据Key进行投递。比如,绑定时设置了Routing Key为abc,那么客户端提交的消息,只有设置了Key为abc的才会被投递到队列。

2.Topic交换机:对Key进行模式匹配后进行投递,可以使用符号#匹配一个或多个词,符号*匹配正好一个词。比如,abc.#匹配abc.def.ghi,abc.*只匹配abc.def。

3.Fanout交换机:不需要任何Key,它采取广播的模式,一个消息进来时,投递到与该交换机绑定的所有队列。

RabbitMQ支持消息的持久化,也就是将数据写在磁盘上。为了数据安全考虑,大多数情况下都会选择持久化。消息队列持久化包括3个部分:

1.Exchange持久化,在声明时指定durable=> 1。

2.Queue持久化,在声明时指定durable=> 1。

3.消息持久化,在投递时指定delivery_mode=> 2(1是非持久化)。

如果Exchange和Queue都是持久化的,那么它们之间的Binding也是持久化的。如果Exchange和Queue两者之间有一个是持久化的,一个是非持久化的,就不允许建立绑定。

安装与使用

在RabbitMQ官网的下载页面https://www.rabbitmq.com/download.html中,我们可以获取到针对各种不同操作系统的安装包和说明文档。这里,我们将对几个常用的平台进行一一说明。

下面我们采用的是Erlang和RabbitMQ Server版本说明:

- Erlang/OTP 19.1

- RabbitMQ Server 3.6.5

在Windows系统中安装

1.安装 Erlang,通过官方下载页面http://www.erlang.org/downloads 获取exe安装包,直接打开并完成安装。

2.安装 RabbitMQ,通过官方下载页面https://www.rabbitmq.com/download.html获取exe安装包。

3.下载完成后,直接运行安装程序。

4.RabbitMQ Server安装完成之后,会自动注册为服务,并以默认配置进行启动。

在Windows的安装过程中,有时候会碰到服务启动失败的情况,通常都是由于用户名为中文,导致默认的db和log目录访问出现问题。要解决该问题,需要先卸载RabbitMQ Server,然后设置环境变量 RABBITMQ_BASE 为一个不含中文的路径,比如E:\server\rabbitmq。最后,重新安装RabbitMQ即可。

在Mac OS X中安装

在Mac OS X中使用brew工具,可以很容易地安装RabbitMQ的服务端,只需按如下命令操作即可:

1.将brew更新到最新版本,执行brew update命令。

2.安装Erlang,执行brew install erlang命令。

3.安装RabbitMQ Server,执行brew install rabbitmq命令。

通过执行上面的命令,RabbitMQ Server会被安装到/usr/local/sbin,并不会自动加到用户的环境变量中去,所以我们需要在.bash_profile或.profile文件中增加下面的内容:

PATH=$PATH:/usr/local/sbin

这样,就可以通过rabbitmq-server命令来启动RabbitMQ的服务端了。

在Ubuntu中安装

在Ubuntu中,我们可以使用APT仓库来进行安装:

1.安装Erlang,执行apt-get install erlang命令。

2.执行下面的命令,新增APT仓库到/etc/apt/sources.list.d:

echo 'debhttp://www.rabbitmq.com/debian/testing main' |

sudo tee /etc/apt/sources.list.d/rabbitmq.list

3.更新APT仓库的package list,执行sudo apt-get update命令。

4.安装Rabbit Server,执行sudo apt-get install rabbitmq-server命令。

Rabbit管理

我们可以直接通过访问配置文件进行管理,也可以通过访问Web进行管理。下面将介绍如何通过Web进行管理。

- 执行 rabbitmq-plugins enable rabbitmq_management 命令,开启 Web管理插件,这样就可以通过浏览器来进行管理了。

> rabbitmq-plugins enable rabbitmq_management

The following plugins have been enabled:

mochiweb

webmachine

rabbitmq_web_dispatch

amqp_client

rabbitmq_management_agent

rabbitmq_management

Applying plugin configuration to rabbit@PC-201602152056...started 6 plugins.

- 打开浏览器并访问http://localhost:15672/,并使用默认用户guest登录,密码也为guest。可以看到如下图所示的管理页面:

从图中我们可以看到之前章节中提到的一些基本概念,比如 Connections、Channels、Exchanges、Queues等。第一次使用的读者,可以点开各项看看都有些什么内容,熟悉一下RabbitMQ Server的服务端。

- 单击Admin选项卡,如下图所示,可以尝试创建一个名为springcloud的用户。

其中,Tags标签是RabbitMQ中的角色分类,共有下面几种。

- none:不能访问management plugin。

- management:用户可以通过AMQP做的任何事外加如下内容。

- 列出自己可以通过AMQP登入的virtual hosts。

- 查看自己的virtual hosts中的queues、exchanges 和 bindings。

- 查看和关闭自己的channels 和 connections。

- 查看有关自己的virtual hosts的“全局”统计信息,包含其他用户在这些virtual hosts中的活动。

- policymaker:management可以做的任何事外加如下内容。

- 查看、创建和删除自己的virtual hosts所属的policies和parameters。

- monitoring:management可以做的任何事外加如下内容。

- 列出所有virtual hosts,包括它们不能登录的virtual hosts。

- 查看其他用户的connections和channels。

- 查看节点级别的数据,如clustering和memory的使用情况。

- 查看真正的关于所有virtual hosts的全局的统计信息。

- administrator:policymaker和monitoring可以做的任何事外加如下内容。

- 创建和删除virtual hosts。

- 查看、创建和删除users。

- 查看、创建和删除permissions。

- 关闭其他用户的connections。

快速入门

接下来,我们通过在Spring Boot应用中整合RabbitMQ,实现一个简单的发送、接收消息的例子来对RabbitMQ有一个直观的感受和理解。

在Spring Boot中整合RabbitMQ是一件非常容易的事,因为之前我们已经介绍过Starter POMs,其中的AMQP模块就可以很好地支持RabbitMQ,下面我们就来详细说说整合过程。

- 新建一个Spring Boot工程,命名为rabbitmq-hello。

- 在 pom.xml 中引入如下依赖内容,其中 spring-boot-starter-amqp 用于支持RabbitMQ。

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>1.3.7.RELEASE</version>

<relativePath/> <!--lookup parent from repository-->

</parent>

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-test</artifactId>

<scope>test</scope>

</dependency>

</dependencies>

- 在application.properties中配置关于RabbitMQ的连接和用户信息,这里我们使用之前安装时创建的 springcloud。若没有自己的用户,可以回到上面的安装内容,在管理页面中创建用户。

spring.application.name=rabbitmq-hello

spring.rabbitmq.host=localhost

spring.rabbitmq.port=5672

spring.rabbitmq.username=springcloud

spring.rabbitmq.password=123456

- 创建消息生产者 Sender。通过注入 AmqpTemplate 接口的实例来实现消息的发送,AmqpTemplate接口定义了一套针对AMQP协议的基础操作。在Spring Boot中会根据配置来注入其具体实现。在该生产者中,我们会产生一个字符串,并发送到名为hello的队列中。

@Component

public class Sender {

@Autowired

private AmqpTemplate rabbitTemplate;

public void send(){

String context="hello "+new Date();

System.out.println("Sender : "+context);

this.rabbitTemplate.convertAndSend("hello",context);

}

}

- 创建消息消费者Receiver。通过@RabbitListener注解定义该类对hello队列的监听,并用@RabbitHandler 注解来指定对消息的处理方法。所以,该消费者实现了对hello队列的消费,消费操作为输出消息的字符串内容。

@Component

@RabbitListener(queues="hello")

public class Receiver {

@RabbitHandler

public void process(String hello){

System.out.println("Receiver : "+hello);

}

}

- 创建RabbitMQ的配置类RabbitConfig,用来配置队列、交换器、路由等高级信息。这里我们以入门为主,先以最小化的配置来定义,以完成一个基本的生产和消费过程。

@Configuration

public class RabbitConfig {

@Bean

public Queue helloQueue(){

return new Queue("hello");

}

}

- 创建应用主类。

@SpringBootApplication

public class HelloApplication {

public static void main(String[]args){

SpringApplication.run(HelloApplication.class,args);

}

}

- 创建单元测试类,用来调用消息生产。

@RunWith(SpringJUnit4ClassRunner.class)

@SpringApplicationConfiguration(classes=HelloApplication.class)

public class HelloApplicationTests {

@Autowired

private Sender sender;

@Test

public void hello()throws Exception {

sender.send();

}

}

完成程序编写之后,下面开始尝试运行。首先确保RabbitMQ Server已经启动,然后进行下面的操作。

- 启动应用主类,从控制台中,我们可看到如下内容,程序创建了一个访问127.0.0.1:5672中springcloud的连接。

o.s.a.r.c.CachingConnectionFactory : Created new connection: SimpleConnection@29836d32

[delegate=amqp://springcloud@127.0.0.1:5672/]

同时,我们通过RabbitMQ的控制面板,可以看到Connections和Channels中包含当前连接的条目。

- 运行单元测试类,我们可以在控制台中看到下面的输出内容,消息被发送到了RabbitMQ Server的hello队列中。

Sender : hello Sun Sep 25 11:06:11 CST 2016

- 切换到应用主类的控制台,我们可以看到类似如下的输出,消费者对hello队列的监听程序执行了,并输出了接收到的消息信息。

Receiver : hello Sun Sep 25 11:06:11 CST 2016

通过上面的示例,我们在Spring Boot应用中引入spring-boot-starter-amqp模块,进行简单配置就完成了对 RabbitMQ 的消息生产和消费的开发内容。然而在实际应用中,还有很多内容没有演示,比如之前提到的一些概念:交换机、路由关键字、绑定、虚拟主机等,这里不做更多的讲解,读者可以自行查阅 RabbitMQ 的官方教程,其中有更全面的讲解。在这里,我们需要重点理解的是,在整个生产消费过程中,生产和消费是一个异步操作,这也是在分布式系统中要使用消息代理的重要原因,以此我们可以使用通信来解耦业务逻辑。在这个例子中,读者可以进一步做一些测试,比如,不运行消费者,先运行生产者,此时可以看到在RabbitMQ Server管理页面的Queues选项卡下多了一些待处理的消息,这时我们再启动消费者,它就会处理这些消息,所以通过生产消费模式的异步操作,系统间调用就没有同步调用需要那么高的实时性要求,同时也更容易控制处理的吞吐量以保证系统的正常运行等。

在上一节中,我们已经介绍了关于消息代理、AMQP以及RabbitMQ的基础知识和使用方法。在下面的内容中,我们开始具体介绍Spring Cloud Bus的配置,并以一个Spring Cloud Bus与Spring Cloud Config结合的例子来实现配置内容的实时更新。

先回顾一下,在上一章Spring Cloud Config的介绍中,我们留了一个悬念:如何实现对配置信息的实时更新。虽然我们已经能够通过/refresh 接口和 Git 仓库的 Web Hook来实现 Git 仓库中的内容修改触发应用程序的属性更新。但是,若所有触发操作均需要我们手工去维护Web Hook中的应用配置的话,随着系统的不断扩展,会变得越来越难以维护,而消息代理中间件是解决该问题最为合适的方案。是否还记得我们在介绍消息代理中的特点时提到过这样一个功能:消息代理中间件可以将消息路由到一个或多个目的地。利用这个功能,我们就能完美地解决该问题,下面来说说Spring Cloud Bus中的具体实现方案。

整合Spring Cloud Bus

因为Spring Cloud 基于 Spring Boot,在上一节中我们已经体验了 Spring Boot 与RabbitMQ的整合,所以在Spring Cloud Bus中使用RabbitMQ也是非常容易配置的。

下面我们来具体动手尝试整个配置过程。

- 准备工作:这里我们不创建新的应用,但需要用到上一章中已经实现的关于 Spring Cloud Config的几个工程,若读者对其还不了解,建议先阅读第8章的内容。

- config-repo:定义在Git仓库中的一个目录,其中存储了应用名为didispace的多环境配置文件,配置文件中有一个from参数。

- config-server-eureka:配置了Git仓库,并注册到了Eureka的服务端。

- config-client-eureka:通过Eureka发现Config Server的客户端,应用名为didispace,用来访问配置服务器以获取配置信息。该应用中提供了一个/from 接口,它会获取 config-repo/didispace-dev.properties 中的from属性并返回。

- 扩展config-client-eureka应用。

- 修改pom.xml,增加spring-cloud-starter-bus-amqp模块(注意springboot-starter-actuator模块也是必需的,用来提供刷新端点)。

<dependency>

<groupId>org.springframework.cloud</groupId>

<artifactId>spring-cloud-starter-bus-amqp</artifactId>

</dependency>

- 在配置文件中增加关于RabbitMQ的连接和用户信息。

spring.rabbitmq.host=localhost

spring.rabbitmq.port=5672

spring.rabbitmq.username=springcloud

spring.rabbitmq.password=123456

- 启动config-server-eureka,再启动两个config-client-eureka(分别在不同的端口上,比如7002、7003)。我们可以在config-client-eureka中的控制台中看到如下内容,在启动时,客户端程序多了一个/bus/refresh请求。

o.s.b.a.e.mvc.EndpointHandlerMapping  : Mapped

"{[/bus/refresh],methods=[POST]}" onto public void

org.springframework.cloud.bus.endpoint.RefreshBusEndpoint.refresh(java.lang.String)

- 先访问两个 config-client-eureka 的/from 请求,会返回当前 configrepo/didispace-dev.properties中的from属性。

- 接着,修改config-repo/didispace-dev.properties中的from属性值,并发送POST请求到其中的一个/bus/refresh。

- 最后,再分别访问启动的两个config-client-eureka的/from请求,此时这两个请求都会返回最新的 config-repo/didispace-dev.properties 中的from属性。

到这里,我们已经能够通过Spring Cloud Bus来实时更新总线上的属性配置了。

原理分析

上一节中,我们通过使用 Spring Cloud Bus 与 Spring Cloud Config 的整合,并以RabbitMQ作为消息代理,实现了应用配置的动态更新。

整个方案的架构如下图所示,其中包含了Git仓库、Config Server以及几个微服务应用的实例,这些微服务应用的实例中都引入了 Spring Cloud Bus,所以它们都连接到了RabbitMQ的消息总线上。

当我们将系统启动起来之后,图中“Service A”的三个实例会请求Config Server以获取配置信息,Config Server根据应用配置的规则从Git仓库中获取配置信息并返回。

此时,若我们需要修改“Service A”的属性。首先,通过Git管理工具去仓库中修改对应的属性值,但是这个修改并不会触发“Service A”实例的属性更新。我们向“Service A”的实例3发送POST请求,访问/bus/refresh接口。此时,“Service A”的实例3就会将刷新请求发送到消息总线中,该消息事件会被“Service A”的实例1和实例2从总线中获取到,并重新从Config Server中获取它们的配置信息,从而实现配置信息的动态更新。

而从Git仓库中配置的修改到发起/bus/refresh的POST请求这一步可以通过Git仓库的Web Hook来自动触发。由于所有连接到消息总线上的应用都会接收到更新请求,所以在Web Hook中就不需要维护所有节点内容来进行更新,从而解决了上一章中仅通过Web Hook来逐个进行刷新的问题。

指定刷新范围

在上面的例子中,我们通过向服务实例请求Spring Cloud Bus的/bus/refresh接口,从而触发总线上其他服务实例的/refresh。但是在一些特殊场景下,我们希望可以刷新微服务中某个具体实例的配置。

Spring Cloud Bus 对这种场景也有很好的支持,/bus/refresh 接口提供了一个destination 参数,用来定位具体要刷新的应用程序。比如,我们可以请求/bus/refresh? destination=customers:9000,此时总线上的各应用实例会根据destination 属性的值来判断是否为自己的实例名,若符合才进行配置刷新,若不符合就忽略该消息。

关于应用的实例名,我们在之前介绍Spring Cloud Netflix的Eureka时有过详细的介绍,它的默认命名按此规则生成:${spring.cloud.client.hostname}:${spring.application.name}:${spring.application.instance_id:${server.port}}},想了解更多内容可查看第3章的相关介绍。

destination 参数除了可以定位具体的实例之外,还可以用来定位具体的服务。定位服务的原理是通过使用 Spring 的 PathMatecher(路径匹配)来实现的,比如/bus/refresh? destination=customers:**,该请求会触发customers服务的所有实例进行刷新。

架构优化

既然Spring Cloud Bus的/bus/refresh接口提供了针对服务和实例进行配置更新的参数,那么我们的架构也可以相应做出一些调整。在之前的架构中,服务的配置更新需要通过向具体服务中的某个实例发送请求,再触发对整个服务集群的配置更新。虽然能实现功能,但是这样的结果是,我们指定的应用实例会不同于集群中的其他应用实例,这样会增加集群内部的复杂度,不利于将来的运维工作。比如,需要对服务实例进行迁移,那么我们不得不修改Web Hook中的配置等。所以要尽可能地让服务集群中的各个节点是对等的。

因此,我们将之前的架构做了一些调整,如下图所示。

我们主要做了以下这些改动:

1.在Config Server中也引入Spring Cloud Bus,将配置服务端也加入到消息总线中来。

2./bus/refresh请求不再发送到具体服务实例上,而是发送给Config Server,并通过destination参数来指定需要更新配置的服务或实例。

通过上面的改动,我们的服务实例不需要再承担触发配置更新的职责。同时,对于Git的触发等配置都只需要针对Config Server即可,从而简化了集群上的一些维护工作。

RabbitMQ配置

Spring Cloud Bus中的RabbitMQ整合使用了Spring Boot的ConnectionFactory,所以在Spring Cloud Bus中支持使用以spring.rabbit.mq为前缀的Spring Boot配置属性,具体的配置属性、说明以及默认值如下表所示。

续表

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文