本文主要基于 SkyWalking 3.2.6 正式版
本文主要分享 应用与应用实例的注册。先来简单了解下注册的整体流程:
下面,我们分成两个小节,分别从 API 的实现与调用,分享代码的具体实现。
友情提示:推荐阅读 《探针与Collector间通讯协议》 。
Collector 注册相关 API 相关有四个接口:
API 处理的流程大体如下:

我们先来看看 API 的定义,ApplicationRegisterService.proto ,如下图所示:

KeyWithIntegerValue.proto 中定义。ApplicationRegisterServiceHandler#register(Application, StreamObserver<ApplicationMapping>), 代码如下:
applicationCode )数组。IApplicationIDService#getOrCreate(applicationCode) 方法,获取或创建应用,最终获得应用编号( applicationId )。applicationId != 0 ),则添加到响应。为什么会存在获得不到的情况呢?在下文中,我们会看到,实际异步保存应用,所以会存在获取失败的情况。当获取失败,调用方( 例如 Agent )可以重新发起该请求进行注册应用,从而在异步保存应用,获取到应用编号。org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService ,继承 Service 接口,应用编号服务接口。
#getOrCreate(applicationCode) 接口方法,根据应用编码获取或创建应用,并获得应用编号。org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService ,实现 IApplicationIDService 接口,应用编号服务实现类。
#getOrCreate(applicationCode) 方法,代码如下:ApplicationCacheService#get(applicationCode) 方法,从缓存中获取应用编号。在 《SkyWalking 源码分析 —— Collector Cache 缓存组件》 有详细解析。Graph<Application> 对象,调用 Graph#start(application) 方法,进行流式处理。在这过程中,会保存应用到存储器。在 #createApplicationRegisterGraph() 方法中,我们可以看到 Application 对应的 Graph<Application> 对象的创建。
org.skywalking.apm.collector.agent.stream.worker.register.ApplicationRegisterRemoteWorker ,继承 AbstractRemoteWorker 抽象类,应用注册远程 Worker 。#id() 实现方法,返回 10006 。#selector 实现方法,返回 Selector.ForeverFirst 。在 《SkyWalking 源码分析 —— Collector Remote 远程通信服务》 有详细解析。#onWork(Application) 实现方法,调用 Next#execute(message) 方法,提交数据给下面的节点,继续流式处理。ip 排序 ) 进行后续的流式处理,即,保存应用。org.skywalking.apm.collector.agent.stream.worker.register.ApplicationRegisterSerialWorker ,继承 AbstractLocalAsyncWorker 抽象类,异步保存应用 Worker 。ApplicationCacheService#get(applicationCode) 方法,从缓存中获取应用编号。applicationCode 创建应用并保存。ApplicationH2RegisterDAO#getMinApplicationId() 方法,获得 Application 记录的应用编号的最小值。min == 0 时,说明没有 Application 记录。applicationId = 1 ,applicationCode = User ,用于表示用户发起请求。在 SkyWaling UI 中,我们可以看到该条 Application 记录如下图:
applicationId = -1 。ApplicationH2RegisterDAO#getMaxApplicationId() 方法,获得 Application 记录的应用编号的最大值。IdAutoIncrement#increment(min, max) 方法,获得应用编号。该方法较为有趣,在下文详细解析。ApplicationEsRegisterDAO#save(Application) 方法,保存应用。#id() 实现方法,返回 101 。#onWork(Application) 实现方法,保存应用( Application )。代码如下( 以 ES 作为 DAO 实现为例子 ):IdAutoIncrement#increment(min, max) 方法,双向均匀自增。可能看起来比较奇怪,以上文 Application 的调用举例子:
min | max | result | applicationCode |
|---|---|---|---|
0 | / | 1 | User |
0 | / | -1 | 应用 A |
-1 | 1 | 2 | 应用 B |
-1 | 2 | -2 | 应用 C |
-2 | 2 | 3 | 应用 D |
result ,不调用 #increment(min, max) 方法。min + max = 0 为中心点( 实际以 0 为中心点), 双向均匀自增。TODO 【4007】
org.skywalking.apm.collector.storage.table.register.Application ,应用。例如记录在 ES 如下图:

我们先来看看 API 的定义,InstanceDiscoveryService ,如下图所示:

整体代码和 「2.1 应用的注册 API」 非常相似,所以本小节,更多的是提供代码的链接地址。
InstanceDiscoveryServiceHandler#register(ApplicationInstance, StreamObserver<ApplicationInstanceMapping>),注册应用实例。
org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService ,继承 Service 接口,应用实例编号服务接口。
#getOrCreate(applicationCode) 接口方法,根据应用编号 + AgentUUID,获取或创建应用实例,并获得应用编号。org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService ,实现 IInstanceIDService 接口,应用编号服务实现类。
#getOrCreate(applicationCode) 方法。在 #createInstanceRegisterGraph() 方法中,我们可以看到 Instance 对应的 Graph<Instance> 对象的创建。
org.skywalking.apm.collector.agent.stream.worker.register.InstanceRegisterRemoteWorker ,继承 AbstractRemoteWorker 抽象类,应用实例注册远程 Worker 。org.skywalking.apm.collector.agent.stream.worker.register.ApplicationRegisterSerialWorker ,继承 AbstractLocalAsyncWorker 抽象类,异步保存应用 Worker 。"1" 正向递增。org.skywalking.apm.collector.storage.table.register.Instance ,应用实例。例如记录在 ES 如下图:

我们先来看看 API 的定义,InstanceDiscoveryService.proto ,如下图所示:

Downstream.proto 中定义。一般情况下,Agent 在注册应用时候成功后,如果因为各种原因原因和 Collector 断开了 gRPC Channel 连接( 例如,网络 ),恢复连接后,需要调用该 API ,进行恢复注册。
InstanceDiscoveryServiceHandler#recover(ApplicationInstanceRecover, StreamObserver<Downstream>), 代码如下:
TimeBucketUtils#getSecondTimeBucket(time) 方法,将 registerTime 转成 timeBucket 。IInstanceIDService#recover(instanceId, applicationId, registerTime, osInfo) 方法,恢复注册应用实例。InstanceIDService#recover(instanceId, applicationId, registerTime, osInfo) 实现方法,恢复注册。代码如下:
我们先来看看 API 的定义,InstanceDiscoveryService.proto ,如下图所示:

Downstream.proto 中定义。一般情况下,Agent 在注册应用时候成功后,定时向 Collector 发送心跳,记录应用存活。
InstanceDiscoveryServiceHandler#heartbeat(ApplicationInstanceHeartbeat, StreamObserver<org.skywalking.apm.network.proto.Downstream>) ,目前该方法暂未实现。实现后,会首先调用一个 Service 方法,而后调用 InstanceEsRegisterDAO#updateHeartbeatTime(instanceId, heartbeatTime) 方法,记录应用实例的心跳时间。
org.skywalking.apm.agent.core.remote.AppAndServiceRegisterClient ,实现 BootService 、GRPCChannelListener 、Runnable 、TracingContextListener 接口,注册应用与实例的客户端。该客户端会调用上述所有 API 。
PROCESS_UUID 静态属性,Agent UUID ,使用 UUID 算法生成,去除多余 "-" 。status 属性,gRPC 连接状态。applicationRegisterServiceBlockingStub / instanceDiscoveryServiceBlockingStub / serviceNameDiscoveryServiceBlockingStub 属性,对应 gRPC 提供 API 的阻塞 Stub 。needRegisterRecover 属性,是否需要发起恢复的注册。#statusChanged(GRPCChannelStatus) 实现方法,根据 gRPC 连接状态的变更,创建或销毁 Stub 。#boot() 实现方法,将自己作为监听器( 因为实现了 GRPCChannelListener 接口 )添加到 GRPCChannelManager 中,从而监听 gRPC Channel 的状态。在 《SkyWalking 源码分析 —— Agent Remote 远程通信服务》 有详细解析。applicationRegisterFuture 属性,注册应用与实例的定时任务。#boot() 实现方法,创建 applicationRegisterFuture 。该定时任务无初始化延迟,每 Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL ( 默认:3 s ) 执行一次 #run() 方法。lastSegmentTime 属性,最后记录 Segment 的时间。#afterFinished() 实现方法,记录 Segment 最后的时间。#afterBoot() 实现方法,将自己作为监听器( 因为实现了 TracingContextListener 接口 )添加到 GRPCChannelManager 中,从而监听 Segment 的记录。在 《SkyWalking 源码分析 —— Agent 收集 Trace 数据》 有详细解析。#run() 实现方法,执行应用的注册,应用实例的正常注册、恢复注册、心跳的逻辑。
shouldTry )。可能对 shouldTry 会比较疑惑?该变量用于,应用的注册成功后,重新标记 shouldTry = true ,继续执行应用实例的注册。lastSegmentTime 一分钟,调用 「2.4 应用实例的心跳 API」 。GRPCChannelManager#reportError(t) 方法,处理异常,例如请求超时。