- 推荐序一
- 推荐序二
- 推荐序三
- 推荐语
- 前言
- 第1章 基础知识
- 第2章 微服务构建:Spring Boot
- 第3章 服务治理:Spring Cloud Eureka
- 第4章 客户端负载均衡:Spring Cloud Ribbon
- 第5章 服务容错保护:Spring Cloud Hystrix
- 第6章 声明式服务调用:Spring Cloud Feign
- 第7章 API网关服务:Spring Cloud Zuul
- 第8章 分布式配置中心:Spring Cloud Config
- 第9章 消息总线:Spring Cloud Bus
- 第10章 消息驱动的微服务:Spring Cloud Stream
- 附录 A Starter POMs
- 后记
深入理解
在整合Kafka实现了消息总线之后,我们不妨继续使用Kafka提供的控制台消费者来看看,当执行/bus/refresh时,消息消费者都获得了什么。通过前文我们从控制台中获得的信息可以知道,Spring Cloud Bus使用了名为springCloudBus的Topic,所以我们可以使用命令kafka-console-consumer--zookeeper localhost:2181--topic springCloudBus,启动对springCloudBus的消费者控制台来进行观察。
启动消费者控制台之后,我们向config-server发送POST请求:/bus/refresh,此时在控制台中可以看到类似如下的内容:
contentType "application/json"
{
"type": "RefreshRemoteApplicationEvent",
"timestamp": 1475073160814,
"originService": "config-server:7001",
"destinationService": "*:**",
"id": "bbfbf495-39d8-4ff9-93d6-174873ff7299"
}
contentType "application/json"
{
"type": "AckRemoteApplicationEvent",
"timestamp": 1475073160821,
"originService": "config-server:7001",
"destinationService": "*:**",
"id": "1f794774-10d6-4140-a80d-470983c6c0ff",
"ackId": "bbfbf495-39d8-4ff9-93d6-174873ff7299",
"ackDestinationService": "*:**",
"event": "org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent"
}
contentType "application/json"
{
"type": "AckRemoteApplicationEvent",
"timestamp": 1475075467554,
"originService": "didispace:7002",
"destinationService": "*:**",
"id": "7560151e-f60c-49cd-8167-b691e846ad08",
"ackId": "21502725-28f5-4d19-a98a-f8114fa4f1dc",
"ackDestinationService": "*:**",
"event": "org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent"
}
...
下面,我们来详细理解消息中的信息内容。
- type:消息的事件类型。在上面的例子中,包含了RefreshRemoteApplicationEvent和AckRemoteApplicationEvent。其中,RefreshRemoteApplicationEvent事件就是我们用来刷新配置的事件,而AckRemoteApplicationEvent是响应消息已经正确接收的告知消息事件。
- timestamp:消息的时间戳。
- originService:消息的来源服务实例。
- destinationService:消息的目标服务实例。上面示例中的*:**代表了总线上的所有服务实例。如果想要指定服务或是实例,在之前介绍 RabbitMQ 实现消息总线时已经提过,只需要通过使用destination参数来定位具体要刷新的应用实例即可,比如发起/bus/refresh? destination=didispace 请求,就可以得到如下的刷新事件消息,其中destinationService为didispace:**,表示总线上所有didispace服务的实例。
contentType "application/json"
{
"type": "RefreshRemoteApplicationEvent",
"timestamp": 1475131215007,
"originService": "config-server:7001",
"destinationService": "didispace:**",
"id": "667fe948-e9b2-447f-be22-3c8acf647ead"
}
- id:消息的唯一标识。
上面的消息内容是RefreshRemoteApplicationEvent和AckRemoteApplicationEvent类型共有的,下面几个属性是AckRemoteApplicationEvent所特有的,分别表示如下含义。
- ackId:Ack消息对应的消息来源。我们可以看到第一条AckRemoteApplicationEvent的ackId对应了RefreshRemoteApplicationEvent的id,说明这条Ack是告知该RefreshRemoteApplicationEvent事件的消息已经被收到。
- ackDestinationService:Ack 消息的目标服务实例。可以看到这里使用的是*:**,所以消息总线上所有的实例都会收到该Ack消息。
- event:Ack 消息的来源事件。可以看到上例中的两个 Ack 均来源于刷新配置的RefreshRemoteApplicationEvent 事件,我们在测试的时候由于启动了两个config-client,所以有两个实例接收到了配置刷新事件,同时它们都会返回一个Ack消息。由于ackDestinationService为*:**,所以两个config-client都会收到对RefreshRemoteApplicationEvent事件的Ack消息。
源码分析
通过上面的分析,我们已经得到了两个非常重要的线索 RefreshRemoteApplicationEvent和AckRemoteApplicationEvent。我们不妨顺着这两个事件类来详细看看Spring Cloud Bus的源码,以帮助我们理解它的运行机制。
顺着RefreshRemoteApplicationEvent和AckRemoteApplicationEvent,我们可以整理出如下的事件关系类图。
可以看到,其中RefreshRemoteApplicationEvent和AckRemoteApplicationEvent 这些我们已经接触过的事件都继承了 RemoteApplicationEvent 抽象类,而RemoteApplicationEvent继承自Spring Framework的ApplicationEvent,可以断定,Spring Cloud Bus也采用了Spring的事件驱动模型。
事件驱动模型
如果读者对Spring的事件驱动模型已经非常了解,那么可以跳过这一小节,直接看后面的分析。如果你还不清楚它的原理,建议先通过本小节的内容来理解其基本原理,以帮助阅读和理解后续的源码分析内容。
Spring 的事件驱动模型中包含了三个基本概念:事件、事件监听者和事件发布者,如下图所示。
- 事件: Spring 中定义了事件的抽象类 ApplicationEvent,它继承自 JDK 的EventObject类。从图中我们可以看到,事件包含了两个成员变量:timestamp,该字段用于存储事件发生的时间戳,以及父类中的 source,该字段表示源事件对象。当我们需要自定义事件的时候,只需要继承 ApplicationEvent,比如RemoteApplicationEvent、RefreshRemoteApplicationEvent 等,可以在自定义的Event中增加一些事件的属性来给事件监听者处理。
- 事件监听者: Spring 中定义了事件监听者的接口 ApplicationListener,它继承自JDK的EventListener接口,同时ApplicationListener接口限定了ApplicationEvent子类作为该接口中onApplicationEvent(E event);函数的参数。所以,每一个ApplicationListener 都是针对某个ApplicationEvent子类的监听和处理者。
那么,事件与监听者是如何关联起来的呢?我们看下图:
- 事件发布者: Spring中定义了ApplicationEventPublisher和ApplicationEventMulticaster两个接口用来发布事件。其中ApplicationEventPublisher接口定义了发布事件的函数 publishEvent(ApplicationEvent event)和publishEvent(Object event);而ApplicationEventMulticaster接口中定义了对 ApplicationListener 的维护操作(比如新增、移除等)以及将ApplicationEvent多播给可用ApplicationListener的操作。
ApplicationEventPublisher 的publishEvent 实现在AbstractApplicationContext中,具体如下:
protected void publishEvent(Object event,ResolvableType eventType){
Assert.notNull(event,"Event must not be null");
...
if(this.earlyApplicationEvents !=null){
this.earlyApplicationEvents.add(applicationEvent);
}
else {
getApplicationEventMulticaster().multicastEvent(applicationEvent,eventType);
}
...
}
可以看到,它最终会调用ApplicationEventMulticaster的multicastEvent来具体实现发布事件给监听者的操作。而ApplicationEventMulticaster在Spring的默认实现位于SimpleApplicationEventMulticaster中,具体如下:
SimpleApplicationEventMulticaster通过遍历维护的ApplicationListener集合来找到对应ApplicationEvent的监听器,然后调用监听器的onApplicationEvent函数来对具体事件做出处理操作。
事件定义
在对Spring的事件模型有了一定的理解之后,下面我们来详细介绍Spring Cloud Bus中的事件定义。首先,从RemoteApplicationEvent抽象类开始:
先来看看RemoteApplicationEvent类上修饰的注解。
- @JsonTypeInfo(use=JsonTypeInfo.Id.NAME,property="type"):Jackson对多态类型的处理注解,当进行序列化时,会使用子类的名称作为type属性的值,比如之前示例中的"type": "RefreshRemoteApplicationEvent"。
- @JsonIgnoreProperties("source"):序列化的时候忽略 source 属性,source是ApplicationEvent的父类EventObject的属性,用来定义事件的发生源。
再来看看它的属性:originService、destinationService、id,这些内容都可以在RemoteApplicationEvent的子类事件消息中找到,比如:
{
"type": "RefreshRemoteApplicationEvent",
"timestamp": 1475073160814,
"originService": "config-server:7001",
"destinationService": "*:**",
"id": "bbfbf495-39d8-4ff9-93d6-174873ff7299"
}
{
"type": "AckRemoteApplicationEvent",
"timestamp": 1475075467554,
"originService": "didispace:7002",
"destinationService": "*:**",
"id": "7560151e-f60c-49cd-8167-b691e846ad08",
"ackId": "21502725-28f5-4d19-a98a-f8114fa4f1dc",
"ackDestinationService": "*:**",
"event": "org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent"
}
下面,我们再来分别看看RemoteApplicationEvent的几个具体实现的事件类。
- RefreshRemoteApplicationEvent 事件类,该事件用于远程刷新应用的配置信息。它的实现非常简单,只是继承了RemoteApplicationEvent,并没有增加其他内容。从之前的示例中我们也能看到,消息中的内容与RemoteApplicationEvent中包含的属性完全一致。
@SuppressWarnings("serial")
public class RefreshRemoteApplicationEvent extends RemoteApplicationEvent {
@SuppressWarnings("unused")
private RefreshRemoteApplicationEvent(){
//for serializers
}
public RefreshRemoteApplicationEvent(Object source,String originService,
String destinationService){
super(source,originService,destinationService);
}
}
- AckRemoteApplicationEvent事件类,该事件用于告知某个事件消息已经被接收,通过该消息我们可以监控各个事件消息的响应。从其成员属性中,我们可以找到之前示例中所总结的,比RefreshRemoteApplicationEvent事件的消息多出的几个属性:ackId、ackDestinationService 以及 event。其中 event成员变量通过泛型限定了必须为RemoteApplicationEvent 的子类对象,该定义符合这样的逻辑:Ack 消息肯定有一个事件源头,而每一个事件都必须继承RemoteApplicationEvent抽象类,所以AckRemoteApplicationEvent的事件源头肯定是一个RemoteApplicationEvent的子类,比如示例中的Ack消息源头就是RemoteApplicationEvent的子类事件:RefreshRemoteApplicationEvent。
@SuppressWarnings("serial")
public class AckRemoteApplicationEvent extends RemoteApplicationEvent {
private final String ackId;
private final String ackDestinationService;
private final Class<? extends RemoteApplicationEvent> event;
@SuppressWarnings("unused")
private AckRemoteApplicationEvent(){
super();
this.ackDestinationService=null;
this.ackId=null;
this.event=null;
}
public AckRemoteApplicationEvent(Object source,String originService,
String destinationService,String ackDestinationService,String ackId,
Class<? extends RemoteApplicationEvent> type){
super(source,originService,destinationService);
this.ackDestinationService=ackDestinationService;
this.ackId=ackId;
this.event=type;
}
...
}
- EnvironmentChangeRemoteApplicationEvent 事件类,该事件用于动态更新消息总线上每个节点的Spring环境属性。可以看到,该类中定义了一个Map类型的成员变量,而接收消息的节点就是根据该Map对象中的属性来覆盖本地的Spring环境属性。
@SuppressWarnings("serial")
public class EnvironmentChangeRemoteApplicationEvent extends
RemoteApplicationEvent {
private final Map<String,String> values;
@SuppressWarnings("unused")
private EnvironmentChangeRemoteApplicationEvent(){
//for serializers
values=null;
}
public EnvironmentChangeRemoteApplicationEvent(Object source,String originService,
String destinationService,Map<String,String> values){
super(source,originService,destinationService);
this.values=values;
}
...
}
- SentApplicationEvent事件类,细心的读者可能已经发现,该类的结构和内容与 RemoteApplicationEvent 非常相似,不同的是:该类不是抽象类,并且多一个成员 Class<? extends RemoteApplicationEvent> type。SentApplicationEvent 事件较为特殊,它主要用于发送信号来表示一个远程的事件已经在系统中被发送到某些地方了,从它的继承关系中,我们可以知道它本身并不是一个远程的事件(不是继承自RemoteApplicationEvent),所以它不会被发送到消息总线上去,而是在本地产生(通常是由于响应了某个远程的事件)。由于该事件的 id 属性能够匹配消费者 AckRemoteApplicationEvent 消息中的ackId,所以应用程序可以通过监听这个事件来监控远程事件消息的消费情况。
@SuppressWarnings("serial")
@JsonTypeInfo(use=JsonTypeInfo.Id.NAME,property="type")
@JsonIgnoreProperties("source")
public class SentApplicationEvent extends ApplicationEvent {
private static final Object TRANSIENT_SOURCE=new Object();
private final String originService;
private final String destinationService;
private final String id;
private Class<? extends RemoteApplicationEvent> type;
protected SentApplicationEvent(){
//for serialization libs like jackson
this(TRANSIENT_SOURCE,null,null,null,RemoteApplicationEvent.class);
}
public SentApplicationEvent(Object source,String originService,
String destinationService,String id,
Class<? extends RemoteApplicationEvent> type){
super(source);
this.originService=originService;
this.type=type;
if(destinationService==null){
destinationService="*";
}
if(! destinationService.contains(":")){
//All instances of the destination unless specifically requested
destinationService=destinationService+":**";
}
this.destinationService=destinationService;
this.id=id;
}
...
}
事件监听器
在了解了Spring Cloud Bus中的事件类之后,我们来看看另外一个重要元素:事件监听器。通过整理源码,可以得到下面的类图关系。
其中,RefreshListener和EnvironmentChangeListener都继承了Spring事件模型中的监听器接口ApplicationListener。我们先来看看RefreshListener:
public class RefreshListener
implements ApplicationListener<RefreshRemoteApplicationEvent> {
private static Log log=LogFactory.getLog(RefreshListener.class);
private ContextRefresher contextRefresher;
public RefreshListener(ContextRefresher contextRefresher){
this.contextRefresher=contextRefresher;
}
@Override
public void onApplicationEvent(RefreshRemoteApplicationEvent event){
Set<String> keys=contextRefresher.refresh();
log.info("Received remote refresh request.Keys refreshed "+keys);
}
}
从泛型中我们可以看到该监听器就是针对我们之前所介绍的 RefreshRemoteApplicationEvent 事件的,其中 onApplicationEvent 函数中调用了ContextRefresher中的refresh()函数进行配置属性的刷新。
public class ContextRefresher {
...
private ConfigurableApplicationContext context;
...
public synchronized Set<String> refresh(){
Map<String,Object> before=extract(
this.context.getEnvironment().getPropertySources());
addConfigFilesToEnvironment();
Set<String> keys=changes(before,
extract(this.context.getEnvironment().getPropertySources())).keySet();
this.context.publishEvent(new EnvironmentChangeEvent(keys));
this.scope.refreshAll();
return keys;
}
...
}
再来看看EnvironmentChangeListener监听器。
public class EnvironmentChangeListener
implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> {
private static Log log=LogFactory.getLog(EnvironmentChangeListener.class);
@Autowired
private EnvironmentManager env;
@Override
public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event){
Map<String,String> values=event.getValues();
log.info("Received remote environment change request.Keys/values to update "
+values);
for(Map.Entry<String,String> entry : values.entrySet()){
env.setProperty(entry.getKey(),entry.getValue());
}
}
}
它是针对 EnvironmentChangeRemoteApplicationEvent 事件的监听类,在处理类中,可以看到它从 EnvironmentChangeRemoteApplicationEvent 中获取了之前提到的事件中定义的Map对象,然后通过遍历来更新EnvironmentManager中的属性内容。
事件跟踪
除了上面介绍的RefreshListener和EnvironmentChangeListener监听器外,还有一个与它们都有点不同的TraceListener监听器。
public class TraceListener {
private static Log log=LogFactory.getLog(TraceListener.class);
private TraceRepository repository;
public TraceListener(TraceRepository repository){
this.repository=repository;
}
@EventListener
public void onAck(AckRemoteApplicationEvent event){
this.repository.add(getReceivedTrace(event));
}
@EventListener
public void onSend(SentApplicationEvent event){
this.repository.add(getSentTrace(event));
}
protected Map<String,Object> getSentTrace(SentApplicationEvent event){
...
}
protected Map<String,Object> getReceivedTrace(AckRemoteApplicationEvent event){
...
}
}
从之前整理的类图和源码中,我们都可以看到该监听器并没有实现 ApplicationListener接口,但可以看到这里使用了@EventListener注解。该注解是从Spring 4.2开始提供的新功能,通过它可以自动地将函数注册为一个ApplicationListener的实现。所以在该类中,实际上等价于实现了两个监听器,一个监听 AckRemoteApplicationEvent事件,一个监听SentApplicationEvent事件。
在这两个监听处理函数中调用了类似的方法:this.repository.add(getReceivedTrace(event));,其中TraceRepository是对Trace跟踪信息的操作接口,而它的默认实现是spring-boot-actuator模块的InMemoryTraceRepository,具体如下:
public class InMemoryTraceRepository implements TraceRepository {
private int capacity=100;
private boolean reverse=true;
private final List<Trace> traces=new LinkedList<Trace>();
public void setReverse(boolean reverse){
synchronized(this.traces){
this.reverse=reverse;
}
}
public void setCapacity(int capacity){
synchronized(this.traces){
this.capacity=capacity;
}
}
@Override
public List<Trace> findAll(){
synchronized(this.traces){
return Collections.unmodifiableList(new ArrayList<Trace>(this.traces));
}
}
@Override
public void add(Map<String,Object> map){
Trace trace=new Trace(new Date(),map);
synchronized(this.traces){
while(this.traces.size()>=this.capacity){
this.traces.remove(this.reverse ? this.capacity-1 : 0);
}
if(this.reverse){
this.traces.add(0,trace);
}
else {
this.traces.add(trace);
}
}
}
}
可以看到,默认的 Trace 跟踪信息存储并没有用到特别的数据库或消息系统,而是采用了内存存储的方式。如上代码所示,通过 LinkedList<Trace>集合和 capacity 属性的定义,在add(Map<String,Object> map)函数中进行循环存储,所以默认的Trace跟踪实现只能存储和查询最近的100条跟踪信息。
那么跟踪事件都记录了哪些内容呢?我们继续看 TraceListener 中 getSentTrace和getReceivedTrace的具体实现:
public class TraceListener {
...
protected Map<String,Object> getSentTrace(SentApplicationEvent event){
Map<String,Object> map=new LinkedHashMap<String,Object>();
map.put("signal","spring.cloud.bus.sent");
map.put("type",event.getType().getSimpleName());
map.put("id",event.getId());
map.put("origin",event.getOriginService());
map.put("destination",event.getDestinationService());
if(log.isDebugEnabled()){
log.debug(map);
}
return map;
}
protected Map<String,Object> getReceivedTrace(AckRemoteApplicationEvent event){
Map<String,Object> map=new LinkedHashMap<String,Object>();
map.put("signal","spring.cloud.bus.ack");
map.put("event",event.getEvent().getSimpleName());
map.put("id",event.getAckId());
map.put("origin",event.getOriginService());
map.put("destination",event.getAckDestinationService());
if(log.isDebugEnabled()){
log.debug(map);
}
return map;
}
}
可以看到,这两个函数会收集关于发送和接收到的Ack事件信息,并且两个函数获得的内容就是事件定义相关的一些属性,看到这里大家是否感觉似曾相识?是的,这些信息与之前我们通过Kafka的控制台工具获取的消息内容非常类似。既然Spring Cloud Bus已经提供了 Trace 跟踪信息的监听和记录,我们不妨尝试使用一下。要开启该功能非常简单,只需在配置文件中将下面的属性设置为true即可:
spring.cloud.bus.trace.enabled=true
通过请求配置主机的/trace接口,比如http://localhost:7002/trace,可以获得如下信息,
[
{
"timestamp": 1475129670494,
"info": {
"signal": "spring.cloud.bus.ack",
"event": "RefreshRemoteApplicationEvent",
"id": "84ecdf83-a904-41bc-a34d-62680ccf35d7",
"origin": "config-server:7001",
"destination": "*:**"
}
},
{
"timestamp": 1475129670475,
"info": {
"signal": "spring.cloud.bus.sent",
"type": "RefreshRemoteApplicationEvent",
"id": "84ecdf83-a904-41bc-a34d-62680ccf35d7",
"origin": "config-server:7001",
"destination": "*:**"
}
},{
"timestamp": 1475129670473,
"info": {
"signal": "spring.cloud.bus.ack",
"event": "RefreshRemoteApplicationEvent",
"id": "84ecdf83-a904-41bc-a34d-62680ccf35d7",
"origin": "didispace:7002",
"destination": "*:**"
}
}
]
与我们分析的内容一样,该请求返回了最近的Send和Ack消息内容。
如果希望针对AckRemoteApplicationEvent或是SentApplicationEvent做一些特殊处理,我们也可以通过@EventListener注解在应用程序中编写自己的处理逻辑,或者重写TraceRepository来改造跟踪的存储等。
原则上每一个消息总线上的应用都可以用来跟踪Ack消息,但是大多数情况下我们把这个任务交给更核心的服务(比如特定的监控服务),这样在该服务中我们就能在 Ack 消息中实现更复杂的逻辑进行预警和善后工作。
事件发布
通过上面的分析,我们已经了解了Spring Cloud Bus中事件以及监听器的定义,下面来看看这些事件是如何发布给监听器进行处理的。
在org.springframework.cloud.bus包下,我们可以找到关于Spring Cloud Bus启动时加载的一些基础类和接口,包括自动化配置类BusAutoConfiguration、属性定义类BusProperties等。我们可以从Spring Cloud Bus的自动化配置类中看看它在启动的时候都加载了什么内容:
@Configuration
@ConditionalOnBusEnabled
@EnableBinding(SpringCloudBusClient.class)
@EnableConfigurationProperties(BusProperties.class)
public class BusAutoConfiguration implements ApplicationEventPublisherAware {
public static final String BUS_PATH_MATCHER_NAME="busPathMatcher";
@Autowired
@Output(SpringCloudBusClient.OUTPUT)
private MessageChannel cloudBusOutboundChannel;
@Autowired
private ServiceMatcher serviceMatcher;
@Autowired
private ChannelBindingServiceProperties bindings;
@Autowired
private BusProperties bus;
private ApplicationEventPublisher applicationEventPublisher;
...
}
我们先来看看在该自动化配置类中,都定义了哪些成员。
- MessageChannel cloudBusOutboundChannel:该接口定义了发送消息的抽象方法。
- ServiceMatcher serviceMatcher:该对象中提供了下面两个重要函数,用来判断事件的来源服务是否为自己,以及判断目标是否为自己,以此作为依据是否要响应消息进行事件的处理。
public boolean isFromSelf(RemoteApplicationEvent event){
String originService=event.getOriginService();
String serviceId=getServiceId();
return this.matcher.match(originService,serviceId);
}
public boolean isForSelf(RemoteApplicationEvent event){
String destinationService=event.getDestinationService();
return(destinationService==null || destinationService.trim().isEmpty()||this.matcher.match(destinationService,getServiceId()));
}
- ChannelBindingServiceProperties bindings:定义了消息服务的绑定属性。
- BusProperties bus:该对象定义了Spring Cloud Bus的属性,具体如下所示。
@ConfigurationProperties("spring.cloud.bus")
public class BusProperties {
private Env env=new Env();
private Refresh refresh=new Refresh();
private Ack ack=new Ack();
private Trace trace=new Trace();
private String destination="springCloudBus";
private boolean enabled=true;
...
}
从中可以看到,Spring Cloud Bus 的属性前缀使用了 spring.cloud.bus。destination和enabled属性分别定义了默认的队列(Queue)或主题(Topic)是否连接到消息总线,所以我们可以通过spring.cloud.bus.destination来修改消息总线使用的队列或主题名称,以及使用 spring.cloud.bus.enabled 属性来设置应用是否要连接到消息总线上。
另外,在该配置类中为Env、Refresh、Ack、Trace 4种已经实现的事件分别创建了配置对象,这些配置类都是BusProperties的内部类。从下面的源码中,我们可以看到对于这4种事件,Env、Refresh、Ack均是默认开启的,只有Trace事件需要通过修改配置来开启,就如之前我们介绍“事件跟踪”的时候配置spring.cloud.bus.trace.enabled=true属性那样。
public static class Env {
private boolean enabled=true;
...
}
public static class Refresh {
private boolean enabled=true;
...
}
public static class Ack {
private boolean enabled=true;
private String destinationService;
...
}
public static class Trace {
private boolean enabled=false;
...
}
- ApplicationEventPublisher:Spring 事件模型中用来发布事件的接口,也就是我们之前介绍的事件以及监听的桥梁。
除了定义的这些成员变量之外,还能看到这里定义了两个监听方法acceptLocal和acceptRemote。
其中,acceptLocal 方法如下所示,它通过@EventListener(classes=RemoteApplicationEvent.class)注解修饰。之前已经介绍过该注解,可以将该函数理解为对 RemoteApplicationEvent 事件的监听器,但是在其实现中并非所有的RemoteApplicationEvent事件都会处理。根据if中的条件,可以看到在该监听处理中,只对事件来源是自己并且事件类型不是AckRemoteApplicationEvent的内容进行后续的处理,而后续的处理就是通过消息管道将该事件发送出去。所以,该监听器的功能就是监听本地事件来进行消息的发送。
@EventListener(classes=RemoteApplicationEvent.class)
public void acceptLocal(RemoteApplicationEvent event){
if(this.serviceMatcher.isFromSelf(event)
&& !(event instanceof AckRemoteApplicationEvent)){
this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).
build());
}
}
再来看看acceptRemote方法。该方法中使用了@StreamListener注解修饰,该注解的作用是将该函数注册为消息代理上数据流的事件监听器,注解中的属性值SpringCloudBusClient.INPUT 指定了监听的通道名。同时,回头看该函数所在类的定义,使用了@EnableBinding注解,该注解用来实现与消息代理的连接,注解中的属性值 SpringCloudBusClient.class 声明了输入和输出通道的定义(这部分内容源自Spring Cloud Stream,在下一章中,我们会对这些内容做详细介绍,这里我们只需理解它用来绑定消息代理的输入与输出,以实现向消息总线上发送和接收消息即可)。
@StreamListener(SpringCloudBusClient.INPUT)
public void acceptRemote(RemoteApplicationEvent event){
if(event instanceof AckRemoteApplicationEvent){
if(this.bus.getTrace().isEnabled()&& ! this.serviceMatcher.isFromSelf(event)
&& this.applicationEventPublisher !=null){
this.applicationEventPublisher.publishEvent(event);
}//If it's an ACK we are finished processing at this point
return;
}
if(this.serviceMatcher.isForSelf(event)
&& this.applicationEventPublisher !=null){
if(! this.serviceMatcher.isFromSelf(event)){
this.applicationEventPublisher.publishEvent(event);
}
if(this.bus.getAck().isEnabled()){
AckRemoteApplicationEvent ack=new AckRemoteApplicationEvent(this,
this.serviceMatcher.getServiceId(),
this.bus.getAck().getDestinationService(),
event.getDestinationService(),event.getId(),event.getClass());
this.cloudBusOutboundChannel
.send(MessageBuilder.withPayload(ack).build());
this.applicationEventPublisher.publishEvent(ack);
}
}
if(this.bus.getTrace().isEnabled()&& this.applicationEventPublisher !=null){
//We are set to register sent events so publish it for local consumption,
//irrespective of the origin
this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
event.getOriginService(),event.getDestinationService(),
event.getId(),event.getClass()));
}
}
通过上面的分析,我们已经可以知道Spring Cloud Bus通过acceptRemote方法来监听消息代理的输入通道,并根据事件类型和配置内容来确定是否要发布事件给我们之前分析的几个事件监听器来对事件做具体的处理;而acceptLocal方法用来监听本地的事件,针对事件来源是自己,并且事件类型不是AckRemoteApplicationEvent的内容通过消息代理的输出通道发送到总线上去。
控制端点
在介绍了Spring Cloud Bus中实现的事件模型之后,我们已经知道每个节点是如何响应消息总线上的事件了。那么这些发送到消息总线上用来触发各个节点的事件处理的动作是如何实现的呢?回想一下之前在实现配置属性刷新时,我们在修改了 Git 仓库上的配置信息之后,往总线上的某个节点发送了一个请求/bus/refresh来触发总线上的所有节点进行配置刷新;我们在连接到消息总线的应用启动时,也能在控制台中看到类似下面的输出:
2016-09-30 11:05:13.037 INFO 18720---[ main]
o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/bus/refresh],methods=[POST]}"
onto public void
org.springframework.cloud.bus.endpoint.RefreshBusEndpoint.refresh(java.lang.String)
2016-09-30 11:05:13.045 INFO 18720---[ main]
o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/bus/env],methods=[POST]}" onto
public void
org.springframework.cloud.bus.endpoint.EnvironmentBusEndpoint.env(java.util.Map<jav
a.lang.String,java.lang.String>,java.lang.String)
从上面的日志信息中可以看到,在org.springframework.cloud.bus.endpoint包下的RefreshBusEndpoint和EnvironmentBusEndpoint分别创建了两个控制端点:/bus/refresh 和/bus/env。通过整理 org.springframework.cloud.bus.endpoint包下的内容,我们可以得到如下类图:
从图中可以发现,Spring Cloud Bus中的Endpoint也是通过spring-boot-actuator模块来实现的。下面,简单介绍一下spring-boot-actuator模块中的几个重要元素。
- Endpoint:该接口中定义了监控端点需要暴露的一些有用信息,比如,id、是否开启标识、是否开启敏感信息标识等。
- AbstractEndPoint:该抽象类是对Endpoint的基础实现,在该抽象类中引入了Environment接口对象,从而对接口暴露信息的控制可以通过配置文件的方式来控制。
- MvcEndpoint接口:该接口定义了Endpoint接口在MVC层的策略。在这里可以通过使用Spring MVC的@RequestMapping注解来定义端点暴露的接口地址。
下面我们来看看Spring Cloud Bus是如何扩展Endpoint的。
- BusEndpoint:该类继承自AbstractEndPoint。从类上的注解@ConfigurationProperties 配置可以知道,Spring Cloud Bus 实现的端点配置属性需要以endpoints.bus开头,通过该类的构造函数(配合AbstractEndpoint中的构造函数),我们可以知道默认id为bus,并且端点默认敏感标识为true:
@ConfigurationProperties(prefix="endpoints.bus",ignoreUnknownFields=false)
public class BusEndpoint extends AbstractEndpoint<Collection<String>> {
public BusEndpoint(){
super("bus");
}
@Override
public Collection<String> invoke(){
return Collections.emptyList();
}
}
public abstract class AbstractEndpoint<T> implements Endpoint<T>,EnvironmentAware {
...
public AbstractEndpoint(String id){
this(id,true);
}
public AbstractEndpoint(String id,boolean sensitive){
this.id=id;
this.sensitiveDefault=sensitive;
}
...
}
- AbstractBusEndpoint类是实现Spring Cloud Bus中端点的重要基类,它实现了MvcEndpoint接口来暴露MVC层的接口,同时关联了BusEndpoint对象。通过下面的源码,我们可以看到,getPath、isSensitive和getEndpointType都是委托给BusEndpoint来获取的,从而实现通过Environment配置接口。
public class AbstractBusEndpoint implements MvcEndpoint {
private ApplicationEventPublisher context;
private BusEndpoint delegate;
private String appId;
public AbstractBusEndpoint(ApplicationEventPublisher context,String appId,
BusEndpoint busEndpoint){
this.context=context;
this.appId=appId;
this.delegate=busEndpoint;
}
protected String getInstanceId(){
return this.appId;
}
protected void publish(ApplicationEvent event){
context.publishEvent(event);
}
@Override
public String getPath(){
return "/"+this.delegate.getId();
}
@Override
public boolean isSensitive(){
return this.delegate.isSensitive();
}
@Override
@SuppressWarnings("rawtypes")
public Class<? extends Endpoint> getEndpointType(){
return this.delegate.getClass();
}
}
默认实现的几个端点都继承自 AbstractBusEndpoint 类来实现 MVC 层接口的暴露和配置,下面我们来看看具体的两个实现端点。
- 实现配置刷新的端点RefreshBusEndpoint类。通过下面的源码,我们可以看到,在该类中定义了refresh的POST请求,由于在BusEndpoint默认构造时id为bus,而AbstractBusEndpoint中getPath函数通过BusEndpoint中的id拼接而成,所以对于 RefreshBusEndpoint 中 refresh 请求的完整路径为/bus/refresh。同时,该请求通过@RequestParam注解还定义了一个可选的参数destination,正如在之前的示例中介绍的,该参数用于指定刷新的服务实例。在请求处理部分直接调用了父类中的 publish 函数将 RefreshRemoteApplicationEvent事件发布出来,实现在总线上发布消息的功能。
public class RefreshBusEndpoint extends AbstractBusEndpoint {
public RefreshBusEndpoint(ApplicationEventPublisher context,String id,
BusEndpoint delegate){
super(context,id,delegate);
}
@RequestMapping(value="refresh",method=RequestMethod.POST)
@ResponseBody
public void refresh(
@RequestParam(value="destination",required=false)String destination){
publish(new RefreshRemoteApplicationEvent(this,getInstanceId(),
destination));
}
}
- EnvironmentBusEndpoint的实现与RefreshBusEndpoint类似,通过暴露/bus/env的POST请求接口,并提供了Map类型的params参数设定需要更新的配置信息,以及同refresh接口一样的destination参数指定需要更新的服务实例,来触发环境参数更新的消息总线控制。
public class EnvironmentBusEndpoint extends AbstractBusEndpoint {
public EnvironmentBusEndpoint(ApplicationEventPublisher context,String id,
BusEndpoint delegate){
super(context,id,delegate);
}
@RequestMapping(value="env",method=RequestMethod.POST)
@ResponseBody
public void env(@RequestParam Map<String,String> params,
@RequestParam(value="destination",required=false)String destination){
publish(new EnvironmentChangeRemoteApplicationEvent(this,getInstanceId(),
destination,params));
}
}
其他消息代理的支持
由于目前版本的Spring Cloud Bus只实现了RabbitMQ和Kafka的封装,虽然大部分情况下,这两个产品的特性已经涵盖我们大部分的业务场景,但是由于一些特殊需求或是遗留系统等其他因素,有些团队不得不使用其他的消息代理,这个时候我们就需要扩展消息代理的支持。实际上,通过之前对源码的分析,我们可以看到,Spring Cloud Bus在绑定具体消息代理的输入与输出通道时均使用了抽象接口的方式,所以真正的实现来自于spring-cloud-starter-bus-amqp和spring-cloud-starter-bus-kafka的依赖。
我们可以查看spring-cloud-starter-bus-amqp和spring-cloud-starter bus-kafka 的依赖,可以看到它们分别依赖了 spring-cloud-starter-streamrabbit 和 spring-cloud-starter-stream-kafka,真正实现与这些消息代理进行交互操作的是Spring Cloud Stream。所以,我们在本章中使用的所有Spring Cloud Bus的消息通信基础实际上都是由Spring Cloud Stream所提供的。一定程度上,我们可以将Spring Cloud Bus理解为是一个使用了Spring Cloud Stream构建的上层应用。由于Spring Cloud Stream为了让开发者屏蔽各个消息代理之间的差异,将来能够方便地切换不同的消息代理而不影响业务程序,所以在业务程序与消息代理之间定义了一层抽象,称为绑定器(Binder)。我们在整合RabbitMQ和Kafka的时候就是分别引入了它们各自的绑定器实现,可以回想一下之前的实现内容,不论使用 RabbitMQ 还是 Kafka 实现,在程序上其实没有任何变化,变化的只是对绑定器的配置。所以,当我们要在其他消息代理上使用Spring Cloud Bus消息总线时,只需要去实现一套指定消息代理的绑定器即可。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论