前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spring-cloud-sleuth源码学习二

spring-cloud-sleuth源码学习二

原创
作者头像
eeaters
修改2021-10-11 18:06:58
1.5K0
修改2021-10-11 18:06:58
举报
文章被收录于专栏:阿杰

文章分三部分:

- spring-cloud-sleuth快速上手(https://cloud.tencent.com/developer/article/1884423)

- zipkin-brave的demo及源码

- spring-cloud-sleuth源码(https://cloud.tencent.com/developer/article/1886833)

zipkin-brave源码梳理

spring-cloud-sleuth的quick-start 上手极快 ; 但是看代码的时候有点懵逼,所以就先对brave进行梳理,梳理后再看spring-cloud对zipkin的整合,瞬间清晰了

测试类全部依赖于Brave-quickstart, 用于熟悉下api

note: 只是对brave是如何进行日志链路追踪的进行梳理,包括spring-cloud-sleuth的源码也只梳理相关类

Brave-quickstart:

测试的依赖

代码语言:txt
复制
    <properties>
        <spring-cloud.version>2020.0.3</spring-cloud.version>
        <zipkin-reporter.version>2.16.3</zipkin-reporter.version>
    </properties>

     
     <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>

        <dependency>
            <groupId>io.zipkin.reporter2</groupId>
            <artifactId>zipkin-reporter-brave</artifactId>
            <version>${zipkin-reporter.version}</version>
        </dependency>

        <dependency>
            <groupId>io.zipkin.reporter2</groupId>
            <artifactId>zipkin-sender-okhttp3</artifactId>
        </dependency>

测试类

tracker生成类

代码语言:txt
复制
public class GlobalContext {

    static OkHttpSender sender;

    static AsyncZipkinSpanHandler spanHandler;

    static Tracing tracing;

    static{
        //配置一个提交器, 控制提交span
        sender = OkHttpSender.create("http://127.0.0.1:9411/api/v2/spans");
        spanHandler = AsyncZipkinSpanHandler.create(sender);
        tracing = Tracing.newBuilder()
                .localServiceName("trace-demo")
                .addSpanHandler(spanHandler)
                .build();
    }

    public static Tracer tracer() {
        return tracing.tracer();
    }

    public static void close() {
        tracing.close();
        spanHandler.close();
        sender.close();
    }

测试1

代码语言:txt
复制
  Tracer tracer = GlobalContext.tracer();
        //root span
        Span root = tracer.newTrace().name("root");
        logger.info("[root]");

        Span root_sub_first = tracer.newChild(root.context()).name("root_sub_first");
        logger.info("[root_sub_first]");

        Span root_sub_second = tracer.newChild(root.context()).name("root_sub_second").start();
        logger.info("[root_sub_second]");

        Span root_sub_sub_first = tracer.newChild(root_sub_first.context()).name("root_sub_sub_first").start();
        logger.info("[root_sub_sub_first]");

        root_sub_sub_first.finish();
        root_sub_first.finish();
        root_sub_second.finish();
        root.finish();

        GlobalContext.close();
        Thread.currentThread().join();

测试2

代码语言:txt
复制
 private static Logger logger = LoggerFactory.getLogger(TagDemo.class);
    public static void main(String[] args) throws InterruptedException {
//        method1();
        method2();
        Thread.currentThread().join();
    }

    /**
     * 一个简单的全局增长统计
     * @throws InterruptedException
     */
    private static void method2() throws InterruptedException {
        Tracer tracer = GlobalContext.tracer();
        Span start = tracer.nextSpan().name("tag-test-02").start();
        try (Tracer.SpanInScope inScope = tracer.withSpanInScope(start)) {
            logger.info("tag-test-02");
            SUMMARY_TAG.tag(Summarizer.summarizer, start);


            Span span = tracer.nextSpan().name("tag-test-02-sub").start();
            SUMMARY_TAG.tag(Summarizer.summarizer, span);
            logger.info("tag-test-02-sub");
            span.finish();

        }finally {
            start.finish();
        }

    }


    /**
     * 在zipkin客户端可以通过    tagQuery=client/finagle.version   来进行查询的过滤
     *
     * @throws InterruptedException
     */
    private static void method1() throws InterruptedException {
        Tracer tracer = GlobalContext.tracer();
        Span start = tracer.nextSpan().name("tag-test").start();
        try (Tracer.SpanInScope inScope = tracer.withSpanInScope(start)) {
            start.tag("client/finagle.version", "6.36.0");
            logger.info("tag-test");
        }finally {
            start.finish();
        }

    }

    static class Summarizer{
        static Summarizer summarizer = new Summarizer();
        private int sum = 0;

        public int increase() {
            return ++sum;
        }
    }

    static Tag SUMMARY_TAG = new Tag<Summarizer>("abc") {
        @Override
        protected String parseValue(Summarizer input, TraceContext context) {
            return input.increase() + "次";
        }
    };

Tracing

提供链路装配的必要功能,

代码语言:txt
复制
  static final class Default extends Tracing {
    //链路追踪器; Spring中这个是一个单例的bean
    final Tracer tracer;
    //模式是   B3Propagation.FACTORY
    final Propagation.Factory propagationFactory;
    //上一个factory产生的实际传播对象
    final Propagation<String> stringPropagation;
    //默认是: brave.propagation.CurrentTraceContext.Default类, 里面有一个 InheritableThreadLocal 的静态字段
    final CurrentTraceContext currentTraceContext;
    //取样器
    final Sampler sampler;
    final Clock clock;
    //Tag的子类,name=error , 比如异常了把error转成指定格式传给zipkin服务端
    final ErrorParser errorParser;
    final AtomicBoolean noop;
  }

Tracer

链路追踪器, 链路追踪中重要的Span就是由该类来进行创建, 按照官方demo来中使用到#newTracer#nextSpan #newChild(TraceContext)底层全部是通过_toSpan来生成的span

代码语言:txt
复制
public class Tracer {
  //根据不同的jdk版本来获取对应的Clock; 比如
  final Clock clock;
  //传播工厂,生成的传播类在不同的调用中增加调用过程中的附加信息
  final Propagation.Factory propagationFactory;
  //虽然注释用来调用tostring方法的,但也只是在这个类里做这个事情, 这个类是有其他功能的
  final SpanHandler spanHandler; // only for toString
  //所有挂起的span
  final PendingSpans pendingSpans;
  //取样器
  final Sampler sampler;
  //当前追踪的上下文,用于整合其他上下文,如mdc
  final CurrentTraceContext currentTraceContext;
  final boolean traceId128Bit, supportsJoin, alwaysSampleLocal;
  final AtomicBoolean noop;
}

#newTrace()

代码语言:txt
复制
public Span newTrace() {
    return _toSpan(null, newRootContext(0));
}

TraceContext newRootContext(int flags) {
    flags &= ~FLAG_SHARED; // cannot be shared if we aren't reusing the span ID
    // tracer中有一个propagationFactory类,创建上下文的时候可以扩展一下
    return decorateContext(flags, 0L, 0L, 0L, 0L, 0L, Collections.emptyList());
}

newChild(TraceContext)

代码语言:txt
复制
 public Span newChild(TraceContext parent) {
    if (parent == null) throw new NullPointerException("parent == null");
    return _toSpan(parent, decorateContext(parent, parent.spanId()));
  }

nextSpan()

代码语言:txt
复制
 //通过线程变量来获取当前链路追踪的上下文; 如果有就以这个上下文为root创建子span ; 没有就创建一个rootspan再根据这个rootspan创建子span
 public Span nextSpan() {
    TraceContext parent = currentTraceContext.get();
    return parent != null ? newChild(parent) : newTrace();
  }

_toSpan()

代码语言:txt
复制
Span _toSpan(@Nullable TraceContext parent, TraceContext context) {
    if (isNoop(context)) return new NoopSpan(context);
    //pendingSpans携带了所有挂起的span ; getOrCreate或将创建好的span保存到map中去, 并且SpanHandle处理begin信号
    PendingSpan pendingSpan = pendingSpans.getOrCreate(parent, context, false);
    TraceContext pendingContext = pendingSpan.context();
    if (pendingContext != null) context = pendingContext;
    //pendingSpan.state() -> MutableSpan  和  pendingSpans中携带的PendingSpan 是同一个对象
    return new RealSpan(context, pendingSpans, pendingSpan.state(), pendingSpan.clock());
}

TraceContext

链路追踪的上下文,携带了链路追踪定义的信息

代码语言:txt
复制
//@Immutable    
public final class TraceContext extends SamplingFlags { 
    final long traceIdHigh, traceId, localRootId, parentId, spanId;
    final List<Object> extraList;
    TraceContext(
        int flags,
        long traceIdHigh,
        long traceId,
        long localRootId,
        long parentId,
        long spanId,
        //用于扩展时增加附加的信息
        List<Object> extraList
    ) {
        super(flags);
        this.traceIdHigh = traceIdHigh;
        this.traceId = traceId;
        this.localRootId = localRootId;
        this.parentId = parentId;
        this.spanId = spanId;
        this.extraList = extraList;
    }
}

CurrentTraceContext

这个和TraceContext没有继承关系, 是持有TraceContext的地方, 默认的实现类是: Default,通过本地线程变量来存储上下文

代码语言:txt
复制
  public static final class Default extends ThreadLocalCurrentTraceContext {
    // Inheritable as Brave 3's ThreadLocalServerClientAndLocalSpanState was inheritable
    static final InheritableThreadLocal<TraceContext> INHERITABLE = new InheritableThreadLocal<>();
    super{  //父类
          static final ThreadLocal<TraceContext> DEFAULT = new ThreadLocal<>();
          final ThreadLocal<TraceContext> local;    
          final RevertToNullScope revertToNull;     //root - scope
         super{
              //Scope的装饰器
              final ScopeDecorator[] scopeDecorators;
         }
    }
  }

RealSpan

在看源码中Span一直有点梳理不清楚,因为代码中创建的span都是RealSpan, 所以依RealSpan为入口对Span进行了梳理,结论如下:

  • span中数据的存储在MutableSpan中; 如tag/annotate;
  • PendingSpans(继承了WeakConcurrentMap)中包含了所有挂起的MutableSpan,当我们执行start/end/finish/flush时, RealSpan是将命令转发到PendingSpans中进行执行; 这个类里面统一的进行了处理(如:将信息发送到zipkin服务器上)
  • annotate是标注的意思; 标注某一个事件("sr","cr","ss","cs")发生的时间点; 个人理解是一种特殊的tag(理解用的,实际上和tag的存储是分开的) 除此之外,Clock刚开始不知道干嘛用的; 在annotate中也得到了解惑; 就是获取时间点的,
代码语言:txt
复制
final class RealSpan extends Span {
  final TraceContext context;
  final PendingSpans pendingSpans;
  final MutableSpan state;
  final Clock clock;

    /**
    * 个人理解annotate是一种特殊tag的来源方法
    */
  @Override public Span annotate(long timestamp, String value) {
    // Modern instrumentation should not send annotations such as this, but we leniently
    // accept them rather than fail. This for example allows old bridges like to Brave v3 to work
    if ("cs".equals(value)) {
      synchronized (state) {
        state.kind(Span.Kind.CLIENT);
        state.startTimestamp(timestamp);
      }
    } else if ("sr".equals(value)) {
      synchronized (state) {
        state.kind(Span.Kind.SERVER);
        state.startTimestamp(timestamp);
      }
    } else if ("cr".equals(value)) {
      synchronized (state) {
        state.kind(Span.Kind.CLIENT);
      }
      finish(timestamp);
    } else if ("ss".equals(value)) {
      synchronized (state) {
        state.kind(Span.Kind.SERVER);
      }
      finish(timestamp);
    } else {
      synchronized (state) {
        state.annotate(timestamp, value);
      }
    }
    return this;
  }   
}

PendingSpan

字面意思: 挂起的span, 比如start一个span时, PendingSpans中增加PendingSpan对象, 调用end/flush时,移出这个对象

  • 在PendingSpans中有个map是:TraceContext->PendingSpan ;
  • 在一个区域内; RealSpan中有个state对应该类的 span 是同一个对象
代码语言:txt
复制
public final class PendingSpan extends WeakReference<TraceContext> {
  final MutableSpan span;
  final TickClock clock;
  final TraceContext handlerContext;
}

PendingSpans

前面我们看Tracer类的时候也有这个类; 首先Tracer是单例的; 那么单例里面有这么一个对象; 所以这个对象也是单例的; 我们对span执行start/end/flush/finish等操作实际上都是调入到这个类上, 这个类继承了WeakConcurrentMap类,里面有所有PendingSpan 类 比如finish方法会溢出PendingSpan ,然后通过spanHandler将数据提交给了zipkin服务器

代码语言:txt
复制
public final class PendingSpans extends WeakConcurrentMap<TraceContext, PendingSpan> {
  final MutableSpan defaultSpan;
  final Clock clock;
  //brave.Tracing.Builder#build  这个spanHandler是 zipkinSpanReporter 专门用来提交数据的
  final SpanHandler spanHandler;
  final AtomicBoolean noop;
    
 public void finish(TraceContext context, long timestamp) {
    PendingSpan last = remove(context);
    if (last == null) return;
    last.span.finishTimestamp(timestamp != 0L ? timestamp : last.clock.currentTimeMicroseconds());
    spanHandler.end(last.handlerContext, last.span, Cause.FINISHED);
  }
}

MutableSpan

名称上说明该对象是可变的; 外部暴露的span为RealSpan,当调用tag/annotae方法时实际数据保存的地方就是该类

代码语言:txt
复制
public final class MutableSpan implements Cloneable {
      /*
   * One of these objects is allocated for each in-flight span, so we try to be parsimonious on
   * things like array allocation and object reference size.
   */
  String traceId, localRootId, parentId, id;
  Kind kind;
  int flags;
  long startTimestamp, finishTimestamp;
  String name, localServiceName, localIp, remoteServiceName, remoteIp;
  int localPort, remotePort;
  Throwable error;

  //
  // The below use object arrays instead of ArrayList. The intent is not for safe sharing
  // (copy-on-write), as this type is externally synchronized. In other words, this isn't
  // copy-on-write. We just grow arrays as we need to similar to how ArrayList does it.
  //
  // tags [(key, value)] annotations [(timestamp, value)]
  Object[] tags = EMPTY_ARRAY, annotations = EMPTY_ARRAY;
  int tagCount, annotationCount;
}

ZipkinSpanHandler

对应前面的PendingSpans类,这个类里面Reporter见名知意是提交器,通过这个handler来将数据提交到zipkin服务器上

代码语言:txt
复制
public class ZipkinSpanHandler extends SpanHandler implements Closeable { 
    final Reporter<MutableSpan> spanReporter;
    final Tag<Throwable> errorTag; // for toBuilder()
    final boolean alwaysReportSpans;
}

AsyncReporter

只会有一条线程进行发送; 如果线程已经在工作的话会丢入pending队列中之前的线程会处理掉

代码语言:txt
复制
public abstract class AsyncReporter<S> extends Component implements Reporter<S>, Flushable {
    
  //通过内部的Build类可以得到,这个reporter就是该类,
  static final class BoundedAsyncReporter<S> extends AsyncReporter<S> {
    static final Logger logger = Logger.getLogger(BoundedAsyncReporter.class.getName());
     //线程是否已经在工作了;
    final AtomicBoolean started, closed;
    final BytesEncoder<S> encoder;
    //缓冲队列
    final ByteBoundedQueue<S> pending;
    final Sender sender;
    final int messageMaxBytes;
    final long messageTimeoutNanos, closeTimeoutNanos;
    final CountDownLatch close;
    final ReporterMetrics metrics;
    //生成线程的工厂类
    final ThreadFactory threadFactory;

    /** Tracks if we should log the first instance of an exception in flush(). */
    private boolean shouldWarnException = true;
      
    //特意粘贴这个方法,是想解释异步的原因在这个类里面,启动新线程处理
   void startFlusherThread() {
      BufferNextMessage<S> consumer =
          BufferNextMessage.create(encoder.encoding(), messageMaxBytes, messageTimeoutNanos);
      Thread flushThread = threadFactory.newThread(new Flusher<>(this, consumer));
      flushThread.setName("AsyncReporter{" + sender + "}");
      flushThread.setDaemon(true);
      flushThread.start();
    }
  }
}

Sender

发送器,这个类是将数据如何发送到zipkin服务的, rabbitmq/kafka/rest/okhttp等方式, 这个类比较好理解,不同实现也不粘代码了

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章分三部分:
  • zipkin-brave源码梳理
  • Brave-quickstart:
  • 测试的依赖
  • 测试类
    • tracker生成类
      • 测试1
        • 测试2
        • Tracing
        • Tracer
          • #newTrace()
            • newChild(TraceContext)
              • nextSpan()
                • _toSpan()
                • TraceContext
                • CurrentTraceContext
                • RealSpan
                • PendingSpan
                • PendingSpans
                • MutableSpan
                • ZipkinSpanHandler
                • AsyncReporter
                • Sender
                相关产品与服务
                云服务器
                云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档