返回介绍

源码分析

发布于 2024-08-18 11:12:34 字数 54614 浏览 0 评论 0 收藏 0

相信很多熟悉 Spring 的读者看到这里一定会产生这样的疑问:RestTemplate 不是Spring自己就提供的吗?跟Ribbon的客户端负载均衡又有什么关系呢?在本节中,我们将透过现象看本质,探索一下Ribbon是如何通过RestTemplate实现客户端负载均衡的。

首先,回顾一下之前的消费者示例:我们是如何实现客户端负载均衡的?仔细观察一下之前的实现代码,可以发现在消费者的例子中,可能就@LoadBalanced这个注解是之前没有接触过的,并且从命名上来看也与负载均衡相关。我们不妨以此为线索来看看Spring Cloud Ribbon的源码实现。

从@LoadBalanced 注解源码的注释中可以知道,该注解用来给 RestTemplate 做标记,以使用负载均衡的客户端(LoadBalancerClient)来配置它。

通过搜索LoadBalancerClient可以发现,这是Spring Cloud中定义的一个接口:

public interface LoadBalancerClient {

ServiceInstance choose(String serviceId);

<T> T execute(String serviceId,LoadBalancerRequest<T> request)throws IOException;

URI reconstructURI(ServiceInstance instance,URI original);

}

从该接口中,我们可以通过定义的抽象方法来了解客户端负载均衡器中应具备的几种能力。

- ServiceInstance choose(String serviceId):根据传入的服务名serviceId,从负载均衡器中挑选一个对应服务的实例。

- T execute(String serviceId,LoadBalancerRequest request)throws IOException:使用从负载均衡器中挑选出的服务实例来执行请求内容。

- URI reconstructURI(ServiceInstance instance,URI original):为系统构建一个合适的host:port形式的URI。在分布式系统中,我们使用逻辑上的服务名称作为host来构建URI(替代服务实例的host:port形式)进行请求,比如http://myservice/path/to/service。在该操作的定义中,前者ServiceInstance对象是带有host和port的具体服务实例,而后者URI对象则是使用逻辑服务名定义为host 的 URI,而返回的 URI 内容则是通过ServiceInstance的服务实例详情拼接出的具体host:post形式的请求地址。

顺着 LoadBalancerClient 接口的所属包 org.springframework.cloud.client.loadbalancer,我们对其内容进行整理,可以得出如下图所示的关系。

从类的命名上可初步判断 LoadBalancerAutoConfiguration 为实现客户端负载均衡器的自动化配置类。通过查看源码,我们可以验证这一点假设:

@Configuration

@ConditionalOnClass(RestTemplate.class)

@ConditionalOnBean(LoadBalancerClient.class)

public class LoadBalancerAutoConfiguration {

@LoadBalanced

@Autowired(required=false)

private List<RestTemplate> restTemplates=Collections.emptyList();

@Bean

public SmartInitializingSingleton loadBalancedRestTemplateInitializer(

final List<RestTemplateCustomizer> customizers){

return new SmartInitializingSingleton(){

@Override

public void afterSingletonsInstantiated(){

for(RestTemplate restTemplate : LoadBalancerAutoConfiguration.

this.restTemplates){

for(RestTemplateCustomizer customizer : customizers){

customizer.customize(restTemplate);

}

}

}

};

}

@Bean

@ConditionalOnMissingBean

public RestTemplateCustomizer restTemplateCustomizer(

final LoadBalancerInterceptor loadBalancerInterceptor){

return new RestTemplateCustomizer(){

@Override

public void customize(RestTemplate restTemplate){

List<ClientHttpRequestInterceptor> list=new ArrayList<>(

restTemplate.getInterceptors());

list.add(loadBalancerInterceptor);

restTemplate.setInterceptors(list);

}

};

}

@Bean

public LoadBalancerInterceptor ribbonInterceptor(

LoadBalancerClient loadBalancerClient){

return new LoadBalancerInterceptor(loadBalancerClient);

}

}

从LoadBalancerAutoConfiguration类头上的注解可以知道,Ribbon实现的负载均衡自动化配置需要满足下面两个条件。

- @ConditionalOnClass(RestTemplate.class):RestTemplate 类必须存在于当前工程的环境中。

- @ConditionalOnBean(LoadBalancerClient.class):在Spring的Bean工程中必须有LoadBalancerClient的实现Bean。

在该自动化配置类中,主要做了下面三件事:

- 创建了一个 LoadBalancerInterceptor 的 Bean,用于实现对客户端发起请求时进行拦截,以实现客户端负载均衡。

- 创建了一个RestTemplateCustomizer的Bean,用于给RestTemplate增加LoadBalancerInterceptor拦截器。

- 维护了一个被@LoadBalanced注解修饰的RestTemplate对象列表,并在这里进行初始化,通过调用 RestTemplateCustomizer 的实例来给需要客户端负载均衡的RestTemplate增加LoadBalancerInterceptor拦截器。

接下来,我们看看 LoadBalancerInterceptor 拦截器是如何将一个普通的RestTemplate变成客户端负载均衡的:

通过源码以及之前的自动化配置类,我们可以看到在拦截器中注入了LoadBalancerClient的实现。当一个被@LoadBalanced注解修饰的RestTemplate对象向外发起HTTP请求时,会被LoadBalancerInterceptor类的intercept函数所拦截。由于我们在使用RestTemplate时采用了服务名作为host,所以直接从HttpRequest的URI对象中通过getHost()就可以拿到服务名,然后调用execute函数去根据服务名来选择实例并发起实际的请求。

分析到这里,LoadBalancerClient还只是一个抽象的负载均衡器接口,所以我们还需要找到它的具体实现类来进一步进行分析。通过查看Ribbon的源码,可以很容易地在org.springframework.cloud.netflix.ribbon 包下找到对应的实现类 RibbonLoadBalancerClient。

可以看到,在execute函数的实现中,第一步做的就是通过getServer根据传入的服务名serviceId去获得具体的服务实例:

protected Server getServer(ILoadBalancer loadBalancer){

if(loadBalancer==null){

return null;

}

return loadBalancer.chooseServer("default");

}

通过 getServer 函数的实现源码,我们可以看到这里获取具体服务实例的时候并没有使用LoadBalancerClient接口中的choose函数,而是使用了Netflix Ribbon自身的ILoadBalancer接口中定义的chooseServer函数。

我们先来认识一下这个ILoadBalancer接口:

public interface ILoadBalancer {

public void addServers(List<Server> newServers);

public Server chooseServer(Object key);

public void markServerDown(Server server);

public List<Server> getReachableServers();

public List<Server> getAllServers();

}

可以看到,在该接口中定义了一个客户端负载均衡器需要的一系列抽象操作(未列举过期函数)。

- addServers:向负载均衡器中维护的实例列表增加服务实例。

- chooseServer:通过某种策略,从负载均衡器中挑选出一个具体的服务实例。

- markServerDown:用来通知和标识负载均衡器中某个具体实例已经停止服务,不然负载均衡器在下一次获取服务实例清单前都会认为服务实例均是正常服务的。

- getReachableServers:获取当前正常服务的实例列表。

- getAllServers:获取所有已知的服务实例列表,包括正常服务和停止服务的实例。

在该接口定义中涉及的Server对象定义是一个传统的服务端节点,在该类中存储了服务端节点的一些元数据信息,包括host、port以及一些部署信息等。

而对于该接口的实现,我们整理出如下图所示的结构。可以看到,BaseLoadBalancer类实现了基础的负载均衡,而DynamicServerListLoadBalancer和ZoneAwareLoadBalancer在负载均衡的策略上做了一些功能的扩展。

那么在整合 Ribbon 的时候 Spring Cloud 默认采用了哪个具体实现呢?我们通过RibbonClientConfiguration 配置类,可以知道在整合时默认采用了 ZoneAwareLoadBalancer来实现负载均衡器。

@Bean

@ConditionalOnMissingBean

public ILoadBalancer ribbonLoadBalancer(IClientConfig config,

ServerList<Server> serverList,ServerListFilter<Server> serverListFilter,

IRule rule,IPing ping){

ZoneAwareLoadBalancer<Server> balancer=LoadBalancerBuilder.newBuilder()

.withClientConfig(config).withRule(rule).withPing(ping)

.withServerListFilter(serverListFilter).withDynamicServerList(serverList)

.buildDynamicServerListLoadBalancer();

return balancer;

}

下面,我们再回到 RibbonLoadBalancerClient 的 execute 函数逻辑,在通过ZoneAwareLoadBalancer 的 chooseServer 函数获取了负载均衡策略分配到的服务实例对象Server之后,将其内容包装成RibbonServer对象(该对象除了存储了服务实例的信息之外,还增加了服务名serviceId、是否需要使用HTTPS等其他信息),然后使用该对象再回调LoadBalancerInterceptor请求拦截器中LoadBalancerRequest的apply(final ServiceInstance instance)函数,向一个实际的具体服务实例发起请求,从而实现一开始以服务名为host的URI请求到host:post形式的实际访问地址的转换。

在apply(final ServiceInstance instance)函数中传入的ServiceInstance接口对象是对服务实例的抽象定义。在该接口中暴露了服务治理系统中每个服务实例需要提供的一些基本信息,比如serviceId、host、port等,具体定义如下:

public interface ServiceInstance {

String getServiceId();

String getHost();

int getPort();

boolean isSecure();

URI getUri();

Map<String,String> getMetadata();

}

而上面提到的具体包装 Server 服务实例的 RibbonServer 对象就是ServiceInstance接口的实现,可以看到它除了包含Server对象之外,还存储了服务名、是否使用HTTPS标识以及一个Map类型的元数据集合。

protected static class RibbonServer implements ServiceInstance {

private final String serviceId;

private final Server server;

private final boolean secure;

private Map<String,String> metadata;

protected RibbonServer(String serviceId,Server server){

this(serviceId,server,false,Collections.<String,String> emptyMap());

}

protected RibbonServer(String serviceId,Server server,boolean secure,

Map<String,String> metadata){

this.serviceId=serviceId;

this.server=server;

this.secure=secure;

this.metadata=metadata;

}

//省略实现ServiceInstance的一些获取Server信息的get函数

...

}

那么 apply(final ServiceInstance instance)函数在接收到了具体ServiceInstance实例后,是如何通过LoadBalancerClient接口中的reconstructURI操作来组织具体请求地址的呢?

@Override

public ClientHttpResponse apply(final ServiceInstance instance)

throws Exception {

HttpRequest serviceRequest=new ServiceRequestWrapper(request,instance);

return execution.execute(serviceRequest,body);

}

从 apply 的实现中,可以看到它具体执行的时候,还传入了 ServiceRequestWrapper对象,该对象继承了HttpRequestWrapper并重写了getURI函数,重写后的getURI通过调用LoadBalancerClient接口的reconstructURI函数来重新构建一个URI来进行访问。

private class ServiceRequestWrapper extendshttpRequestWrapper {

private final ServiceInstance instance;

...

@Override

public URI getURI(){

URI uri=LoadBalancerInterceptor.this.loadBalancer.reconstructURI(

this.instance,getRequest().getURI());

return uri;

}

}

在LoadBalancerInterceptor拦截器中,ClientHttpRequestExecution的实例具体执行 execution.execute(serviceRequest,body)时,会调用 InterceptingClientHttpRequest下InterceptingRequestExecution类的execute函数,具体实现如下:

public ClientHttpResponse execute(HttpRequest request,byte[]body)throws IOException {

if(this.iterator.hasNext()){

ClientHttpRequestInterceptor nextInterceptor=this.iterator.next();

return nextInterceptor.intercept(request,body,this);

}

else {

ClientHttpRequest delegate=requestFactory.createRequest(request.getURI(),request.getMethod());

delegate.getHeaders().putAll(request.getHeaders());

if(body.length > 0){

StreamUtils.copy(body,delegate.getBody());

}

return delegate.execute();

}

}

可以看到,在创建请求的时候requestFactory.createRequest(request.getURI(),request.getMethod());,这里的 request.getURI()会调用之前介绍的ServiceRequestWrapper对象中重写的getURI函数。此时,它就会使用RibbonLoad BalancerClient中实现的reconstructURI来组织具体请求的服务实例地址。

public URI reconstructURI(ServiceInstance instance,URI original){

Assert.notNull(instance,"instance can not be null");

String serviceId=instance.getServiceId();

RibbonLoadBalancerContext context=this.clientFactory

.getLoadBalancerContext(serviceId);

Server server=new Server(instance.getHost(),instance.getPort());

boolean secure=isSecure(server,serviceId);

URI uri=original;

if(secure){

uri=UriComponentsBuilder.fromUri(uri).scheme("https").build().toUri();

}

return context.reconstructURIWithServer(server,uri);

}

从reconstructURI函数中我们可以看到,它通过ServiceInstance实例对象的serviceId,从 SpringClientFactory 类的 clientFactory 对象中获取对应serviceId 的负载均衡器的上下文 RibbonLoadBalancerContext 对象。然后根据ServiceInstance 中的信息来构建具体服务实例信息的 Server 对象,并使用RibbonLoadBalancerContext对象的reconstructURIWithServer函数来构建服务实例的URI。

为了帮助理解,简单介绍一下上面提到的SpringClientFactory和RibbonLoadBalancerContext:

- SpringClientFactory 类是一个用来创建客户端负载均衡器的工厂类,该工厂类会为每一个不同名的Ribbon客户端生成不同的Spring上下文。

- RibbonLoadBalancerContext类是LoadBalancerContext的子类,该类用于存储一些被负载均衡器使用的上下文内容和 API 操作(reconstructURIWithServer就是其中之一)。

从reconstructURIWithServer的实现中我们可以看到,它同reconstructURI的定义类似。只是 reconstructURI 的第一个保存具体服务实例的参数使用了 Spring Cloud定义的ServiceInstance,而reconstructURIWithServer中使用了Netflix中定义的Server,所以在RibbonLoadBalancerClient实现reconstructURI 的时候,做了一次转换,使用ServiceInstance的host和port信息构建了一个Server对象来给reconstructURIWithServer使用。从reconstructURIWithServer的实现逻辑中,我们可以看到,它从Server对象中获取host和port信息,然后根据以服务名为host的URI对象original中获取其他请求信息,将两者内容进行拼接整合,形成最终要访问的服务实例的具体地址。

public class LoadBalancerContext implements IClientConfigAware {

...

public URI reconstructURIWithServer(Server server,URI original){

String host=server.getHost();

int port=server.getPort();

if(host.equals(original.getHost())

&& port==original.getPort()){

return original;

}

String scheme=original.getScheme();

if(scheme==null){

scheme=deriveSchemeAndPortFromPartialUri(original).first();

}

try {

StringBuilder sb=new StringBuilder();

sb.append(scheme).append("://");

if(! Strings.isNullOrEmpty(original.getRawUserInfo())){

sb.append(original.getRawUserInfo()).append("@");

}

sb.append(host);

if(port >=0){

sb.append(":").append(port);

}

sb.append(original.getRawPath());

if(! Strings.isNullOrEmpty(original.getRawQuery())){

sb.append("? ").append(original.getRawQuery());

}

if(! Strings.isNullOrEmpty(original.getRawFragment())){

sb.append("#").append(original.getRawFragment());

}

URI newURI=new URI(sb.toString());

return newURI;

} catch(URISyntaxException e){

throw new RuntimeException(e);

}

}

...

}

另外,从RibbonLoadBalancerClient的execute函数逻辑中,我们还能看到在回调拦截器中,执行具体的请求之后,Ribbon还通过RibbonStatsRecorder对象对服务的请求进行了跟踪记录,这里不再展开说明,有兴趣的读者可以继续研究。

分析到这里,我们已经可以大致理清Spring Cloud Ribbon中实现客户端负载均衡的基本脉络,了解了它是如何通过 LoadBalancerInterceptor 拦截器对 RestTemplate的请求进行拦截,并利用Spring Cloud的负载均衡器LoadBalancerClient将以逻辑服务名为host 的 URI 转换成具体的服务实例地址的过程。同时通过分析LoadBalancerClient的Ribbon实现RibbonLoadBalancerClient,可以知道在使用Ribbon实现负载均衡器的时候,实际使用的还是Ribbon中定义的ILoadBalancer接口的实现,自动化配置会采用ZoneAwareLoadBalancer的实例来实现客户端负载均衡。

负载均衡器

通过之前的分析,我们已经对 Spring Cloud 如何使用 Ribbon 有了基本的了解。虽然Spring Cloud 中定义了 LoadBalancerClient 作为负载均衡器的通用接口,并且针对Ribbon 实现了 RibbonLoadBalancerClient,但是它在具体实现客户端负载均衡时,是通过 Ribbon 的 ILoadBalancer 接口实现的。在上一节进行分析时候,我们对该接口的实现结构已经做了一些简单的介绍,下面我们根据ILoadBalancer接口的实现类逐个看看它是如何实现客户端负载均衡的。

AbstractLoadBalancer

AbstractLoadBalancer是ILoadBalancer接口的抽象实现。在该抽象类中定义了一个关于服务实例的分组枚举类ServerGroup,它包含以下三种不同类型。

- ALL:所有服务实例。

- STATUS_UP:正常服务的实例。

- STATUS_NOT_UP:停止服务的实例。

另外,还实现了一个 chooseServer()函数,该函数通过调用接口中的chooseServer(Object key)实现,其中参数key为null,表示在选择具体服务实例时忽略key的条件判断。

最后,还定义了两个抽象函数。

- getServerList(ServerGroup serverGroup):定义了根据分组类型来获取不同的服务实例的列表。

- getLoadBalancerStats():定义了获取LoadBalancerStats对象的方法,LoadBalancerStats 对象被用来存储负载均衡器中各个服务实例当前的属性和统计信息。这些信息非常有用,我们可以利用这些信息来观察负载均衡器的运行情况,同时这些信息也是用来制定负载均衡策略的重要依据。

public abstract class AbstractLoadBalancer implements ILoadBalancer {

public enum ServerGroup{

ALL,

STATUS_UP,

STATUS_NOT_UP

}

public Server chooseServer(){

return chooseServer(null);

}

public abstract List<Server> getServerList(ServerGroup serverGroup);

public abstract LoadBalancerStats getLoadBalancerStats();

}

BaseLoadBalancer

BaseLoadBalancer类是Ribbon负载均衡器的基础实现类,在该类中定义了很多关于负载均衡器相关的基础内容。

- 定义并维护了两个存储服务实例Server对象的列表。一个用于存储所有服务实例的清单,一个用于存储正常服务的实例清单。

@Monitor(name=PREFIX+"AllServerList",type=DataSourceType.INFORMATIONAL)

protected volatile List<Server> allServerList=Collections

.synchronizedList(new ArrayList<Server>());

@Monitor(name=PREFIX+"UpServerList",type=DataSourceType.INFORMATIONAL)

protected volatile List<Server> upServerList=Collections

.synchronizedList(new ArrayList<Server>());

- 定义了之前我们提到的用来存储负载均衡器各服务实例属性和统计信息的LoadBalancerStats对象。

- 定义了检查服务实例是否正常服务的 IPing 对象,在 BaseLoadBalancer 中默认为null,需要在构造时注入它的具体实现。

- 定义了检查服务实例操作的执行策略对象IPingStrategy,在BaseLoadBalancer中默认使用了该类中定义的静态内部类SerialPingStrategy实现。根据源码,我们可以看到该策略采用线性遍历 ping 服务实例的方式实现检查。该策略在当IPing的实现速度不理想,或是Server列表过大时,可能会影响系统性能,这时候需要通过实现 IPingStrategy 接口并重写 pingServers(IPing ping,Server[]servers)函数去扩展ping的执行策略。

- 定义了负载均衡的处理规则 IRule 对象,从 BaseLoadBalancer 中chooseServer(Object key)的实现源码,我们可以知道,负载均衡器实际将服务实例选择任务委托给了 IRule 实例中的 choose 函数来实现。而在这里,默认初始化了RoundRobinRule为IRule的实现对象。RoundRobinRule实现了最基本且常用的线性负载均衡规则。

public Server chooseServer(Object key){

if(counter==null){

counter=createCounter();

}

counter.increment();

if(rule==null){

return null;

} else {

try {

return rule.choose(key);

} catch(Throwable t){

return null;

}

}

}

- 启动ping任务:在BaseLoadBalancer的默认构造函数中,会直接启动一个用于定时检查Server是否健康的任务。该任务默认的执行间隔为10秒。

- 实现了ILoadBalancer接口定义的负载均衡器应具备以下一系列基本操作。

- addServers(List newServers):向负载均衡器中增加新的服务实例列表,该实现将原本已经维护着的所有服务实例清单 allServerList 和新传入的服务实例清单 newServers 都加入到 newList 中,然后通过调用setServersList函数对newList进行处理,在BaseLoadBalancer中实现的时候会使用新的列表覆盖旧的列表。而之后介绍的几个扩展实现类对于服务实例清单更新的优化都是通过对setServersList函数的重写来实现的。

public void addServers(List<Server> newServers){

if(newServers !=null && newServers.size()> 0){

try {

ArrayList<Server> newList=new ArrayList<Server>();

newList.addAll(allServerList);

newList.addAll(newServers);

setServersList(newList);

} catch(Exception e){

logger.error("Exception while adding Servers",e);

}

}

}

- chooseServer(Object key):挑选一个具体的服务实例,在上面介绍 IRule的时候,已经做了说明,这里不再赘述。

- markServerDown(Server server):标记某个服务实例暂停服务。

public void markServerDown(Server server){

if(server==null){

return;

}

if(! server.isAlive()){

return;

}

logger.error("LoadBalancer: markServerDown called on["

+server.getId()+"]");

server.setAlive(false);

notifyServerStatusChangeListener(singleton(server));

}

- getReachableServers():获取可用的服务实例列表。由于BaseLoadBalancer中单独维护了一个正常服务的实例清单,所以直接返回即可。

public List<Server> getReachableServers(){

return Collections.unmodifiableList(upServerList);

}

- getAllServers():获取所有的服务实例列表。由于BaseLoadBalancer中单独维护了一个所有服务的实例清单,所以也直接返回它即可。

public List<Server> getAllServers(){

return Collections.unmodifiableList(allServerList);

}

DynamicServerListLoadBalancer

DynamicServerListLoadBalancer 类继承于 BaseLoadBalancer 类,它是对基础负载均衡器的扩展。在该负载均衡器中,实现了服务实例清单在运行期的动态更新能力;同时,它还具备了对服务实例清单的过滤功能,也就是说,我们可以通过过滤器来选择性地获取一批服务实例清单。下面我们具体来看看在该类中增加了一些什么内容。

ServerList

从 DynamicServerListLoadBalancer 的成员定义中,我们马上可以发现新增了一个关于服务列表的操作对象ServerList<T> serverListImpl。其中泛型T从类名中对于T的限定DynamicServerListLoadBalancer<T extends Server>可以获知它是一个Server的子类,即代表了一个具体的服务实例的扩展类。而ServerList接口定义如下所示:

public interface ServerList<T extends Server> {

public List<T> getInitialListOfServers();

public List<T> getUpdatedListOfServers();

}

它定义了两个抽象方法:getInitialListOfServers用于获取初始化的服务实例清单,而getUpdatedListOfServers用于获取更新的服务实例清单。那么该接口的实现有哪些呢?通过搜索源码,我们可以整理出如下图所示的结构。

从上图中我们可以看到有多个 ServerList 的实现类,那么在 DynamicServerListLoadBalancer中的ServerList默认配置到底使用了哪个具体实现呢?既然在该负载均衡器中需要实现服务实例的动态更新,那么势必需要Ribbon具备访问Eureka来获取服务实例的能力,所以我们从 Spring Cloud 整合 Ribbon 与 Eureka 的包org.springframework.cloud.netflix.ribbon.eureka下进行探索,可以找到配置类 EurekaRibbonClientConfiguration,在该类中可以找到如下创建ServerList实例的内容:

@Bean

@ConditionalOnMissingBean

public ServerList<? > ribbonServerList(IClientConfig config){

DiscoveryEnabledNIWSServerList discoveryServerList=new

DiscoveryEnabledNIWSServerList(

config);

DomainExtractingServerList serverList=new DomainExtractingServerList(

discoveryServerList,config,this.approximateZoneFromHostname);

return serverList;

}

可以看到,这里创建的是一个DomainExtractingServerList实例,从下面它的源码中我们可以看到,在它内部还定义了一个 ServerList list。同时,DomainExtractingServerList 类中对 getInitialListOfServers 和 getUpdatedListOfServers的具体实现,其实委托给了内部定义的ServerList list对象,而该对象是通过创建 DomainExtractingServerList 时,由构造函数传入的DiscoveryEnabledNIWSServerList实现的。

那么 DiscoveryEnabledNIWSServerList 是如何实现这两个服务实例获取的呢?我们从源码中可以看到这两个方法都是通过该类中的一个私有函数obtainServersViaDiscovery通过服务发现机制来实现服务实例的获取的。

@Override

public List<DiscoveryEnabledServer> getInitialListOfServers(){

return obtainServersViaDiscovery();

}

@Override

public List<DiscoveryEnabledServer> getUpdatedListOfServers(){

return obtainServersViaDiscovery();

}

而obtainServersViaDiscovery的实现逻辑如下所示,主要依靠EurekaClient从服务注册中心中获取到具体的服务实例InstanceInfo列表(EurekaClient的具体实现,我们在分析Eureka的源码时已经做了详细的介绍,这里传入的vipAddress可以理解为逻辑上的服务名,比如 USER-SERVICE)。接着,对这些服务实例进行遍历,将状态为UP(正常服务)的实例转换成DiscoveryEnabledServer对象,最后将这些实例组织成列表返回。

在DiscoveryEnabledNIWSServerList中通过EurekaClient从服务注册中心获取到最新的服务实例清单后,返回的List到了DomainExtractingServerList类中,将继续通过setZones函数进行处理。而这里的处理具体内容如下所示,主要完成将DiscoveryEnabledNIWSServerList返回的List列表中的元素,转换成内部定义的DiscoveryEnabledServer 的子类对象 DomainExtractingServer,在该对象的构造函数中将为服务实例对象设置一些必要的属性,比如 id、zone、isAliveFlag、readyToServe等信息。

private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer>

servers){

List<DiscoveryEnabledServer> result=new ArrayList<>();

boolean isSecure=this.clientConfig.getPropertyAsBoolean(

CommonClientConfigKey.IsSecure,Boolean.TRUE);

boolean shouldUseIpAddr=this.clientConfig.getPropertyAsBoolean(

CommonClientConfigKey.UseIPAddrForServer,Boolean.FALSE);

for(DiscoveryEnabledServer server : servers){

result.add(new DomainExtractingServer(server,isSecure,shouldUseIpAddr,

this.approximateZoneFromHostname));

}

return result;

}

ServerListUpdater

通过上面的分析我们已经知道了Ribbon与Eureka整合后,如何实现从Eureka Server中获取服务实例清单。那么它又是如何触发向Eureka Server去获取服务实例清单以及如何在获取到服务实例清单后更新本地的服务实例清单的呢?继续来看 DynamicServerListLoadBalancer 中的实现内容,我们可以很容易地找到下面定义的关于ServerListUpdater的内容:

protected final ServerListUpdater.UpdateAction updateAction=new ServerListUpdater.UpdateAction(){

@Override

public void doUpdate(){

updateListOfServers();

}

};

protected volatile ServerListUpdater serverListUpdater;

根据该接口的命名,我们基本就能猜到,这个对象实现的是对ServerList的更新,所以可以称它为“服务更新器”。从下面的源码中可以看到,在ServerListUpdater内部还定义了一个UpdateAction接口,上面定义的updateAction对象就是以匿名内部类的方式创建了一个它的具体实现,其中doUpdate实现的内容就是对ServerList的具体更新操作。除此之外,“服务更新器”中还定义了一系列控制它和获取它的信息的操作。

public interface ServerListUpdater {

public interface UpdateAction {

void doUpdate();

}

//启动服务更新器,传入的UpdateAction对象为更新操作的具体实现。

void start(UpdateAction updateAction);

//停止服务更新器

void stop();

//获取最近的更新时间戳

String getLastUpdate();

//获取上一次更新到现在的时间间隔,单位为毫秒

long getDurationSinceLastUpdateMs();

//获取错过的更新周期数

int getNumberMissedCycles();

//获取核心线程数

int getCoreThreads();

}

而ServerListUpdater的实现类不多,具体如下图所示。

根据两个类的注释,我们可以很容易地知道它们的作用。

- PollingServerListUpdater:动态服务列表更新的默认策略,也就是说,DynamicServerListLoadBalancer 负载均衡器中的默认实现就是它,它通过定时任务的方式进行服务列表的更新。

- EurekaNotificationServerListUpdater:该更新器也可服务于DynamicServerListLoadBalancer负载均衡器,但是它的触发机制与PollingServerListUpdater不同,它需要利用Eureka的事件监听器来驱动服务列表的更新操作。

下面我们来详细看看它默认实现的 PollingServerListUpdater。先从用于启动“服务更新器”的start函数源码看起,具体如下。我们可以看到start函数的实现内容验证了之前提到的:以定时任务的方式进行服务列表的更新。它先创建了一个 Runnable的线程实现,在该实现中调用了上面提到的具体更新服务实例列表的方法updateAction.doUpdate(),最后再为这个Runnable线程实现启动了一个定时任务来执行。

继续看 PollingServerListUpdater 的其他内容,我们可以找到用于启动定时任务的两个重要参数initialDelayMs和refreshIntervalMs的默认定义分别为1000和30*1000,单位为毫秒。也就是说,更新服务实例在初始化之后延迟1秒后开始执行,并以30秒为周期重复执行。除了这些内容之外,还能看到它还会记录最后更新时间、是否存活等信息,同时也实现了 ServerListUpdater 中定义的一些其他操作内容,这些操作相对比较简单,这里不再具体说明,有兴趣的读者可以自己查看源码了解其实现原理。

ServerListFilter

在了解了更新服务实例的定时任务是如何启动的之后,我们回到 updateAction.doUpdate()调用的具体实现位置,在 DynamicServerListLoadBalancer 中,它的实际实现委托给了updateListOfServers函数,具体实现如下:

public void updateListOfServers(){

List<T> servers=new ArrayList<T>();

if(serverListImpl !=null){

servers=serverListImpl.getUpdatedListOfServers();

LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",

getIdentifier(),servers);

if(filter !=null){

servers=filter.getFilteredListOfServers(servers);

LOGGER.debug("Filtered List of Servers for {} obtained from Discovery

client: {}",

getIdentifier(),servers);

}

}

updateAllServerList(servers);

}

可以看到,这里终于用到了之前提到的ServerList的getUpdatedListOfServers(),通过之前的介绍已经知道这一步实现了从Eureka Server中获取服务可用实例的列表。在获得了服务实例列表之后,这里又将引入一个新的对象 filter,追溯该对象的定义,我们可以找到它是ServerListFilter定义的。

ServerListFilter接口非常简单,该接口中定义了一个方法List getFilteredListOfServers(List servers),主要用于实现对服务实例列表的过滤,通过传入的服务实例清单,根据一些规则返回过滤后的服务实例清单。该接口的实现如下图所示。

其中,除了 ZonePreferenceServerListFilter 的实现是 Spring Cloud Ribbon中对Netflix Ribbon的扩展实现外,其他均是Netflix Ribbon中的原生实现类。下面,我们可以分别看看这些过滤器实现都有什么特点。

- AbstractServerListFilter:这是一个抽象过滤器,在这里定义了过滤时需要的一个重要依据对象LoadBalancerStats,我们在之前介绍过,该对象存储了关于负载均衡器的一些属性和统计信息等。

public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> {

private volatile LoadBalancerStats stats;

public void setLoadBalancerStats(LoadBalancerStats stats){

this.stats=stats;

}

public LoadBalancerStats getLoadBalancerStats(){

return stats;

}

}

- ZoneAffinityServerListFilter:该过滤器基于“区域感知(Zone Affinity)”的方式实现服务实例的过滤,也就是说,它会根据提供服务的实例所处的区域(Zone)与消费者自身的所处区域(Zone)进行比较,过滤掉那些不是同处一个区域的实例。

public List<T> getFilteredListOfServers(List<T> servers){

if(zone !=null &&(zoneAffinity || zoneExclusive)&& servers !=null &&

servers.size()> 0){

List<T> filteredServers=Lists.newArrayList(Iterables.filter(

servers,this.zoneAffinityPredicate.getServerOnlyPredicate()));

if(shouldEnableZoneAffinity(filteredServers)){

return filteredServers;

} else if(zoneAffinity){

overrideCounter.increment();

}

}

return servers;

}

从上面的源码中我们可以看到,对于服务实例列表的过滤是通过 Iterables.filter(servers,this.zoneAffinityPredicate.getServerOnlyPredicate())来实现的,其中判断依据由 ZoneAffinityPredicate 实现服务实例与消费者的 Zone比较。而在过滤之后,这里并不会马上返回过滤的结果,而是通过shouldEnableZoneAffinity函数来判断是否要启用“区域感知”的功能。从下面shouldEnableZoneAffinity的实现中,我们可以看到,它使用了LoadBalancerStats的getZoneSnapshot方法来获取这些过滤后的同区域实例的基础指标(包含实例数量、断路器断开数、活动请求数、实例平均负载等),根据一系列的算法求出下面的几个评价值并与设置的阈值进行对比(下面的为默认值),若有一个条件符合,就不启用“区域感知”过滤的服务实例清单。这一算法实现为集群出现区域故障时,依然可以依靠其他区域的实例进行正常服务提供了完善的高可用保障。同时,通过这里的介绍,我们也可以关联着来理解之前介绍Eureka的时候提到的对于区域分配设计来保证跨区域故障的高可用问题。

- blackOutServerPercentage:故障实例百分比(断路器断开数/实例数量)>=0.8。

- activeReqeustsPerServer:实例平均负载 >=0.6。

- availableServers:可用实例数(实例数量-断路器断开数)< 2。

private boolean shouldEnableZoneAffinity(List<T> filtered){

if(! zoneAffinity && ! zoneExclusive){

return false;

}

if(zoneExclusive){

return true;

}

LoadBalancerStats stats=getLoadBalancerStats();

if(stats==null){

return zoneAffinity;

} else {

logger.debug("Determining if zone affinity should be enabled with given server list: {}",filtered);

ZoneSnapshot snapshot=stats.getZoneSnapshot(filtered);

double loadPerServer=snapshot.getLoadPerServer();

int instanceCount=snapshot.getInstanceCount();

int circuitBreakerTrippedCount=snapshot.getCircuitTrippedCount();

if(((double)circuitBreakerTrippedCount)/instanceCount >=

blackOutServerPercentageThreshold.get()

|| loadPerServer >=activeReqeustsPerServerThreshold.get()

||(instanceCount-circuitBreakerTrippedCount)<

availableServersThreshold.get()){

logger.debug("zoneAffinity is overriden.blackOutServerPercentage: {},

activeReqeustsPerServer: {},availableServers: {}",

new Object[]{(double)circuitBreakerTrippedCount /instanceCount,

loadPerServer,instanceCount-circuitBreakerTrippedCount});

return false;

} else {

return true;

}

}

}

- DefaultNIWSServerListFilter:该过滤器完全继承自 ZoneAffinityServerListFilter,是默认的NIWS(Netflix Internal Web Service)过滤器。

- ServerListSubsetFilter:该过滤器也继承自 ZoneAffinityServerListFilter,它非常适用于拥有大规模服务器集群(上百或更多)的系统。因为它可以产生一个“区域感知”结果的子集列表,同时它还能够通过比较服务实例的通信失败数量和并发连接数来判定该服务是否健康来选择性地从服务实例列表中剔除那些相对不够健康的实例。该过滤器的实现主要分为以下三步:

1.获取“区域感知”的过滤结果,作为候选的服务实例清单。

2.从当前消费者维护的服务实例子集中剔除那些相对不够健康的实例(同时也将这些实例从候选清单中剔除,防止第三步的时候又被选入),不够健康的标准如下所示。

a.服务实例的并发连接数超过客户端配置的值,默认为0,配置参数为

<clientName>.<nameSpace>.ServerListSubsetFilter.eliminat

ionConnectionThresold。

b.服务实例的失败数超过客户端配置的值,默认为0,配置参数为

<clientName>.<nameSpace>.ServerListSubsetFilter.eliminat

ionFailureThresold。

c.如果按符合上面任一规则的服务实例剔除后,剔除比例小于客户端默认配置的百分比,默认为0.1(10%),配置参数为<clientName>.<nameSpace>.ServerListSubsetFilter.forceEliminatePercent,那么就先对剩下的实例列表进行健康排序,再从最不健康的实例进行剔除,直到达到配置的剔除百分比。

3.在完成剔除后,清单已经少了至少10%(默认值)的服务实例,最后通过随机的方式从候选清单中选出一批实例加入到清单中,以保持服务实例子集与原来的数量一致,而默认的实例子集数量为20,其配置参数为<clientName>.<nameSpace>.ServerListSubsetFilter.size。

- ZonePreferenceServerListFilter:Spring Cloud 整合时新增的过滤器。若使用Spring Cloud整合Eureka和Ribbon时会默认使用该过滤器。它实现了通过配置或者Eureka实例元数据的所属区域(Zone)来过滤出同区域的服务实例。如下面的源码所示,它的实现非常简单,首先通过父类ZoneAffinityServerListFilter的过滤器来获得“区域感知”的服务实例列表,然后遍历这个结果,取出根据消费者配置预设的区域 Zone 来进行过滤,如果过滤的结果是空就直接返回父类获取的结果,如果不为空就返回通过消费者配置的Zone过滤后的结果。

@Override

public List<Server> getFilteredListOfServers(List<Server> servers){

List<Server> output=super.getFilteredListOfServers(servers);

if(this.zone !=null && output.size()==servers.size()){

List<Server> local=new ArrayList<Server>();

for(Server server : output){

if(this.zone.equalsIgnoreCase(server.getZone())){

local.add(server);

}

}

if(! local.isEmpty()){

return local;

}

}

return output;

}

ZoneAwareLoadBalancer

ZoneAwareLoadBalancer 负载均衡器是对 DynamicServerListLoadBalancer的扩展。在 DynamicServerListLoadBalancer 中,我们可以看到它并没有重写选择具体服务实例的chooseServer函数,所以它依然会采用在BaseLoadBalancer中实现的算法。使用 RoundRobinRule 规则,以线性轮询的方式来选择调用的服务实例,该算法实现简单并没有区域(Zone)的概念,所以它会把所有实例视为一个Zone下的节点来看待,这样就会周期性地产生跨区域(Zone)访问的情况,由于跨区域会产生更高的延迟,这些实例主要以防止区域性故障实现高可用为目的而不能作为常规访问的实例,所以在多区域部署的情况下会有一定的性能问题,而该负载均衡器则可以避免这样的问题。那么它是如何实现的呢?

首先,在ZoneAwareLoadBalancer中,我们可以发现,它并没有重写setServersList,说明实现服务实例清单的更新主逻辑没有修改。但是我们可以发现它重写了这个函数setServerListForZones(Map<String,List<Server>>zoneServersMap)。看到这里可能会有一些陌生,因为它并不是接口中定义的必备函数,所以我们不妨去父类DynamicServerListLoadBalancer中寻找一下该函数,我们可以找到下面的定义:

public void setServersList(List lsrv){

super.setServersList(lsrv);

List<T> serverList=(List<T>)lsrv;

Map<String,List<Server>> serversInZones=new HashMap<String,List<Server>>();

...

setServerListForZones(serversInZones);

}

protected void setServerListForZones(Map<String,List<Server>> zoneServersMap){

LOGGER.debug("Setting server list for zones: {}",zoneServersMap);

getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);

}

setServerListForZones 函数的调用位于更新服务实例清单函数 setServersList的最后,同时从其实现的内容来看,它在父类DynamicServerListLoadBalancer中的作用是根据按区域Zone分组的实例列表,为负载均衡器中的LoadBalancerStats对象创建ZoneStats并放入Map zoneStatsMap集合中,每一个区域Zone对应一个ZoneStats,它用于存储每个Zone的一些状态和统计信息。

在ZoneAwareLoadBalancer中对setServerListForZones的重写如下:

protected void setServerListForZones(Map<String,List<Server>> zoneServersMap){

super.setServerListForZones(zoneServersMap);

if(balancers==null){

balancers=new ConcurrentHashMap<String,BaseLoadBalancer>();

}

for(Map.Entry<String,List<Server>> entry: zoneServersMap.entrySet()){

String zone=entry.getKey().toLowerCase();

getLoadBalancer(zone).setServersList(entry.getValue());

}

for(Map.Entry<String,BaseLoadBalancer> existingLBEntry: balancers.entrySet()){

if(! zoneServersMap.keySet().contains(existingLBEntry.getKey())){

existingLBEntry.getValue().setServersList(Collections.emptyList());

}

}

}

可以看到,在该实现中创建了一个 ConcurrentHashMap()类型的 balancers 对象,它将用来存储每个Zone区域对应的负载均衡器。而具体的负载均衡器的创建则是通过在下面的第一个循环中调用getLoadBalancer函数来完成,同时在创建负载均衡器的时候会创建它的规则(如果当前实现中没有 IRule 的实例,就创建一个 AvailabilityFilteringRule规则;如果已经有具体实例,就克隆一个)。在创建完负载均衡器后又马上调用setServersList函数为其设置对应Zone区域的实例清单。而第二个循环则是对Zone 区域中实例清单的检查,看看是否有 Zone 区域下已经没有实例了,是的话就将balancers中对应Zone区域的实例列表清空,该操作的作用是为了后续选择节点时,防止过时的Zone区域统计信息干扰具体实例的选择算法。

在了解了该负载均衡器是如何扩展服务实例清单的实现后,我们来具体看看它是如何挑选服务实例,来实现对区域的识别的:

从源码中我们可以看到,只有当负载均衡器中维护的实例所属的Zone区域的个数大于1的时候才会执行这里的选择策略,否则还是将使用父类的实现。当Zone区域的个数大于1的时候,它的实现步骤如下所示。

- 调用ZoneAvoidanceRule中的静态方法createSnapshot(lbStats),为当前负载均衡器中所有的Zone区域分别创建快照,保存在Map zoneSnapshot中,这些快照中的数据将用于后续的算法。

- 调用ZoneAvoidanceRule中的静态方法getAvailableZones(zoneSnapshot,triggeringLoad.get(),triggeringBlackoutPercentage.get()),来获取可用的Zone区域集合,在该函数中会通过Zone区域快照中的统计数据来实现可用区的挑选。

- 首先它会剔除符合这些规则的 Zone 区域:所属实例数为零的 Zone 区域;Zone区域内实例的平均负载小于零,或者实例故障率(断路器断开次数/实例数)大于等于阈值(默认为0.99999)。

- 然后根据Zone区域的实例平均负载计算出最差的Zone区域,这里的最差指的是实例平均负载最高的Zone区域。

- 如果在上面的过程中没有符合剔除要求的区域,同时实例最大平均负载小于阈值(默认为20%),就直接返回所有Zone区域为可用区域。否则,从最坏Zone区域集合中随机选择一个,将它从可用Zone区域集合中剔除。

- 当获得的可用Zone区域集合不为空,并且个数小于Zone区域总数,就随机选择一个Zone区域。

- 在确定了某个 Zone 区域后,则获取了对应 Zone 区域的服务均衡器,并调用chooseServer 来选择具体的服务实例,而在 chooseServer 中将使用 IRule接口的 choose 函数来选择具体的服务实例。在这里,IRule 接口的实现会使用ZoneAvoidanceRule来挑选出具体的服务实例。

关于 ZoneAvoidanceRule 的策略以及其他一些还未提到的负载均衡策略,我们将在下一节做更加详细的解读。

负载均衡策略

通过上面的源码解读,我们已经对Ribbon实现的负载均衡器以及其中包含的服务实例过滤器、服务实例信息的存储对象、区域的信息快照等都有了深入的认识和理解,但是对于负载均衡器中的服务实例选择策略只是讲解了几个默认实现的内容,而对于IRule的其他实现还没有详细解读,下面我们来看看在Ribbon中都提供了哪些负载均衡的策略实现。

如下图所示,可以看到在Ribbon中实现了非常多的选择策略,其中也包含了我们在前面内容中提到过的RoundRobinRule和ZoneAvoidanceRule。下面我们来详细解读一下IRule接口的各个实现。

AbstractLoadBalancerRule

负载均衡策略的抽象类,在该抽象类中定义了负载均衡器ILoadBalancer对象,该对象能够在具体实现选择服务策略时,获取到一些负载均衡器中维护的信息来作为分配依据,并以此设计一些算法来实现针对特定场景的高效策略。

public abstract class AbstractLoadBalancerRule implements IRule,IClientConfigAware {

private ILoadBalancer lb;

@Override

public void setLoadBalancer(ILoadBalancer lb){

this.lb=lb;

}

@Override

public ILoadBalancer getLoadBalancer(){

return lb;

}

}

RandomRule

该策略实现了从服务实例清单中随机选择一个服务实例的功能。它的具体实现如下,可以看到 IRule 接口的 choose(Object key)函数实现,委托给了该类中的choose(ILoadBalancer lb,Object key),该方法增加了一个负载均衡器对象的参数。从具体的实现上看,它会使用传入的负载均衡器来获得可用实例列表upList和所有实例列表 allList,并通过 rand.nextInt(serverCount)函数来获取一个随机数,并将该随机数作为upList的索引值来返回具体实例。同时,具体的选择逻辑在一个while(server==null)循环之内,而根据选择逻辑的实现,正常情况下每次选择都应该选出一个服务实例,如果出现死循环获取不到服务实例时,则很有可能存在并发的Bug。

RoundRobinRule

该策略实现了按照线性轮询的方式依次选择每个服务实例的功能。它的具体实现如下,其详细结构与RandomRule非常类似。除了循环条件不同外,就是从可用列表中获取所谓的逻辑不同。从循环条件中,我们可以看到增加了一个count计数变量,该变量会在每次循环之后累加,也就是说,如果一直选择不到server超过10次,那么就会结束尝试,并打印一个警告信息No available alive servers after 10 tries from load balancer:...。而线性轮询的实现则是通过AtomicInteger nextServerCyclicCounter对象实现,每次进行实例选择时通过调用incrementAndGetModulo函数实现递增。

RetryRule

该策略实现了一个具备重试机制的实例选择功能。从下面的实现中我们可以看到,在其内部还定义了一个IRule对象,默认使用了RoundRobinRule实例。而在choose方法中则实现了对内部定义的策略进行反复尝试的策略,若期间能够选择到具体的服务实例就返回,若选择不到就根据设置的尝试结束时间为阈值(maxRetryMillis参数定义的值+choose方法开始执行的时间戳),当超过该阈值后就返回null。

public class RetryRule extends AbstractLoadBalancerRule {

IRule subRule=new RoundRobinRule();

long maxRetryMillis=500;

...

public Server choose(ILoadBalancer lb,Object key){

long requestTime=System.currentTimeMillis();

long deadline=requestTime+maxRetryMillis;

Server answer=null;

answer=subRule.choose(key);

if(((answer==null)||(! answer.isAlive()))

&&(System.currentTimeMillis()< deadline)){

InterruptTask task=new InterruptTask(deadline

-System.currentTimeMillis());

while(! Thread.interrupted()){

answer=subRule.choose(key);

if(((answer==null)||(! answer.isAlive()))

&&(System.currentTimeMillis()< deadline)){

Thread.yield();

} else {

break;

}

}

task.cancel();

}

if((answer==null)||(! answer.isAlive())){

return null;

} else {

return answer;

}

}

...

}

WeightedResponseTimeRule

该策略是对 RoundRobinRule 的扩展,增加了根据实例的运行情况来计算权重,并根据权重来挑选实例,以达到更优的分配效果,它的实现主要有三个核心内容。

定时任务

WeightedResponseTimeRule策略在初始化的时候会通过serverWeightTimer.schedule(new DynamicServerWeightTask(),0,serverWeightTaskTimerInterval)启动一个定时任务,用来为每个服务实例计算权重,该任务默认30秒执行一次。

class DynamicServerWeightTask extends TimerTask {

public void run(){

ServerWeight serverWeight=new ServerWeight();

try {

serverWeight.maintainWeights();

} catch(Throwable t){

logger.error("Throwable caught while running DynamicServerWeightTask for"+name,t);

}

}

}

权重计算

在源码中我们可以轻松找到用于存储权重的对象 List<Double> accumulatedWeights=new ArrayList<Double>(),该 List 中每个权重值所处的位置对应了负载均衡器维护的服务实例清单中所有实例在清单中的位置。

维护实例权重的计算过程通过maintainWeights函数实现,具体如下面的代码所示:

public void maintainWeights(){

ILoadBalancer lb=getLoadBalancer();

...

try {

logger.info("Weight adjusting job started");

AbstractLoadBalancer nlb=(AbstractLoadBalancer)lb;

LoadBalancerStats stats=nlb.getLoadBalancerStats();

...

//计算所有实例的平均响应时间的总和:totalResponseTime

double totalResponseTime=0;

for(Server server : nlb.getAllServers()){

//如果服务实例的状态快照不在缓存中,那么这里会进行自动加载

ServerStats ss=stats.getSingleServerStat(server);

totalResponseTime+=ss.getResponseTimeAvg();

}

//逐个计算每个实例的权重:weightSoFar+totalResponseTime-实例的平均响应时间

Double weightSoFar=0.0;

List<Double> finalWeights=new ArrayList<Double>();

for(Server server : nlb.getAllServers()){

ServerStats ss=stats.getSingleServerStat(server);

double weight=totalResponseTime-ss.getResponseTimeAvg();

weightSoFar+=weight;

finalWeights.add(weightSoFar);

}

setWeights(finalWeights);

} catch(Throwable t){

logger.error("Exception while dynamically calculating server weights",t);

} finally {

serverWeightAssignmentInProgress.set(false);

}

}

该函数的实现主要分为两个步骤:

- 根据 LoadBalancerStats 中记录的每个实例的统计信息,累加所有实例的平均响应时间,得到总平均响应时间totalResponseTime,该值会用于后续的计算。

- 为负载均衡器中维护的实例清单逐个计算权重(从第一个开始),计算规则为weightSoFar+totalResponseTime-实例的平均响应时间,其中weightSoFar初始化为零,并且每计算好一个权重需要累加到weightSoFar上供下一次计算使用。

举个简单的例子来理解这个计算过程,假设有4个实例A、B、C、D,它们的平均响应时间为10、40、80、100,所以总响应时间是10+40+80+100=230,每个实例的权重为总响应时间与实例自身的平均响应时间的差的累积所得,所以实例A、B、C、D的权重分别如下所示。

- 实例A:230-10=220

- 实例B:220+(230-40)=410

- 实例C:410+(230-80)=560

- 实例D:560+(230-100)=690

需要注意的是,这里的权重值只是表示了各实例权重区间的上限,并非某个实例的优先级,所以不是数值越大被选中的概率就越大。那么什么是权重区间呢?以上面例子的计算结果为例,它实际上是为这4个实例构建了4个不同的区间,每个实例的区间下限是上一个实例的区间上限,而每个实例的区间上限则是我们上面计算并存储于 List accumulatedWeights中的权重值,其中第一个实例的下限默认为零。所以,根据上面示例的权重计算结果,我们可以得到每个实例的权重区间。

- 实例A:[0,220]

- 实例B:(220,410]

- 实例C:(410,560]

- 实例D:(560,690)

不难发现,实际上每个区间的宽度就是:总的平均响应时间-实例的平均响应时间,所以实例的平均响应时间越短、权重区间的宽度越大,而权重区间的宽度越大被选中的概率就越高。可能很多读者会问,这些区间边界的开闭是如何确定的呢?为什么不那么规则?下面我们会通过实例选择算法的解读来解释。

实例选择

WeightedResponseTimeRule选择实例的实现与之前介绍的算法结构类似,下面是它主体的算法(省略了循环体和一些判断等处理):

从源码中我们可以看到,选择实例的核心过程就两步:

- 生成一个[0,最大权重值)区间内的随机数。

- 遍历权重列表,比较权重值与随机数的大小,如果权重值大于等于随机数,就拿当前权重列表的索引值去服务实例列表中获取具体的实例。这就是在上一节中提到的服务实例会根据权重区间挑选的原理,而权重区间边界的开闭原则根据算法,正常每个区间为(x,y]的形式,但是第一个实例和最后一个实例为什么不同呢?由于随机数的最小取值可以为0,所以第一个实例的下限是闭区间,同时随机数的最大值取不到最大权重值,所以最后一个实例的上限是开区间。

若继续以上面的数据为例进行服务实例的选择,则该方法会从[0,690)区间中选出一个随机数,比如选出的随机数为230,由于该值位于第二个区间,所以此时就会选择实例B来进行请求。

ClientConfigEnabledRoundRobinRule

该策略较为特殊,我们一般不直接使用它。因为它本身并没有实现什么特殊的处理逻辑,正如下面的源码所示,在它的内部定义了一个 RoundRobinRule 策略,而 choose函数的实现也正是使用了 RoundRobinRule 的线性轮询机制,所以它实现的功能实际上与RoundRobinRule相同,那么定义它有什么特殊的用处呢?

虽然我们不会直接使用该策略,但是通过继承该策略,默认的choose就实现了线性轮询机制,在子类中做一些高级策略时通常有可能会存在一些无法实施的情况,那么就可以用父类的实现作为备选。在后文中我们将继续介绍的高级策略均是基于ClientConfigEnabledRoundRobinRule的扩展。

public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {

RoundRobinRule roundRobinRule=new RoundRobinRule();

...

@Override

public Server choose(Object key){

if(roundRobinRule !=null){

return roundRobinRule.choose(key);

} else {

throw new IllegalArgumentException(

"This class has not been initialized with the RoundRobinRule class");

}

}

}

BestAvailableRule

该策略继承自ClientConfigEnabledRoundRobinRule,在实现中它注入了负载均衡器的统计对象 LoadBalancerStats,同时在具体的 choose 算法中利用LoadBalancerStats保存的实例统计信息来选择满足要求的实例。从如下源码中我们可以看到,它通过遍历负载均衡器中维护的所有服务实例,会过滤掉故障的实例,并找出并发请求数最小的一个,所以该策略的特性是可选出最空闲的实例。

同时,由于该算法的核心依据是统计对象loadBalancerStats,当其为空的时候,该策略是无法执行的。所以从源码中我们可以看到,当 loadBalancerStats 为空的时候,它会采用父类的线性轮询策略,正如我们在介绍 ClientConfigEnabledRoundRobinRule时那样,它的子类在无法满足实现高级策略的时候,可以使用线性轮询策略的特性。后面将要介绍的策略因为也都继承自ClientConfigEnabledRoundRobinRule,所以它们都会具有这样的特性。

PredicateBasedRule

这是一个抽象策略,它也继承了ClientConfigEnabledRoundRobinRule,从其命名中可以猜出这是一个基于 Predicate 实现的策略,Predicate 是 Google Guava Collection工具对集合进行过滤的条件接口。

如下面的源码所示,它定义了一个抽象函数getPredicate来获取AbstractServerPredicate对象的实现,而在choose函数中,通过AbstractServerPredicate的chooseRoundRobinAfterFiltering函数来选出具体的服务实例。从该函数的命名我们也大致能猜出它的基础逻辑:先通过子类中实现的 Predicate 逻辑来过滤一部分服务实例,然后再以线性轮询的方式从过滤后的实例清单中选出一个。

public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {

public abstract AbstractServerPredicate getPredicate();

@Override

public Server choose(Object key){

ILoadBalancer lb=getLoadBalancer();

Optional<Server> server=getPredicate().chooseRoundRobinAfterFiltering

(lb.getAllServers(),key);

if(server.isPresent()){

return server.get();

} else {

return null;

}

}

}

通过下面AbstractServerPredicate的源码片段,可以证实我们上面所做的猜测。在上面choose函数中调用的chooseRoundRobinAfterFiltering方法先通过内部定义的 getEligibleServers 函数来获取备选的实例清单(实现了过滤),如果返回的清单为空,则用Optional.absent()来表示不存在,反之则以线性轮询的方式从备选清单中获取一个实例。

public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {

...

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers,Object loadBalancerKey){

List<Server> eligible=getEligibleServers(servers,loadBalancerKey);

if(eligible.size()==0){

return Optional.absent();

}

return Optional.of(eligible.get(nextIndex.getAndIncrement()% eligible.

size()));

}

public List<Server> getEligibleServers(List<Server> servers,Object loadBalancerKey){

if(loadBalancerKey==null){

return ImmutableList.copyOf(Iterables.filter(servers,this.getServerOnlyPredicate()));

} else {

List<Server> results=Lists.newArrayList();

for(Server server: servers){

if(this.apply(new PredicateKey(loadBalancerKey,server))){

results.add(server);

}

}

return results;

}

}

}

在了解了整体逻辑之后,我们来详细看看实现过滤功能的getEligibleServers函数。从源码上看,它的实现结构简单清晰,通过遍历服务清单,使用this.apply方法来判断实例是否需要保留,如果是就添加到结果列表中。

可能到这里,不熟悉Google Guava Collections集合工具的读者会感到困惑,这个apply在AbstractServerPredicate中找不到它的定义,那么它是如何实现过滤的呢?实际上,AbstractServerPredicate 实现了 com.google.common.base.Predicate接口,而apply方法是该接口中的定义,主要用来实现过滤条件的判断逻辑,它输入的参数则是过滤条件需要用到的一些信息(比如源码中的 new PredicateKey(loadBalancerKey,server)),它传入了关于实例的统计信息和负载均衡器的选择算法传递过来的key)。既然在AbstractServerPredicate中我们未能找到apply的实现,所以这里的chooseRoundRobinAfterFiltering函数只是定义了一个模板策略:“先过滤清单,再轮询选择”。对于如何过滤,需要我们在AbstractServerPredicate的子类中实现apply方法来确定具体的过滤策略。

后面我们将要介绍的两个策略就是基于此抽象策略实现,只是它们使用了不同的Predicate实现来完成过滤逻辑以达到不同的实例选择效果。

Google Guava Collections是一个对Java Collections Framework增强和扩展的开源项目。虽然Java Collections Framework已经能够满足我们大多数情况下使用集合的要求,但是当遇到一些特殊的情况时我们的代码会比较冗长且容易出错。Guava Collections 可以帮助我们让集合操作代码更为简短精练并大大增强代码的可读性。

AvailabilityFilteringRule

该策略继承自上面介绍的抽象策略PredicateBasedRule,所以它也继承了“先过滤清单,再轮询选择”的基本处理逻辑,其中过滤条件使用了AvailabilityPredicate:

public class AvailabilityPredicate extends AbstractServerPredicate {

...

public boolean apply(@Nullable PredicateKey input){

LoadBalancerStats stats=getLBStats();

if(stats==null){

return true;

}

return ! shouldSkipServer(stats.getSingleServerStat(input.getServer()));

}

private boolean shouldSkipServer(ServerStats stats){

if((CIRCUIT_BREAKER_FILTERING.get()&& stats.isCircuitBreakerTripped())

|| stats.getActiveRequestsCount()>=activeConnectionsLimit.get()){

return true;

}

return false;

}

}

从上述源码中,我们可以知道它的主要过滤逻辑位于shouldSkipServer方法中,它主要判断服务实例的两项内容:

- 是否故障,即断路器是否生效已断开。

- 实例的并发请求数大于阈值,默认值为232 -1,该配置可通过参数<clientName>.<nameSpace>.ActiveConnectionsLimit来修改。

这两项内容中只要有一个满足 apply 就返回 false(代表该节点可能存在故障或负载过高),都不满足就返回true。

在该策略中,除了实现了上面的过滤方法之外,对于choose的策略也做了一些改进优化,所以父类的实现对于它来说只是一个备用选项,其具体实现如下所示:

public Server choose(Object key){

int count=0;

Server server=roundRobinRule.choose(key);

while(count++<=10){

if(predicate.apply(new PredicateKey(server))){

return server;

}

server=roundRobinRule.choose(key);

}

return super.choose(key);

}

可以看到,它并没有像在父类中那样,先遍历所有的节点进行过滤,然后在过滤后的集合中选择实例。而是先以线性的方式选择一个实例,接着用过滤条件来判断该实例是否满足要求,若满足就直接使用该实例,若不满足要求就再选择下一个实例,并检查是否满足要求,如此循环进行,当这个过程重复了10次还是没有找到符合要求的实例,就采用父类的实现方案。

简单地说,该策略通过线性抽样的方式直接尝试寻找可用且较空闲的实例来使用,优化了父类每次都要遍历所有实例的开销。

ZoneAvoidanceRule

该策略我们在介绍负载均衡器 ZoneAwareLoadBalancer 时已经提到过,它也是PredicateBasedRule的具体实现类。在之前的介绍中主要针对ZoneAvoidanceRule中用于选择Zone区域策略的一些静态函数,比如createSnapshot、getAvailableZones。在这里我们将详细看看 ZoneAvoidanceRule 作为服务实例过滤条件的实现原理。从下面ZoneAvoidanceRule的源码片段中可以看到,它使用了CompositePredicate来进行服务实例清单的过滤。这是一个组合过滤条件,在其构造函数中,它以ZoneAvoidancePredicate为主过滤条件,AvailabilityPredicate为次过滤条件初始化了组合过滤条件的实例。

public class ZoneAvoidanceRule extends PredicateBasedRule {

...

private CompositePredicate compositePredicate;

public ZoneAvoidanceRule(){

super();

ZoneAvoidancePredicate zonePredicate=new ZoneAvoidancePredicate(this);

AvailabilityPredicate availabilityPredicate=new AvailabilityPredicate(this);

compositePredicate=createCompositePredicate(zonePredicate,

availabilityPredicate);

}

...

}

ZoneAvoidanceRule在实现的时候并没有像AvailabilityFilteringRule那样重写choose函数来优化,所以它完全遵循了父类的过滤主逻辑:“先过滤清单,再轮询选择”。其中过滤清单的条件就是我们上面提到的以ZoneAvoidancePredicate为主过滤条件、AvailabilityPredicate 为次过滤条件的组合过滤条件 CompositePredicate。从CompositePredicate的源码片段中,我们可以看到它定义了一个主过滤条件 AbstractServerPredicate delegate 以及一组次过滤条件列表 List fallbacks,所以它的次过滤列表是可以拥有多个的,并且由于它采用了List存储所以次过滤条件是按顺序执行的。

在获取过滤结果的实现函数getEligibleServers中,它的处理逻辑如下所示。

- 使用主过滤条件对所有实例过滤并返回过滤后的实例清单。

- 依次使用次过滤条件列表中的过滤条件对主过滤条件的结果进行过滤。

- 每次过滤之后(包括主过滤条件和次过滤条件),都需要判断下面两个条件,只要有一个符合就不再进行过滤,将当前结果返回供线性轮询算法选择:

- 过滤后的实例总数 >=最小过滤实例数(minimalFilteredServers,默认为1)。

- 过滤后的实例比例 > 最小过滤百分比(minimalFilteredPercentage,默认为0)。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文