- 推荐序一
- 推荐序二
- 推荐序三
- 推荐语
- 前言
- 第1章 基础知识
- 第2章 微服务构建:Spring Boot
- 第3章 服务治理:Spring Cloud Eureka
- 第4章 客户端负载均衡:Spring Cloud Ribbon
- 第5章 服务容错保护:Spring Cloud Hystrix
- 第6章 声明式服务调用:Spring Cloud Feign
- 第7章 API网关服务:Spring Cloud Zuul
- 第8章 分布式配置中心:Spring Cloud Config
- 第9章 消息总线:Spring Cloud Bus
- 第10章 消息驱动的微服务:Spring Cloud Stream
- 附录 A Starter POMs
- 后记
使用详解
在“快速入门”一节中我们已经使用过Hystrix中的核心注解@HystrixCommand,通过它创建了HystrixCommand的实现,同时利用fallback属性指定了服务降级的实现方法。然而这些还只是Hystrix使用的一小部分,在实现一个大型分布式系统时,往往还需要更多高级的配置功能。接下来我们将详细介绍Hystrix各接口和注解的使用方法。
创建请求命令
Hystrix 命令就是我们之前所说的 HystrixCommand,它用来封装具体的依赖服务调用逻辑。
我们可以通过继承的方式来实现,比如:
public class UserCommand extends HystrixCommand<User> {
private RestTemplate restTemplate;
private Long id;
public UserCommand(Setter setter,RestTemplate restTemplate,Long id){
super(setter);
this.restTemplate=restTemplate;
this.id=id;
}
@Override
protected User run(){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",User.class,id);
}
}
通过上面实现的 UserCommand,我们既可以实现请求的同步执行也可以实现异步执行。
- 同步执行: User u=new UserCommand(restTemplate,1L).execute();。
- 异步执行: Future<User> futureUser=new UserCommand(restTemplate,1L).queue();。异步执行的时候,可以通过对返回的futureUser调用get方法来获取结果。
另外,也可以通过@HystrixCommand 注解来更为优雅地实现 Hystrix 命令的定义,比如:
public class UserService {
@Autowired
private RestTemplate restTemplate;
@HystrixCommand
public User getUserById(Long id){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",User.class,id);
}
}
虽然@HystrixCommand 注解可以非常优雅地定义 Hystrix 命令的实现,但是如上定义的getUserById方式只是同步执行的实现,若要实现异步执行则还需另外定义,比如:
@HystrixCommand
public Future<User> getUserByIdAsync(final String id){
return new AsyncResult<User>(){
@Override
public User invoke(){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",
User.class,id);
}
};
}
除了传统的同步执行与异步执行之外,我们还可以将 HystrixCommand 通过Observable来实现响应式执行方式。通过调用observe()和toObservable()方法可以返回Observable对象,比如:
Observable<String> ho=new UserCommand(restTemplate,1L).observe();
Observable<String> co=new UserCommand(restTemplate,1L).toObservable();
observe()和toObservable()虽然都返回了Observable,但是它们略有不同,前者返回的是一个 Hot Observable,该命令会在 observe()调用的时候立即执行,当Observable每次被订阅的时候会重放它的行为;而后者返回的是一个Cold Observable,toObservable()执行之后,命令不会被立即执行,只有当所有订阅者都订阅它之后才会执行。更多关于这两个方法的区别可见“原理分析”小节的内容,这里不做具体说明。
虽然HystrixCommand具备了observe()和toObservable()的功能,但是它的实现有一定的局限性,它返回的Observable只能发射一次数据,所以Hystrix还提供了另外一个特殊命令封装HystrixObservableCommand,通过它实现的命令可以获取能发射多次的Observable。
如果使用HystrixObservableCommand来实现命令封装,需要将命令的执行逻辑在construct方法中重载,这样Hystrix才能将具体逻辑包装到Observable内,如下所示:
而对此的注解实现依然是使用@HystrixCommand,只是方法定义需要做一些变化,具体内容与construct()的实现类似,如下所示:
@HystrixCommand
public Observable<User> getUserById(final String id){
return Observable.create(new Observable.OnSubscribe<User>(){
@Override
public void call(Subscriber<? super User> observer){
try {
if(! observer.isUnsubscribed()){
User user=restTemplate.getForObject("http://HELLO-SERVICE/
users/{1}",User.class,id);
observer.onNext(user);
observer.onCompleted();
}
} catch(Exception e){
observer.onError(e);
}
}
});
}
在使用@HystrixCommand 注解实现响应式命令时,可以通过 observableExecutionMode参数来控制是使用observe()还是toObservable()的执行方式。该参数有下面两种设置方式。
- @HystrixCommand(observableExecutionMode=ObservableExecutionMode.EAGER):EAGER是该参数的模式值,表示使用observe()执行方式。
- @HystrixCommand(observableExecutionMode=ObservableExecutionMode.LAZY):表示使用toObservable()执行方式。
定义服务降级
fallback是Hystrix命令执行失败时使用的后备方法,用来实现服务的降级处理逻辑。在HystrixCommand中可以通过重载getFallback()方法来实现服务降级逻辑,Hystrix会在 run()执行过程中出现错误、超时、线程池拒绝、断路器熔断等情况时,执行getFallback()方法内的逻辑,比如我们可以用如下方式实现服务降级逻辑:
public class UserCommand extends HystrixCommand<User> {
private RestTemplate restTemplate;
private Long id;
public UserCommand(Setter setter,RestTemplate restTemplate,Long id){
super(setter);
this.restTemplate=restTemplate;
this.id=id;
}
@Override
protected User run(){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",User.class,id);
}
@Override
protected User getFallback(){
return new User();
}
}
在 HystrixObservableCommand 实现的 Hystrix 命令中,我们可以通过重载resumeWithFallback 方法来实现服务降级逻辑。该方法会返回一个 Observable 对象,当命令执行失败的时候,Hystrix会将Observable中的结果通知给所有的订阅者。
若要通过注解实现服务降级只需要使用@HystrixCommand 中的 fallbackMethod参数来指定具体的服务降级实现方法,如下所示:
public class UserService {
@Autowired
private RestTemplate restTemplate;
@HystrixCommand(fallbackMethod="defaultUser")
public User getUserById(Long id){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",User.class,id);
}
public User defaultUser(){
return new User();
}
}
在使用注解来定义服务降级逻辑时,我们需要将具体的Hystrix命令与fallback实现函数定义在同一个类中,并且fallbackMethod的值必须与实现fallback方法的名字相同。由于必须定义在一个类中,所以对于 fallback 的访问修饰符没有特定的要求,定义为private、protected、public均可。
在上面的例子中,defaultUser方法将在getUserById执行时发生错误的情况下被执行。若defaultUser方法实现的并不是一个稳定逻辑,它依然可能会发生异常,那么我们也可以为它添加@HystrixCommand 注解以生成Hystrix 命令,同时使用fallbackMethod来指定服务降级逻辑,比如:
public class UserService {
@Autowired
private RestTemplate restTemplate;
@HystrixCommand(fallbackMethod="defaultUser")
public User getUserById(Long id){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",User.class,id);
}
@HystrixCommand(fallbackMethod="defaultUserSec")
public User defaultUser(){
//此处可能是另外一个网络请求来获取,所以也有可能失败
return new User("First Fallback");
}
public User defaultUserSec(){
return new User("Second Fallback");
}
}
在实际使用时,我们需要为大多数执行过程中可能会失败的Hystrix命令实现服务降级逻辑,但是也有一些情况可以不去实现降级逻辑,如下所示。
- 执行写操作的命令: 当Hystrix命令是用来执行写操作而不是返回一些信息的时候,通常情况下这类操作的返回类型是void或是为空的Observable,实现服务降级的意义不是很大。当写入操作失败的时候,我们通常只需要通知调用者即可。
- 执行批处理或离线计算的命令: 当Hystrix命令是用来执行批处理程序生成一份报告或是进行任何类型的离线计算时,那么通常这些操作只需要将错误传播给调用者,然后让调用者稍后重试而不是发送给调用者一个静默的降级处理响应。
不论Hystrix命令是否实现了服务降级,命令状态和断路器状态都会更新,并且我们可以由此了解到命令执行的失败情况。
异常处理
异常传播
在HystrixCommand实现的run()方法中抛出异常时,除了HystrixBadRequestException之外,其他异常均会被Hystrix认为命令执行失败并触发服务降级的处理逻辑,所以当需要在命令执行中抛出不触发服务降级的异常时来使用它。
而在使用注册配置实现Hystrix命令时,它还支持忽略指定异常类型功能,只需要通过设置@HystrixCommand注解的ignoreExceptions参数,比如:
@HystrixCommand(ignoreExceptions={BadRequestException.class})
public User getUserById(Long id){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",User.class,id);
}
如上面代码的定义,当 getUserById 方法抛出了类型为BadRequestException的异常时,Hystrix会将它包装在HystrixBadRequestException中抛出,这样就不会触发后续的fallback逻辑。
异常获取
当Hystrix命令因为异常(除了HystrixBadRequestException的异常)进入服务降级逻辑之后,往往需要对不同异常做针对性的处理,那么我们如何来获取当前抛出的异常呢?
在以传统继承方式实现的 Hystrix 命令中,我们可以用 getFallback()方法通过Throwable getExecutionException()方法来获取具体的异常,通过判断来进入不同的处理逻辑。
除了传统的实现方式之外,注解配置方式也同样可以实现异常的获取。它的实现也非常简单,只需要在fallback实现方法的参数中增加Throwable e对象的定义,这样在方法内部就可以获取触发服务降级的具体异常内容了,比如:
@HystrixCommand(fallbackMethod="fallback1")
User getUserById(String id){
throw new RuntimeException("getUserById command failed");
}
User fallback1(String id,Throwable e){
assert "getUserById command failed".equals(e.getMessage());
}
命令名称、分组以及线程池划分
以继承方式实现的Hystrix命令使用类名作为默认的命令名称,我们也可以在构造函数中通过Setter静态类来设置,比如:
public UserCommand(){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GroupName"))
.andCommandKey(HystrixCommandKey.Factory.asKey("CommandName")););
}
从上面 Setter 的使用中可以看到,我们并没有直接设置命令名称,而是先调用了withGroupKey 来设置命令组名,然后才通过调用 andCommandKey 来设置命令名。这是因为在Setter的定义中,只有withGroupKey静态函数可以创建Setter的实例,所以GroupKey是每个Setter必需的参数,而CommandKey则是一个可选参数。
通过设置命令组,Hystrix会根据组来组织和统计命令的告警、仪表盘等信息。那么为什么一定要设置命令组呢?因为除了根据组能实现统计之外,Hystrix命令默认的线程划分也是根据命令分组来实现的。默认情况下,Hystrix会让相同组名的命令使用同一个线程池,所以我们需要在创建Hystrix命令时为其指定命令组名来实现默认的线程池划分。
如果 Hystrix 的线程池分配仅仅依靠命令组来划分,那么它就显得不够灵活了,所以Hystrix 还提供了 HystrixThreadPoolKey 来对线程池进行设置,通过它我们可以实现更细粒度的线程池划分,比如:
public UserCommand(){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey
("CommandGroupKey"))
.andCommandKey(HystrixCommandKey.Factory.asKey("CommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey")));
}
如果在没有特别指定HystrixThreadPoolKey的情况下,依然会使用命令组的方式来划分线程池。通常情况下,尽量通过HystrixThreadPoolKey的方式来指定线程池的划分,而不是通过组名的默认方式实现划分,因为多个不同的命令可能从业务逻辑上来看属于同一个组,但是往往从实现本身上需要跟其他命令进行隔离。
上面已经介绍了如何为通过继承实现的 HystrixCommand 设置命令名称、分组以及线程池划分,那么当我们使用@HystrixCommand注解的时候,又该如何设置呢?只需设置@HystrixCommand注解的commandKey、groupKey以及threadPoolKey属性即可,它们分别表示了命令名称、分组以及线程池划分,比如我们可以像下面这样进行设置:
@HystrixCommand(commandKey="getUserById",groupKey="UserGroup",threadPoolKey="getUserByIdThread")
public User getUserById(Long id){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",User.class,id);
}
请求缓存
当系统用户不断增长时,每个微服务需要承受的并发压力也越来越大。在分布式环境下,通常压力来自于对依赖服务的调用,因为请求依赖服务的资源需要通过通信来实现,这样的依赖方式比起进程内的调用方式会引起一部分的性能损失,同时HTTP相比于其他高性能的通信协议在速度上没有任何优势,所以它有些类似于对数据库这样的外部资源进行读写操作,在高并发的情况下可能会成为系统的瓶颈。既然如此,我们很容易地可以联想到,类似数据访问的缓存保护是否也可以应用到依赖服务的调用上呢?
答案显而易见,在高并发的场景之下,Hystrix中提供了请求缓存的功能,我们可以方便地开启和使用请求缓存来优化系统,达到减轻高并发时的请求线程消耗、降低请求响应时间的效果。
开启请求缓存功能
Hystrix 请求缓存的使用非常简单,我们只需要在实现 HystrixCommand 或HystrixObservableCommand 时,通过重载 getCacheKey()方法来开启请求缓存,比如:
public class UserCommand extends HystrixCommand<User> {
private RestTemplate restTemplate;
private Long id;
public UserCommand(RestTemplate restTemplate,Long id){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("UserGroup")));
this.restTemplate=restTemplate;
this.id=id;
}
@Override
protected User run(){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",User.class,id);
}
@Override
protected String getCacheKey(){
return String.valueOf(id);
}
}
在上面的例子中,我们通过在getCacheKey方法中返回的请求缓存key值(使用了传入的获取User对象的id值),就能让该请求命令具备缓存功能。此时,当不同的外部请求处理逻辑调用了同一个依赖服务时,Hystrix会根据getCacheKey方法返回的值来区分是否是重复的请求,如果它们的cacheKey相同,那么该依赖服务只会在第一个请求到达时被真实地调用一次,另外一个请求则是直接从请求缓存中返回结果,所以通过开启请求缓存可以让我们实现的Hystrix命令具备下面几项好处:
- 减少重复的请求数,降低依赖服务的并发度。
- 在同一用户请求的上下文中,相同依赖服务的返回数据始终保持一致。
- 请求缓存在run()和construct()执行之前生效,所以可以有效减少不必要的线程开销。
清理失效缓存功能
使用请求缓存时,如果只是读操作,那么不需要考虑缓存内容是否正确的问题,但是如果请求命令中还有更新数据的写操作,那么缓存中的数据就需要我们在进行写操作时进行及时处理,以防止读操作的请求命令获取到了失效的数据。
在Hystrix中,我们可以通过HystrixRequestCache.clear()方法来进行缓存的清理,具体示例如下:
public class UserGetCommand extends HystrixCommand<User> {
private static final HystrixCommandKey GETTER_KEY=HystrixCommandKey.Factory.asKey("CommandKey");
private RestTemplate restTemplate;
private Long id;
public UserGetCommand(RestTemplate restTemplate,Long id){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetSetGet"))
.andCommandKey(GETTER_KEY));
this.restTemplate=restTemplate;
this.id=id;
}
@Override
protected User run(){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",User.class,id);
}
@Override
protected String getCacheKey(){
//根据id置入缓存
return String.valueOf(id);
}
public static void flushCache(Long id){
//刷新缓存,根据id进行清理
HystrixRequestCache.getInstance(GETTER_KEY,
HystrixConcurrencyStrategyDefault.getInstance()).clear(String.valueOf(id));
}
}
public class UserPostCommand extends HystrixCommand<User> {
private RestTemplate restTemplate;
private User user;
public UserPostCommand(RestTemplate restTemplate,User user){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetSetGet")));
this.restTemplate=restTemplate;
this.user=user;
}
@Override
protected User run(){
//写操作
User r=restTemplate.postForObject("http://USER-SERVICE/users",user,User.class);
//刷新缓存,清理缓存中失效的User
UserGetCommand.flushCache(user.getId());
return r;
}
}
该示例中主要有两个请求命令:UserGetCommand 用于根据 id 获取 User 对象、而UserPostCommand用于更新User对象。当我们对UserGetCommand命令实现了请求缓存之后,那么势必需要为UserPostCommand命令实现缓存的清理,以保证User被更新之后,Hystrix请求缓存中相同缓存Key的结果被移除,这样在下一次获取User的时候不会从缓存中获取到未更新的结果。
我们可以看到,在上面UserGetCommand的实现中,增加了一个静态方法flushCache,该方法通过HystrixRequestCache.getInstance(GETTER_KEY,HystrixConcurrencyStrategyDefault.getInstance())方法从默认的 Hystrix 并发策略中根据GETTER_KEY获取到该命令的请求缓存对象HystrixRequestCache的实例,然后再调用该请求缓存对象实例的clear方法,对Key为更新User的id值的缓存内容进行清理。而在 UserPostCommand 的实现中,在 run 方法调用依赖服务之后,增加了对UserGetCommand中静态方法flushCache的调用,以实现对失效缓存的清理。
工作原理
通过上面的入门例子,我们已经能够体会到在Hystrix中实现请求缓存是非常方便的,那么它是如何做到的呢?我们不妨通过分析其源码来了解一下它的实现原理,对其有一个深入的理解,有助于指导我们正确使用和配置请求缓存。由于 getCacheKey 方法在AbstractCommand抽象命令类中实现,所以我们可以先从这个抽象命令类的实现中看起。
从下面AbstractCommand的源码片段中,我们可以看到,getCacheKey方法默认返回的是null,并且从isRequestCachingEnabled方法的实现逻辑中我们还可以知道,如果不重写getCacheKey方法,让它返回一个非null值,那么缓存功能是不会开启的;同时请求命令的缓存开启属性也需要设置为true才能开启(该属性默认为true,所以通常用该属性来控制请求缓存功能的强制关闭)。
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>,
HystrixObservable<R> {
...
protected final HystrixRequestCache requestCache;
...
protected String getCacheKey(){
return null;
}
protected boolean isRequestCachingEnabled(){
return properties.requestCacheEnabled().get()&& getCacheKey()!=null;
}
...
public Observable<R> toObservable(){
...
//尝试从缓存中获取结果
final boolean requestCacheEnabled=isRequestCachingEnabled();
final String cacheKey=getCacheKey();
final AbstractCommand<R> _cmd=this;
if(requestCacheEnabled){
HystrixCommandResponseFromCache<R> fromCache=
(HystrixCommandResponseFromCache<R>)requestCache.get(cacheKey);
if(fromCache !=null){
isResponseFromCache=true;
return handleRequestCacheHitAndEmitValues(fromCache,_cmd);
}
}
...
Observable<R> hystrixObservable=
Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
//加入缓存
if(requestCacheEnabled && cacheKey !=null){
HystrixCachedObservable<R> toCache=
HystrixCachedObservable.from(hystrixObservable,this);
HystrixCommandResponseFromCache<R> fromCache=
(HystrixCommandResponseFromCache<R>)requestCache.putIfAbsent(cacheKey,toCache);
if(fromCache !=null){
toCache.unsubscribe();
isResponseFromCache=true;
return handleRequestCacheHitAndEmitValues(fromCache,_cmd);
} else {
afterCache=toCache.toObservable();
}
} else {
afterCache=hystrixObservable;
}
}
...
}
另外,从命令异步执行的核心方法toObservable()中,我们可以看到与缓存相关的主要执行步骤,它分为两部分内容:尝试获取请求缓存以及将请求结果加入缓存。
- 尝试获取请求缓存: Hystrix 命令在执行前会根据之前提到的 isRequestCachingEnabled 方法来判断当前命令是否启用了请求缓存。如果开启了请求缓存并且重写了getCacheKey方法,并返回了一个非null的缓存Key值,那么就使用 getCacheKey 返回的 Key 值去调用 HystrixRequestCache 中的get(String cacheKey)来获取缓存的HystrixCachedObservable对象。
- 将请求结果加入缓存: 在执行命令缓存操作之前,我们可以看到已经获得了一个延迟执行的命令结果对象hystrixObservable。接下来与尝试获取请求缓存操作一样,需要先判断当前命令是否开启了请求缓存功能,如果开启了请求缓存并且getCacheKey返回了具体的Key值,就将hystrixObservable对象包装成请求缓存结果HystrixCachedObservable的实例对象toCache,然后将其放入当前命令的缓存对象中。从调用的方法putIfAbsent中,我们大致可以猜到在请求缓存对象HystrixRequestCache中维护了一个线程安全的Map来保存请求缓存的响应,所以在调用putIfAbsent将包装的请求缓存放入缓存对象后,对其返回结果fromCache进行了判断,如果其不为null,说明当前缓存Key的请求命令缓存命中,直接对toCache执行取消订阅操作(即,不再发起真实请求),同时调用缓存命令的处理方法handleRequestCacheHitAndEmitValues来执行缓存命中的结果获取。如果返回的fromCache为null说明缓存没有命中,则将当前结果toCache缓存起来,并将其转换成Observable返回给调用者使用。
使用注解实现请求缓存
Hystrix的请求缓存除了可以通过上面传统的方式实现之外,还可以通过注解的方式进行配置实现。注解配置的定义实现同JSR 107的定义非常相似,但由于Hystrix不需要独立外置的缓存系统来支持,所以没有JSR 107的定义那么复杂,它只提供了三个专用于请求缓存的注解。
JSR 107是Java缓存API的定义,也被称为JCache。它定义了一系列开发人员使用的标准化Java缓存API和服务提供商使用的标准SPI。
下面我们从几个方面的实例来看看这几个注解的具体使用方法。
- 设置请求缓存: 通过注解为请求命令开启缓存功能非常简单,如下例所示,我们只需添加@CacheResult注解即可。当该依赖服务被调用并返回User对象时,由于该方法被@CacheResult注解修改,所以Hystrix会将该结果置入请求缓存中,而它的缓存Key值会使用所有的参数,也就是这里Long类型的id值。
@CacheResult
@HystrixCommand
public User getUserById(Long id){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",User.class,id);
}
- 定义缓存Key: 当使用注解来定义请求缓存时,若要为请求命令指定具体的缓存Key生成规则,我们可以使用@CacheResult和@CacheRemove注解的cacheKeyMethod方法来指定具体的生成函数;也可以通过使用@CacheKey注解在方法参数中指定用于组装缓存Key的元素。
使用 cacheKeyMethod 方法的示例如下,它通过在请求命令的同一个类中定义一个专门生成Key的方法,并用@CacheResult注解的cacheKeyMethod方法来指定它即可。它的配置方式类似于@HystrixCommand服务降级fallbackMethod的使用。
@CacheResult(cacheKeyMethod="getUserByIdCacheKey")
@HystrixCommand
public User getUserById(Long id){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",User.class,id);
}
private Long getUserByIdCacheKey(Long id){
return id;
}
通过@CacheKey 注解实现的方式更加简单,具体示例如下所示。但是在使用@CacheKey注解的时候需要注意,它的优先级比cacheKeyMethod的优先级低,如果已经使用了cacheKeyMethod指定缓存Key的生成函数,那么@CacheKey注解不会生效。
@CacheResult
@HystrixCommand
public User getUserById(@CacheKey("id")Long id){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",
User.class,id);
}
@CacheKey 注解除了可以指定方法参数作为缓存 Key 之外,它还允许访问参数对象的内部属性作为缓存Key。比如下面的例子,它指定了User对象的id属性作为缓存Key。
@CacheResult
@HystrixCommand
public User getUserById(@CacheKey("id")User user){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",User.class,user.getId());
}
- 缓存清理: 在之前的例子中,我们已经通过@CacheResult 注解将请求结果置入Hystrix的请求缓存之中。若该内容调用了update操作进行了更新,那么此时请求缓存中的结果与实际结果就会产生不一致(缓存中的结果实际上已经失效了),所以我们需要在update 类型的操作上对失效的缓存进行清理。在Hystrix 的注解配置中,可以通过@CacheRemove注解来实现失效缓存的清理,比如下面的例子所示:
@CacheResult
@HystrixCommand
public User getUserById(@CacheKey("id")Long id){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",User.class,id);
}
@CacheRemove(commandKey="getUserById")
@HystrixCommand
public void update(@CacheKey("id")User user){
return restTemplate.postForObject("http://USER-SERVICE/users",user,User.class);
}
需要注意的是,@CacheRemove注解的commandKey属性是必须要指定的,它用来指明需要使用请求缓存的请求命令,因为只有通过该属性的配置,Hystrix才能找到正确的请求命令缓存位置。
请求合并
微服务架构中的依赖通常通过远程调用实现,而远程调用中最常见的问题就是通信消耗与连接数占用。在高并发的情况之下,因通信次数的增加,总的通信时间消耗将会变得不那么理想。同时,因为依赖服务的线程池资源有限,将出现排队等待与响应延迟的情况。为了优化这两个问题,Hystrix提供了HystrixCollapser来实现请求的合并,以减少通信消耗和线程数的占用。
HystrixCollapser 实现了在 HystrixCommand 之前放置一个合并处理器,将处于一个很短的时间窗(默认10毫秒)内对同一依赖服务的多个请求进行整合并以批量方式发起请求的功能(服务提供方也需要提供相应的批量实现接口)。通过HystrixCollapser 的封装,开发者不需要关注线程合并的细节过程,只需关注批量化服务和处理。下面我们从HystrixCollapser的使用实例中对其合并请求的过程一探究竟。
public abstract class HystrixCollapser<BatchReturnType,ResponseType,RequestArgumentType> implements
HystrixExecutable<ResponseType>,HystrixObservable<ResponseType> {
...
public abstract RequestArgumentType getRequestArgument();
protected abstract HystrixCommand<BatchReturnType>createCommand(Collection<CollapsedRequest<ResponseType,RequestArgumentType>>requests);
protected abstract void mapResponseToRequests(BatchReturnType batchResponse,Collection<CollapsedRequest<ResponseType,RequestArgumentType>> requests);
...
}
从HystrixCollapser抽象类的定义中可以看到,它指定了三个不同的类型。
- BatchReturnType:合并后批量请求的返回类型。
- ResponseType:单个请求返回的类型。
- RequestArgumentType:请求参数类型。
而对于这三个类型的使用可以在它的三个抽象方法中看到。
- RequestArgumentType getRequestArgument():该函数用来定义获取请求参数的方法。
- HystrixCommand<BatchReturnType>createCommand(Collection<CollapsedRequest<ResponseType,RequestArgumentType>> requests):合并请求产生批量命令的具体实现方法。
- mapResponseToRequests(BatchReturnType batchResponse,Collection<CollapsedRequest<ResponseType,RequestArgumentType>> requests):批量命令结果返回后的处理,这里需要实现将批量结果拆分并传递给合并前的各个原子请求命令的逻辑。
接下来,我们通过一个简单的示例来直观理解实现请求合并的过程。
假设当前微服务USER-SERVICE提供了两个获取User的接口。
- /users/{id}:根据id返回User对象的GET请求接口。
- /users? ids={ids}:根据ids返回User对象列表的GET请求接口,其中ids为以逗号分隔的id集合。
而在服务消费端,已经为这两个远程接口通过 RestTemplate 实现了简单的调用,具体如下所示:
@Service
public class UserServiceImpl implements UserService {
@Autowired
private RestTemplate restTemplate;
@Override
public User find(Long id){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}",User.class,id);
}
@Override
public List<User> findAll(List<Long> ids){
return restTemplate.getForObject("http://USER-SERVICE/users? ids={1}",List.class,StringUtils.join(ids,","));
}
}
接着,我们实现将短时间内多个获取单一User对象的请求命令进行合并。
- 第一步,为请求合并的实现准备一个批量请求命令的实现,具体如下:
public class UserBatchCommand extends HystrixCommand<List<User>> {
UserService userService;
List<Long> userIds;
public UserBatchCommand(UserService userService,List<Long> userIds){
super(Setter.withGroupKey(asKey("userServiceCommand")));
this.userIds=userIds;
this.userService=userService;
}
@Override
protected List<User> run()throws Exception {
return userService.findAll(userIds);
}
}
批量请求命令实际上就是一个简单的 HystrixCommand 实现,从上面的实现中可以看到它通过调用 userService.findAll 方法来访问/users? ids={ids}接口以返回User的列表结果。
- 第二步,通过继承HystrixCollapser实现请求合并器:
public class UserCollapseCommand extends HystrixCollapser<List<User>,User,Long> {
private UserService userService;
private Long userId;
public UserCollapseCommand(UserService userService,Long userId){
super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey
("userCollapseCommand")).andCollapserPropertiesDefaults(
HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));
this.userService=userService;
this.userId=userId;
}
@Override
public Long getRequestArgument(){
return userId;
}
@Override
protected HystrixCommand<List<User>>
createCommand(Collection<CollapsedRequest<User,Long>> collapsedRequests){
List<Long> userIds=new ArrayList<>(collapsedRequests.size());
userIds.addAll(collapsedRequests.stream().map(CollapsedRequest::getArgument).collec
t(Collectors.toList()));
return new UserBatchCommand(userService,userIds);
}
@Override
protected void mapResponseToRequests(List<User> batchResponse,
Collection<CollapsedRequest<User,Long>> collapsedRequests){
int count=0;
for(CollapsedRequest<User,Long> collapsedRequest : collapsedRequests){
User user=batchResponse.get(count++);
collapsedRequest.setResponse(user);
}
}
}
在上面的构造函数中,我们为请求合并器设置了时间延迟属性,合并器会在该时间窗内收集获取单个 User 的请求并在时间窗结束时进行合并组装成单个批量请求。getRequestArgument方法返回给定的单个请求参数userId,而createCommand和mapResponseToRequests是请求合并器的两个核心。
- createCommand:该方法的collapsedRequests参数中保存了延迟时间窗中收集到的所有获取单个User的请求。通过获取这些请求的参数来组织上面我们准备的批量请求命令 UserBatchCommand实例。
- mapResponseToRequests:在批量请求命令 UserBatchCommand 实例被触发执行完成之后,该方法开始执行,其中 batchResponse 参数保存了createCommand 中组织的批量请求命令的返回结果,而 collapsedRequests参数则代表了每个被合并的请求。在这里我们通过遍历批量结果batchResponse对象,为collapsedRequests 中每个合并前的单个请求设置返回结果,以此完成批量结果到单个请求结果的转换。
下图展示了在未使用HystrixCollapser请求合并器之前的线程使用情况。可以看到,当服务消费者同时对USER-SERVICE的/users/{id}接口发起了5个请求时,会向该依赖服务的独立线程池中申请5个线程来完成各自的请求操作。
而在使用了HystrixCollapser请求合并器之后,相同情况下的线程占用如下图所示。由于同一时间发生的5个请求处于请求合并器的一个时间窗内,这些发向/users/{id}接口的请求被请求合并器拦截下来,并在合并器中进行组合,然后将这些请求合并成一个请求发向USER-SERVICE的批量接口/users? ids={ids}。在获取到批量请求结果之后,通过请求合并器再将批量结果拆分并分配给每个被合并的请求。从图中我们可以看到,通过使用请求合并器有效减少了对线程池中资源的占用。所以在资源有效并且短时间内会产生高并发请求的时候,为避免连接不够用而引起的延迟可以考虑使用请求合并器的方式来处理和优化。
使用注解实现请求合并器
在快速入门的例子中,我们使用@HystrixCommand注解优雅地实现了HystrixCommand的定义,那么对于请求合并器是否也可以通过注解来定义呢?答案是肯定的!
以上面实现的请求合并器为例,还可以通过如下方式实现:
@Service
public class UserService {
@Autowired
private RestTemplate restTemplate;
@HystrixCollapser(batchMethod="findAll",collapserProperties={
@HystrixProperty(name="timerDelayInMilliseconds",value="100")
})
public User find(Long id){
return null;
}
@HystrixCommand
public List<User> findAll(List<Long> ids){
return restTemplate.getForObject("http://USER-SERVICE/users? ids={1}",List.class,StringUtils.join(ids,","));
}
}
我们之前已经介绍过@HystrixCommand 了,可以看到,这里通过它定义了两个Hystrix命令,一个用于请求/users/{id}接口,一个用于请求/users? ids={ids}接口。而在请求/users/{id}接口的方法上通过@HystrixCollapser注解为其创建了合并请求器,通过 batchMethod 属性指定了批量请求的实现方法为findAll 方法(即请求/users? ids={ids}接口的命令),同时通过collapserProperties属性为合并请求器设置了相关属性,这里使用@HystrixProperty(name="timerDelayInMilliseconds",value="100")将合并时间窗设置为100毫秒。这样通过@HystrixCollapser注解简单而又优雅地实现了在/users/{id}依赖服务之前设置了一个批量请求合并器。
请求合并的额外开销
虽然通过请求合并可以减少请求的数量以缓解依赖服务线程池的资源,但是在使用的时候也需要注意它所带来的额外开销:用于请求合并的延迟时间窗会使得依赖服务的请求延迟增高。比如,某个请求不通过请求合并器访问的平均耗时为5ms,请求合并的延迟时间窗为10ms(默认值),那么当该请求设置了请求合并器之后,最坏情况下(在延迟时间窗结束时才发起请求)该请求需要15ms才能完成。
由于请求合并器的延迟时间窗会带来额外开销,所以我们是否使用请求合并器需要根据依赖服务调用的实际情况来选择,主要考虑下面两个方面。
- 请求命令本身的延迟 。如果依赖服务的请求命令本身是一个高延迟的命令,那么可以使用请求合并器,因为延迟时间窗的时间消耗显得微不足道了。
- 延迟时间窗内的并发量 。如果一个时间窗内只有1~2个请求,那么这样的依赖服务不适合使用请求合并器。这种情况不但不能提升系统性能,反而会成为系统瓶颈,因为每个请求都需要多消耗一个时间窗才响应。相反,如果一个时间窗内具有很高的并发量,并且服务提供方也实现了批量处理接口,那么使用请求合并器可以有效减少网络连接数量并极大提升系统吞吐量,此时延迟时间窗所增加的消耗就可以忽略不计了。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论