Dubbo 源码分析 四种负载均衡

发布于 2023-05-02 22:47:25 字数 7727 浏览 81 评论 0

Dubbo 选择在客户端做负载均衡,提供了四种选择:随机、轮询、最少活跃调用数、一致性Hash。四种算法简单且有趣,可以稍微研究以下。


首先 LoadBalance 接口是一个 SPI,select() 方法被声明为 Adaptive,通过 URL 参数找到对应的负载均衡方式,如果以下四种无法满足,可以自行拓展。

AbstractLoadBalance 的模版方法

抽象类只提供了一个模版方法,具体的均衡策略由 doSelect 实现,getWeight 是一个公用的计算权重方法。

public abstract class AbstractLoadBalance implements LoadBalance {
  static int calculateWarmupWeight(int uptime, int warmup, int weight) {
    //uptime:运行时间
    //warmup:预热时间
    //ww = 权重 * 运行时间/预热时间
    //在预热时间内,ww与运行时间成正比,uptime越短,权重越小
    //超出预热时间,返回预设权重
    int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
    return ww < 1 ? 1 : (ww > weight ? weight : ww);
  }
  //模版方法
  @Override
  public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    if (invokers == null || invokers.isEmpty())
      return null;
    if (invokers.size() == 1)
      return invokers.get(0);
    return doSelect(invokers, url, invocation);
  }
  protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

  protected int getWeight(Invoker<?> invoker, Invocation invocation) {
    int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
    if (weight > 0) {
      //获取启动时间
      long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
      if (timestamp > 0L) {
        //获取运行时间
        int uptime = (int) (System.currentTimeMillis() - timestamp);
        //预热时间
        int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
        //未达到预热时间,则根据运行时间长短重新计算权重
        if (uptime > 0 && uptime < warmup) {
          weight = calculateWarmupWeight(uptime, warmup, weight);
        }
      }
    }
    return weight;
  }

}

RandomLoadBalance 随机访问

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  int length = invokers.size(); //调用者数量
  int totalWeight = 0; //权重总额
  boolean sameWeight = true; //是否每个调用者的权重都是相等的
  for (int i = 0; i < length; i++) {
    int weight = getWeight(invokers.get(i), invocation);
    totalWeight += weight; //求和
    //出现相邻的调用者权重不等,则sameWeight为否
    if (sameWeight && i > 0
          && weight != getWeight(invokers.get(i - 1), invocation)) {
      sameWeight = false;
    }
  }
  //以下的逻辑需要列一个表,可能会清晰一些
  if (totalWeight > 0 && !sameWeight) {
    //得到一个【0, totalWeight)的随机数
    int offset = random.nextInt(totalWeight);
    //循环依次减去调用者权重,直到offset<0
    for (int i = 0; i < length; i++) {
      offset -= getWeight(invokers.get(i), invocation);
      if (offset < 0) {
        return invokers.get(i);
      }
    }
  }
  return invokers.get(random.nextInt(length));
}

上面代码什么意思呢?我们稍微列一下。

假设,ABC 三个节点,权重分别是 1,2,3,那 totalWeight = 6,则 offset 取值范围是 [0,6) 。 那么,

offset  A:1    B:2    C:3
     0  0-1<0                选A
     1  1-1=0     0-2<0           选B
     2  2-1=1     1-2<0           选B
     3  3-1=2     2-2=0    0-3<0  选C
     4  4-1=3     3-2=1    1-3<0  选C
     5  5-1=4  4-2=2    2-3<0  选C

可见ABC被分配到的概率也是 1、2、3。

RoundRobinLoadBalance 权重比例轮训

按照权重轮训,这个负载均衡的方式可以说是十分坑爹,按照文档里的说法是

存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

就是会出现某台机子的活跃度非常高,而且还不断的把请求丢给这台机器。

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
  int length = invokers.size(); //调用者数量
  int maxWeight = 0; //最大最小权重
  int minWeight = Integer.MAX_VALUE;
  final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
  int weightSum = 0;
  for (int i = 0; i < length; i++) {
    int weight = getWeight(invokers.get(i), invocation);
    maxWeight = Math.max(maxWeight, weight);
    minWeight = Math.min(minWeight, weight);//找出最大最小权重
    if (weight > 0) {
      invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
      weightSum += weight;
    }
  }
  AtomicPositiveInteger sequence = sequences.get(key);
  if (sequence == null) {
    sequences.putIfAbsent(key, new AtomicPositiveInteger());
    sequence = sequences.get(key);
  }
  //下面通过mod和weight不断递减1,依次轮训调用
  //A:1 B:2 C:3 D:2,调用量:7
  //则轮训调用A1次,B2次,C3次,D1次
  int currentSequence = sequence.getAndIncrement();
  if (maxWeight > 0 && minWeight < maxWeight) {
    int mod = currentSequence % weightSum;
    for (int i = 0; i < maxWeight; i++) {
      for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
        final Invoker<T> k = each.getKey();
        final IntegerWrapper v = each.getValue();
        if (mod == 0 && v.getValue() > 0) {
          return k;
        }
        if (v.getValue() > 0) {
          v.decrement();
          mod--;
        }
      }
    }
  }
  return invokers.get(currentSequence % length);
}

LeastActiveLoadBalance 最低活跃量

这种方式需要配合 ActiveLimitFilter 一起使用,有一点要注意,这个活跃度是针对客户端而言,它并没有通过zk同步每个终端调用量来得到服务端的真实负载,这个拦截器仅仅作为客户端限流用的。

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  int length = invokers.size(); //调用者数量
  int leastActive = -1; //最低活跃度
  int leastCount = 0; //多少相同最低活跃度
  int[] leastIndexs = new int[length]; //记录相等最低活跃度的调用者下标
  int totalWeight = 0; //相同最低活跃度的权重总额
  int firstWeight = 0; // Initial value, used for comparision
  boolean sameWeight = true; //相同最低活跃度的调用者权重是否相等
  for (int i = 0; i < length; i++) {
    Invoker<T> invoker = invokers.get(i);
    int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
    int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight
    if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
      leastActive = active; //记录新的最低活跃度
      leastCount = 1;
      leastIndexs[0] = i; //重置
      totalWeight = weight; //重置
      firstWeight = weight; //重置
      sameWeight = true; // 重置
    } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
      leastIndexs[leastCount++] = i;
      totalWeight += weight; //累加权重
      if (sameWeight && i > 0
            && weight != firstWeight) {
        sameWeight = false;
      }
    }
  }
  //最低活跃度只有一个,那那肯定是它
  if (leastCount == 1) {
    return invokers.get(leastIndexs[0]);
  }
  //下面这一步不用多说了,与RandomLoadBalance一摸一样
  //最低活跃度相同的调用这,以权重比例随机访问
  if (!sameWeight && totalWeight > 0) {
    int offsetWeight = random.nextInt(totalWeight);
    for (int i = 0; i < leastCount; i++) {
      int leastIndex = leastIndexs[i];
      offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
      if (offsetWeight <= 0)
        return invokers.get(leastIndex);
    }
  }
  return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}

ConsistentHashLoadBalance 一致性哈希

这个其实没什么好说的,源码只是计算 hash 值

一致性 Hash,相同参数的请求总是发到同一提供者。
当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

深巷少女

暂无简介

0 文章
0 评论
382 人气
更多

推荐作者

爱人如己

文章 0 评论 0

萧瑟寒风

文章 0 评论 0

云雾

文章 0 评论 0

倒带

文章 0 评论 0

浮世清欢

文章 0 评论 0

撩起发的微风

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文