还记得这个我们zipkin系列的第一篇文章中提到过的架构图么,服务端组件就这么几个,很简单。
其中稍微有点内涵的也就是collector和storage分别提供了多种不同的实现,collector支持http、rabbitMq、kafka而storage支持内存、mysql、es
现在我们来了解一下服务端的实现。服务端开启自动装配使用的是@EnableZipkinServer
注解
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(InternalZipkinConfiguration.class)public @interface EnableZipkinServer {
}
接着深入InternalZipkinConfiguration
这个类
@Configuration@Import({ ZipkinServerConfiguration.class, TracingConfiguration.class, ZipkinQueryApiV2.class, ZipkinHttpCollector.class, MetricsHealthController.class})public class InternalZipkinConfiguration {}
可以看到这个类一共又引入了这么多的装配类,一个一个来分析吧
这个类中,主要是配置了Zipkin的健康检查、web容器以及、默认的存储的方式的配置
其中健康检查的bean包括以下几个:
ZipkinHealthIndicator
关于SpringBoot中健康检查的原理可以参考这篇文章:SpringBoot健康检查实现原理MetricsHealthController
,其中暴露了两个端口health和metricsWeb容器主要是由这个类负责的:UndertowServletWebServerFactory
InMemoryConfiguration
这个类则是存储的自动装配类,如果没有选择使用MySQL或者ES的话则会使用内存进行存储
Zipkin服务端其实也会存储trace信息,这个时候就需要一些Client的配置。这个类里面就是这些配置 这里就不详细展开了
Zipkin V2版本的查询API,这个API是供Zipkin的UI界面使用的,其中包含这几个接口
Zipkin默认的Collector使用http协议里收集Trace信息,这里就是本文的重点
核心接受请求的方法是handleRequest
public void handleRequest(HttpServerExchange exchange) throws Exception { boolean v2 = exchange.getRelativePath().equals("/api/v2/spans"); boolean v1 = !v2 && exchange.getRelativePath().equals("/api/v1/spans"); if (!v2 && !v1) { next.handleRequest(exchange); return; }
if (!POST.equals(exchange.getRequestMethod())) { next.handleRequest(exchange); return; }
String contentTypeValue = exchange.getRequestHeaders().getFirst(CONTENT_TYPE); boolean json = contentTypeValue == null || contentTypeValue.startsWith("application/json"); boolean thrift = !json && contentTypeValue.startsWith("application/x-thrift"); boolean proto = v2 && !json && contentTypeValue.startsWith("application/x-protobuf"); if (!json && !thrift && !proto) { exchange .setStatusCode(400) .getResponseSender() .send("unsupported content type " + contentTypeValue + "\n"); return; }
HttpCollector collector = v2 ? (json ? JSON_V2 : PROTO3) : thrift ? THRIFT : JSON_V1; metrics.incrementMessages(); exchange.getRequestReceiver().receiveFullBytes(collector, errorCallback); }
这里一共有这么几个逻辑:
默认情况下使用的是异步方式处理数据的,这个处理的实现类是这个AsyncReceiverImpl
在receiveFullBytes
方法中首先会对这次请求的参数进行处理,处理完成之后的json格式如下:
[{ "traceId": "e840c83e65d10358", "id": "e840c83e65d10358", "kind": "SERVER", "name": "get /user/getuser/{id}", "timestamp": 1574174031876439, "duration": 5911123, "localEndpoint": { "serviceName": "consumer-demo-feign", "ipv4": "192.168.0.15" }, "remoteEndpoint": { "ipv6": "::1", "port": 60716 }, "tags": { "http.method": "GET", "http.path": "/user/getUser/3", "mvc.controller.class": "UserController", "mvc.controller.method": "getUser" }}]
当json数据格式化之后,接下来就是具体的处理
public synchronized Call<Void> accept(List<Span> spans) { int delta = spans.size(); int spansToRecover = this.spansByTraceIdTimeStamp.size() + delta - this.maxSpanCount; this.evictToRecoverSpans(spansToRecover); Iterator var4 = spans.iterator();
while(var4.hasNext()) { Span span = (Span)var4.next(); long timestamp = span.timestampAsLong(); String lowTraceId = lowTraceId(span.traceId()); InMemoryStorage.TraceIdTimestamp traceIdTimeStamp = new InMemoryStorage.TraceIdTimestamp(lowTraceId, timestamp); this.spansByTraceIdTimeStamp.put(traceIdTimeStamp, span); this.traceIdToTraceIdTimeStamps.put(lowTraceId, traceIdTimeStamp); ++this.acceptedSpanCount; if (this.searchEnabled) { String spanName = span.name(); if (span.localServiceName() != null) { this.serviceToTraceIds.put(span.localServiceName(), lowTraceId); if (spanName != null) { this.serviceToSpanNames.put(span.localServiceName(), spanName); } }
if (span.remoteServiceName() != null) { this.serviceToTraceIds.put(span.remoteServiceName(), lowTraceId); if (spanName != null) { this.serviceToSpanNames.put(span.remoteServiceName(), spanName); } } } }
return Call.create((Object)null); }
实际上,最终的span数据都是在这么几个对象中存在的
private final InMemoryStorage.SortedMultimap<InMemoryStorage.TraceIdTimestamp, Span> spansByTraceIdTimeStamp;private final InMemoryStorage.SortedMultimap<String, InMemoryStorage.TraceIdTimestamp> traceIdToTraceIdTimeStamps;private final InMemoryStorage.ServiceNameToTraceIds serviceToTraceIds;private final InMemoryStorage.SortedMultimap<String, String> serviceToSpanNames;
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有