本文主要研究一下storm的LoggingMetricsConsumer
storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/LoggingMetricsConsumer.java
public class LoggingMetricsConsumer implements IMetricsConsumer {
public static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsConsumer.class);
static private String padding = " ";
@Override
public void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
}
@Override
public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
StringBuilder sb = new StringBuilder();
String header = String.format("%d\t%15s:%-4d\t%3d:%-11s\t",
taskInfo.timestamp,
taskInfo.srcWorkerHost, taskInfo.srcWorkerPort,
taskInfo.srcTaskId,
taskInfo.srcComponentId);
sb.append(header);
for (DataPoint p : dataPoints) {
sb.delete(header.length(), sb.length());
sb.append(p.name)
.append(padding).delete(header.length() + 23, sb.length()).append("\t")
.append(p.value);
LOG.info(sb.toString());
}
}
@Override
public void cleanup() {
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration monitorInterval="60" shutdownHook="disable">
<properties>
<property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property>
<property name="patternNoTime">%msg%n</property>
<property name="patternMetrics">%d %-8r %m%n</property>
</properties>
<appenders>
<RollingFile name="A1"
fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}"
filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.%i.gz">
<PatternLayout>
<pattern>${pattern}</pattern>
</PatternLayout>
<Policies>
<SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
</Policies>
<DefaultRolloverStrategy max="9"/>
</RollingFile>
<RollingFile name="STDOUT"
fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out"
filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out.%i.gz">
<PatternLayout>
<pattern>${patternNoTime}</pattern>
</PatternLayout>
<Policies>
<SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
</Policies>
<DefaultRolloverStrategy max="4"/>
</RollingFile>
<RollingFile name="STDERR"
fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err"
filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err.%i.gz">
<PatternLayout>
<pattern>${patternNoTime}</pattern>
</PatternLayout>
<Policies>
<SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB -->
</Policies>
<DefaultRolloverStrategy max="4"/>
</RollingFile>
<RollingFile name="METRICS"
fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics"
filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics.%i.gz">
<PatternLayout>
<pattern>${patternMetrics}</pattern>
</PatternLayout>
<Policies>
<SizeBasedTriggeringPolicy size="2 MB"/>
</Policies>
<DefaultRolloverStrategy max="9"/>
</RollingFile>
<Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514"
protocol="UDP" appName="[${sys:storm.id}:${sys:worker.port}]" mdcId="mdc" includeMDC="true"
facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}"
messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" immediateFail="true" immediateFlush="true"/>
</appenders>
<loggers>
<root level="info"> <!-- We log everything -->
<appender-ref ref="A1"/>
<appender-ref ref="syslog"/>
</root>
<Logger name="org.apache.storm.metric.LoggingMetricsConsumer" level="info" additivity="false">
<appender-ref ref="METRICS"/>
</Logger>
<Logger name="STDERR" level="INFO">
<appender-ref ref="STDERR"/>
<appender-ref ref="syslog"/>
</Logger>
<Logger name="STDOUT" level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="syslog"/>
</Logger>
</loggers>
</configuration>
conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);
topology.metrics.consumer.register:
- class: "org.apache.storm.metric.LoggingMetricsConsumer"
max.retain.metric.tuples: 100
parallelism.hint: 1
- class: "org.apache.storm.metric.HttpForwardingMetricsConsumer"
parallelism.hint: 1
argument: "http://example.com:8080/metrics/my-topology/"
storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
public class MetricsConsumerBolt implements IBolt {
public static final Logger LOG = LoggerFactory.getLogger(MetricsConsumerBolt.class);
private final int _maxRetainMetricTuples;
private final Predicate<IMetricsConsumer.DataPoint> _filterPredicate;
private final DataPointExpander _expander;
private final BlockingQueue<MetricsTask> _taskQueue;
IMetricsConsumer _metricsConsumer;
String _consumerClassName;
OutputCollector _collector;
Object _registrationArgument;
private Thread _taskExecuteThread;
private volatile boolean _running = true;
public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples,
Predicate<IMetricsConsumer.DataPoint> filterPredicate, DataPointExpander expander) {
_consumerClassName = consumerClassName;
_registrationArgument = registrationArgument;
_maxRetainMetricTuples = maxRetainMetricTuples;
_filterPredicate = filterPredicate;
_expander = expander;
if (_maxRetainMetricTuples > 0) {
_taskQueue = new LinkedBlockingDeque<>(_maxRetainMetricTuples);
} else {
_taskQueue = new LinkedBlockingDeque<>();
}
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
try {
_metricsConsumer = (IMetricsConsumer) Class.forName(_consumerClassName).newInstance();
} catch (Exception e) {
throw new RuntimeException("Could not instantiate a class listed in config under section " +
Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e);
}
_metricsConsumer.prepare(topoConf, _registrationArgument, context, collector);
_collector = collector;
_taskExecuteThread = new Thread(new MetricsHandlerRunnable());
_taskExecuteThread.setDaemon(true);
_taskExecuteThread.start();
}
@Override
public void execute(Tuple input) {
IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo) input.getValue(0);
Collection<IMetricsConsumer.DataPoint> dataPoints = (Collection) input.getValue(1);
Collection<IMetricsConsumer.DataPoint> expandedDataPoints = _expander.expandDataPoints(dataPoints);
List<IMetricsConsumer.DataPoint> filteredDataPoints = getFilteredDataPoints(expandedDataPoints);
MetricsTask metricsTask = new MetricsTask(taskInfo, filteredDataPoints);
while (!_taskQueue.offer(metricsTask)) {
_taskQueue.poll();
}
_collector.ack(input);
}
private List<IMetricsConsumer.DataPoint> getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> dataPoints) {
return Lists.newArrayList(Iterables.filter(dataPoints, _filterPredicate));
}
@Override
public void cleanup() {
_running = false;
_metricsConsumer.cleanup();
_taskExecuteThread.interrupt();
}
static class MetricsTask {
private IMetricsConsumer.TaskInfo taskInfo;
private Collection<IMetricsConsumer.DataPoint> dataPoints;
public MetricsTask(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) {
this.taskInfo = taskInfo;
this.dataPoints = dataPoints;
}
public IMetricsConsumer.TaskInfo getTaskInfo() {
return taskInfo;
}
public Collection<IMetricsConsumer.DataPoint> getDataPoints() {
return dataPoints;
}
}
class MetricsHandlerRunnable implements Runnable {
@Override
public void run() {
while (_running) {
try {
MetricsTask task = _taskQueue.take();
_metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints());
} catch (InterruptedException e) {
break;
} catch (Throwable t) {
LOG.error("Exception occurred during handle metrics", t);
}
}
}
}
}
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
validateBasic(topology);
StormTopology ret = topology.deepCopy();
addAcker(topoConf, ret);
if (hasEventLoggers(topoConf)) {
addEventLogger(topoConf, ret);
}
addMetricComponents(topoConf, ret);
addSystemComponents(topoConf, ret);
addMetricStreams(ret);
addSystemStreams(ret);
validateStructure(ret);
return ret;
}
public static void addMetricComponents(Map<String, Object> conf, StormTopology topology) {
Map<String, Bolt> metricsConsumerBolts = metricsConsumerBoltSpecs(conf, topology);
for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) {
topology.put_to_bolts(entry.getKey(), entry.getValue());
}
}
public static void addMetricStreams(StormTopology topology) {
for (Object component : allComponents(topology).values()) {
ComponentCommon common = getComponentCommon(component);
StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));
common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo);
}
}
public static Map<String, Bolt> metricsConsumerBoltSpecs(Map<String, Object> conf, StormTopology topology) {
Map<String, Bolt> metricsConsumerBolts = new HashMap<>();
Set<String> componentIdsEmitMetrics = new HashSet<>();
componentIdsEmitMetrics.addAll(allComponents(topology).keySet());
componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);
Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
for (String componentId : componentIdsEmitMetrics) {
inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());
}
List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);
if (registerInfo != null) {
Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();
for (Map<String, Object> info : registerInfo) {
String className = (String) info.get(TOPOLOGY_METRICS_CONSUMER_CLASS);
Object argument = info.get(TOPOLOGY_METRICS_CONSUMER_ARGUMENT);
Integer maxRetainMetricTuples = ObjectReader.getInt(info.get(
TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES), 100);
Integer phintNum = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT), 1);
Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
List<String> whitelist = (List<String>) info.get(
TOPOLOGY_METRICS_CONSUMER_WHITELIST);
List<String> blacklist = (List<String>) info.get(
TOPOLOGY_METRICS_CONSUMER_BLACKLIST);
FilterByMetricName filterPredicate = new FilterByMetricName(whitelist, blacklist);
Boolean expandMapType = ObjectReader.getBoolean(info.get(
TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE), false);
String metricNameSeparator = ObjectReader.getString(info.get(
TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR), ".");
DataPointExpander expander = new DataPointExpander(expandMapType, metricNameSeparator);
MetricsConsumerBolt boltInstance = new MetricsConsumerBolt(className, argument,
maxRetainMetricTuples, filterPredicate, expander);
Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,
boltInstance, null, phintNum, metricsConsumerConf);
String id = className;
if (classOccurrencesMap.containsKey(className)) {
// e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
int occurrenceNum = classOccurrencesMap.get(className);
occurrenceNum++;
classOccurrencesMap.put(className, occurrenceNum);
id = Constants.METRICS_COMPONENT_ID_PREFIX + className + "#" + occurrenceNum;
} else {
id = Constants.METRICS_COMPONENT_ID_PREFIX + className;
classOccurrencesMap.put(className, 1);
}
metricsConsumerBolts.put(id, metricsConsumerBolt);
}
}
return metricsConsumerBolts;
}
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java
protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;
protected void setupMetrics() {
for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
StormTimer timerTask = workerData.getUserTimer();
timerTask.scheduleRecurring(interval, interval,
() -> {
TupleImpl tuple =
new TupleImpl(workerTopologyContext, new Values(interval), Constants.SYSTEM_COMPONENT_ID,
(int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
AddressedTuple metricsTickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
try {
receiveQueue.publish(metricsTickTuple);
receiveQueue.flush(); // avoid buffering
} catch (InterruptedException e) {
LOG.warn("Thread interrupted when publishing metrics. Setting interrupt flag.");
Thread.currentThread().interrupt();
return;
}
}
);
}
}
public Map<Integer, Map<Integer, Map<String, IMetric>>> getIntervalToTaskToMetricToRegistry() {
return intervalToTaskToMetricToRegistry;
}
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.java
private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
Map<String, Object> conf = workerData.getConf();
return new TopologyContext(
topology,
workerData.getTopologyConf(),
workerData.getTaskToComponent(),
workerData.getComponentToSortedTasks(),
workerData.getComponentToStreamToFields(),
// This is updated by the Worker and the topology has shared access to it
workerData.getBlobToLastKnownVersion(),
workerData.getTopologyId(),
ConfigUtils.supervisorStormResourcesPath(
ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())),
ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),
taskId,
workerData.getPort(), workerData.getLocalTaskIds(),
workerData.getDefaultSharedResources(),
workerData.getUserSharedResources(),
executor.getSharedExecutorData(),
executor.getIntervalToTaskToMetricToRegistry(),
executor.getOpenOrPrepareWasCalled());
}
storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
private Integer _taskId;
private Map<String, Object> _taskData = new HashMap<>();
private List<ITaskHook> _hooks = new ArrayList<>();
private Map<String, Object> _executorData;
private Map<Integer, Map<Integer, Map<String, IMetric>>> _registeredMetrics;
public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
if (_openOrPrepareWasCalled.get()) {
throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
"IBolt::prepare() or ISpout::open() method.");
}
if (metric == null) {
throw new IllegalArgumentException("Cannot register a null metric");
}
if (timeBucketSizeInSecs <= 0) {
throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs " +
"greater than or equal to 1 second.");
}
if (getRegisteredMetricByName(name) != null) {
throw new RuntimeException("The same metric name `" + name + "` was registered twice.");
}
Map<Integer, Map<Integer, Map<String, IMetric>>> m1 = _registeredMetrics;
if (!m1.containsKey(timeBucketSizeInSecs)) {
m1.put(timeBucketSizeInSecs, new HashMap<Integer, Map<String, IMetric>>());
}
Map<Integer, Map<String, IMetric>> m2 = m1.get(timeBucketSizeInSecs);
if (!m2.containsKey(_taskId)) {
m2.put(_taskId, new HashMap<String, IMetric>());
}
Map<String, IMetric> m3 = m2.get(_taskId);
if (m3.containsKey(name)) {
throw new RuntimeException("The same metric name `" + name + "` was registered twice.");
} else {
m3.put(name, metric);
}
return metric;
}
//......
}
topology.builtin.metrics.bucket.size.secs
)值,在defaults.yaml中默认为60,即Executor每隔60秒发射一次metricsTickTuple,其streamId为Constants.METRICS_TICK_STREAM_IDstorm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java
public void metricsTick(Task task, TupleImpl tuple) {
try {
Integer interval = tuple.getInteger(0);
int taskId = task.getTaskId();
Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = intervalToTaskToMetricToRegistry.get(interval);
Map<String, IMetric> nameToRegistry = null;
if (taskToMetricToRegistry != null) {
nameToRegistry = taskToMetricToRegistry.get(taskId);
}
if (nameToRegistry != null) {
IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
hostname, workerTopologyContext.getThisWorkerPort(),
componentId, taskId, Time.currentTimeSecs(), interval);
List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();
for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) {
IMetric metric = entry.getValue();
Object value = metric.getValueAndReset();
if (value != null) {
IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);
dataPoints.add(dataPoint);
}
}
if (!dataPoints.isEmpty()) {
task.sendUnanchored(Constants.METRICS_STREAM_ID,
new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);
executorTransfer.flush();
}
}
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
}
这个使用的是旧版的metrics,非V2版本
)topology.builtin.metrics.bucket.size.secs
)参数,默认为60,即60秒发射一次metricsTickTuple