第十八篇:如何进行生产环境作业监控
你好,欢迎来到第 18 课时,本课时主要讲解如何进行生产环境作业监控。
在第15课时的学习中涉及了。Flink的后台页面是首要考虑的平台以用于识别任务是否遇到反压。该平台能够提供一目了然的任务运行状态信息。
在实际生产环境中使用Flink时,在线生产过程中可以通过该系统的后台管理界面快速完成对Flink JobManager、TaskManager、执行计划以及资源分配情况等关键参数的实时监控与分析,并针对单个Flink任务的具体情况进行问题排查和优化配置。
在许多大型企业和中型企业中,在进行进群作业管理时,则更加注重对作业精细化的实时监控状态。例如,在当前与上期及同期相比的吞吐量指标、整个集群的整体任务运行概况以及集群负载情况等维度进行评估;此时就需要专门针对集群任务作业设计相应的监控系统来分析通过Flink框架实现的数据处理流水线运行状态。
Flink Metrics
就遇到了一个问题,并且我们现在已经实现了Flink Metrics。
Flink Metrics 是 Flink 开发团队打造的一套运行信息采集工具。它不仅能够采集 Flink 提供的核心运行参数包括但不限于 CPU 使用率、内存占用率、线程活跃度等基础运行参数,并且允许配置采集用户自定义的个性化监控参数。
通过使用 Flink Metrics 我们可以轻松地做到:
- 使用Flink实时获取其metric data(即Metrics)或根据需求定制用户所需的关键指标数据;
- 利用Flink提供的REST API获取这些数据,并将其与第三方系统对接以展示。
Flink Metrics 分类
Flink 支持了四个主要的监控度量工具包括以下几种:计数器工具用于记录事件数量、计数值采集器用于获取统计数据、直方图分析工具用于显示数据分布情况以及流量计数器用于追踪网络流量。
Counter
我们称 Counter 为计数器,并主要用作统计工具来计算数据流中的关键指标值。例如统计某数据流的输入量和输出量等关键参数信息。
public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("MyCounter");
}
@Override
public String map(String value) throws Exception {
this.counter.inc();
return value;
}
}
AI助手
Gauge
Gauge 用于测量某个指标在瞬间的数据量。例如,在实时监控 Flink 的某个节点时所关注的内存使用情况以及通过 map 运算产生的输出数据数量。
public class MyMapper extends RichMapFunction<String, String> {
private transient int valueNumber = 0L;
@Override
public void open(Configuration config) {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Long>() {
@Override
public Long getValue() {
return valueNumber;
}
});
}
@Override
public String map(String value) throws Exception {
valueNumber++;
return value;
}
}
AI助手
Meter
Meter 被用来计算一个指标的平均值。
public class MyMapper extends RichMapFunction<Long, Integer> {
private Meter meter;
@Override
public void open(Configuration config) {
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter());
}
@public Integer map(Long value) throws Exception {
this.meter.markEvent();
}
}
AI助手
Histogram
Histogram 被称为直方图,在Flink中与之相关的指标数量有限。该方法主要用于计算数据分布的关键统计参数如最大值、最小值以及中位数等。
public class MyMapper extends RichMapFunction<Long, Integer> {
private Histogram histogram;
@Override
public void open(Configuration config) {
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram());
}
@public Integer map(Long value) throws Exception {
this.histogram.update(value);
}
}
AI助手
这里需要特别强调的是,在Flink系统中设置的Metrics数据呈现出层级化结构特征,并按照某种Group分类方式进行组织存储。为了精确识别并锁定单一的Metrics数据,在系统中我们采用了独特的标识方法即通过Metric Group名称与具体Metric名称相结合的方式进行定位标识。
源码分析
基于 org.apache.flink.metrics.Metric 类的 Flink Metrics 实现采用了该类作为基础构建。整体架构图如下所示:

为提高对 Metrics 的管理与分类效率,
Flink 为 Metrics 分组提供了支持;
该功能具体实现了于下图所示的 MetricGroup 中,
从图中可以看出其子类间的继承关系。

此外还提供了一个方便的对外披露 Metric 监测结果的接口。具体来说,该接口属于 org.apache.flink.metrics.reporter 包,并命名为 MetricReporter。该接口的实现类通过 Metrics 类型进行注册和移除。
public abstract class AbstractReporter implements MetricReporter, CharacterFilter {
protected final Logger log = LoggerFactory.getLogger(this.getClass());
protected final Map<Gauge<?>, String> gauges = new HashMap();
protected final Map<Counter, String> counters = new HashMap();
protected final Map<Histogram, String> histograms = new HashMap();
protected final Map<Meter, String> meters = new HashMap();
public AbstractReporter() {
}
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
String name = group.getMetricIdentifier(metricName, this);
synchronized(this) {
if(metric instanceof Counter) {
this.counters.put((Counter)metric, name);
} else if(metric instanceof Gauge) {
this.gauges.put((Gauge)metric, name);
} else if(metric instanceof Histogram) {
this.histograms.put((Histogram)metric, name);
} else if(metric instanceof Meter) {
this.meters.put((Meter)metric, name);
} else {
this.log.warn("Cannot add unknown metric type {}. This indicates that the reporter does not support this metric type.", metric.getClass().getName());
}
}
}
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
synchronized(this) {
if(metric instanceof Counter) {
this.counters.remove(metric);
} else if(metric instanceof Gauge) {
this.gauges.remove(metric);
} else if(metric instanceof Histogram) {
this.histograms.remove(metric);
} else if(metric instanceof Meter) {
this.meters.remove(metric);
} else {
this.log.warn("Cannot remove unknown metric type {}. This indicates that the reporter does not support this metric type.", metric.getClass().getName());
}
}
}
}
AI助手
获取 Metrics
查看 Metrics 的方式多种多样。首先可以通过 Flink 的后台管理系统查看部分指标;其次可以利用 Flink 提供的 Http 接口获取关于 Flink 任务状态的信息(由于 Flink Http 接口返回的数据均为 Json 格式),便于我们将 Json 数据进行解析处理;最后一种途径是通过 Metric Reporter系统进行数据获取。
Flink HTTP 接口
Flink 涵盖了多种功能模块以辅助管理 Flink 任务的状态信息。所有请求均可通过访问 http://hostname:8081/ 并附加特定 URI 的方式进行查询。该服务端点提供全面的 HTTP 接口供您获取所需信息。
Flink 支持的接口包括:
/config
/overview
/jobs
/joboverview/running
/joboverview/completed
/jobs/<jobid>
/jobs/<jobid>/vertices
/jobs/<jobid>/config
/jobs/<jobid>/exceptions
/jobs/<jobid>/accumulators
/jobs/<jobid>/vertices/<vertexid>
/jobs/<jobid>/vertices/<vertexid>/subtasktimes
/jobs/<jobid>/vertices/<vertexid>/taskmanagers
/jobs/<jobid>/vertices/<vertexid>/accumulators
/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators
/jobs/<jobid>/plan
/jars/upload
/jars
/jars/:jarid
/jars/:jarid/plan
/jars/:jarid/run
AI助手
例如,在集群中运行以下命令可以访问所有任务概览:/joboverview;其结果通常呈现以下形式:
{
"running":[],
"finished":[
{
"jid": "7684be6004e4e955c2a558a9bc463f65",
"name": "Flink Java Job at Wed Sep 16 18:08:21 CEST 2015",
"state": "FINISHED",
"start-time": 1442419702857,
"end-time": 1442419975312,
"duration":272455,
"last-modification": 1442419975312,
"tasks": {
"total": 6,
"pending": 0,
"running": 0,
"finished": 6,
"canceling": 0,
"canceled": 0,
"failed": 0
}
},
{
"jid": "49306f94d0920216b636e8dd503a6409",
"name": "Flink Java Job at Wed Sep 16 18:16:39 CEST 2015",
...
}]
}
AI助手
Flink Reporter
Flink集成了一组内置Reporter功能,这些Reporter均可在Flink官方网站上查阅获取详细信息
例如,在Flink中提供了Graphite、InfluxDB和Prometheus等多种内置Reporter功能。我们能够较为简便地将这些Reporter集成到外部系统中使用,并且其具体配置信息可在Flink官方网站的详细页面查阅。
在此案例中, 我们展示了 Flink 与 InfluxDB、Grafana 的集成应用, 其中特别值得一提的是该系统的任务监控功能. 在这一系统中, InfluxDB 负责记录和存储 Flink 产生的实时数据, 而 Grafana 则承担着将这些数据以直观的方式展示给运维团队的任务.
- InfluxDB 的安装
作为开源时序数据库项目的一部分,在性能和可扩展性方面均有出色表现。
该数据库采用Go语言开发,并专为高效支持时间序列查询和存储功能而设计。
它在企业级系统监控中的时间序列数据库应用中表现突出,
并在物联网设备运行状态及实时采集的数据分析中也被广泛应用。
InfluxDB 的安装步骤相对简单,在此不做详细说明;需要关注的是修改 InuxdB 的配置文件 /etc/inuxdB/inuxdB.conf:为了避免潜在问题,请确保合理配置相关参数设置。
[admin]
enabled = true
bind-address = “:8083”
我们就可以通过 8083 端口打开 InfluxDB 的控制台了。
- Grafana 的安装
安装可以直接点击此处查看官方文档,Grafana 提供了一个简单的配置选项,默认账号为 admin 和密码均为 admin,默认可以通过 3000 端口访问服务。
- 修改 flink-conf.yaml
我们需要在 flink 的配置文件中新增以下配置:
metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
metrics.reporter.influxdb.host: xxx.xxx.xxx.xxx
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink
AI助手
在执行过程中,在完成Java包的复制操作后(即将flink-metrics-influxdb-1.10.0.jar这个包移动至Flink指定的lib目录中),随后启动Flink服务;即可通过Grafana平台实时查看收集到的Metrics数据。
实际上
总结
本次课程重点阐述了 Flink Metrics 指标的分类及其相关特性,并深入剖析了其源码实现机制。同时通过实例展示了如何利用 Flink 监控这些指标的有效途径。在实际应用中建议根据具体情况灵活选择监控方案,并建立基于 Flink 的任务监控系统。
本次课程内容已圆满结束。下期课程中我们将深入探讨Flink在元数据关联方面的具体操作,请期待 next class!
