首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊storm的LoggingMetricsConsumer

聊聊storm的LoggingMetricsConsumer

作者头像
code4it
发布于 2018-12-06 07:06:07
发布于 2018-12-06 07:06:07
51900
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下storm的LoggingMetricsConsumer

LoggingMetricsConsumer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/LoggingMetricsConsumer.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class LoggingMetricsConsumer implements IMetricsConsumer {
    public static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsConsumer.class);
    static private String padding = "                       ";

    @Override
    public void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
    }

    @Override
    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
        StringBuilder sb = new StringBuilder();
        String header = String.format("%d\t%15s:%-4d\t%3d:%-11s\t",
                                      taskInfo.timestamp,
                                      taskInfo.srcWorkerHost, taskInfo.srcWorkerPort,
                                      taskInfo.srcTaskId,
                                      taskInfo.srcComponentId);
        sb.append(header);
        for (DataPoint p : dataPoints) {
            sb.delete(header.length(), sb.length());
            sb.append(p.name)
              .append(padding).delete(header.length() + 23, sb.length()).append("\t")
              .append(p.value);
            LOG.info(sb.toString());
        }
    }

    @Override
    public void cleanup() {
    }
}
  • LoggingMetricsConsumer实现了IMetricsConsumer接口,在handleDataPoints方法将taskInfo及dataPoints打印到log;具体打印到哪个log呢,这个需要看storm的log4j2的配置

log4j2/worker.xml

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?xml version="1.0" encoding="UTF-8"?>
<!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
 The ASF licenses this file to You under the Apache License, Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-->

<configuration monitorInterval="60" shutdownHook="disable">
<properties>
    <property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property>
    <property name="patternNoTime">%msg%n</property>
    <property name="patternMetrics">%d %-8r %m%n</property>
</properties>
<appenders>
    <RollingFile name="A1"
        fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}"
        filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.%i.gz">
        <PatternLayout>
            <pattern>${pattern}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <RollingFile name="STDOUT"
        fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out"
        filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out.%i.gz">
        <PatternLayout>
            <pattern>${patternNoTime}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="4"/>
    </RollingFile>
    <RollingFile name="STDERR"
        fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err"
        filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err.%i.gz">
        <PatternLayout>
            <pattern>${patternNoTime}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
        </Policies>
        <DefaultRolloverStrategy max="4"/>
    </RollingFile>
    <RollingFile name="METRICS"
        fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics"
        filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics.%i.gz">
        <PatternLayout>
            <pattern>${patternMetrics}</pattern>
        </PatternLayout>
        <Policies>
            <SizeBasedTriggeringPolicy size="2 MB"/>
        </Policies>
        <DefaultRolloverStrategy max="9"/>
    </RollingFile>
    <Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514"
        protocol="UDP" appName="[${sys:storm.id}:${sys:worker.port}]" mdcId="mdc" includeMDC="true"
        facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}"
        messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" immediateFail="true" immediateFlush="true"/>
</appenders>
<loggers>
    <root level="info"> <!-- We log everything -->
        <appender-ref ref="A1"/>
        <appender-ref ref="syslog"/>
    </root>
    <Logger name="org.apache.storm.metric.LoggingMetricsConsumer" level="info" additivity="false">
        <appender-ref ref="METRICS"/>
    </Logger>
    <Logger name="STDERR" level="INFO">
        <appender-ref ref="STDERR"/>
        <appender-ref ref="syslog"/>
    </Logger>
    <Logger name="STDOUT" level="INFO">
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="syslog"/>
    </Logger>
</loggers>
</configuration>
  • 以worker.xml为例,这里对name为org.apache.storm.metric.LoggingMetricsConsumer的logger配置了info级别的输出,additivity为false
  • METRICS的appender指定了文件名为${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics,比如workers-artifacts/tickDemo-1-1541070680/6700/worker.log.metrics
  • METRCIS配置的是RollingFile,SizeBasedTriggeringPolicy的大小为2MB

配置

topology配置

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
  • 可以在topology提交的时候,在conf注册LoggingMetricsConsumer;这种配置只有该topology的worker生效,即有指标数据的话,会写入topology的worker.log.metrics文件

storm.yaml配置

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
topology.metrics.consumer.register:
  - class: "org.apache.storm.metric.LoggingMetricsConsumer"
    max.retain.metric.tuples: 100
    parallelism.hint: 1
  - class: "org.apache.storm.metric.HttpForwardingMetricsConsumer"
    parallelism.hint: 1
    argument: "http://example.com:8080/metrics/my-topology/"
  • storm.yaml配置是作用于所有的topology,注意这里配置的是topology.metrics.consumer.register,是topology级别的,数据是写入worker.log.metrics文件
  • 如果是cluster级别的话,配置的是storm.cluster.metrics.consumer.register,而且只能使用storm.yaml的配置方式,开启这个的话,有指标数据会写入nimbus.log.metrics以及supervisor.log.metrics文件
  • 启动nimbus以及supervisor采用的log4j配置参数为-Dlog4j.configurationFile=/apache-storm/log4j2/cluster.xml;而启动woker采用的log4j配置参数为-Dlog4j.configurationFile=/apache-storm/log4j2/worker.xml;各个组件的-Dlogfile.name参数分别为nimbus.log、supervisor.log、worker.log

MetricsConsumerBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class MetricsConsumerBolt implements IBolt {
    public static final Logger LOG = LoggerFactory.getLogger(MetricsConsumerBolt.class);
    private final int _maxRetainMetricTuples;
    private final Predicate<IMetricsConsumer.DataPoint> _filterPredicate;
    private final DataPointExpander _expander;
    private final BlockingQueue<MetricsTask> _taskQueue;
    IMetricsConsumer _metricsConsumer;
    String _consumerClassName;
    OutputCollector _collector;
    Object _registrationArgument;
    private Thread _taskExecuteThread;
    private volatile boolean _running = true;

    public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples,
                               Predicate<IMetricsConsumer.DataPoint> filterPredicate, DataPointExpander expander) {

        _consumerClassName = consumerClassName;
        _registrationArgument = registrationArgument;
        _maxRetainMetricTuples = maxRetainMetricTuples;
        _filterPredicate = filterPredicate;
        _expander = expander;

        if (_maxRetainMetricTuples > 0) {
            _taskQueue = new LinkedBlockingDeque<>(_maxRetainMetricTuples);
        } else {
            _taskQueue = new LinkedBlockingDeque<>();
        }
    }

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        try {
            _metricsConsumer = (IMetricsConsumer) Class.forName(_consumerClassName).newInstance();
        } catch (Exception e) {
            throw new RuntimeException("Could not instantiate a class listed in config under section " +
                                       Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e);
        }
        _metricsConsumer.prepare(topoConf, _registrationArgument, context, collector);
        _collector = collector;
        _taskExecuteThread = new Thread(new MetricsHandlerRunnable());
        _taskExecuteThread.setDaemon(true);
        _taskExecuteThread.start();
    }

    @Override
    public void execute(Tuple input) {
        IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo) input.getValue(0);
        Collection<IMetricsConsumer.DataPoint> dataPoints = (Collection) input.getValue(1);
        Collection<IMetricsConsumer.DataPoint> expandedDataPoints = _expander.expandDataPoints(dataPoints);
        List<IMetricsConsumer.DataPoint> filteredDataPoints = getFilteredDataPoints(expandedDataPoints);
        MetricsTask metricsTask = new MetricsTask(taskInfo, filteredDataPoints);

        while (!_taskQueue.offer(metricsTask)) {
            _taskQueue.poll();
        }

        _collector.ack(input);
    }

    private List<IMetricsConsumer.DataPoint> getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> dataPoints) {
        return Lists.newArrayList(Iterables.filter(dataPoints, _filterPredicate));
    }

    @Override
    public void cleanup() {
        _running = false;
        _metricsConsumer.cleanup();
        _taskExecuteThread.interrupt();
    }

    static class MetricsTask {
        private IMetricsConsumer.TaskInfo taskInfo;
        private Collection<IMetricsConsumer.DataPoint> dataPoints;

        public MetricsTask(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) {
            this.taskInfo = taskInfo;
            this.dataPoints = dataPoints;
        }

        public IMetricsConsumer.TaskInfo getTaskInfo() {
            return taskInfo;
        }

        public Collection<IMetricsConsumer.DataPoint> getDataPoints() {
            return dataPoints;
        }
    }

    class MetricsHandlerRunnable implements Runnable {

        @Override
        public void run() {
            while (_running) {
                try {
                    MetricsTask task = _taskQueue.take();
                    _metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints());
                } catch (InterruptedException e) {
                    break;
                } catch (Throwable t) {
                    LOG.error("Exception occurred during handle metrics", t);
                }
            }
        }
    }

}
  • MetricsConsumerBolt在构造器里头创建了_taskQueue,如果_maxRetainMetricTuples大于0,则创建的是有界队列,否则创建的是无界队列;读取的是topology.metrics.consumer.register下面的max.retain.metric.tuples值,读取不到默认为100
  • MetricsConsumerBolt在prepare的时候启动了MetricsHandlerRunnable线程,该线程从_taskQueue取出MetricsTask,然后调用_metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints());
  • MetricsConsumerBolt的execute方法,在接收到tuple的时候,就会往_taskQueue添加数据,如果添加不进去,则poll掉一个再添加

StormCommon.systemTopologyImpl

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
        validateBasic(topology);

        StormTopology ret = topology.deepCopy();
        addAcker(topoConf, ret);
        if (hasEventLoggers(topoConf)) {
            addEventLogger(topoConf, ret);
        }
        addMetricComponents(topoConf, ret);
        addSystemComponents(topoConf, ret);
        addMetricStreams(ret);
        addSystemStreams(ret);

        validateStructure(ret);

        return ret;
    }

    public static void addMetricComponents(Map<String, Object> conf, StormTopology topology) {
        Map<String, Bolt> metricsConsumerBolts = metricsConsumerBoltSpecs(conf, topology);
        for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) {
            topology.put_to_bolts(entry.getKey(), entry.getValue());
        }
    }

    public static void addMetricStreams(StormTopology topology) {
        for (Object component : allComponents(topology).values()) {
            ComponentCommon common = getComponentCommon(component);
            StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));
            common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo);
        }
    }

    public static Map<String, Bolt> metricsConsumerBoltSpecs(Map<String, Object> conf, StormTopology topology) {
        Map<String, Bolt> metricsConsumerBolts = new HashMap<>();

        Set<String> componentIdsEmitMetrics = new HashSet<>();
        componentIdsEmitMetrics.addAll(allComponents(topology).keySet());
        componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);

        Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
        for (String componentId : componentIdsEmitMetrics) {
            inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());
        }

        List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);
        if (registerInfo != null) {
            Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();
            for (Map<String, Object> info : registerInfo) {
                String className = (String) info.get(TOPOLOGY_METRICS_CONSUMER_CLASS);
                Object argument = info.get(TOPOLOGY_METRICS_CONSUMER_ARGUMENT);
                Integer maxRetainMetricTuples = ObjectReader.getInt(info.get(
                    TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES), 100);
                Integer phintNum = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT), 1);
                Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
                metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
                List<String> whitelist = (List<String>) info.get(
                    TOPOLOGY_METRICS_CONSUMER_WHITELIST);
                List<String> blacklist = (List<String>) info.get(
                    TOPOLOGY_METRICS_CONSUMER_BLACKLIST);
                FilterByMetricName filterPredicate = new FilterByMetricName(whitelist, blacklist);
                Boolean expandMapType = ObjectReader.getBoolean(info.get(
                    TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE), false);
                String metricNameSeparator = ObjectReader.getString(info.get(
                    TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR), ".");
                DataPointExpander expander = new DataPointExpander(expandMapType, metricNameSeparator);
                MetricsConsumerBolt boltInstance = new MetricsConsumerBolt(className, argument,
                                                                           maxRetainMetricTuples, filterPredicate, expander);
                Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,
                                                                               boltInstance, null, phintNum, metricsConsumerConf);

                String id = className;
                if (classOccurrencesMap.containsKey(className)) {
                    // e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
                    int occurrenceNum = classOccurrencesMap.get(className);
                    occurrenceNum++;
                    classOccurrencesMap.put(className, occurrenceNum);
                    id = Constants.METRICS_COMPONENT_ID_PREFIX + className + "#" + occurrenceNum;
                } else {
                    id = Constants.METRICS_COMPONENT_ID_PREFIX + className;
                    classOccurrencesMap.put(className, 1);
                }
                metricsConsumerBolts.put(id, metricsConsumerBolt);
            }
        }
        return metricsConsumerBolts;
    }
  • StormCommon在创建systemTopologyImpl的时候,会添加添加一些系统的components,这里就调用了addMetricComponents以及addMetricStreams
  • addMetricComponents根据conf创建MetricsConsumerBolt,并使用shuffle以及Constants.METRICS_STREAM_ID指定所有的component为输入源
  • addMetricStreams给每个component配置了输出数据到Constants.METRICS_STREAM_ID,且输出的字段为Arrays.asList(“task-info”, “data-points”)

Executor.setupMetrics

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;

    protected void setupMetrics() {
        for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
            StormTimer timerTask = workerData.getUserTimer();
            timerTask.scheduleRecurring(interval, interval,
                                        () -> {
                                            TupleImpl tuple =
                                                new TupleImpl(workerTopologyContext, new Values(interval), Constants.SYSTEM_COMPONENT_ID,
                                                              (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
                                            AddressedTuple metricsTickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
                                            try {
                                                receiveQueue.publish(metricsTickTuple);
                                                receiveQueue.flush();  // avoid buffering
                                            } catch (InterruptedException e) {
                                                LOG.warn("Thread interrupted when publishing metrics. Setting interrupt flag.");
                                                Thread.currentThread().interrupt();
                                                return;
                                            }
                                        }
            );
        }
    }

    public Map<Integer, Map<Integer, Map<String, IMetric>>> getIntervalToTaskToMetricToRegistry() {
        return intervalToTaskToMetricToRegistry;
    }
  • Executor在setupMetrics方法里头,设置了定时任务,采用BROADCAST_DEST的方式定时向METRICS_TICK_STREAM_ID发射metricsTickTuple
  • 这里是依据intervalToTaskToMetricToRegistry来配置的,其key为interval
  • intervalToTaskToMetricToRegistry在Executor构造器中初始化:intervalToTaskToMetricToRegistry = new HashMap<>()

Task.mkTopologyContext

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
        Map<String, Object> conf = workerData.getConf();
        return new TopologyContext(
            topology,
            workerData.getTopologyConf(),
            workerData.getTaskToComponent(),
            workerData.getComponentToSortedTasks(),
            workerData.getComponentToStreamToFields(),
            // This is updated by the Worker and the topology has shared access to it
            workerData.getBlobToLastKnownVersion(),
            workerData.getTopologyId(),
            ConfigUtils.supervisorStormResourcesPath(
                ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())),
            ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),
            taskId,
            workerData.getPort(), workerData.getLocalTaskIds(),
            workerData.getDefaultSharedResources(),
            workerData.getUserSharedResources(),
            executor.getSharedExecutorData(),
            executor.getIntervalToTaskToMetricToRegistry(),
            executor.getOpenOrPrepareWasCalled());
    }
  • mkTopologyContext方法在创建TopologyContext的时候,传递进去了executor.getIntervalToTaskToMetricToRegistry()

TopologyContext

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
    private Integer _taskId;
    private Map<String, Object> _taskData = new HashMap<>();
    private List<ITaskHook> _hooks = new ArrayList<>();
    private Map<String, Object> _executorData;
    private Map<Integer, Map<Integer, Map<String, IMetric>>> _registeredMetrics;

    public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
        if (_openOrPrepareWasCalled.get()) {
            throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
                                       "IBolt::prepare() or ISpout::open() method.");
        }

        if (metric == null) {
            throw new IllegalArgumentException("Cannot register a null metric");
        }

        if (timeBucketSizeInSecs <= 0) {
            throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs " +
                                               "greater than or equal to 1 second.");
        }

        if (getRegisteredMetricByName(name) != null) {
            throw new RuntimeException("The same metric name `" + name + "` was registered twice.");
        }

        Map<Integer, Map<Integer, Map<String, IMetric>>> m1 = _registeredMetrics;
        if (!m1.containsKey(timeBucketSizeInSecs)) {
            m1.put(timeBucketSizeInSecs, new HashMap<Integer, Map<String, IMetric>>());
        }

        Map<Integer, Map<String, IMetric>> m2 = m1.get(timeBucketSizeInSecs);
        if (!m2.containsKey(_taskId)) {
            m2.put(_taskId, new HashMap<String, IMetric>());
        }

        Map<String, IMetric> m3 = m2.get(_taskId);
        if (m3.containsKey(name)) {
            throw new RuntimeException("The same metric name `" + name + "` was registered twice.");
        } else {
            m3.put(name, metric);
        }

        return metric;
    }

    //......
}
  • Executor的intervalToTaskToMetricToRegistry最后传递给了TopologyContext的_registeredMetrics
  • registerMetric方法会往_registeredMetrics添加值,其key为timeBucketSizeInSecs
  • 内置metrics的timeBucketSizeInSecs读取的是Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS(topology.builtin.metrics.bucket.size.secs)值,在defaults.yaml中默认为60,即Executor每隔60秒发射一次metricsTickTuple,其streamId为Constants.METRICS_TICK_STREAM_ID

Executor.metricsTick

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public void metricsTick(Task task, TupleImpl tuple) {
        try {
            Integer interval = tuple.getInteger(0);
            int taskId = task.getTaskId();
            Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = intervalToTaskToMetricToRegistry.get(interval);
            Map<String, IMetric> nameToRegistry = null;
            if (taskToMetricToRegistry != null) {
                nameToRegistry = taskToMetricToRegistry.get(taskId);
            }
            if (nameToRegistry != null) {
                IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
                    hostname, workerTopologyContext.getThisWorkerPort(),
                    componentId, taskId, Time.currentTimeSecs(), interval);
                List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();
                for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) {
                    IMetric metric = entry.getValue();
                    Object value = metric.getValueAndReset();
                    if (value != null) {
                        IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);
                        dataPoints.add(dataPoint);
                    }
                }
                if (!dataPoints.isEmpty()) {
                    task.sendUnanchored(Constants.METRICS_STREAM_ID,
                                        new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);
                    executorTransfer.flush();
                }
            }
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }
  • SpoutExecutor以及BoltExecutor在tupleActionFn中接收到streamId为Constants.METRICS_TICK_STREAM_ID的tuple的时候,会调用父类Executor.metricsTick方法
  • metricsTick采用task.sendUnanchored(Constants.METRICS_STREAM_ID, new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);发射数据,发射到Constants.METRICS_STREAM_ID中,values为taskInfo及dataPoints;dataPoints的数据从TopologyContext的_registeredMetrics中读取(这个使用的是旧版的metrics,非V2版本)
  • MetricsConsumerBolt接收到数据之后,就是放入_taskQueue队列;与此同时MetricsHandlerRunnable线程会阻塞从_taskQueue中取数据,然后回调_metricsConsumer.handleDataPoints方法来消费数据

小结

  • LoggingMetricsConsumer是storm metric提供的,metrics2中没有;nimbus及supervisor使用的是-Dlog4j.configurationFile=/apache-storm/log4j2/cluster.xml;worker使用的是-Dlog4j.configurationFile=/apache-storm/log4j2/worker.xml;各个组件的-Dlogfile.name分别为nimbus.log、supervisor.log、worker.log
  • storm在构建topology的时候会添加系统的component,其中就包括添加metricsConsumerBolt以及metricStreams;同时Executor在init方法中会setupMetrics,定时发射metricsTickTuple;SpoutExecutor以及BoltExecutor在tupleActionFn接收到metricsTickTuple的时候,会调用metricsTick方法来生产数据发射到Constants.METRICS_STREAM_ID中,之后MetricsConsumerBolt就可以接收数据,然后回调_metricsConsumer.handleDataPoints方法来消费数据
  • 这里要注意两个参数,一个是MetricsConsumerBolt里头用到的max.retain.metric.tuples,它是配置在topology.metrics.consumer.register下面的,如果没有配置默认为100;它是MetricsConsumerBolt里头_taskQueue队列的大小,如果设置为0,则表示无界;内置metric的interval读取的是Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS(topology.builtin.metrics.bucket.size.secs)参数,默认为60,即60秒发射一次metricsTickTuple

doc

  • Storm Metrics
  • New Metrics Reporting API
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-11-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
高频面试点:Android性能优化之内存优化(下篇)
链接:https://juejin.im/post/5e72b2d151882549236f9cb8
陈宇明
2020/12/16
6590
我的图片四级缓存框架
开发App一定涉及到图片加载、图片处理,那就必须会用到三方的图片框架,要么选择自己封装。至于主流的三方图片框架,就不得不说老牌的ImageLoader、如今更流行的Glide、Picasso和Fresco。但三方的框架本文不会过多介绍。
蜻蜓队长
2018/08/03
8950
我的图片四级缓存框架
picasso图片缓存框架
picasso是Square公司开源的一个Android图形缓存库,地址http://square.github.io/picasso/,可以实现图片下载和缓存功能。 picasso使用简单,如下 Picasso.with(context).load("http://i.imgur.com/DvpvklR.png").into(imageView);   主要有以下一些特性: 在adapter中回收和取消当前的下载; 使用最少的内存完成复杂的图形转换操作; 自动的内存和硬盘缓存; 图形转换操作,如变换
xiangzhihong
2018/01/30
1.9K0
picasso图片缓存框架
ImageLoader
这次做一个图片加载器,里面涉及到线程池,bitmap的高效加载,LruCache,DiskLruCache。接下来我先介绍这四个知识点
提莫队长
2019/02/21
4660
《Android源码设计模式》学习笔记之ImageLoader
需求:设计一个图片加载工具类。 要求:职责单一、可扩展性强、实现三级缓存,遵循开闭原则。
程序员飞飞
2020/02/27
6460
android之listview缓存图片(缓存优化)
网上关于这个方面的文章也不少,基本的思路是线程+缓存来解决。下面提出一些优化: 1、采用线程池 2、内存缓存+文件缓存 3、内存缓存中网上很多是采用SoftReference来防止堆溢出,这儿严格限制只能使用最大JVM内存的1/4 4、对下载的图片进行按比例缩放,以减少内存的消耗 具体的代码里面说明。先放上内存缓存类的代码MemoryCache.java: public class MemoryCache {   private static final String TAG = "MemoryC
xiangzhihong
2018/01/29
1.9K0
Android瀑布流照片墙实现,体验不规则排列的美感
本文讲解了如何利用Android原生开发实现照片墙功能,包括布局、照片显示、图片缓存、滑动优化等方面的具体实现。同时介绍了如何实现查看原图和多点触控缩放的功能。
用户1158055
2018/01/05
3.1K0
Android瀑布流照片墙实现,体验不规则排列的美感
Android-Universal-Image-Loader图片异步加载并缓存
 这个图片异步加载并缓存的类已经被很多开发者所使用,是最常用的几个开源库之一,主流的应用,随便反编译几个火的项目,都可以见到它的身影。        可是有的人并不知道如何去使用这库如何进行配置,网上查到的信息对于刚接触的人来说可能太少了,下面我就把我使用过程中所知道的写了下来,希望可以帮助自己和别人更深入了解这个库的使用和配置。         GITHUB上的下载路径为:https://github.com/nostra13/Android-Universal-Image-Loader ,下载最新的库
xiangzhihong
2018/01/29
1.3K0
Android-Universal-Image-Loader图片异步加载并缓存
android使用LruCache对listview加载图片时候优化处理
注意:LruCache是有版本限制的,低版本的sdk需要在libs文件夹添加相应的support-4v文件。 本文改造的大部分是参考http://www.iteye.com/topic/1118828,感谢。 不废话直接上工程代码,内有关键注释,项目就不上传了,自己对照着上面网址改呗。 首先是Application文件,负责创建图片存储文件夹: public class MyApp extends Application{ @Override public void onCrea
xiangzhihong
2018/01/29
8630
[android] 练习使用ListView(三)
解决OOM和图片乱序问题 package com.android.test; import java.io.InputStream; import java.net.HttpURLConnectio
唯一Chat
2019/09/10
3730
[android] 练习使用ListView(三)
13.缓存、三级缓存、内存溢出、AsyncTask
SharePreference工具类 /** * SharePreference封装 * */ public class PrefUtils { public static final String PREF_NAME = "config"; public static boolean getBoolean(Context ctx, String key, boolean defaultValue) { SharedPreferences sp = ctx.getSharedPre
六月的雨
2018/05/14
1.2K0
使用LRU算法缓存图片,android 3.0
在您的UI中显示单个图片是非常简单的,如果您需要一次显示很多图片就有点复杂了。在很多情况下 (例如使用 ListView, GridView 或者 ViewPager控件), 显示在屏幕上的图片以及即将显示在屏幕上的图片数量是非常大的(例如在图库中浏览大量图片)。 在这些控件中,当一个子控件不显示的时候,系统会重用该控件来循环显示 以便减少对内存的消耗。同时垃圾回收机制还会 释放那些已经载入内存中的Bitmap资源(假设您没有强引用这些Bitmap)。一般来说这样都是不错的,但是在用户来回滑动屏幕
xiangzhihong
2018/01/30
1.1K0
大量图片优化
最近在练习中用GridView加入相册中图片发现加入大量的相片之后,GirdView会变得很卡,想到或许可以用异步加载的方式来解决,但是能力有限,想得到却无法实现。在读了一些大牛的博客和代码之后,终于实现了。 1  在异步加载之前的代码的和普通加载代码一样,只需要在GirdView的Adapter的public View getView(int position, View convertView, ViewGroupparent)方法使用异步加载的方式返回ImageView。 2  如果能把加载过的
xiangzhihong
2018/01/30
9660
Android-Universal-Image-Loader源码分析
ImageLoader 是 android 使用中出现比较早(PS:即的刚接触安卓项目的时候就用的是这个图片加载图,算算已经快5年了),使用最多的一个开源图片加载库了。随着glide , fresco 和 picasso等图片加载的库出现,ImageLoader使用变得越来越少。最近在看其他图片加载库的源码,顺便补补之前错过的一些事情。
静默加载
2020/05/29
1.8K0
Android Universal Image Loader
最近在阅读Coding的安卓客户端源码,因为该源码的图片加载库使用的是universal-image-loader,我以前也使用过,但是没总结过,所以这次好好研究并总结下它的使用方法。其实,这些类库使用起来不会很难,但是很多时候如果之前没有仔细阅读这些类库的相关文档,开发过程中由于时间紧迫常常会因为快速实现功能而没有采用官方推荐的最佳实践,这样对于应用来说其实是不好的。
宅男潇涧
2018/08/01
6450
Android Universal Image Loader
Android开发笔记(七十七)图片缓存算法
由于手机流量有限,又要加快app的运行效率,因此好的app都有做图片缓存。图片缓存说起来简单,做起来就用到很多知识点,可算是集Android技术之大全了。只要理解图片缓存的算法,并加以实践把它做好,我觉得差不多可以懂半个Android的开发。
aqi00
2019/01/18
1.2K0
【Android应用开发】 Universal Image Loader ( 使用简介 | 示例代码解析 )
转载请注明出处 : http://blog.csdn.net/shulianghan/article/details/50824912
韩曙亮
2023/03/27
1.2K0
【Android应用开发】 Universal Image Loader ( 使用简介 | 示例代码解析 )
Android车轮之图片加载框架Android-Universal-Image-Loader
前言:从学习Android已经有十周时间了,之前都在学习PHP脚本语言,曾经还用纯php写了一个小型论坛,虽然不难,即使你用的东西自己同样封装了,但是最终总是感觉不太舒服,后来就用了国内的ThinkPHP框架作为框架学习,然而就慢慢体验到了使用框架的好处,比如优化的程序较好,更容易学习到框架里面不错的知识模块...... 其实Android也是一样的,倘若你开发一个项目的话,一切都从零开始,嘿嘿,那你就可悲╮(╯▽╰)╭,对于开源的东西,学会选择轮子以及会用轮子对于开发项目是非常重要的,接下来介绍的轮子就
AlicFeng
2018/06/08
6720
推荐阅读
相关推荐
高频面试点:Android性能优化之内存优化(下篇)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档