前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊flink的MetricQueryServiceGateway

聊聊flink的MetricQueryServiceGateway

原创
作者头像
code4it
发布于 2019-03-17 03:55:52
发布于 2019-03-17 03:55:52
48500
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下flink的MetricQueryServiceGateway

MetricQueryServiceGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface MetricQueryServiceGateway {
​
    CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout);
​
    String getAddress();
}
  • MetricQueryServiceGateway定义了两个方法,一个是queryMetrics,一个是getAddress;它有一个实现类为AkkaQueryServiceGateway

AkkaQueryServiceGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class AkkaQueryServiceGateway implements MetricQueryServiceGateway {private final ActorRef queryServiceActorRef;public AkkaQueryServiceGateway(ActorRef queryServiceActorRef) {
        this.queryServiceActorRef = Preconditions.checkNotNull(queryServiceActorRef);
    }
​
    @Override
    public CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout) {
        return FutureUtils.toJava(
            Patterns.ask(queryServiceActorRef, MetricQueryService.getCreateDump(), timeout.toMilliseconds())
                .mapTo(ClassTag$.MODULE$.apply(MetricDumpSerialization.MetricSerializationResult.class))
        );
    }
​
    @Override
    public String getAddress() {
        return queryServiceActorRef.path().toString();
    }
}
  • AkkaQueryServiceGateway实现了MetricQueryServiceGateway接口,它的构造器要求传入queryServiceActorRef;queryMetrics方法ask的消息类型为MetricQueryService.CreateDump;getAddress方法返回的是queryServiceActorRef.path()

MetricQueryService

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class MetricQueryService extends UntypedActor {
    private static final Logger LOG = LoggerFactory.getLogger(MetricQueryService.class);public static final String METRIC_QUERY_SERVICE_NAME = "MetricQueryService";
    private static final String SIZE_EXCEEDED_LOG_TEMPLATE =  "{} will not be reported as the metric dump would exceed the maximum size of {} bytes.";private static final CharacterFilter FILTER = new CharacterFilter() {
        @Override
        public String filterCharacters(String input) {
            return replaceInvalidChars(input);
        }
    };private final MetricDumpSerializer serializer = new MetricDumpSerializer();private final Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
    private final Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
    private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
    private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();private final long messageSizeLimit;//......
​
    @Override
    public void onReceive(Object message) {
        try {
            if (message instanceof AddMetric) {
                AddMetric added = (AddMetric) message;
​
                String metricName = added.metricName;
                Metric metric = added.metric;
                AbstractMetricGroup group = added.group;
​
                QueryScopeInfo info = group.getQueryServiceMetricInfo(FILTER);if (metric instanceof Counter) {
                    counters.put((Counter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
                } else if (metric instanceof Gauge) {
                    gauges.put((Gauge<?>) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
                } else if (metric instanceof Histogram) {
                    histograms.put((Histogram) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
                } else if (metric instanceof Meter) {
                    meters.put((Meter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
                }
            } else if (message instanceof RemoveMetric) {
                Metric metric = (((RemoveMetric) message).metric);
                if (metric instanceof Counter) {
                    this.counters.remove(metric);
                } else if (metric instanceof Gauge) {
                    this.gauges.remove(metric);
                } else if (metric instanceof Histogram) {
                    this.histograms.remove(metric);
                } else if (metric instanceof Meter) {
                    this.meters.remove(metric);
                }
            } else if (message instanceof CreateDump) {
                MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);
​
                dump = enforceSizeLimit(dump);getSender().tell(dump, getSelf());
            } else {
                LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString());
                getSender().tell(new Status.Failure(new IOException("MetricQueryServiceActor received an invalid message. " + message.toString())), getSelf());
            }
        } catch (Exception e) {
            LOG.warn("An exception occurred while processing a message.", e);
        }
    }public static Object getCreateDump() {
        return CreateDump.INSTANCE;
    }private static class CreateDump implements Serializable {
        private static final CreateDump INSTANCE = new CreateDump();
    }
    //......
}
  • MetricQueryService继承了UntypedActor,它的onReceive方法判断message类型,如果为CreateDump的话,则调用MetricDumpSerialization.MetricDumpSerializer.serialize(counters, gauges, histograms, meters)方法来序列化metrics得到MetricDumpSerialization.MetricSerializationResult,然后使用getSender().tell(dump, getSelf())返回数据

MetricDumpSerialization

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class MetricDumpSerialization {
    //......public static class MetricSerializationResult implements Serializable {private static final long serialVersionUID = 6928770855951536906L;public final byte[] serializedCounters;
        public final byte[] serializedGauges;
        public final byte[] serializedMeters;
        public final byte[] serializedHistograms;public final int numCounters;
        public final int numGauges;
        public final int numMeters;
        public final int numHistograms;public MetricSerializationResult(
            byte[] serializedCounters,
            byte[] serializedGauges,
            byte[] serializedMeters,
            byte[] serializedHistograms,
            int numCounters,
            int numGauges,
            int numMeters,
            int numHistograms) {
​
            Preconditions.checkNotNull(serializedCounters);
            Preconditions.checkNotNull(serializedGauges);
            Preconditions.checkNotNull(serializedMeters);
            Preconditions.checkNotNull(serializedHistograms);
            Preconditions.checkArgument(numCounters >= 0);
            Preconditions.checkArgument(numGauges >= 0);
            Preconditions.checkArgument(numMeters >= 0);
            Preconditions.checkArgument(numHistograms >= 0);
            this.serializedCounters = serializedCounters;
            this.serializedGauges = serializedGauges;
            this.serializedMeters = serializedMeters;
            this.serializedHistograms = serializedHistograms;
            this.numCounters = numCounters;
            this.numGauges = numGauges;
            this.numMeters = numMeters;
            this.numHistograms = numHistograms;
        }
    }public static class MetricDumpSerializer {private DataOutputSerializer countersBuffer = new DataOutputSerializer(1024 * 8);
        private DataOutputSerializer gaugesBuffer = new DataOutputSerializer(1024 * 8);
        private DataOutputSerializer metersBuffer = new DataOutputSerializer(1024 * 8);
        private DataOutputSerializer histogramsBuffer = new DataOutputSerializer(1024 * 8);public MetricSerializationResult serialize(
            Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
            Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
            Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
            Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {
​
            countersBuffer.clear();
            int numCounters = 0;
            for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {
                try {
                    serializeCounter(countersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
                    numCounters++;
                } catch (Exception e) {
                    LOG.debug("Failed to serialize counter.", e);
                }
            }
​
            gaugesBuffer.clear();
            int numGauges = 0;
            for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {
                try {
                    serializeGauge(gaugesBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
                    numGauges++;
                } catch (Exception e) {
                    LOG.debug("Failed to serialize gauge.", e);
                }
            }
​
            histogramsBuffer.clear();
            int numHistograms = 0;
            for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {
                try {
                    serializeHistogram(histogramsBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
                    numHistograms++;
                } catch (Exception e) {
                    LOG.debug("Failed to serialize histogram.", e);
                }
            }
​
            metersBuffer.clear();
            int numMeters = 0;
            for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {
                try {
                    serializeMeter(metersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
                    numMeters++;
                } catch (Exception e) {
                    LOG.debug("Failed to serialize meter.", e);
                }
            }return new MetricSerializationResult(
                countersBuffer.getCopyOfBuffer(),
                gaugesBuffer.getCopyOfBuffer(),
                metersBuffer.getCopyOfBuffer(),
                histogramsBuffer.getCopyOfBuffer(),
                numCounters,
                numGauges,
                numMeters,
                numHistograms);
        }public void close() {
            countersBuffer = null;
            gaugesBuffer = null;
            metersBuffer = null;
            histogramsBuffer = null;
        }
    }//......
}
  • MetricDumpSerialization有几个静态类分别是MetricSerializationResult、MetricDumpSerializer、MetricDumpDeserializer;MetricDumpSerializer提供了serialize方法用于将counters、gauges、histograms、meters指标序列化为MetricSerializationResult

小结

  • MetricQueryServiceGateway定义了两个方法,一个是queryMetrics,一个是getAddress;它有一个实现类为AkkaQueryServiceGateway
  • AkkaQueryServiceGateway实现了MetricQueryServiceGateway接口,它的构造器要求传入queryServiceActorRef;queryMetrics方法ask的消息类型为MetricQueryService.CreateDump;getAddress方法返回的是queryServiceActorRef.path()
  • MetricQueryService继承了UntypedActor,它的onReceive方法判断message类型,如果为CreateDump的话,则调用MetricDumpSerialization.MetricDumpSerializer.serialize(counters, gauges, histograms, meters)方法来序列化metrics得到MetricDumpSerialization.MetricSerializationResult,然后使用getSender().tell(dump, getSelf())返回数据;MetricDumpSerialization有几个静态类分别是MetricSerializationResult、MetricDumpSerializer、MetricDumpDeserializer;MetricDumpSerializer提供了serialize方法用于将counters、gauges、histograms、meters指标序列化为MetricSerializationResult

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Flink Metrics&REST API 介绍和原理解析
一个监控系统对于每一个服务和应用基本上都是必不可少的。在 Flink 源码中监控相关功能主要在 flink-metrics 模块中,用于对 Flink 应用进行性能度量。Flink 监控模块使用的是当前比较流行的 metrics-core 库,来自 Coda Hale 的 dropwizard/metrics [1]。dropwizard/metrics 不仅仅在 Flink 项目中使用到,Kafka、Spark 等项目也是用的这个库。Metrics 包含监控的指标(Metric)以及指标如何导出(Reporter)。Metric 为多层树形结构,Metric Group + Metric Name 构成了指标的唯一标识。Reporter 支持上报到 JMX、Influxdb、Prometheus 等时序数据库。Flink 监控模块具体的使用配置可以在 flink-core 模块的 org.apache.flink.configuration.MetricOptions 中找到。
吴云涛
2022/04/07
4.5K2
Flink Metrics&REST API 介绍和原理解析
Spark metrics实现KafkaSink
监控是Spark非常重要的一部分。Spark的运行情况是由ListenerBus以及MetricsSystem 来完成的。通过Spark的Metrics系统,我们可以把Spark Metrics的收集到的信息发送到各种各样的Sink,比如HTTP、JMX以及CSV文件。 目前支持的Sink包括:
UFO
2018/08/29
1.2K0
聊聊flink的Async I/O
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
code4it
2019/01/19
3.5K0
聊聊flink的Async I/O
metrics小常识
Metrics,我们听到的太多了,熟悉大数据系统的不可能没听说过metrics,当我们需要为某个系统某个服务做监控、做统计,就需要用到Metrics。
全栈程序员站长
2022/11/17
4630
springboot2上报metrics到statsd
micrometer-registry-statsd-1.0.1-sources.jar!/io/micrometer/statsd/StatsdFlavor.java
code4it
2018/09/17
1.3K0
写给大忙人看的 Flink Operator State 的存储方式
最近有幸在做面试官,在面试的过程中发现很多面试者都知道 Key State 会存入 RockDB (如果设置 StateBackend 为 RockDBStateBackend ),却也同样认为 Operator State 也会存入 RockDB。其中包括一些看过这部分源码的或者已经在发布一些课程的人。
shengjk1
2020/04/08
1.1K0
聊聊flink的RpcServer
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
code4it
2019/03/14
9100
聊聊flink的RpcServer
聊聊flink的JobManagerGateway
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
code4it
2019/03/18
7830
聊聊flink的JobManagerGateway
聊聊flink的RestartStrategies
flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
code4it
2019/02/11
9940
聊聊flink的RestartStrategies
聊聊artemis的ActiveMQMetricsPlugin
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/ActiveMQMetricsPlugin.java
code4it
2020/02/24
4330
聊聊flink的StateDescriptor
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/RuntimeContext.java
code4it
2018/12/23
1.5K0
聊聊flink的StateDescriptor
聊聊flink的OperatorStateBackend
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStateBackend.java
code4it
2018/12/11
1.2K0
聊聊flink的OperatorStateBackend
Flink Metrics&REST API 介绍和原理解析
作者:吴云涛,腾讯 CSIG 高级工程师 一个监控系统对于每一个服务和应用基本上都是必不可少的。在 Flink 源码中监控相关功能主要在 flink-metrics 模块中,用于对 Flink 应用进行性能度量。Flink 监控模块使用的是当前比较流行的 metrics-core 库,来自 Coda Hale 的 dropwizard/metrics [1]。dropwizard/metrics 不仅仅在 Flink 项目中使用到,Kafka、Spark 等项目也是用的这个库。Metrics 包含监控的指标
腾讯云大数据
2022/05/07
1K0
Flink Metrics&REST API 介绍和原理解析
聊聊flink的Allowed Lateness
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/WindowedStream.java
code4it
2019/01/08
2K0
聊聊flink的Allowed Lateness
聊聊flink的Broadcast State
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MapStateDescriptor.java
code4it
2018/12/26
2.4K0
聊聊flink的Broadcast State
Spring接入Metric+Graphite+Grafana搭建监控系统
这是一个开源的监控系统,我们这里只是存储数据使用 可以使用docker安装,然后去8880端口进行访问,你也可以自定义端口在命令中的 -p 默认账号密码都是 root
才疏学浅的木子
2023/11/18
3500
Spring接入Metric+Graphite+Grafana搭建监控系统
聊聊flink的InputFormatSourceFunction
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
code4it
2018/11/29
1.5K0
聊聊flink的InputFormatSourceFunction
聊聊flink KeyedStream的intervalJoin操作
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
code4it
2019/01/11
1.3K0
聊聊flink KeyedStream的intervalJoin操作
聊聊flink的StateTtlConfig
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/StateTtlConfig.java
code4it
2018/12/24
9440
聊聊flink的StateTtlConfig
使用Metrics.NET 构建 ASP.NET MVC 应用程序的性能指标
通常我们需要监测ASP.NET MVC 或 Web API 的应用程序的性能时,通常采用的是自定义性能计数器,性能计数器会引发无休止的运维问题(损坏的计数器、权限问题等)。这篇文章向你介绍一个新的替代性能计数器的工具Metrics.NET,因为是它是内部的,所以我们能够向系统中添加更多更有意义的度量标准。 Metrics.NET(https://github.com/etishor/Metrics.NET)是一个给CLR 提供度量工具的包,它是移植自Java的metrics,支持的平台 .NET 4.5.1
张善友
2018/01/29
1K0
相关推荐
Flink Metrics&REST API 介绍和原理解析
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验