摘要: 原创出处 http://www.iocoder.cn/SkyWalking/agent-send-trace/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 SkyWalking 3.2.6 正式版
分布式链路追踪系统,链路的追踪大体流程如下:
本文主要分享【第二部分】 SkyWalking Agent 发送 Trace 数据。
考虑到减少外部组件的依赖,Agent 收集到 Trace 数据后,不是写入外部消息队列( 例如,Kafka )或者日志文件,而是 Agent 写入内存消息队列,后台线程【异步】发送给 Collector 。
本文涉及的类非常少,如下图所示:

org.skywalking.apm.agent.core.remote.TraceSegmentServiceClient ,TraceSegment 发送服务客户端。它是一个服务,也是一个客户端,负责将 TraceSegment 异步发送到 Collector 。
我们先来看看 TraceSegmentServiceClient 的属性:
TIMEOUT 静态属性,发送等待超时时长,单位:毫秒。lastLogTime 属性,最后打印日志时间。该属性主要用于开发调试。segmentUplinkedCounter 属性,TraceSegment 发送数量。segmentAbandonedCounter 属性,TraceSegment 被丢弃数量。在 Agent 未连接上 Collector 时,产生的 TraceSegment 将被丢弃。carrier 属性,内存队列。在 《SkyWalking 源码分析 —— DataCarrier 异步处理库》 有对 DataCarrier 的详细解析。serviceStub 属性,非阻塞 Stub 。status 属性,连接状态。下面,我们来介绍 TraceSegmentServiceClient 实现的接口以及对应的方法。
#beforeBoot() 方法,代码如下:
GRPCChannelManager#addChannelListener(this) 方法,将自己添加到 GRPCChannelManager 中,作为一个监听器,从而调用 #statusChanged(GRPCChannelStatus) 方法,实现对连接状态( status )的监听处理。#boot() 方法,代码如下:
#consume(List<TraceSegment> ) 方法,实现异步发送 TraceSegment 到 Collector 。#afterBoot() 方法,代码如下:
TracingContext.ListenerManager#add(this) 方法,将自己添加到 ListenerManager 中,作为一个监听器,从而调用 #afterFinished(TraceSegment) 方法,实现收集到新的 TraceSegment ,添加到内存队列。#shutdown() 方法,代码如下:
DataCarrier#shutdownConsumers() 方法,停止消费。#statusChanged(GRPCChannelStatus) 方法,代码如下:
#afterFinished(TraceSegment) 方法,代码如下:
TraceSegment.ignore = true 时,忽略该 TraceSegment 。#consume(List<TraceSegment>) 方法,代码如下:
org.skywalking.apm.agent.core.remote。GRPCStreamServiceStatus 对象。GRPCStreamServiceStatus#finished() 方法,标记完成。为什么呢?下面会看到。GRPCChannelManager#reportError(Throwable) 方法,汇报错误。如果是连接错误,GRPCChannelManager 会负责断开重连。DistributedTraceId#toUniqueId()ID#transform()AbstractTracingSpan#transform()ExitSpan#transform()LogDataEntity#transform()TraceSegmentRef#transform()KeyValuePair#transform()TraceSegment#transform() 方法,将 TraceSegment 转换成 org.skywalking.apm.network.proto.UpstreamSegment 对象,用于 gRPC 传输,参见 TraceSegmentService.proto 的数据结构定义。StreamObserver#onCompleted() 方法,标记全部请求发送完成。GRPCStreamServiceStatus#wait4Finish(maxTimeout) 方法,等待 Collector 处理完成。这就是为什么上面需要调用 GRPCStreamServiceStatus#finished() 方法。完成后,记录数量到 segmentUplinkedCounter 。segmentAbandonedCounter 。#printUplinkStatus() 方法,每三十秒,打印一次 segmentUplinkedCounter 和 segmentAbandonedCounter 数据。主要用于开发调试。另外,该方法会重置 segmentUplinkedCounter 和 segmentAbandonedCounter 计数。ps:目前 DataCarrier 最长每 20 秒消费一次。
#onError(List<TraceSegment>, Throwable) 方法,当消费发生异常时,打印日志。