1. 概述
本文主要分享 Eureka-Client 向 Eureka-Server 获取增量注册信息的过程。
前置阅读:《Eureka 源码解析 —— 应用实例注册发现(六)之全量获取》
FROM 《深度剖析服务发现组件Netflix Eureka》
Eureka-Client 获取注册信息,分成全量获取和增量获取。默认配置下,Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,而后每 30 秒增量获取刷新本地缓存( 非“正常”情况下会是全量获取 )。
本文重点在于增量获取。
推荐 Spring Cloud 书籍:
推荐 Spring Cloud 视频:
Applications.appsHashCode ,应用集合一致性哈希码。
增量获取注册的应用集合( Applications ) 时,Eureka-Client 会获取到:
Eureka-Client 将变化的应用集合和本地缓存的应用集合进行合并后进行计算本地的应用集合一致性哈希码。若两个哈希码相等,意味着增量获取成功;若不相等,意味着增量获取失败,Eureka-Client 重新和 Eureka-Server 全量获取应用集合。
Eureka 比较应用集合一致性哈希码,和日常我们通过哈希码比较两个对象是否相等类似。
appsHashCode = ${status}_${count}_
status ) + 数量( count )拼接出一致性哈希码。若数量为 0 ,该应用实例状态不进行拼接。状态以字符串大小排序。appsHashCode = UP_8_ 。8 个 UP ,2 个 DOWN ,则 appsHashCode = DOWN_2_UP_8_ 。#populateInstanceCountMap() 方法,计算每个应用实例状态的数量。实现代码如下:
// Applications.java public void populateInstanceCountMap(Map<String, AtomicInteger> instanceCountMap) { for (Application app : this.getRegisteredApplications()) { for (InstanceInfo info : app.getInstancesAsIsFromEureka()) { // 计数 AtomicInteger instanceCount = instanceCountMap.computeIfAbsent(info.getStatus().name(), k -> new AtomicInteger(0)); instanceCount.incrementAndGet(); } } } public List<Application> getRegisteredApplications() { return new ArrayList<Application>(this.applications); } // Applications.java public List<InstanceInfo> getInstancesAsIsFromEureka() { synchronized (instances) { return new ArrayList<InstanceInfo>(this.instances); } }#getReconcileHashCode() 方法,计算 hashcode 。实现代码如下:
public static String getReconcileHashCode(Map<String, AtomicInteger> instanceCountMap) { StringBuilder reconcileHashCode = new StringBuilder(75); for (Map.Entry<String, AtomicInteger> mapEntry : instanceCountMap.entrySet()) { reconcileHashCode.append(mapEntry.getKey()).append(STATUS_DELIMITER) // status .append(mapEntry.getValue().get()).append(STATUS_DELIMITER); // count } return reconcileHashCode.toString(); }本小节,建议你理解完全文后,再回到此处 本小节,建议你理解完全文后,再回到此处 本小节,建议你理解完全文后,再回到此处
笔者刚看完应用集合一致性哈希算法的计算公式,处于一脸懵逼的状态。这么精简的方式真的能够校验出数据的一致性么?不晓得有多少读者跟笔者有一样的疑惑。下面我们来论证该算法的合理性( 一本正经的胡说八道 )。
一致性哈希值通过状态 + 数量来计算,那么是不是可能状态总数是一样多,实际分布在不同的应用?那么我们列举模型如下:
UP | |
|---|---|
应用A | m |
应用B | n |
如果此时应用A 下线了 c 个原应用实例,应用B 注册了 c 个信应用实例,那么处于 UP 状态的数量仍然是 m + n 个。
UP (server) | UP (client) | |
|---|---|---|
应用A | m - c | m - c |
应用B | n + c | n + c |
UP (server) | UP (client) | |
|---|---|---|
应用A | m - c | m |
应用B | n + c | n |
UP (server) | UP (client) | |
|---|---|---|
应用A | m - c | m - w |
应用B | n + c | n + w |
What ? 从异常情况【1】【2】可以看到,一致性哈希算法竟然是不合理的,那么我们手动来做一次最精简的实验。实验如下:
eureka.retentionTimeInMSInDeltaQueue = 1 ,变更记录队列每条记录存活时长 1 ms。用以实现 Eureka-Client 请求不到完整的增量变化。eureka.deltaRetentionTimerIntervalInMs = 1 ,变更记录队列每条记录过期定时任务执行频率 1 ms。用以实现 Eureka-Client 请求不到完整的增量变化。eureka.shouldUseReadOnlyResponseCache = false ,禁用响应缓存的只读缓存。用以避免等待缓存刷新。eureka.waitTimeInMsWhenSyncEmpty = 1 ,UP (server) | UP (client) | |
|---|---|---|
应用A | 0 | 1 |
应用B | 1 | 0 |
?结论?
当然排除掉特别极端的场景,Eureka-Client 从 Eureka-Server 因为网络异常导致一直同步不到增量变化,又恰好应用关闭和开启满足状态统计数量。另外,变更记录队列记录过期时长为 300 秒,增量获取频率为 30 秒,获取的次数有 10 次左右。所以,应用集合一致性哈希码在绝大多数场景是合理的。笔者的YY,解决这个极小场景有如下方式:
appsHashCode = MD5(${app_name}_${instance_id}_${status}_${count}_) ,增加对应用名和应用实例编号敏感。ps :笔者怀着忐忑的心写完了这个小节,如果有不合理的地方,又或者有不同观点的胖友,欢迎一起探讨。谢谢。
TODO[0027][反思]:应用集合一致性哈希算法。
在 《Eureka 源码解析 —— 应用实例注册发现(六)之全量获取》「2.4 发起获取注册信息」 里,调用 DiscoveryClient#getAndUpdateDelta(...) 方法,增量获取注册信息,并刷新本地缓存,实现代码如下:
1: private void getAndUpdateDelta(Applications applications) throws Throwable {
2: long currentUpdateGeneration = fetchRegistryGeneration.get();
3:
4: // 增量获取注册信息
5: Applications delta = null;
6: EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
7: if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
8: delta = httpResponse.getEntity();
9: }
10:
11: if (delta == null) {
12: // 增量获取为空,全量获取
13: logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
14: + "Hence got the full registry.");
15: getAndStoreFullRegistry();
16: } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
17: logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
18: String reconcileHashCode = "";
19: if (fetchRegistryUpdateLock.tryLock()) {
20: try {
21: // 将变化的应用集合和本地缓存的应用集合进行合并
22: updateDelta(delta);
23: // 计算本地的应用集合一致性哈希码
24: reconcileHashCode = getReconcileHashCode(applications);
25: } finally {
26: fetchRegistryUpdateLock.unlock();
27: }
28: } else {
29: logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
30: }
31: // There is a diff in number of instances for some reason
32: if (!reconcileHashCode.equals(delta.getAppsHashCode()) // 一致性哈希值不相等
33: || clientConfig.shouldLogDeltaDiff()) { //
34: reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
35: }
36: } else {
37: logger.warn("Not updating application delta as another thread is updating it already");
38: logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
39: }
40: }AbstractJerseyEurekaHttpClient#getApplicationsInternal(...) 方法,GET 请求 Eureka-Server 的 apps/detla 接口,参数为 regions ,返回格式为 JSON ,实现增量获取注册信息。#getAndStoreFullRegistry() 方法,全量获取注册信息,并设置到本地缓存。该方法在 《Eureka 源码解析 —— 应用实例注册发现(六)之全量获取》「2.4.1 全量获取注册信息,并设置到本地缓存」 有详细解析。eureka.printDeltaFullDiff ,是否打印增量和全量差异。默认值 :false 。从目前代码实现上来看,暂时没有生效。注意 :开启该参数会导致每次增量获取后又发起全量获取,不要开启。#updateDelta(...) 方法,将变化的应用集合和本地缓存的应用集合进行合并。#reconcileAndLogDifference() 方法,全量获取注册信息,并设置到本地缓存,和 #getAndStoreFullRegistry() 基本类似。调用 #updateDelta(...) 方法,将变化的应用集合和本地缓存的应用集合进行合并。实现代码如下:
1: private void updateDelta(Applications delta) {
2: int deltaCount = 0;
3: for (Application app : delta.getRegisteredApplications()) { // 循环增量(变化)应用集合
4: for (InstanceInfo instance : app.getInstances()) {
5: Applications applications = getApplications();
6: // TODO[0009]:RemoteRegionRegistry
7: String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
8: if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
9: Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
10: if (null == remoteApps) {
11: remoteApps = new Applications();
12: remoteRegionVsApps.put(instanceRegion, remoteApps);
13: }
14: applications = remoteApps;
15: }
16:
17: ++deltaCount;
18: if (ActionType.ADDED.equals(instance.getActionType())) { // 添加
19: Application existingApp = applications.getRegisteredApplications(instance.getAppName());
20: if (existingApp == null) {
21: applications.addApplication(app);
22: }
23: logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
24: applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
25: } else if (ActionType.MODIFIED.equals(instance.getActionType())) { // 修改
26: Application existingApp = applications.getRegisteredApplications(instance.getAppName());
27: if (existingApp == null) {
28: applications.addApplication(app);
29: }
30: logger.debug("Modified instance {} to the existing apps ", instance.getId());
31:
32: applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
33: } else if (ActionType.DELETED.equals(instance.getActionType())) { // 删除
34: Application existingApp = applications.getRegisteredApplications(instance.getAppName());
35: if (existingApp == null) {
36: applications.addApplication(app);
37: }
38: logger.debug("Deleted instance {} to the existing apps ", instance.getId());
39: applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);
40: }
41: }
42: }
43: logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
44:
45: getApplications().setVersion(delta.getVersion());
46: // 过滤、打乱应用集合
47: getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
48:
49: // TODO[0009]:RemoteRegionRegistry
50: for (Applications applications : remoteRegionVsApps.values()) {
51: applications.setVersion(delta.getVersion());
52: applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
53: }
54: }Application#addInstance(...) 方法,实现代码如下:
// Application.java public void addInstance(InstanceInfo i) { // 添加到 应用实例映射 instancesMap.put(i.getId(), i); synchronized (instances) { // 移除原有实例 instances.remove(i); // 添加新实例 instances.add(i); // 设置 isDirty ,目前只用于 `#toString()` 方法打印,无业务逻辑 isDirty = true; } } // InstanceInfo.java @Override public int hashCode() { // 只使用 ID 计算 hashcode String id = getId(); return (id == null) ? 31 : (id.hashCode() + 31); } @Override public boolean equals(Object obj) { // 只对比 ID if (this == obj) { return true; } if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } InstanceInfo other = (InstanceInfo) obj; String id = getId(); if (id == null) { if (other.getId() != null) { return false; } } else if (!id.equals(other.getId())) { return false; } return true; }Application#addInstance(...) 方法。Application#removeInstance(...) 方法,实现代码如下:
public void removeInstance(InstanceInfo i) { removeInstance(i, true); } private void removeInstance(InstanceInfo i, boolean markAsDirty) { // 移除 应用实例映射 instancesMap.remove(i.getId()); synchronized (instances) { // 移除 应用实例 instances.remove(i); if (markAsDirty) { // 设置 isDirty ,目前只用于 `#toString()` 方法打印,无业务逻辑 isDirty = true; } } }Applications#shuffleInstances(...) 方法,根据配置 eureka.shouldFilterOnlyUpInstances = true ( 默认值 :true ) 过滤只保留状态为开启( UP )的应用实例,并随机打乱应用实例顺序。打乱后,实现调用应用服务的随机性。代码比较易懂,点击链接查看方法实现。com.netflix.eureka.resources.ApplicationsResource,处理所有应用的请求操作的 Resource ( Controller )。
接收增量获取请求,映射 ApplicationsResource#getContainers() 方法。
AbstractInstanceRegistry.recentlyChangedQueue,最近租约变更记录队列。实现代码如下:
// AbstractInstanceRegistry.java
/**
* 最近租约变更记录队列
*/
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();
/**
* 最近租约变更记录
*/
private static final class RecentlyChangedItem {
/**
* 最后更新时间戳
*/
private long lastUpdateTime;
/**
* 租约
*/
private Lease<InstanceInfo> leaseInfo;
public RecentlyChangedItem(Lease<InstanceInfo> lease) {
this.leaseInfo = lease;
lastUpdateTime = System.currentTimeMillis();
}
public long getLastUpdateTime() {
return this.lastUpdateTime;
}
public Lease<InstanceInfo> getLeaseInfo() {
return this.leaseInfo;
}
}lastUpdateTime 超过一定时长后进行移除。实现代码如下:
// AbstractInstanceRegistry.java this.deltaRetentionTimer.schedule(getDeltaRetentionTask(), serverConfig.getDeltaRetentionTimerIntervalInMs(), serverConfig.getDeltaRetentionTimerIntervalInMs()); private TimerTask getDeltaRetentionTask() { return new TimerTask() { @Override public void run() { Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator(); while (it.hasNext()) { if (it.next().getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) { it.remove(); } else { break; } } } }; }eureka.deltaRetentionTimerIntervalInMs, 移除队列里过期的租约变更记录的定时任务执行频率,单位:毫秒。默认值 :30 * 1000 毫秒。eureka.retentionTimeInMSInDeltaQueue,租约变更记录过期时长,单位:毫秒。默认值 : 3 * 60 * 1000 毫秒。在 《Eureka 源码解析 —— 应用实例注册发现(六)之全量获取》「3.3 缓存读取」 里,在 #generatePayload() 方法里,调用 AbstractInstanceRegistry#getApplicationDeltas(...) 方法,获取近期变化的应用集合,实现代码如下:
// AbstractInstanceRegistry.java
1: public Applications getApplicationDeltas() {
2: // 添加 增量获取次数 到 监控
3: GET_ALL_CACHE_MISS_DELTA.increment();
4: // 初始化 变化的应用集合
5: Applications apps = new Applications();
6: apps.setVersion(responseCache.getVersionDelta().get());
7: Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
8: try {
9: // 获取写锁
10: write.lock();
11: // 获取 最近租约变更记录队列
12: Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
13: logger.debug("The number of elements in the delta queue is :" + this.recentlyChangedQueue.size());
14: // 拼装 变化的应用集合
15: while (iter.hasNext()) {
16: Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
17: InstanceInfo instanceInfo = lease.getHolder();
18: Object[] args = {instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name()};
19: logger.debug("The instance id %s is found with status %s and actiontype %s", args);
20: Application app = applicationInstancesMap.get(instanceInfo.getAppName());
21: if (app == null) {
22: app = new Application(instanceInfo.getAppName());
23: applicationInstancesMap.put(instanceInfo.getAppName(), app);
24: apps.addApplication(app);
25: }
26: app.addInstance(decorateInstanceInfo(lease));
27: }
28:
29: // TODO[0009]:RemoteRegionRegistry
30: boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
31: if (!disableTransparentFallback) {
32: Applications allAppsInLocalRegion = getApplications(false);
33:
34: for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
35: Applications applications = remoteRegistry.getApplicationDeltas();
36: for (Application application : applications.getRegisteredApplications()) {
37: Application appInLocalRegistry =
38: allAppsInLocalRegion.getRegisteredApplications(application.getName());
39: if (appInLocalRegistry == null) {
40: apps.addApplication(application);
41: }
42: }
43: }
44: }
45:
46: // 获取全量应用集合,通过它计算一致性哈希值
47: Applications allApps = getApplications(!disableTransparentFallback);
48: apps.setAppsHashCode(allApps.getReconcileHashCode());
49: return apps;
50: } finally {
51: write.unlock();
52: }
53: }apps )。最近租约变更记录队列 )。apps )。#getApplications(...) 方法,获取全量应用集合( allApps ),在 《Eureka 源码解析 —— 应用实例注册发现(六)之全量获取》「3.3.1 获得注册的应用集合」 有详细解析。后通过 allApps 计算一致性哈希值。通过这个全量应用集合的哈希值,Eureka-Client 获取到增量应用集合并合并后,就可以比对啦。