Dubbo 源码分析 四种负载均衡
Dubbo 选择在客户端做负载均衡,提供了四种选择:随机、轮询、最少活跃调用数、一致性Hash。四种算法简单且有趣,可以稍微研究以下。
首先 LoadBalance
接口是一个 SPI,select()
方法被声明为 Adaptive,通过 URL 参数找到对应的负载均衡方式,如果以下四种无法满足,可以自行拓展。
AbstractLoadBalance 的模版方法
抽象类只提供了一个模版方法,具体的均衡策略由 doSelect 实现,getWeight
是一个公用的计算权重方法。
public abstract class AbstractLoadBalance implements LoadBalance {
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
//uptime:运行时间
//warmup:预热时间
//ww = 权重 * 运行时间/预热时间
//在预热时间内,ww与运行时间成正比,uptime越短,权重越小
//超出预热时间,返回预设权重
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
//模版方法
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.isEmpty())
return null;
if (invokers.size() == 1)
return invokers.get(0);
return doSelect(invokers, url, invocation);
}
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
//获取启动时间
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
//获取运行时间
int uptime = (int) (System.currentTimeMillis() - timestamp);
//预热时间
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
//未达到预热时间,则根据运行时间长短重新计算权重
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
}
RandomLoadBalance 随机访问
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); //调用者数量
int totalWeight = 0; //权重总额
boolean sameWeight = true; //是否每个调用者的权重都是相等的
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; //求和
//出现相邻的调用者权重不等,则sameWeight为否
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
//以下的逻辑需要列一个表,可能会清晰一些
if (totalWeight > 0 && !sameWeight) {
//得到一个【0, totalWeight)的随机数
int offset = random.nextInt(totalWeight);
//循环依次减去调用者权重,直到offset<0
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
return invokers.get(random.nextInt(length));
}
上面代码什么意思呢?我们稍微列一下。
假设,ABC 三个节点,权重分别是 1,2,3,那 totalWeight = 6,则 offset 取值范围是 [0,6)
。 那么,
offset A:1 B:2 C:3
0 0-1<0 选A
1 1-1=0 0-2<0 选B
2 2-1=1 1-2<0 选B
3 3-1=2 2-2=0 0-3<0 选C
4 4-1=3 3-2=1 1-3<0 选C
5 5-1=4 4-2=2 2-3<0 选C
可见ABC被分配到的概率也是 1、2、3。
RoundRobinLoadBalance 权重比例轮训
按照权重轮训,这个负载均衡的方式可以说是十分坑爹,按照文档里的说法是
存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。
就是会出现某台机子的活跃度非常高,而且还不断的把请求丢给这台机器。
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int length = invokers.size(); //调用者数量
int maxWeight = 0; //最大最小权重
int minWeight = Integer.MAX_VALUE;
final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
int weightSum = 0;
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
maxWeight = Math.max(maxWeight, weight);
minWeight = Math.min(minWeight, weight);//找出最大最小权重
if (weight > 0) {
invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
weightSum += weight;
}
}
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
//下面通过mod和weight不断递减1,依次轮训调用
//A:1 B:2 C:3 D:2,调用量:7
//则轮训调用A1次,B2次,C3次,D1次
int currentSequence = sequence.getAndIncrement();
if (maxWeight > 0 && minWeight < maxWeight) {
int mod = currentSequence % weightSum;
for (int i = 0; i < maxWeight; i++) {
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
final Invoker<T> k = each.getKey();
final IntegerWrapper v = each.getValue();
if (mod == 0 && v.getValue() > 0) {
return k;
}
if (v.getValue() > 0) {
v.decrement();
mod--;
}
}
}
}
return invokers.get(currentSequence % length);
}
LeastActiveLoadBalance 最低活跃量
这种方式需要配合 ActiveLimitFilter
一起使用,有一点要注意,这个活跃度是针对客户端而言,它并没有通过zk同步每个终端调用量来得到服务端的真实负载,这个拦截器仅仅作为客户端限流用的。
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); //调用者数量
int leastActive = -1; //最低活跃度
int leastCount = 0; //多少相同最低活跃度
int[] leastIndexs = new int[length]; //记录相等最低活跃度的调用者下标
int totalWeight = 0; //相同最低活跃度的权重总额
int firstWeight = 0; // Initial value, used for comparision
boolean sameWeight = true; //相同最低活跃度的调用者权重是否相等
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight
if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
leastActive = active; //记录新的最低活跃度
leastCount = 1;
leastIndexs[0] = i; //重置
totalWeight = weight; //重置
firstWeight = weight; //重置
sameWeight = true; // 重置
} else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
leastIndexs[leastCount++] = i;
totalWeight += weight; //累加权重
if (sameWeight && i > 0
&& weight != firstWeight) {
sameWeight = false;
}
}
}
//最低活跃度只有一个,那那肯定是它
if (leastCount == 1) {
return invokers.get(leastIndexs[0]);
}
//下面这一步不用多说了,与RandomLoadBalance一摸一样
//最低活跃度相同的调用这,以权重比例随机访问
if (!sameWeight && totalWeight > 0) {
int offsetWeight = random.nextInt(totalWeight);
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexs[i];
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0)
return invokers.get(leastIndex);
}
}
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}
ConsistentHashLoadBalance 一致性哈希
这个其实没什么好说的,源码只是计算 hash 值
一致性 Hash,相同参数的请求总是发到同一提供者。
当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论