- spring-cloud-sleuth快速上手(https://cloud.tencent.com/developer/article/1884423)
- zipkin-brave的demo及源码
- spring-cloud-sleuth源码(https://cloud.tencent.com/developer/article/1886833)
spring-cloud-sleuth的quick-start 上手极快 ; 但是看代码的时候有点懵逼,所以就先对brave进行梳理,梳理后再看spring-cloud对zipkin的整合,瞬间清晰了
测试类全部依赖于Brave-quickstart, 用于熟悉下api
note: 只是对brave是如何进行日志链路追踪的进行梳理,包括spring-cloud-sleuth的源码也只梳理相关类
<properties>
<spring-cloud.version>2020.0.3</spring-cloud.version>
<zipkin-reporter.version>2.16.3</zipkin-reporter.version>
</properties>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-brave</artifactId>
<version>${zipkin-reporter.version}</version>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-sender-okhttp3</artifactId>
</dependency>
public class GlobalContext {
static OkHttpSender sender;
static AsyncZipkinSpanHandler spanHandler;
static Tracing tracing;
static{
//配置一个提交器, 控制提交span
sender = OkHttpSender.create("http://127.0.0.1:9411/api/v2/spans");
spanHandler = AsyncZipkinSpanHandler.create(sender);
tracing = Tracing.newBuilder()
.localServiceName("trace-demo")
.addSpanHandler(spanHandler)
.build();
}
public static Tracer tracer() {
return tracing.tracer();
}
public static void close() {
tracing.close();
spanHandler.close();
sender.close();
}
Tracer tracer = GlobalContext.tracer();
//root span
Span root = tracer.newTrace().name("root");
logger.info("[root]");
Span root_sub_first = tracer.newChild(root.context()).name("root_sub_first");
logger.info("[root_sub_first]");
Span root_sub_second = tracer.newChild(root.context()).name("root_sub_second").start();
logger.info("[root_sub_second]");
Span root_sub_sub_first = tracer.newChild(root_sub_first.context()).name("root_sub_sub_first").start();
logger.info("[root_sub_sub_first]");
root_sub_sub_first.finish();
root_sub_first.finish();
root_sub_second.finish();
root.finish();
GlobalContext.close();
Thread.currentThread().join();
private static Logger logger = LoggerFactory.getLogger(TagDemo.class);
public static void main(String[] args) throws InterruptedException {
// method1();
method2();
Thread.currentThread().join();
}
/**
* 一个简单的全局增长统计
* @throws InterruptedException
*/
private static void method2() throws InterruptedException {
Tracer tracer = GlobalContext.tracer();
Span start = tracer.nextSpan().name("tag-test-02").start();
try (Tracer.SpanInScope inScope = tracer.withSpanInScope(start)) {
logger.info("tag-test-02");
SUMMARY_TAG.tag(Summarizer.summarizer, start);
Span span = tracer.nextSpan().name("tag-test-02-sub").start();
SUMMARY_TAG.tag(Summarizer.summarizer, span);
logger.info("tag-test-02-sub");
span.finish();
}finally {
start.finish();
}
}
/**
* 在zipkin客户端可以通过 tagQuery=client/finagle.version 来进行查询的过滤
*
* @throws InterruptedException
*/
private static void method1() throws InterruptedException {
Tracer tracer = GlobalContext.tracer();
Span start = tracer.nextSpan().name("tag-test").start();
try (Tracer.SpanInScope inScope = tracer.withSpanInScope(start)) {
start.tag("client/finagle.version", "6.36.0");
logger.info("tag-test");
}finally {
start.finish();
}
}
static class Summarizer{
static Summarizer summarizer = new Summarizer();
private int sum = 0;
public int increase() {
return ++sum;
}
}
static Tag SUMMARY_TAG = new Tag<Summarizer>("abc") {
@Override
protected String parseValue(Summarizer input, TraceContext context) {
return input.increase() + "次";
}
};
提供链路装配的必要功能,
static final class Default extends Tracing {
//链路追踪器; Spring中这个是一个单例的bean
final Tracer tracer;
//模式是 B3Propagation.FACTORY
final Propagation.Factory propagationFactory;
//上一个factory产生的实际传播对象
final Propagation<String> stringPropagation;
//默认是: brave.propagation.CurrentTraceContext.Default类, 里面有一个 InheritableThreadLocal 的静态字段
final CurrentTraceContext currentTraceContext;
//取样器
final Sampler sampler;
final Clock clock;
//Tag的子类,name=error , 比如异常了把error转成指定格式传给zipkin服务端
final ErrorParser errorParser;
final AtomicBoolean noop;
}
链路追踪器, 链路追踪中重要的Span就是由该类来进行创建, 按照官方demo来中使用到#newTracer
和#nextSpan
#newChild(TraceContext)
底层全部是通过_toSpan
来生成的span
public class Tracer {
//根据不同的jdk版本来获取对应的Clock; 比如
final Clock clock;
//传播工厂,生成的传播类在不同的调用中增加调用过程中的附加信息
final Propagation.Factory propagationFactory;
//虽然注释用来调用tostring方法的,但也只是在这个类里做这个事情, 这个类是有其他功能的
final SpanHandler spanHandler; // only for toString
//所有挂起的span
final PendingSpans pendingSpans;
//取样器
final Sampler sampler;
//当前追踪的上下文,用于整合其他上下文,如mdc
final CurrentTraceContext currentTraceContext;
final boolean traceId128Bit, supportsJoin, alwaysSampleLocal;
final AtomicBoolean noop;
}
#newTrace()
public Span newTrace() {
return _toSpan(null, newRootContext(0));
}
TraceContext newRootContext(int flags) {
flags &= ~FLAG_SHARED; // cannot be shared if we aren't reusing the span ID
// tracer中有一个propagationFactory类,创建上下文的时候可以扩展一下
return decorateContext(flags, 0L, 0L, 0L, 0L, 0L, Collections.emptyList());
}
newChild(TraceContext)
public Span newChild(TraceContext parent) {
if (parent == null) throw new NullPointerException("parent == null");
return _toSpan(parent, decorateContext(parent, parent.spanId()));
}
nextSpan()
//通过线程变量来获取当前链路追踪的上下文; 如果有就以这个上下文为root创建子span ; 没有就创建一个rootspan再根据这个rootspan创建子span
public Span nextSpan() {
TraceContext parent = currentTraceContext.get();
return parent != null ? newChild(parent) : newTrace();
}
_toSpan()
Span _toSpan(@Nullable TraceContext parent, TraceContext context) {
if (isNoop(context)) return new NoopSpan(context);
//pendingSpans携带了所有挂起的span ; getOrCreate或将创建好的span保存到map中去, 并且SpanHandle处理begin信号
PendingSpan pendingSpan = pendingSpans.getOrCreate(parent, context, false);
TraceContext pendingContext = pendingSpan.context();
if (pendingContext != null) context = pendingContext;
//pendingSpan.state() -> MutableSpan 和 pendingSpans中携带的PendingSpan 是同一个对象
return new RealSpan(context, pendingSpans, pendingSpan.state(), pendingSpan.clock());
}
链路追踪的上下文,携带了链路追踪定义的信息
//@Immutable
public final class TraceContext extends SamplingFlags {
final long traceIdHigh, traceId, localRootId, parentId, spanId;
final List<Object> extraList;
TraceContext(
int flags,
long traceIdHigh,
long traceId,
long localRootId,
long parentId,
long spanId,
//用于扩展时增加附加的信息
List<Object> extraList
) {
super(flags);
this.traceIdHigh = traceIdHigh;
this.traceId = traceId;
this.localRootId = localRootId;
this.parentId = parentId;
this.spanId = spanId;
this.extraList = extraList;
}
}
这个和TraceContext没有继承关系, 是持有TraceContext的地方, 默认的实现类是: Default,通过本地线程变量来存储上下文
public static final class Default extends ThreadLocalCurrentTraceContext {
// Inheritable as Brave 3's ThreadLocalServerClientAndLocalSpanState was inheritable
static final InheritableThreadLocal<TraceContext> INHERITABLE = new InheritableThreadLocal<>();
super{ //父类
static final ThreadLocal<TraceContext> DEFAULT = new ThreadLocal<>();
final ThreadLocal<TraceContext> local;
final RevertToNullScope revertToNull; //root - scope
super{
//Scope的装饰器
final ScopeDecorator[] scopeDecorators;
}
}
}
在看源码中Span一直有点梳理不清楚,因为代码中创建的span都是RealSpan, 所以依RealSpan为入口对Span进行了梳理,结论如下:
final class RealSpan extends Span {
final TraceContext context;
final PendingSpans pendingSpans;
final MutableSpan state;
final Clock clock;
/**
* 个人理解annotate是一种特殊tag的来源方法
*/
@Override public Span annotate(long timestamp, String value) {
// Modern instrumentation should not send annotations such as this, but we leniently
// accept them rather than fail. This for example allows old bridges like to Brave v3 to work
if ("cs".equals(value)) {
synchronized (state) {
state.kind(Span.Kind.CLIENT);
state.startTimestamp(timestamp);
}
} else if ("sr".equals(value)) {
synchronized (state) {
state.kind(Span.Kind.SERVER);
state.startTimestamp(timestamp);
}
} else if ("cr".equals(value)) {
synchronized (state) {
state.kind(Span.Kind.CLIENT);
}
finish(timestamp);
} else if ("ss".equals(value)) {
synchronized (state) {
state.kind(Span.Kind.SERVER);
}
finish(timestamp);
} else {
synchronized (state) {
state.annotate(timestamp, value);
}
}
return this;
}
}
字面意思: 挂起的span, 比如start一个span时, PendingSpans中增加PendingSpan对象, 调用end/flush时,移出这个对象
public final class PendingSpan extends WeakReference<TraceContext> {
final MutableSpan span;
final TickClock clock;
final TraceContext handlerContext;
}
前面我们看Tracer类的时候也有这个类; 首先Tracer是单例的; 那么单例里面有这么一个对象; 所以这个对象也是单例的; 我们对span执行start/end/flush/finish等操作实际上都是调入到这个类上, 这个类继承了WeakConcurrentMap类,里面有所有PendingSpan 类 比如finish方法会溢出PendingSpan ,然后通过spanHandler将数据提交给了zipkin服务器
public final class PendingSpans extends WeakConcurrentMap<TraceContext, PendingSpan> {
final MutableSpan defaultSpan;
final Clock clock;
//brave.Tracing.Builder#build 这个spanHandler是 zipkinSpanReporter 专门用来提交数据的
final SpanHandler spanHandler;
final AtomicBoolean noop;
public void finish(TraceContext context, long timestamp) {
PendingSpan last = remove(context);
if (last == null) return;
last.span.finishTimestamp(timestamp != 0L ? timestamp : last.clock.currentTimeMicroseconds());
spanHandler.end(last.handlerContext, last.span, Cause.FINISHED);
}
}
名称上说明该对象是可变的; 外部暴露的span为RealSpan,当调用tag/annotae方法时实际数据保存的地方就是该类
public final class MutableSpan implements Cloneable {
/*
* One of these objects is allocated for each in-flight span, so we try to be parsimonious on
* things like array allocation and object reference size.
*/
String traceId, localRootId, parentId, id;
Kind kind;
int flags;
long startTimestamp, finishTimestamp;
String name, localServiceName, localIp, remoteServiceName, remoteIp;
int localPort, remotePort;
Throwable error;
//
// The below use object arrays instead of ArrayList. The intent is not for safe sharing
// (copy-on-write), as this type is externally synchronized. In other words, this isn't
// copy-on-write. We just grow arrays as we need to similar to how ArrayList does it.
//
// tags [(key, value)] annotations [(timestamp, value)]
Object[] tags = EMPTY_ARRAY, annotations = EMPTY_ARRAY;
int tagCount, annotationCount;
}
对应前面的PendingSpans类,这个类里面Reporter见名知意是提交器,通过这个handler来将数据提交到zipkin服务器上
public class ZipkinSpanHandler extends SpanHandler implements Closeable {
final Reporter<MutableSpan> spanReporter;
final Tag<Throwable> errorTag; // for toBuilder()
final boolean alwaysReportSpans;
}
只会有一条线程进行发送; 如果线程已经在工作的话会丢入pending队列中之前的线程会处理掉
public abstract class AsyncReporter<S> extends Component implements Reporter<S>, Flushable {
//通过内部的Build类可以得到,这个reporter就是该类,
static final class BoundedAsyncReporter<S> extends AsyncReporter<S> {
static final Logger logger = Logger.getLogger(BoundedAsyncReporter.class.getName());
//线程是否已经在工作了;
final AtomicBoolean started, closed;
final BytesEncoder<S> encoder;
//缓冲队列
final ByteBoundedQueue<S> pending;
final Sender sender;
final int messageMaxBytes;
final long messageTimeoutNanos, closeTimeoutNanos;
final CountDownLatch close;
final ReporterMetrics metrics;
//生成线程的工厂类
final ThreadFactory threadFactory;
/** Tracks if we should log the first instance of an exception in flush(). */
private boolean shouldWarnException = true;
//特意粘贴这个方法,是想解释异步的原因在这个类里面,启动新线程处理
void startFlusherThread() {
BufferNextMessage<S> consumer =
BufferNextMessage.create(encoder.encoding(), messageMaxBytes, messageTimeoutNanos);
Thread flushThread = threadFactory.newThread(new Flusher<>(this, consumer));
flushThread.setName("AsyncReporter{" + sender + "}");
flushThread.setDaemon(true);
flushThread.start();
}
}
}
发送器,这个类是将数据如何发送到zipkin服务的, rabbitmq/kafka/rest/okhttp等方式, 这个类比较好理解,不同实现也不粘代码了
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。