首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往
24 篇文章
1
[享学Netflix] 三十七、源生Ribbon介绍 --- 客户端负载均衡器
2
[享学Netflix] 三十八、Ribbon核心API源码解析:ribbon-core(一)IClient请求客户端
3
[享学Netflix] 三十九、Ribbon核心API源码解析:ribbon-core(二)IClientConfig配置详解
4
[享学Netflix] 四十、Ribbon核心API源码解析:ribbon-core(三)RetryHandler重试处理器
5
[享学Netflix] 四十一、Ribbon核心API源码解析:ribbon-core(四)ClientException客户端异常
6
[享学Netflix] 四十二、Ribbon的LoadBalancer五大组件之:IPing心跳检测
7
[享学Netflix] 四十三、Ribbon的LoadBalancer五大组件之:ServerList服务列表
8
[享学Netflix] 四十四、netflix-statistics详解,手把手教你写个超简版监控系统
9
[享学Netflix] 四十五、Ribbon服务器状态:ServerStats及其断路器原理
10
[享学Netflix] 四十六、Ribbon负载均衡策略服务器状态总控:LoadBalancerStats
11
[享学Netflix] 四十七、Ribbon多区域选择
12
[享学Netflix] 四十八、Ribbon服务器过滤逻辑的基础组件:AbstractServerPredicate
13
[享学Netflix] 四十九、Ribbon的LoadBalancer五大组件之:服务列表过滤器
14
[享学Netflix] 五十、Ribbon的LoadBalancer五大组件之:ServerListUpdater
15
[享学Netflix] 五十一、Ribbon的LoadBalancer五大组件之:IRule(一)轮询和加权轮询
16
[享学Netflix] 五十二、Ribbon的LoadBalancer五大组件之:IRule(二)应用于大规模集群的可配置规则
17
[享学Netflix] 五十三、Ribbon的LoadBalancer五大组件之:IRule(三)随机和重试,所有IRule实现总结
18
[享学Netflix] 五十四、Ribbon启动连接操作:IPrimeConnection检测Server是否能够提供服务
19
[享学Netflix] 五十五、Ribbon负载均衡器执行上下文:LoadBalancerContext
20
[享学Netflix] 五十六、Ribbon负载均衡器ILoadBalancer(一):BaseLoadBalancer
21
[享学Netflix] 五十七、Ribbon负载均衡器ILoadBalancer(二):ZoneAwareLoadBalancer具备区域意识、动态服务列表的负载均衡器
22
[享学Netflix] 五十八、Ribbon负载均衡命令:LoadBalancerCommand(一)基础类打点
23
[享学Netflix] 五十九、Ribbon负载均衡命令:LoadBalancerCommand(二)执行目标请求
24
[享学Netflix] 六十、Ribbon具有负载均衡能力的客户端:AbstractLoadBalancerAwareClient
清单首页ribbon文章详情

[享学Netflix] 五十、Ribbon的LoadBalancer五大组件之:ServerListUpdater

如果你是房间里最聪明的人,那么你走错房间了。 代码下载地址:https://github.com/f641385712/netflix-learning

目录

前言

我们已经知道ServerList它用于提供Server列表,而ServerListFilter组件它用于对列表进行过滤,本文将介绍一个Action组件:ServerListUpdater服务列表更新器。它像是一个任务调度器,来定时触发相应的动作,它强调的是动作的开始/触发,具体实现它并不关心,所以在实现里你完全可以结合ServerListServerListFilter一起来完成服务列表的维护,实际上也确实是这么做的。


正文

ServerListUpdater组件只关心动作的开始,并不关心具体实现,所以它是比较简单理解的一个组件。


ServerListUpdater

列表更新器,被DynamicServerListLoadBalancer用于动态的更新服务列表。

代码语言:javascript
复制
// Server列表更新器。
public interface ServerListUpdater {
	// 内部接口:函数式接口 实际上执行服务器列表更新的接口 也就是实际的动作
	// 一般使用匿名内部类的形式实现
	public interface UpdateAction {
        void doUpdate();
    }

	// 使用给定的更新操作启动serverList更新程序这个调用应该是幂等的
	void start(UpdateAction updateAction);
	// 停止服务器列表更新程序。这个调用应该是幂等的
	void stop();

	// ============下面是一些获取执行过程中的信息方法==============
	// 最后更新的时间Date的String表示形式
	String getLastUpdate();
	// 自上次更新以来已经过的ms数
	long getDurationSinceLastUpdateMs();
	//错过更新周期的数量(如果有的话)
	int getNumberMissedCycles();
	// 使用的线程数
	int getCoreThreads();
}

从接口方法中能看出,它会通过任务调度去定时实现更新操作。所以它有个唯一实现子类:PollingServerListUpdater


PollingServerListUpdater

动态服务器列表更新器要更新的默认实现,使用一个任务调度器ScheduledThreadPoolExecutor完成定时更新。

它本作为DynamicServerListLoadBalancer的一个内部类,现单独拿出来成为一个public的类了


成员属性
代码语言:javascript
复制
public class PollingServerListUpdater implements ServerListUpdater {

    private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000;
    private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000;

	private final AtomicBoolean isActive = new AtomicBoolean(false);
	private volatile long lastUpdated = System.currentTimeMillis();
    private final long initialDelayMs;
    private final long refreshIntervalMs;
    
	// 继承自Futrue,在Futrue的基础上增加getDelay(TimeUnit unit)方法:还有多久执行任务
	// ScheduledExecutorService提交任务时返回它
	// ScheduledThreadPoolExecutor是带有线程池功能的执行器,实现了接口ScheduledExecutorService
	private volatile ScheduledFuture<?> scheduledFuture;
}
  • isActive:标记当前Scheduled任务是否是活跃状态中(已经开启就活跃状态)
  • lastUpdated:任务调用执行一次updateAction.doUpdate()后记录该时刻,表示最新的一次update的时间戳
  • initialDelayMs:线程池的initialDelay参数。默认值是LISTOFSERVERS_CACHE_UPDATE_DELAY也就是延迟1000ms开始执行
  • refreshIntervalMs:默认值是LISTOFSERVERS_CACHE_REPEAT_INTERVAL也就是30s执行一次
    • 因为该参数相对重要,所以不仅构造时可以指定其值,还可以通过外部化配置其值,对应的key是ServerListRefreshInterval,也就是说你可以这么配置它:
      • <clientName>.ribbon.ServerListRefreshInterval = 60000(当然一般配置一个全局的即可)
  • scheduledFuture:任务调度结果,方便做cancel()动作

初始化任务调度器

该类的任务调度器是通过自己的一个私有静态内部类LazyHolder来内聚实现的:

代码语言:javascript
复制
PollingServerListUpdater.LazyHolder:

	// 重点:core核心数是可**动态**可变的
	private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
	private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
	static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;

	// 使用静态代码块完成初始化
	static {
		// 注册一个callback以便能动态更新core核心数
		int coreSize = poolSizeProp.get();
        poolSizeProp.addCallback(new Runnable() {
            @Override
            public void run() {
                _serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
            }
        });
        _serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
        ...
	}

对于线程调度线程池的初始化,此处的核心要点是:默认的coreSize数是2,但是你可以通过配置动态实现更改,所以当你的ClientName实例较多时,可适当的调高此数值。它的实现原理同ScheduledThreadPoolExectuorWithDynamicSize


成员方法
代码语言:javascript
复制
PollingServerListUpdater:

	// 构造器,为initialDelayMs/refreshIntervalMs两个参数赋值
    public PollingServerListUpdater() {
        this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
    }
    // 从config里面拿值。对应的key是ServerListRefreshInterval
    public PollingServerListUpdater(IClientConfig clientConfig) {
        this(LISTOFSERVERS_CACHE_UPDATE_DELAY, getRefreshIntervalMs(clientConfig));
    }
    public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {
        this.initialDelayMs = initialDelayMs;
        this.refreshIntervalMs = refreshIntervalMs;
    }

	// 启动
    @Override
    public synchronized void start(final UpdateAction updateAction) {
    	// 保证原子性。如果已经启动了就啥都不做
    	if (isActive.compareAndSet(false, true)) {
			//定时任务每次执行的Task
    		Runnable wrapperRunnable = () -> {
				if (!isActive.get()) {
					if (scheduledFuture != null) {
						scheduledFuture.cancel(true);
						return;
					}
					// 每次执行更新操作时,记录下时间戳
					updateAction.doUpdate();
					lastUpdated = System.currentTimeMillis();
				}

				// 启动任务 默认30s执行一次
	            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
	                    wrapperRunnable, initialDelayMs,
	                    refreshIntervalMs, TimeUnit.MILLISECONDS
    		};
    	} 
    }

	// 停止任务
    @Override
    public synchronized void stop() {
    	scheduledFuture.cancel(true);
    }

	// 为何返回String?返回Date不香吗?
    @Override
    public String getLastUpdate() {
        return new Date(lastUpdated).toString();
    }
    // 距离上一次update更新已过去这么长时间了
    @Override
    public long getDurationSinceLastUpdateMs() {
        return System.currentTimeMillis() - lastUpdated;
    }

	// 因为coreSize是动态可以配的,所以提供方法供以访问
    @Override
    public int getCoreThreads() {
        if (isActive.get()) {
            if (getRefreshExecutor() != null) {
                return getRefreshExecutor().getCorePoolSize();
            }
        }
        return 0;
    }
	...

得到一个PollingServerListUpdater实例后,调用其start方法,便可实现定时的更新服务列表了,非常的方便。所以若你想要有一个定时的去更新服务列表的能力,可以使用此组件方便的实现。


哪里使用?

它的唯一使用处在DynamicServerListLoadBalancer里,它的实现动作是:ServerListUpdater.UpdateAction

代码语言:javascript
复制
ServerListUpdater.UpdateAction:

    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
            }
        }
        updateAllServerList(servers);
    }

一句话解释:定时的更新服务器列表,这个更新动作是之前学过的两步:

  • 先用ServerList拿到所有的Server列表
  • 再用ServerListFilter完成过滤

代码示例

代码语言:javascript
复制
@Test
public void fun10() throws InterruptedException {
    ServerListUpdater serverListUpdater = new PollingServerListUpdater();

    serverListUpdater.start(() -> {
        int coreThreads = serverListUpdater.getCoreThreads();
        String lastUpdate = serverListUpdater.getLastUpdate();
        int numberMissedCycles = serverListUpdater.getNumberMissedCycles();
        long durationSinceLastUpdateMs = serverListUpdater.getDurationSinceLastUpdateMs();
        System.out.println("===========上次的执行时间是:" + lastUpdate);
        System.out.println("自上次更新以来已经过的ms数:" + durationSinceLastUpdateMs);
        System.out.println("线程核心数:" + coreThreads);
        System.out.println("错过更新周期的数量:" + numberMissedCycles);

        // .... 执行你对Server列表的更新动作,本处略
    });

    TimeUnit.SECONDS.sleep(500);
}

运行程序,控制台打印:

代码语言:javascript
复制
===========上次的执行时间是:Thu Mar 19 10:28:14 CST 2020
自上次更新以来已经过的ms数:30003
线程核心数:2
错过更新周期的数量:1
===========上次的执行时间是:Thu Mar 19 10:28:44 CST 2020
自上次更新以来已经过的ms数:30002
线程核心数:2
错过更新周期的数量:1
===========上次的执行时间是:Thu Mar 19 10:29:14 CST 2020
自上次更新以来已经过的ms数:30002
线程核心数:2
错过更新周期的数量:1

总结

关于Ribbon的LoadBalancer五大组件之:ServerListUpdater服务列表更新器就先介绍到这,实际上最终实现的都是使用的PollingServerListUpdater来实现定时更新

至此,LoadBalancer的五大组件已完成其四,剩下最为重要,也是相对较难的IRule了,下文继续予以详解。

下一篇
举报
领券