微信公众号:[中间件兴趣圈] 作者简介:《RocketMQ技术内幕》作者
Dubbo支持在服务调用方对服务提供者采用负载均衡算法,LoadBalance接口定义如下:
1@SPI(RandomLoadBalance.NAME)
2public interface LoadBalance {
3
4 /**
5 * select one invoker in list.
6 *
7 * @param invokers invokers.
8 * @param url refer url
9 * @param invocation invocation.
10 * @return selected invoker.
11 */
12 @Adaptive("loadbalance")
13 <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
14}
从中透露出如下几个信息: 默认如果不配置,使用RandomLoadBalance策略(加权随机负载算法)。整个Dubbo的负载均衡类图如下所示:
上述各种路由负载策略,对应的配置值如下:dubbo-cluster\src\main\resources\META-INF\dubbo\internal\com.alibaba.dubbo.rpc.cluster.LoadBalance
一致Hash算法,通常用在缓存领域,主要解决的问题是当数据节点数量发送变化后,尽量减少数据的迁移,在负责算法领域,个人不建议使用。Dubbo一致性Hash算法的实现逻辑主要分布在ConsistentHashLoadBalance的内部类ConsistentHashSelector中。
1private final TreeMap<Long, Invoker<T>> virtualInvokers;
2private final int replicaNumber;
3private final int identityHashCode;
4private final int[] argumentIndex;
接下来看一下其构造方法:
1public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
2 this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
3 this.identityHashCode = System.identityHashCode(invokers); // @1
4 URL url = invokers.get(0).getUrl();
5 this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); // @2
6 String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); // @3 start
7 argumentIndex = new int[index.length];
8 for (int i = 0; i < index.length; i ++) {
9 argumentIndex[i] = Integer.parseInt(index[i]);
10 } // @3 end
11 for (Invoker<T> invoker : invokers) { // @4
12 for (int i = 0; i < replicaNumber / 4; i++) {
13 byte[] digest = md5(invoker.getUrl().toFullString() + i);
14 for (int h = 0; h < 4; h++) {
15 long m = hash(digest, h);
16 virtualInvokers.put(m, invoker);
17 }
18 }
19 } // @4 end
20 }
代码@1:根据所有的调用者生成一个HashCode,用该HashCode值来判断服务提供者是否发生了变化。
代码@2:获取服务提供者< dubbo:method/>标签的hash.nodes属性,如果为空,默认为160,表示一致性hash算法中虚拟节点数量。其配置方式如下:
1< dubbo:method ... >
2 < dubbo:parameter key="hash.nodes" value="160" />
3 < dubbo:parameter key="hash.arguments" value="0,1" />
4< /dubbo:method/>
代码@3:一致性Hash算法,在dubbo中,相同的服务调用参数走固定的节点,hash.arguments表示哪些参数参与hashcode,默认值“0”,表示第一个参数。
代码@4:为每一个Invoker创建replicaNumber 个虚拟节点,每一个节点的Hashcode不同。同一个Invoker不同hashcode的创建逻辑为: invoker.getUrl().toFullString() + i (0-39)的值,对其md5,然后用该值+h(0-3)的值取hash。一致性hash实现的一个关键是如果将一个Invoker创建的replicaNumber个虚拟节点(hashcode)能够均匀分布在Hash环上,Dubbo给出的实现如下,由于能力有限,目前并未真正理解如下方法的实现依据:
1private long hash(byte[] digest, int number) {
2 return (((long) (digest[3 + number * 4] & 0xFF) << 24)
3 | ((long) (digest[2 + number * 4] & 0xFF) << 16)
4 | ((long) (digest[1 + number * 4] & 0xFF) << 8)
5 | (digest[number * 4] & 0xFF))
6 & 0xFFFFFFFFL;
7 }
综上所述,构造函数主要完成一致性Hash算法Hash环的构建,利用了TreeMap的有序性来实现。
public Invoker< T> select(Invocation invocation)
1public Invoker<T> select(Invocation invocation) {
2 String key = toKey(invocation.getArguments()); // @1
3 byte[] digest = md5(key); // @2
4 return selectForKey(hash(digest, 0)); // @3
5}
代码@1:根据调用参数,并根据hash.arguments配置值,获取指定的位置的参数值,追加一起返回。
代码@2:对Key进行md5签名。
代码@3:根据key进行选择调用者。
ConsistentHashSelector#selectForKey
1private Invoker<T> selectForKey(long hash) {
2 Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry(); // @1
3 if (entry == null) { // @2
4 entry = virtualInvokers.firstEntry();
5 }
6 return entry.getValue(); // @3
7}
代码@1,对虚拟节点,从virtualInvokers中选取一个子集,subMap(hash,ture,lastKey,true),其实就是实现根据待查找hashcode(key)顺时针,选中大于等于指定key的第一个key。
代码@2,如果未找到,则返回virtualInvokers第一个key。
代码@3:根据key返回指定的Invoker即可。
这里实现,应该可以不使用tailMap,代码修改如下:
1private Invoker<T> selectForKey(long hash) {
2 Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
3 if(entry == null ) {
4 entry = virtualInvokers.firstEntry();
5 }
6 return entry.getValue();
7}
如果想要了解TreeMap关于这一块的特性(tailMap、ceillingEntry、headMap)等API的详细解释,可以查看我的另外一篇博文:https://blog.csdn.net/prestigeding/article/details/80821576
由于roundrobin(加权轮询)、random(加权随机)、leastactive(最小活跃连接数)都与权重有关系,在介绍这两种负载均衡算法之前,我们首先看一下Dubbo关于权重的获取逻辑,代码见AbstractLoadBalance#getWeigh方法:
1protected int getWeight(Invoker<?> invoker, Invocation invocation) {
2 int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // @1
3 if (weight > 0) {
4 long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L); // @2
5 if (timestamp > 0L) {
6 int uptime = (int) (System.currentTimeMillis() - timestamp);
7 int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP); // @3
8 if (uptime > 0 && uptime < warmup) {
9 weight = calculateWarmupWeight(uptime, warmup, weight); // @4
10 }
11 }
12 }
13 return weight;
14 }
代码@1:首先获取服务提供者的权重(weight)。
代码@2:获取服务提供者的启动时间,在服务提供者启动时,会将启动时间戳存储在服务提供者的URL中,在服务发现(RegistryDirecotry)服务发现时,会将服务提供者的时间戳KEY,换成REMOTE_TIMESTAMP_KEY,避免与服务消费者的启动时间戳冲突。
代码@3:获取服务提供者是否开启预热机制,通过服务提供者< dubbo:service warmup=""/>参数来设置,如果未设置,去默认值10 * 60 * 1000(10分钟)。
代码@4:如果服务提供者启动时间小于预热时间(预热期间),需要根据启动时间,来计算预热期间服务提供者的权重。
AbstractLoadBalance#calculateWarmupWeight
1static int calculateWarmupWeight(int uptime, int warmup, int weight) { // @1
2 int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
3 return ww < 1 ? 1 : (ww > weight ? weight : ww);
4 }
代码@1:参数说明,uptime:服务提供者启动时间;warmup:设置的预热时间;weight:服务提供者的权重,该方法在uptime < warmup时被调用 该方法的实现,就是在预热期间,根据启动时间,动态返回该服务提供者的权重,并且启动时间越长,返回的权重越接近weight,启动时间超过预热时间,则直接返回weight。 该方法单元测试:
其输出结果:这里写图片描述
1protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
2 int length = invokers.size(); // Number of invokers
3 int totalWeight = 0; // The sum of weights // @1 start
4 boolean sameWeight = true; // Every invoker has the same weight?
5 for (int i = 0; i < length; i++) {
6 int weight = getWeight(invokers.get(i), invocation);
7 totalWeight += weight; // Sum
8 if (sameWeight && i > 0
9 && weight != getWeight(invokers.get(i - 1), invocation)) {
10 sameWeight = false;
11 }
12 } // @1 end
13 if (totalWeight > 0 && !sameWeight) { // @2
14 // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
15 int offset = random.nextInt(totalWeight);
16 // Return a invoker based on the random value.
17 for (int i = 0; i < length; i++) {
18 offset -= getWeight(invokers.get(i), invocation);
19 if (offset < 0) {
20 return invokers.get(i);
21 }
22 }
23 }
24 // If all invokers have the same weight value or totalWeight=0, return evenly.
25 return invokers.get(random.nextInt(length)); // @3
26 }
代码@1:首先求所有服务提供者的总权重,并判断每个服务提供者的权重是否相同。
代码@2:如果提供者之间的权重不相同,则产生一个随机数(0-totalWeight),视为offset,然后依次用offset减去服务提供者的权重,如果减去(offset - provider.weight < 0),则该invoker命中。
代码@3:如果服务提供者的权重相同,则随机产生[0-invoker.size)即可。
加权轮询算法的核心算法是按权重轮询,一个基本点是应该是一个当前序号与服务提供者数量取模,需要结合权重。Dubbo使用如下数据结构存储当前序号:
1private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();键值:serviceKey(<dubbo:service interface=""/>+ methodname),每个方法采用不同的计数器。
2RoundRobinLoadBalance #doSelect
3protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
4 String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); // @1
5 int length = invokers.size(); // Number of invokers
6 int maxWeight = 0; // The maximum weight
7 int minWeight = Integer.MAX_VALUE; // The minimum weight
8 final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>(); // @2 start
9 int weightSum = 0;
10 for (int i = 0; i < length; i++) {
11 int weight = getWeight(invokers.get(i), invocation);
12 maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
13 minWeight = Math.min(minWeight, weight); // Choose the minimum weight
14 if (weight > 0) {
15 invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
16 weightSum += weight;
17 }
18 } // @2 end
19 AtomicPositiveInteger sequence = sequences.get(key);
20 if (sequence == null) {
21 sequences.putIfAbsent(key, new AtomicPositiveInteger());
22 sequence = sequences.get(key);
23 }
24 int currentSequence = sequence.getAndIncrement(); // @3
25 if (maxWeight > 0 && minWeight < maxWeight) { // @4
26 int mod = currentSequence % weightSum;
27 for (int i = 0; i < maxWeight; i++) {
28 for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
29 final Invoker<T> k = each.getKey();
30 final IntegerWrapper v = each.getValue();
31 if (mod == 0 && v.getValue() > 0) {
32 return k;
33 }
34 if (v.getValue() > 0) {
35 v.decrement();
36 mod--;
37 }
38 }
39 }
40 }
41 // Round robin
42 return invokers.get(currentSequence % length); // @5
43 }
代码@1:构建ConcurrentMap< String, AtomicPositiveInteger> sequences中的key,以interface+methodname为键,里面存储的是当前序号(轮询)。
代码@2:构建LinkedHashMap< Invoker< T>, IntegerWrapper>存储结构,通过遍历所有Invoker,构建每个Invoker的权重,与此同时算出总权重,并且得出所有服务提供者权重是否相同。
代码@3:获取当前的轮询序号,用于取模。
代码@4:如果服务提供者之间的权重有差别,需要按权重轮询,实现方式是: 1)用当前轮询序号与服务提供者总权重取模,余数为mod。 2)然后从0循环直到最大权重,针对每一次循环,按同一顺序遍历所有服务提供者,如果mod等于0并且对应的Invoker的权重计算器大于0,则选择该服务提供者;否则,mod--,invoker对应的权重减一,权重是临时比那里LinkedHashMap< Invoker< T>, IntegerWrapper>。由于外层循环的次数为所有服务提供者的最大权重,内层循环当mod等于0时,肯定会有一个服务提供者的权重计数器大于0,而返回对应的服务提供者。返回的服务提供者是第一个满足的服务提供者,后续的服务提供者在下一次就会有机会, 因为下一次mod会增大1,后续的服务提供者通过轮询会被选择,选择的机会,取决于权重的大小。
代码@5:如果各服务提供者权重相同,则直接对服务提供者取模即可,轮询后递增。
最小活跃连接数,其核心实现就是,首先找到服务提供者当前最小的活跃连接数,如果一个服务提供者的服务连接数比其他的都要小,则选择这个活跃连接数最小的服务提供者发起调用,如果存在多个服务提供者的活跃连接数,并且是最小的,则在这些服务提供者之间选择加权随机算法选择一个服务提供者。
1protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
2 int length = invokers.size(); // Number of invokers // @1 start
3 int leastActive = -1; // The least active value of all invokers
4 int leastCount = 0; // The number of invokers having the same least active value (leastActive)
5 int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)
6 int totalWeight = 0; // The sum of weights
7 int firstWeight = 0; // Initial value, used for comparision
8 boolean sameWeight = true; // Every invoker has the same weight value? // @1 end
9 for (int i = 0; i < length; i++) { // @2
10 Invoker<T> invoker = invokers.get(i);
11 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
12 int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); //
13 // Weight
14 if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value. // @3
15 leastActive = active; // Record the current least active value
16 leastCount = 1; // Reset leastCount, count again based on current leastCount
17 leastIndexs[0] = i; // Reset
18 totalWeight = weight; // Reset
19 firstWeight = weight; // Record the weight the first invoker
20 sameWeight = true; // Reset, every invoker has the same weight value?
21 } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating. // @4
22 leastIndexs[leastCount++] = i; // Record index number of this invoker
23 totalWeight += weight; // Add this invoker's weight to totalWeight.
24 // If every invoker has the same weight?
25 if (sameWeight && i > 0
26 && weight != firstWeight) {
27 sameWeight = false;
28 }
29 }
30 }
31 // assert(leastCount > 0)
32 if (leastCount == 1) { // @5
33 // If we got exactly one invoker having the least active value, return this invoker directly.
34 return invokers.get(leastIndexs[0]);
35 }
36 if (!sameWeight && totalWeight > 0) { // @6
37 // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
38 int offsetWeight = random.nextInt(totalWeight);
39 // Return a invoker based on the random value.
40 for (int i = 0; i < leastCount; i++) {
41 int leastIndex = leastIndexs[i];
42 offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
43 if (offsetWeight <= 0)
44 return invokers.get(leastIndex);
45 }
46 }
47 // If all invokers have the same weight value or totalWeight=0, return evenly.
48 return invokers.get(leastIndexs[random.nextInt(leastCount)]);
49 }
代码@1:解释相关局部变量。
代码@2:遍历所有的服务提供者,计算上述变量的值。
代码@3:如果leastActive (最小活跃连接数为-1,表示第一次遍历)或最新连接数大于当前遍历的Invoker的活跃连接数,需要reset如下值,重新计算:
1 leastActive = active; // Record the current least active value
2 leastCount = 1; // Reset leastCount, count again based on current leastCount
3 leastIndexs[0] = i; // Reset
4 totalWeight = weight; // Reset
5 firstWeight = weight; // Record the weight the first invoker
6 sameWeight = true; // Reset, every invoker has the same weight value?
代码@4:如果当前遍历的服务提供者的活跃数等于leastActive ,则将总权重想加,并在leastIndexs中记录服务提供者序号。
代码@5,如果最小活跃连接数的服务提供者数量只有一个,则直接返回该服务提供者。
代码@6,如果最小活跃连接数的服务提供者有多个,则使用加权随机算法选取服务提供者。
关于Dubbo的4种负载均衡算法的实现细节就分析到这里了。