博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊storm的LoggingClusterMetricsConsumer
阅读量:6276 次
发布时间:2019-06-22

本文共 22296 字,大约阅读时间需要 74 分钟。

本文主要研究一下storm的LoggingClusterMetricsConsumer

LoggingClusterMetricsConsumer

storm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/LoggingClusterMetricsConsumer.java

public class LoggingClusterMetricsConsumer implements IClusterMetricsConsumer {    public static final Logger LOG = LoggerFactory.getLogger(LoggingClusterMetricsConsumer.class);    static private String padding = "                       ";    @Override    public void prepare(Object registrationArgument) {    }    @Override    public void handleDataPoints(ClusterInfo clusterInfo, Collection
dataPoints) { StringBuilder sb = new StringBuilder(); String header = String.format("%d\t%15s\t%40s\t", clusterInfo.getTimestamp(), "
", "
"); sb.append(header); logDataPoints(dataPoints, sb, header); } @Override public void handleDataPoints(SupervisorInfo supervisorInfo, Collection
dataPoints) { StringBuilder sb = new StringBuilder(); String header = String.format("%d\t%15s\t%40s\t", supervisorInfo.getTimestamp(), supervisorInfo.getSrcSupervisorHost(), supervisorInfo.getSrcSupervisorId()); sb.append(header); for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); sb.append(p.getName()) .append(padding).delete(header.length() + 23, sb.length()).append("\t") .append(p.getValue()); LOG.info(sb.toString()); } } @Override public void cleanup() { } private void logDataPoints(Collection
dataPoints, StringBuilder sb, String header) { for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); sb.append(p.getName()) .append(padding).delete(header.length() + 23, sb.length()).append("\t") .append(p.getValue()); LOG.info(sb.toString()); } }}
  • 这个是cluster级别的metrics consumer,只能在storm.yaml里头配置
  • 它的handleDataPoints供ClusterMetricsConsumerExecutor回调
  • 这里handleDataPoints仅仅是打印到日志文件

storm.yaml配置

## Cluster Metrics Consumersstorm.cluster.metrics.consumer.register:   - class: "org.apache.storm.metric.LoggingClusterMetricsConsumer"#   - class: "com.example.demo.metric.FixedLoggingClusterMetricsConsumer"#     argument:#       - endpoint: "metrics-collector.mycompany.org"#storm.cluster.metrics.consumer.publish.interval.secs: 5
  • 这里指定了consumer类为LoggingClusterMetricsConsumer,同时指定了publish interval为5秒

cluster.xml

%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n
%d %-8r %m%n
${pattern}
${pattern}
${pattern}
${patternMetrics}
  • cluster.xml指定了metrics logging的相关配置,这里使用的是METRICS appender,该appender是一个RollingFile,文件名为${sys:storm.log.dir}/${sys:logfile.name}.metrics,例如nimbus默认的logfile.name为nimbus.log,supervisor默认的logfile.name为supervisor.log,因而写入的文件为nimbus.log.metrics及supervisor.log.metrics

日志输出实例

2018-11-06 07:51:51,488 18628    1541490711           
supervisors 12018-11-06 07:51:51,488 18628 1541490711
topologies 02018-11-06 07:51:51,489 18629 1541490711
slotsTotal 42018-11-06 07:51:51,489 18629 1541490711
slotsUsed 02018-11-06 07:51:51,489 18629 1541490711
slotsFree 42018-11-06 07:51:51,489 18629 1541490711
executorsTotal 02018-11-06 07:51:51,489 18629 1541490711
tasksTotal 02018-11-06 07:51:51,496 18636 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 slotsTotal 42018-11-06 07:51:51,497 18637 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 slotsUsed 02018-11-06 07:51:51,497 18637 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 totalMem 3072.02018-11-06 07:51:51,497 18637 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 totalCpu 400.02018-11-06 07:51:51,498 18638 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 usedMem 0.02018-11-06 07:51:51,498 18638 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 usedCpu 0.0

ClusterMetricsConsumerExecutor

storm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java

public class ClusterMetricsConsumerExecutor {    public static final Logger LOG = LoggerFactory.getLogger(ClusterMetricsConsumerExecutor.class);    private static final String ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED =        "Preparation of Cluster Metrics Consumer failed. " +        "Please check your configuration and/or corresponding systems and relaunch Nimbus. " +        "Skipping handle metrics.";    private IClusterMetricsConsumer metricsConsumer;    private String consumerClassName;    private Object registrationArgument;    public ClusterMetricsConsumerExecutor(String consumerClassName, Object registrationArgument) {        this.consumerClassName = consumerClassName;        this.registrationArgument = registrationArgument;    }    public void prepare() {        try {            metricsConsumer = (IClusterMetricsConsumer) Class.forName(consumerClassName).newInstance();            metricsConsumer.prepare(registrationArgument);        } catch (Exception e) {            LOG.error("Could not instantiate or prepare Cluster Metrics Consumer with fully qualified name " +                      consumerClassName, e);            if (metricsConsumer != null) {                metricsConsumer.cleanup();            }            metricsConsumer = null;        }    }    public void handleDataPoints(final IClusterMetricsConsumer.ClusterInfo clusterInfo, final Collection
dataPoints) { if (metricsConsumer == null) { LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED); return; } try { metricsConsumer.handleDataPoints(clusterInfo, dataPoints); } catch (Throwable e) { LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e); } } public void handleDataPoints(final IClusterMetricsConsumer.SupervisorInfo supervisorInfo, final Collection
dataPoints) { if (metricsConsumer == null) { LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED); return; } try { metricsConsumer.handleDataPoints(supervisorInfo, dataPoints); } catch (Throwable e) { LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e); } } public void cleanup() { if (metricsConsumer != null) { metricsConsumer.cleanup(); } }}
  • ClusterMetricsConsumerExecutor在prepare的时候,根据consumerClassName来实例化IClusterMetricsConsumer的实现类,之后传入调用metricsConsumer.prepare(registrationArgument)做些准备
  • ClusterMetricsConsumerExecutor的handleDataPoints方法实际上是代理了metricsConsumer的handleDataPoints
  • 该handleDataPoints方法有两个,他们都有共同的参数dataPoints,另外一个不同的参数,是一个传的是ClusterInfo,一个是SupervisorInfo,分别用于nimbus及supervisor

Nimbus.launchServer

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

public void launchServer() throws Exception {        try {            BlobStore store = blobStore;            IStormClusterState state = stormClusterState;            NimbusInfo hpi = nimbusHostPortInfo;            LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf));            validator.prepare(conf);            //......            timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),                                    () -> {                                        try {                                            if (isLeader()) {                                                sendClusterMetricsToExecutors();                                            }                                        } catch (Exception e) {                                            throw new RuntimeException(e);                                        }                                    });                        timer.scheduleRecurring(5, 5, clusterMetricSet);        } catch (Exception e) {            if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {                throw e;            }            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {                throw e;            }            LOG.error("Error on initialization of nimbus", e);            Utils.exitProcess(13, "Error on initialization of nimbus");        }    }    private boolean isLeader() throws Exception {        return leaderElector.isLeader();    }
  • Nimbus的launchServer方法创建了一个定时任务,如果是leader节点,则调用sendClusterMetricsToExecutors方法发送相关metrics到metrics consumer
  • 定时任务的调度时间间隔为DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS(storm.cluster.metrics.consumer.publish.interval.secs),在defaults.yaml文件中默认为60,也可以自己在storm.yaml中自己指定
  • 除了发送metrics到metrics consumer,它还有一个定时任务,每隔5秒调用一下ClusterSummaryMetricSet这个线程

Nimbus.sendClusterMetricsToExecutors

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

private void sendClusterMetricsToExecutors() throws Exception {        ClusterInfo clusterInfo = mkClusterInfo();        ClusterSummary clusterSummary = getClusterInfoImpl();        List
clusterMetrics = extractClusterMetrics(clusterSummary); Map
> supervisorMetrics = extractSupervisorMetrics(clusterSummary); for (ClusterMetricsConsumerExecutor consumerExecutor : clusterConsumerExceutors) { consumerExecutor.handleDataPoints(clusterInfo, clusterMetrics); for (Entry
> entry : supervisorMetrics.entrySet()) { consumerExecutor.handleDataPoints(entry.getKey(), entry.getValue()); } } }
  • nimbus的sendClusterMetricsToExecutors方法通过extractClusterMetrics及extractSupervisorMetrics提取相关metrics,然后调用consumerExecutor.handleDataPoints传递数据

ClusterSummaryMetricSet

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

private class ClusterSummaryMetricSet implements Runnable {        private static final int CACHING_WINDOW = 5;                private final ClusterSummaryMetrics clusterSummaryMetrics = new ClusterSummaryMetrics();                private final Function
registerHistogram = (name) -> { //This histogram reflects the data distribution across only one ClusterSummary, i.e., // data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. // Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); clusterSummaryMetrics.put(name, histogram); return histogram; }; private volatile boolean active = false; //NImbus metrics distribution private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); //Supervisor metrics distribution private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); //Topology metrics distribution private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); private final StormMetricsRegistry metricsRegistry; /** * Constructor to put all items in ClusterSummary in MetricSet as a metric. * All metrics are derived from a cached ClusterSummary object, * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than * reporting interval to avoid outdated reporting. */ ClusterSummaryMetricSet(StormMetricsRegistry metricsRegistry) { this.metricsRegistry = metricsRegistry; //Break the code if out of sync to thrift protocol assert ClusterSummary._Fields.values().length == 3 && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS && ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES && ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; final CachedGauge
cachedSummary = new CachedGauge
(CACHING_WINDOW, TimeUnit.SECONDS) { @Override protected ClusterSummary loadValue() { try { ClusterSummary newSummary = getClusterInfoImpl(); LOG.debug("The new summary is {}", newSummary); /* * Update histograms based on the new summary. Most common implementation of Reporter reports Gauges before * Histograms. Because DerivativeGauge will trigger cache refresh upon reporter's query, histogram will also be * updated before query */ updateHistogram(newSummary); return newSummary; } catch (Exception e) { LOG.warn("Get cluster info exception.", e); throw new RuntimeException(e); } } }; clusterSummaryMetrics.put("cluster:num-nimbus-leaders", new DerivativeGauge
(cachedSummary) { @Override protected Long transform(ClusterSummary clusterSummary) { return clusterSummary.get_nimbuses().stream() .filter(NimbusSummary::is_isLeader) .count(); } }); clusterSummaryMetrics.put("cluster:num-nimbuses", new DerivativeGauge
(cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_nimbuses_size(); } }); clusterSummaryMetrics.put("cluster:num-supervisors", new DerivativeGauge
(cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors_size(); } }); clusterSummaryMetrics.put("cluster:num-topologies", new DerivativeGauge
(cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_topologies_size(); } }); clusterSummaryMetrics.put("cluster:num-total-workers", new DerivativeGauge
(cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() .mapToInt(SupervisorSummary::get_num_workers) .sum(); } }); clusterSummaryMetrics.put("cluster:num-total-used-workers", new DerivativeGauge
(cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() .mapToInt(SupervisorSummary::get_num_used_workers) .sum(); } }); clusterSummaryMetrics.put("cluster:total-fragmented-memory-non-negative", new DerivativeGauge
(cachedSummary) { @Override protected Double transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() //Filtered negative value .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_mem(), 0)) .sum(); } }); clusterSummaryMetrics.put("cluster:total-fragmented-cpu-non-negative", new DerivativeGauge
(cachedSummary) { @Override protected Double transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() //Filtered negative value .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_cpu(), 0)) .sum(); } }); } private void updateHistogram(ClusterSummary newSummary) { for (NimbusSummary nimbusSummary : newSummary.get_nimbuses()) { nimbusUptime.update(nimbusSummary.get_uptime_secs()); } for (SupervisorSummary summary : newSummary.get_supervisors()) { supervisorsUptime.update(summary.get_uptime_secs()); supervisorsNumWorkers.update(summary.get_num_workers()); supervisorsNumUsedWorkers.update(summary.get_num_used_workers()); supervisorsUsedMem.update(Math.round(summary.get_used_mem())); supervisorsUsedCpu.update(Math.round(summary.get_used_cpu())); supervisorsFragmentedMem.update(Math.round(summary.get_fragmented_mem())); supervisorsFragmentedCpu.update(Math.round(summary.get_fragmented_cpu())); } for (TopologySummary summary : newSummary.get_topologies()) { topologiesNumTasks.update(summary.get_num_tasks()); topologiesNumExecutors.update(summary.get_num_executors()); topologiesNumWorker.update(summary.get_num_workers()); topologiesUptime.update(summary.get_uptime_secs()); topologiesReplicationCount.update(summary.get_replication_count()); topologiesRequestedMemOnHeap.update(Math.round(summary.get_requested_memonheap())); topologiesRequestedMemOffHeap.update(Math.round(summary.get_requested_memoffheap())); topologiesRequestedCpu.update(Math.round(summary.get_requested_cpu())); topologiesAssignedMemOnHeap.update(Math.round(summary.get_assigned_memonheap())); topologiesAssignedMemOffHeap.update(Math.round(summary.get_assigned_memoffheap())); topologiesAssignedCpu.update(Math.round(summary.get_assigned_cpu())); } } void setActive(final boolean active) { if (this.active != active) { this.active = active; if (active) { metricsRegistry.registerAll(clusterSummaryMetrics); } else { metricsRegistry.removeAll(clusterSummaryMetrics); } } } @Override public void run() { try { setActive(isLeader()); } catch (Exception e) { throw new RuntimeException(e); } } }
  • 这个线程主要是调用setActive方法,做的工作的话,就是不断判断节点状态变化,如果leader发生变化,自己是leader则注册clusterSummaryMetrics,如果自己变成不是leader则删除掉clusterSummaryMetrics
  • clusterSummaryMetrics添加了cluster:num-nimbus-leaders、cluster:num-nimbuses、cluster:num-supervisors、cluster:num-topologies、cluster:num-total-workers、cluster:num-total-used-workers、cluster:total-fragmented-memory-non-negative、cluster:total-fragmented-cpu-non-negative这几个指标

小结

  • LoggingClusterMetricsConsumer消费的是cluster级别的指标,它消费了指标数据,然后打印到日志文件,log4j2的配置读取的是cluster.xml,最后写入的文件是nimbus.log.metrics、supervisor.log.metrics;而LoggingMetricsConsumer是topology的worker级别的,log4j2的配置读取的是worker.xml,最后写入的文件是worker.log.metrics
  • Nimbus在launchServer的时候,会建立一个定时任务,在当前节点是leader的情况下,定时发送metrics指标到ClusterMetricsConsumerExecutor,然后间接回调LoggingClusterMetricsConsumer的handleDataPoints方法,把数据打印到日志;LoggingMetricsConsumer采取的是在Executor设置定时任务来发射metricsTickTuple,触发SpoutExecutor以及BoltExecutor发送指标到topology内置的MetricsConsumerBolt,然后MetricsConsumerBolt回调LoggingMetricsConsumer.handleDataPoints方法来消费数据,把数据打印到日志
  • handleDataPoints处理两类info,一类是ClusterInfo,一类是SupervisorInfo;这里要注意一下定时任务是在当前节点是leader的情况下才会sendClusterMetricsToExecutors的,正常情况nimbus与supervisor不在同一个节点上,因而supervisor.log.metrics可能是空的
  • LoggingMetricsConsumer的实现依赖旧版的IMetric,而LoggingClusterMetricsConsumer则不依赖IMetric,它是从IStormClusterState中获取的指标
  • storm 1.2版本引入了基于Dropwizard Metrics的新的指标系统,TopologyContext中返回IMetric的registerMetric在1.2版本已经被标记为Deprecated,因而LoggingMetricsConsumer后续可能需要改造为基于metrics2的MetricRegistry来获取指标

doc

转载地址:http://jagpa.baihongyu.com/

你可能感兴趣的文章
【源起Netty 外传】System.getPropert()详解
查看>>
LeetCode 300. Longest Increasing Subsequence
查看>>
Spring Boot快速入门(三):依赖注入
查看>>
ASUS Merlin固件开启JFFS教程
查看>>
JS面向对象之四 【new】 (创建特定对象的语法糖)
查看>>
使用 Nodejs 制作命令行工具
查看>>
Python调用C/C++方式
查看>>
JavaScript中的函数与arguments
查看>>
在vue-cli中组件通信
查看>>
翻译连载 | 附录 C:函数式编程函数库-《JavaScript轻量级函数式编程》 |《你不知道的JS》姊妹篇...
查看>>
【313天】跃迁之路——程序员高效学习方法论探索系列(实验阶段71-2017.12.15)...
查看>>
Linux和Ubuntu的区别与联系
查看>>
【译】Tree-shaking - webpack 2 和 Babel 6
查看>>
开源跨平台移动项目Ngui【Action动作系统】
查看>>
ESP32:mdns协议
查看>>
Docker拥抱k8s早有预兆,Docker现何去何从?
查看>>
PHP 源码探秘 - 为什么 trim 会导致乱码
查看>>
你不知道的CSS(二)
查看>>
Head First Python 学习心得(1-6章)
查看>>
从0实现一个tiny react(一)
查看>>