前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊skywalking的metric-exporter

聊聊skywalking的metric-exporter

作者头像
code4it
发布2020-03-30 21:58:32
1.5K0
发布2020-03-30 21:58:32
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下skywalking的metric-exporter

metric-exporter.proto

skywalking-6.6.0/oap-server/exporter/src/main/proto/metric-exporter.proto

代码语言:javascript
复制
syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.apache.skywalking.oap.server.exporter.grpc";


service MetricExportService {
    rpc export (stream ExportMetricValue) returns (ExportResponse) {
    }

    rpc subscription (SubscriptionReq) returns (SubscriptionsResp) {
    }
}

message ExportMetricValue {
    string metricName = 1;
    string entityName = 2;
    string entityId = 3;
    ValueType type = 4;
    int64 timeBucket = 5;
    int64 longValue = 6;
    double doubleValue = 7;
}

message SubscriptionsResp {
    repeated string metricNames = 1;
}

enum ValueType {
    LONG = 0;
    DOUBLE = 1;
}

message SubscriptionReq {

}

message ExportResponse {
}
  • metric-exporter.proto定义了MetricExportService服务,它有export、subscription两个rpc方法

GRPCExporterSetting

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java

代码语言:javascript
复制
@Setter
@Getter
public class GRPCExporterSetting extends ModuleConfig {
    private String targetHost;
    private int targetPort;
    private int bufferChannelSize = 20000;
    private int bufferChannelNum = 2;
}
  • GRPCExporterSetting定义了targetHost、targetPort、bufferChannelSize、bufferChannelNum属性

GRPCExporterProvider

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java

代码语言:javascript
复制
public class GRPCExporterProvider extends ModuleProvider {
    private GRPCExporterSetting setting;
    private GRPCExporter exporter;

    @Override public String name() {
        return "grpc";
    }

    @Override public Class<? extends ModuleDefine> module() {
        return ExporterModule.class;
    }

    @Override public ModuleConfig createConfigBeanIfAbsent() {
        setting = new GRPCExporterSetting();
        return setting;
    }

    @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
        exporter = new GRPCExporter(setting);
        this.registerServiceImplementation(MetricValuesExportService.class, exporter);
    }

    @Override public void start() throws ServiceNotProvidedException, ModuleStartException {

    }

    @Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
        ModuleServiceHolder serviceHolder = getManager().find(CoreModule.NAME).provider();
        exporter.setServiceInventoryCache(serviceHolder.getService(ServiceInventoryCache.class));
        exporter.setServiceInstanceInventoryCache(serviceHolder.getService(ServiceInstanceInventoryCache.class));
        exporter.setEndpointInventoryCache(serviceHolder.getService(EndpointInventoryCache.class));

        exporter.initSubscriptionList();
    }

    @Override public String[] requiredModules() {
        return new String[] {CoreModule.NAME};
    }
}
  • GRPCExporterProvider继承了ModuleProvider,其prepare方法创建GRPCExporter,然后执行registerServiceImplementation;其notifyAfterCompleted方法主要是给exporter设置serviceInventoryCache、serviceInstanceInventoryCache、endpointInventoryCache,然后执行exporter.initSubscriptionList()

MetricFormatter

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/MetricFormatter.java

代码语言:javascript
复制
@Setter
public class MetricFormatter {
    private ServiceInventoryCache serviceInventoryCache;
    private ServiceInstanceInventoryCache serviceInstanceInventoryCache;
    private EndpointInventoryCache endpointInventoryCache;

    protected String getEntityName(MetricsMetaInfo meta) {
        int scope = meta.getScope();
        if (DefaultScopeDefine.inServiceCatalog(scope)) {
            int entityId = Integer.valueOf(meta.getId());
            return serviceInventoryCache.get(entityId).getName();
        } else if (DefaultScopeDefine.inServiceInstanceCatalog(scope)) {
            int entityId = Integer.valueOf(meta.getId());
            return serviceInstanceInventoryCache.get(entityId).getName();
        } else if (DefaultScopeDefine.inEndpointCatalog(scope)) {
            int entityId = Integer.valueOf(meta.getId());
            return endpointInventoryCache.get(entityId).getName();
        } else if (scope == DefaultScopeDefine.ALL) {
            return "";
        } else {
            return null;
        }
    }
}
  • MetricFormatter提供了getEntityName方法,用于从MetricsMetaInfo提取entityName

MetricValuesExportService

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java

代码语言:javascript
复制
public interface MetricValuesExportService extends Service {
    /**
     * This method is sync-mode export, the performance effects the persistence result. Queue mode is highly recommended.
     *
     * @param event value is only accurate when the method invokes. Don't cache it.
     */
    void export(ExportEvent event);
}
  • MetricValuesExportService继承了Service,它定义了export方法

GRPCExporter

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java

代码语言:javascript
复制
public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<GRPCExporter.ExportData> {
    private static final Logger logger = LoggerFactory.getLogger(GRPCExporter.class);

    private GRPCExporterSetting setting;
    private final MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
    private final MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub;
    private final DataCarrier exportBuffer;
    private final Set<String> subscriptionSet;

    public GRPCExporter(GRPCExporterSetting setting) {
        this.setting = setting;
        GRPCClient client = new GRPCClient(setting.getTargetHost(), setting.getTargetPort());
        client.connect();
        ManagedChannel channel = client.getChannel();
        exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel);
        blockingStub = MetricExportServiceGrpc.newBlockingStub(channel);
        exportBuffer = new DataCarrier<ExportData>(setting.getBufferChannelNum(), setting.getBufferChannelSize());
        exportBuffer.consume(this, 1, 200);
        subscriptionSet = new HashSet<>();
    }

    @Override public void export(ExportEvent event) {
        if (ExportEvent.EventType.TOTAL == event.getType()) {
            Metrics metrics = event.getMetrics();
            if (metrics instanceof WithMetadata) {
                MetricsMetaInfo meta = ((WithMetadata)metrics).getMeta();
                if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) {
                    exportBuffer.produce(new ExportData(meta, metrics));
                }
            }
        }
    }

    public void initSubscriptionList() {
        SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).subscription(SubscriptionReq.newBuilder().build());
        subscription.getMetricNamesList().forEach(subscriptionSet::add);
        logger.debug("Get exporter subscription list, {}", subscriptionSet);
    }

    @Override public void init() {

    }

    @Override public void consume(List<ExportData> data) {
        if (data.size() == 0) {
            return;
        }

        ExportStatus status = new ExportStatus();
        StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS).export(
            new StreamObserver<ExportResponse>() {
                @Override public void onNext(ExportResponse response) {

                }

                @Override public void onError(Throwable throwable) {
                    status.done();
                }

                @Override public void onCompleted() {
                    status.done();
                }
            }
        );
        AtomicInteger exportNum = new AtomicInteger();
        data.forEach(row -> {
            ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();

            Metrics metrics = row.getMetrics();
            if (metrics instanceof LongValueHolder) {
                long value = ((LongValueHolder)metrics).getValue();
                builder.setLongValue(value);
                builder.setType(ValueType.LONG);
            } else if (metrics instanceof IntValueHolder) {
                long value = ((IntValueHolder)metrics).getValue();
                builder.setLongValue(value);
                builder.setType(ValueType.LONG);
            } else if (metrics instanceof DoubleValueHolder) {
                double value = ((DoubleValueHolder)metrics).getValue();
                builder.setDoubleValue(value);
                builder.setType(ValueType.DOUBLE);
            } else {
                return;
            }

            MetricsMetaInfo meta = row.getMeta();
            builder.setMetricName(meta.getMetricsName());
            String entityName = getEntityName(meta);
            if (entityName == null) {
                return;
            }
            builder.setEntityName(entityName);
            builder.setEntityId(meta.getId());

            builder.setTimeBucket(metrics.getTimeBucket());

            streamObserver.onNext(builder.build());
            exportNum.getAndIncrement();
        });

        streamObserver.onCompleted();

        long sleepTime = 0;
        long cycle = 100L;
        /**
         * For memory safe of oap, we must wait for the peer confirmation.
         */
        while (!status.isDone()) {
            try {
                sleepTime += cycle;
                Thread.sleep(cycle);
            } catch (InterruptedException e) {
            }

            if (sleepTime > 2000L) {
                logger.warn("Export {} metrics to {}:{}, wait {} milliseconds.",
                    exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime);
                cycle = 2000L;
            }
        }

        logger.debug("Exported {} metrics to {}:{} in {} milliseconds.",
            exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime);
    }

    @Override public void onError(List<ExportData> data, Throwable t) {
        logger.error(t.getMessage(), t);
    }

    @Override public void onExit() {

    }

    @Getter(AccessLevel.PRIVATE)
    public class ExportData {
        private MetricsMetaInfo meta;
        private Metrics metrics;

        public ExportData(MetricsMetaInfo meta, Metrics metrics) {
            this.meta = meta;
            this.metrics = metrics;
        }
    }

    private class ExportStatus {
        private boolean done = false;

        private void done() {
            done = true;
        }

        public boolean isDone() {
            return done;
        }
    }
}
  • GRPCExporter继承了MetricFormatter,实现了MetricValuesExportService、IConsumer接口;其构造器根据GRPCExporterSetting实例化MetricExportServiceGrpc.MetricExportServiceStub以及MetricExportServiceGrpc.MetricExportServiceBlockingStub,并创建DataCarrier,然后注册自身的IConsumer到exportBuffer;其export方法主要是执行exportBuffer.produce(new ExportData(meta, metrics));其consume方法主要是构造ExportMetricValue,然后执行streamObserver.onNext

小结

metric-exporter.proto定义了MetricExportService服务,它有export、subscription两个rpc方法;GRPCExporterProvider继承了ModuleProvider,其prepare方法创建GRPCExporter,然后执行registerServiceImplementation;其notifyAfterCompleted方法主要是给exporter设置serviceInventoryCache、serviceInstanceInventoryCache、endpointInventoryCache,然后执行exporter.initSubscriptionList()

doc

  • metric-exporter
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-03-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • metric-exporter.proto
  • GRPCExporterSetting
  • GRPCExporterProvider
  • MetricFormatter
  • MetricValuesExportService
  • GRPCExporter
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档