- 推荐序一
- 推荐序二
- 推荐序三
- 推荐语
- 前言
- 第1章 基础知识
- 第2章 微服务构建:Spring Boot
- 第3章 服务治理:Spring Cloud Eureka
- 第4章 客户端负载均衡:Spring Cloud Ribbon
- 第5章 服务容错保护:Spring Cloud Hystrix
- 第6章 声明式服务调用:Spring Cloud Feign
- 第7章 API网关服务:Spring Cloud Zuul
- 第8章 分布式配置中心:Spring Cloud Config
- 第9章 消息总线:Spring Cloud Bus
- 第10章 消息驱动的微服务:Spring Cloud Stream
- 附录 A Starter POMs
- 后记
绑定器详解
在“核心概念”一节中,我们已经简单介绍过Binder绑定器的基本概念和作用:它是定义在应用程序与消息中间件之间的抽象层,用来屏蔽消息中间件对应用的复杂性,并提供简单而统一的操作接口给应用程序使用。在本节中,我们将详细介绍绑定器背后的细节和行为。
绑定器SPI
绑定器 SPI 涵盖了一套可插拔的用于连接外部中间件的实现机制,其中包含了许多接口、开箱即用的实现类以及发现策略等内容。其中,最为关键的就是Binder接口,它是用来将输入和输出连接到外部中间件的抽象:
public interface Binder<T,C extends ConsumerProperties,P extends
ProducerProperties> {
Binding<T> bindConsumer(String name,String group,T inboundBindTarget,C
consumerProperties);
Binding<T> bindProducer(String name,T outboundBindTarget,P
producerProperties);
}
当应用程序对输入和输出通道进行绑定的时候,实际上就是通过该接口的实现来完成的。
- 向消息通道发送数据的生产者调用 bindProducer 方法来绑定输出通道时,第一个参数代表了发往消息中间件的目标名称,第二个参数代表了发送消息的本地通道实例,第三个参数是用来创建通道时使用的属性配置(比如分区键的表达式等)。
- 从消息通道接收数据的消费者调用 bindConsumer 方法来绑定输入通道时,第一个参数代表了接收消息中间件的目标名称,第二个参数代表了消费组的名称(如果多个消费者实例使用相同的组名,则消息将对这些消费者实例实现负载均衡,每个生产者发出的消息只会被组内一个消费者实例接收和处理),第三个参数代表了接收消息的本地通道实例,第四个参数是用来创建通道时使用的属性配置。
另外,从Binder的定义中,我们还可以知道Binder是一个参数化并且可扩展的接口。
- 对于输入与输出的绑定类型,在1.0版本中仅支持MessageChannel,但是在接口中通过泛型定义,所以在未来可以对其进行扩展。
- 对于属性配置也提供了可扩展的定义,我们可以为特定的Binder以类型安全的方式来补充一些特有的属性。
一个典型的Binder绑定器实现一般包含以下内容。
- 一个实现Binder接口的类。
- 一个Spring配置加载类,用来创建连接消息中间件的基础结构使用的实例。
- 一个或多个能够在classpath下的META-INF/spring.binders路径找到的绑定器定义文件。比如我们可以在spring-cloud-starter-stream-rabbit中找到该文件,该文件中存储了当前绑定器要使用的自动化配置类的路径:
rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfigur ation
自动化配置
Spring Cloud Stream通过绑定器SPI的实现将应用程序逻辑上的输入输出通道连接到物理上的消息中间件。消息中间件之间通常都会有或多或少的差异性,所以为了适配不同的消息中间件,需要为它们实现各自独有的绑定器。目前,Spring Cloud Stream中默认实现了对RabbitMQ、Kafka的绑定器,在上面的示例中我们引入的spring-cloud-starterstream-rabbit依赖中就包含了RabbitMQ的绑定器spring-cloud-stream-binderrabbit。
默认情况下,Spring Cloud Stream 也遵循 Spring Boot 自动化配置的特性。如果在classpath中能够找到单个绑定器的实现,那么Spring Cloud Stream会自动加载它。而我们在classpath中引入绑定器的方法也非常简单,只需要在pom.xml中增加对应消息中间件的绑定器依赖即可,比如:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
如果使用Kafka,则引入:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
多绑定器配置
当应用程序的classpath下存在多个绑定器时,Spring Cloud Stream在为消息通道做绑定操作时,无法判断应该使用哪个具体的绑定器,所以我们需要为每个输入或输出通道指定具体的绑定器。
我们在一个应用程序中使用多个绑定器时,往往其中一个绑定器会是主要使用的,而第二个可能是为了适应一些特殊要求(比如性能等原因)。我们可以先通过设置默认绑定器来为大部分的通道设置绑定器。比如,使用RabbitMQ设置默认绑定器:
spring.cloud.stream.defaultBinder=rabbit
在设置了默认绑定器之后,再为其他一些少数的消息通道单独设置绑定器,比如:
spring.cloud.stream.bindings.input.binder=kafka
需要注意的是,上面我们设置参数时用来指定具体绑定器的值并不是消息中间件的名称,而是在每个绑定器实现的META-INF/spring.binders文件中定义的标识(一个绑定器实现的标识可以定义多个,以逗号分隔),所以上面配置的rabbit和kafka分别来自于各自的配置定义,它们的具体内容如下所示:
rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfigur ation
kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
另外,当需要在一个应用程序中使用同一类型不同环境的绑定器时,我们也可以通过配置轻松实现通道绑定。比如,当需要连接两个不同的 RabbitMQ 实例的时候,可以参照如下配置:
spring.cloud.stream.bindings.input.binder=rabbit1
spring.cloud.stream.bindings.output.binder=rabbit2
spring.cloud.stream.binders.rabbit1.type=rabbit
spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.host=192.168.0.101
spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.username=springcloud
spring.cloud.stream.binders.rabbit1.environment.spring.rabbitmq.password=123456
spring.cloud.stream.binders.rabbit2.type=rabbit
spring.cloud.stream.binders.rabbit2.environment.spring.rabbitmq.host=192.168.0.102
spring.cloud.stream.binders.rabbit2.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.rabbit2.environment.spring.rabbitmq.username=springcloud
spring.cloud.stream.binders.rabbit2.environment.spring.rabbitmq.password=123456
从上面的配置中,我们可以看到对于输入输出通道指定的绑定器采用了显式别名的配置方式,其中input通道的绑定指定了名为rabbit1的配置,而output通道的绑定指定了名为rabbit2的配置。当采用显式配置方式时会自动禁用默认的绑定器配置,所以当定义了显式配置别名后,对于这些绑定器的配置需要通过 spring.cloud.stream.binders.<configurationName>属性来进行设置。对于绑定器的配置主要有下面4个参数。
- spring.cloud.stream.binders.<configurationName>.type 指定了绑定器的类型,可以是rabbit、kafak或者其他自定义绑定器的标识名,绑定器标识名的定义位于绑定器的META-INF/spring.binders文件中。
- spring.cloud.stream.binders.<configurationName>.environment
参数可以直接用来设置各绑定器的属性,默认为空。
- spring.cloud.stream.binders.<configurationName>.inheritEnvir onment参数用来配置当前绑定器是否继承应用程序自身的环境配置,默认为true。
- spring.cloud.stream.binders.<configurationName>.defaultCandi date参数用来设置当前绑定器配置是否被视为默认绑定器的候选项,默认为true。
当需要让当前配置不影响默认配置时,可以将该属性设置为false。
RabbitMQ与Kafka绑定器
在之前的章节中我们多次提到,Spring Cloud Stream自身就提供了对RabbitMQ和Kafka的绑定器实现。由于 RabbitMQ 和 Kafka 自身的实现结构有所不同,理解绑定器实现与消息中间件自有概念之间的对应关系,对于正确使用绑定器和消息中间件会有非常大的帮助。下面就来分别说说 RabbitMQ 与 Kafka 的绑定器是如何使用消息中间件中不同概念来实现消息的生产与消费的。
- RabbitMQ绑定器:在RabbitMQ中,通过Exchange交换器来实现Spring Cloud Stream的主题概念,所以消息通道的输入输出目标映射了一个具体的Exchange交换器。而对于每个消费组,则会为对应的Exchange交换器绑定一个Queue队列进行消息收发。
- Kafka绑定器:由于Kafka自身就有Topic概念,所以Spring Cloud Stream的主题直接采用了Kafka的Topic主题概念,每个消费组的通道目标都会直接连接Kafka的主题进行消息收发。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论