序
本文主要研究一下storm的LoggingClusterMetricsConsumer
LoggingClusterMetricsConsumer
storm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/LoggingClusterMetricsConsumer.java
public class LoggingClusterMetricsConsumer implements IClusterMetricsConsumer { public static final Logger LOG = LoggerFactory.getLogger(LoggingClusterMetricsConsumer.class); static private String padding = " "; @Override public void prepare(Object registrationArgument) { } @Override public void handleDataPoints(ClusterInfo clusterInfo, CollectiondataPoints) { StringBuilder sb = new StringBuilder(); String header = String.format("%d\t%15s\t%40s\t", clusterInfo.getTimestamp(), " ", " "); sb.append(header); logDataPoints(dataPoints, sb, header); } @Override public void handleDataPoints(SupervisorInfo supervisorInfo, Collection dataPoints) { StringBuilder sb = new StringBuilder(); String header = String.format("%d\t%15s\t%40s\t", supervisorInfo.getTimestamp(), supervisorInfo.getSrcSupervisorHost(), supervisorInfo.getSrcSupervisorId()); sb.append(header); for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); sb.append(p.getName()) .append(padding).delete(header.length() + 23, sb.length()).append("\t") .append(p.getValue()); LOG.info(sb.toString()); } } @Override public void cleanup() { } private void logDataPoints(Collection dataPoints, StringBuilder sb, String header) { for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); sb.append(p.getName()) .append(padding).delete(header.length() + 23, sb.length()).append("\t") .append(p.getValue()); LOG.info(sb.toString()); } }}
- 这个是cluster级别的metrics consumer,只能在storm.yaml里头配置
- 它的handleDataPoints供ClusterMetricsConsumerExecutor回调
- 这里handleDataPoints仅仅是打印到日志文件
storm.yaml配置
## Cluster Metrics Consumersstorm.cluster.metrics.consumer.register: - class: "org.apache.storm.metric.LoggingClusterMetricsConsumer"# - class: "com.example.demo.metric.FixedLoggingClusterMetricsConsumer"# argument:# - endpoint: "metrics-collector.mycompany.org"#storm.cluster.metrics.consumer.publish.interval.secs: 5
- 这里指定了consumer类为LoggingClusterMetricsConsumer,同时指定了publish interval为5秒
cluster.xml
%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n %d %-8r %m%n ${pattern} ${pattern} ${pattern} ${patternMetrics}
- cluster.xml指定了metrics logging的相关配置,这里使用的是METRICS appender,该appender是一个RollingFile,文件名为${sys:storm.log.dir}/${sys:logfile.name}.metrics,例如nimbus默认的logfile.name为nimbus.log,supervisor默认的logfile.name为supervisor.log,因而写入的文件为nimbus.log.metrics及supervisor.log.metrics
日志输出实例
2018-11-06 07:51:51,488 18628 1541490711supervisors 12018-11-06 07:51:51,488 18628 1541490711 topologies 02018-11-06 07:51:51,489 18629 1541490711 slotsTotal 42018-11-06 07:51:51,489 18629 1541490711 slotsUsed 02018-11-06 07:51:51,489 18629 1541490711 slotsFree 42018-11-06 07:51:51,489 18629 1541490711 executorsTotal 02018-11-06 07:51:51,489 18629 1541490711 tasksTotal 02018-11-06 07:51:51,496 18636 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 slotsTotal 42018-11-06 07:51:51,497 18637 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 slotsUsed 02018-11-06 07:51:51,497 18637 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 totalMem 3072.02018-11-06 07:51:51,497 18637 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 totalCpu 400.02018-11-06 07:51:51,498 18638 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 usedMem 0.02018-11-06 07:51:51,498 18638 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 usedCpu 0.0
ClusterMetricsConsumerExecutor
storm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java
public class ClusterMetricsConsumerExecutor { public static final Logger LOG = LoggerFactory.getLogger(ClusterMetricsConsumerExecutor.class); private static final String ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED = "Preparation of Cluster Metrics Consumer failed. " + "Please check your configuration and/or corresponding systems and relaunch Nimbus. " + "Skipping handle metrics."; private IClusterMetricsConsumer metricsConsumer; private String consumerClassName; private Object registrationArgument; public ClusterMetricsConsumerExecutor(String consumerClassName, Object registrationArgument) { this.consumerClassName = consumerClassName; this.registrationArgument = registrationArgument; } public void prepare() { try { metricsConsumer = (IClusterMetricsConsumer) Class.forName(consumerClassName).newInstance(); metricsConsumer.prepare(registrationArgument); } catch (Exception e) { LOG.error("Could not instantiate or prepare Cluster Metrics Consumer with fully qualified name " + consumerClassName, e); if (metricsConsumer != null) { metricsConsumer.cleanup(); } metricsConsumer = null; } } public void handleDataPoints(final IClusterMetricsConsumer.ClusterInfo clusterInfo, final CollectiondataPoints) { if (metricsConsumer == null) { LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED); return; } try { metricsConsumer.handleDataPoints(clusterInfo, dataPoints); } catch (Throwable e) { LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e); } } public void handleDataPoints(final IClusterMetricsConsumer.SupervisorInfo supervisorInfo, final Collection dataPoints) { if (metricsConsumer == null) { LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED); return; } try { metricsConsumer.handleDataPoints(supervisorInfo, dataPoints); } catch (Throwable e) { LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e); } } public void cleanup() { if (metricsConsumer != null) { metricsConsumer.cleanup(); } }}
- ClusterMetricsConsumerExecutor在prepare的时候,根据consumerClassName来实例化IClusterMetricsConsumer的实现类,之后传入调用metricsConsumer.prepare(registrationArgument)做些准备
- ClusterMetricsConsumerExecutor的handleDataPoints方法实际上是代理了metricsConsumer的handleDataPoints
- 该handleDataPoints方法有两个,他们都有共同的参数dataPoints,另外一个不同的参数,是一个传的是ClusterInfo,一个是SupervisorInfo,分别用于nimbus及supervisor
Nimbus.launchServer
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
public void launchServer() throws Exception { try { BlobStore store = blobStore; IStormClusterState state = stormClusterState; NimbusInfo hpi = nimbusHostPortInfo; LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf)); validator.prepare(conf); //...... timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)), () -> { try { if (isLeader()) { sendClusterMetricsToExecutors(); } } catch (Exception e) { throw new RuntimeException(e); } }); timer.scheduleRecurring(5, 5, clusterMetricSet); } catch (Exception e) { if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) { throw e; } if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) { throw e; } LOG.error("Error on initialization of nimbus", e); Utils.exitProcess(13, "Error on initialization of nimbus"); } } private boolean isLeader() throws Exception { return leaderElector.isLeader(); }
- Nimbus的launchServer方法创建了一个定时任务,如果是leader节点,则调用sendClusterMetricsToExecutors方法发送相关metrics到metrics consumer
- 定时任务的调度时间间隔为DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS(
storm.cluster.metrics.consumer.publish.interval.secs
),在defaults.yaml文件中默认为60,也可以自己在storm.yaml中自己指定 - 除了发送metrics到metrics consumer,它还有一个定时任务,每隔5秒调用一下ClusterSummaryMetricSet这个线程
Nimbus.sendClusterMetricsToExecutors
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
private void sendClusterMetricsToExecutors() throws Exception { ClusterInfo clusterInfo = mkClusterInfo(); ClusterSummary clusterSummary = getClusterInfoImpl(); ListclusterMetrics = extractClusterMetrics(clusterSummary); Map > supervisorMetrics = extractSupervisorMetrics(clusterSummary); for (ClusterMetricsConsumerExecutor consumerExecutor : clusterConsumerExceutors) { consumerExecutor.handleDataPoints(clusterInfo, clusterMetrics); for (Entry > entry : supervisorMetrics.entrySet()) { consumerExecutor.handleDataPoints(entry.getKey(), entry.getValue()); } } }
- nimbus的sendClusterMetricsToExecutors方法通过extractClusterMetrics及extractSupervisorMetrics提取相关metrics,然后调用consumerExecutor.handleDataPoints传递数据
ClusterSummaryMetricSet
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
private class ClusterSummaryMetricSet implements Runnable { private static final int CACHING_WINDOW = 5; private final ClusterSummaryMetrics clusterSummaryMetrics = new ClusterSummaryMetrics(); private final FunctionregisterHistogram = (name) -> { //This histogram reflects the data distribution across only one ClusterSummary, i.e., // data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. // Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); clusterSummaryMetrics.put(name, histogram); return histogram; }; private volatile boolean active = false; //NImbus metrics distribution private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); //Supervisor metrics distribution private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); //Topology metrics distribution private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); private final StormMetricsRegistry metricsRegistry; /** * Constructor to put all items in ClusterSummary in MetricSet as a metric. * All metrics are derived from a cached ClusterSummary object, * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than * reporting interval to avoid outdated reporting. */ ClusterSummaryMetricSet(StormMetricsRegistry metricsRegistry) { this.metricsRegistry = metricsRegistry; //Break the code if out of sync to thrift protocol assert ClusterSummary._Fields.values().length == 3 && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS && ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES && ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; final CachedGauge cachedSummary = new CachedGauge (CACHING_WINDOW, TimeUnit.SECONDS) { @Override protected ClusterSummary loadValue() { try { ClusterSummary newSummary = getClusterInfoImpl(); LOG.debug("The new summary is {}", newSummary); /* * Update histograms based on the new summary. Most common implementation of Reporter reports Gauges before * Histograms. Because DerivativeGauge will trigger cache refresh upon reporter's query, histogram will also be * updated before query */ updateHistogram(newSummary); return newSummary; } catch (Exception e) { LOG.warn("Get cluster info exception.", e); throw new RuntimeException(e); } } }; clusterSummaryMetrics.put("cluster:num-nimbus-leaders", new DerivativeGauge (cachedSummary) { @Override protected Long transform(ClusterSummary clusterSummary) { return clusterSummary.get_nimbuses().stream() .filter(NimbusSummary::is_isLeader) .count(); } }); clusterSummaryMetrics.put("cluster:num-nimbuses", new DerivativeGauge (cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_nimbuses_size(); } }); clusterSummaryMetrics.put("cluster:num-supervisors", new DerivativeGauge (cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors_size(); } }); clusterSummaryMetrics.put("cluster:num-topologies", new DerivativeGauge (cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_topologies_size(); } }); clusterSummaryMetrics.put("cluster:num-total-workers", new DerivativeGauge (cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() .mapToInt(SupervisorSummary::get_num_workers) .sum(); } }); clusterSummaryMetrics.put("cluster:num-total-used-workers", new DerivativeGauge (cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() .mapToInt(SupervisorSummary::get_num_used_workers) .sum(); } }); clusterSummaryMetrics.put("cluster:total-fragmented-memory-non-negative", new DerivativeGauge (cachedSummary) { @Override protected Double transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() //Filtered negative value .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_mem(), 0)) .sum(); } }); clusterSummaryMetrics.put("cluster:total-fragmented-cpu-non-negative", new DerivativeGauge (cachedSummary) { @Override protected Double transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() //Filtered negative value .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_cpu(), 0)) .sum(); } }); } private void updateHistogram(ClusterSummary newSummary) { for (NimbusSummary nimbusSummary : newSummary.get_nimbuses()) { nimbusUptime.update(nimbusSummary.get_uptime_secs()); } for (SupervisorSummary summary : newSummary.get_supervisors()) { supervisorsUptime.update(summary.get_uptime_secs()); supervisorsNumWorkers.update(summary.get_num_workers()); supervisorsNumUsedWorkers.update(summary.get_num_used_workers()); supervisorsUsedMem.update(Math.round(summary.get_used_mem())); supervisorsUsedCpu.update(Math.round(summary.get_used_cpu())); supervisorsFragmentedMem.update(Math.round(summary.get_fragmented_mem())); supervisorsFragmentedCpu.update(Math.round(summary.get_fragmented_cpu())); } for (TopologySummary summary : newSummary.get_topologies()) { topologiesNumTasks.update(summary.get_num_tasks()); topologiesNumExecutors.update(summary.get_num_executors()); topologiesNumWorker.update(summary.get_num_workers()); topologiesUptime.update(summary.get_uptime_secs()); topologiesReplicationCount.update(summary.get_replication_count()); topologiesRequestedMemOnHeap.update(Math.round(summary.get_requested_memonheap())); topologiesRequestedMemOffHeap.update(Math.round(summary.get_requested_memoffheap())); topologiesRequestedCpu.update(Math.round(summary.get_requested_cpu())); topologiesAssignedMemOnHeap.update(Math.round(summary.get_assigned_memonheap())); topologiesAssignedMemOffHeap.update(Math.round(summary.get_assigned_memoffheap())); topologiesAssignedCpu.update(Math.round(summary.get_assigned_cpu())); } } void setActive(final boolean active) { if (this.active != active) { this.active = active; if (active) { metricsRegistry.registerAll(clusterSummaryMetrics); } else { metricsRegistry.removeAll(clusterSummaryMetrics); } } } @Override public void run() { try { setActive(isLeader()); } catch (Exception e) { throw new RuntimeException(e); } } }
- 这个线程主要是调用setActive方法,做的工作的话,就是不断判断节点状态变化,如果leader发生变化,自己是leader则注册clusterSummaryMetrics,如果自己变成不是leader则删除掉clusterSummaryMetrics
- clusterSummaryMetrics添加了cluster:num-nimbus-leaders、cluster:num-nimbuses、cluster:num-supervisors、cluster:num-topologies、cluster:num-total-workers、cluster:num-total-used-workers、cluster:total-fragmented-memory-non-negative、cluster:total-fragmented-cpu-non-negative这几个指标
小结
- LoggingClusterMetricsConsumer消费的是cluster级别的指标,它消费了指标数据,然后打印到日志文件,log4j2的配置读取的是cluster.xml,最后写入的文件是nimbus.log.metrics、supervisor.log.metrics;而LoggingMetricsConsumer是topology的worker级别的,log4j2的配置读取的是worker.xml,最后写入的文件是worker.log.metrics
- Nimbus在launchServer的时候,会建立一个定时任务,在当前节点是leader的情况下,定时发送metrics指标到ClusterMetricsConsumerExecutor,然后间接回调LoggingClusterMetricsConsumer的handleDataPoints方法,把数据打印到日志;LoggingMetricsConsumer采取的是在Executor设置定时任务来发射metricsTickTuple,触发SpoutExecutor以及BoltExecutor发送指标到topology内置的MetricsConsumerBolt,然后MetricsConsumerBolt回调LoggingMetricsConsumer.handleDataPoints方法来消费数据,把数据打印到日志
- handleDataPoints处理两类info,一类是ClusterInfo,一类是SupervisorInfo;这里要注意一下定时任务是在当前节点是leader的情况下才会sendClusterMetricsToExecutors的,正常情况nimbus与supervisor不在同一个节点上,因而supervisor.log.metrics可能是空的
- LoggingMetricsConsumer的实现依赖旧版的IMetric,而LoggingClusterMetricsConsumer则不依赖IMetric,它是从IStormClusterState中获取的指标
- storm 1.2版本引入了基于Dropwizard Metrics的新的指标系统,TopologyContext中返回IMetric的registerMetric在1.2版本已经被标记为Deprecated,因而LoggingMetricsConsumer后续可能需要改造为基于metrics2的MetricRegistry来获取指标