Advertisement

Introduction to Flink Streaming Platform for Big Data

阅读量:

作者:禅与计算机程序设计艺术

1.简介

Flink 是一个开源分布式流处理框架,它能够实现快速而轻松的数据流处理,并提供了全面的数据处理方案.该框架不仅支持低延迟实时计算能力和高吞吐量的数据传输能力;此外还具备复杂事件处理(CEP)的能力.Flink 在Apache 项目的排名中位列第二,广泛应用于构建各种实时分析系统、实时报告系统以及机器学习平台.近年来,Flink 社区发展迅速,已成为了最热门的大数据分析平台之一;同时,作为一个具有独特优势的开源分布式流处理框架,Flink 在架构设计上也展现了显著的技术亮点.

本教程的主要目标是帮助读者了解Flink是什么,并深入分析其在实时数据处理中的应用和价值。

2.基本概念术语说明

Flink的研究成果进行了详尽阐述,并对其中的核心术语和基本概念进行了深入解析。我们计划系统地向大家介绍这些核心内容。

Stream processing, also known as data stream processing, refers to a computational model based on data streams. Data traverses from the source to the destination, undergoing sequential transformations and successive filtering and conversion processes before producing outputs. Typically, stream processing employs unbounded datasets that continuously incorporate new data. As a result, stream processing systems are designed to manage large volumes of data efficiently.

Dataflow programming model, 知称为流处理编程模型(英语:dataflow programming model),是一种旨在描述和管理数据流处理任务的编程模式。该模式基于离散的数据流动机制设计,并定义了数据在流程中的传输过程。其核心理念是通过定义数据流动来实现任务执行逻辑的变化与优化,在实际应用中通常被应用于构建和运行分布式计算系统. 例如,在Apache Hadoop和Apache Spark等系统中广泛应用。

3.Task scheduling: 任务调度(英语:task scheduling)是负责将作业分配给可用执行资源的一种机制。它使得多个作业在同一时间段内能够并行执行,并从而提高资源利用率

Stateful stream processing: 基于状态的数据流处理(全称为stateful stream processing)是指对具有固定时长且具备确定输入输出关系的数据流进行处理的过程。这种数据流处理方法依赖于对历史数据记录的信息来确保正确执行相关操作。

5.Event time: 即为数据记录的时间戳。在流处理体系中,可定义为数据记录的入队时刻。能够精准地体现事件发生的具体时间节点。从体系结构来看,事件时间构成了流处理的基础抽象概念。

Watermarking, also referred to as watermarks in computing, serves as a crucial mechanism in stream processing. This ensures timely data processing. By establishing a designated deadline or cutoff time, watermarks dictate the latest point at which data must be processed by receivers. Data is required to reach receivers by this specified time.

Timely sending(英语:timely emission)是指在流处理中的一种关键机制。它表明每个数据元素应立即传递给下一层处理单元。及时性传输意味着每个数据元素应立即传递给下一层处理单元。通过这种方式,可以显著缩短数据处理的时间,并降低系统延迟。

数据处理保障(英语:processing guarantees)表明了在系统发生故障或崩溃时所应采取的具体措施。对于需要极低延迟的实时处理场景而言,系统必须具备至少一次处理保证。

Scalability: 可扩展性能(英语:scalability)指系统在数据量增加时处理效率随之提升的能力。Flink支持集群根据需求自动调整规模,应对不同规模的数据量.

10.Fault tolerance: 容错能力(英语:fault tolerance)是指系统能够容忍一定程度的故障。Flink提供了高可用性的部署策略,能够最大限度地防止数据丢失和数据损坏。

3.核心算法原理和具体操作步骤以及数学公式讲解

在阐述Flink的核心算法原理之前,在复习数据分析的基本概念时:

  1. 数据被视为一个持续不断的数据流序列,
  2. 包括实时传感器输出、日志记录以及网络流量等实例;
  3. 这些信息会通过一系列转换流程被提取出来;
  4. 例如,
  5. 我们可以通过机器学习模型对这些信息进行分析,
  6. 并据此预测系统的未来行为;
  7. 最后,
  8. 我们希望实时监控并跟踪这些信息处理的结果

Batch Processing

批处理(Batch processing)是一种一次性完成大规模数据运算的技术。这种技术能够高效地分析海量数据并生成详尽的报告。然而,在实际应用中存在以下两个主要缺陷:

  1. 无法即时反馈用户查询:批处理仅当完成全部处理任务之后才会产生结果。
  2. 效率低下:批处理需耗时较长以完成大规模数据集的处理任务。

Stream Processing

流处理(Stream processing)是一种针对持续数据流进行实时处理的技术体系。该系统能够即时响应用户的查询请求,并且其响应速度远超传统方法,在实时数据分析方面具有显著优势。作为数据分析的重要组成部分之一,在多个领域均展现出广泛的应用潜力:如日志追踪系统可监控并记录服务器运行状态;网络监控平台能实时跟踪并分析网络流量特征;异常检测算法能够及时发现潜在风险;风险评估模型能为决策者提供可靠的数据支持;推荐系统可根据用户行为动态调整个性化服务;公共意见分析工具可挖掘社交媒体中的情感倾向信息;金融市场交易系统则能在第一时间捕捉市场波动趋势

流处理的核心概念在于将海量数据划分为更小、更易管理的数据块。每个数据块仅包含指定时间段内的相关信息,并支持独立运算;整个流程由一系列运算节点构成,在线执行相应操作;各节点间通过消息传递连接,在线完成信息交互。

流处理主要由以下几个部分构成:

  1. 数据源:作为数据分析的基础,在实际应用中起到关键作用。它可以分为两种主要类型:一种是可以立即获取的数据流(如实时传感器输出),另一种则是经过处理后才能使用的离线资料(如历史记录)。其中,实时性较高的来源包括但不限于工业设备运行产生的信号信息、通过网络传输的包报文以及系统运行日志等信息存储结构。

  2. 算子链条: 算子链条是一种流处理的工作流程。它由多个算子构成, 每个负责特定操作。不同环节之间的连接关系决定了数据传输的具体路径。

  3. 数据存储:这是一个中间层结构。它专门负责暂时存放当前处理阶段产生的所有数据。相比主存而言,在规模上稍显较小,在运行速度上却远超主存水平。

  4. 窗口:Window: 窗口代表一个逻辑概念,它将数据流划分为可管理且有限的时间片段.窗宽决定了操作元的数据收集频率,即每隔多长时间会将数据一次性收集在一起.窗宽的目标是减少操作元的工作负担,提高处理效率.

  5. 状态:状态指的是通过持久化存储保存的数据结构形式。该状态下包含已处理的数据信息,并且可以被反复调用或调用多次。其主要功能是为了实现容错机制并支持系统重启。

  6. 数据追踪系统:该系统作为流处理的核心模块,在实时数据传输中发挥着关键作用。它通过精确监测每个分块的数据更新状态,在完成当前传输任务后及时通知相关接收节点后续可用的数据时间窗口。

7.容错机制:该机制旨在应对部分故障情况下的持续运行能力。当错误出现时(而不是发生),系统将自动切换至备用组件(而不是备份组件),从而降低中断的影响(而不是影响)。

Exactly Once and At Least Once Delivery

Flink提供两种类型的流处理保证:

Each message must be processed at least once to ensure that every message is processed a minimum of once, but may also be processed multiple times. This ensures that messages are handled adequately without data loss or duplication issues.

精确一次处理保证(英语:exactly once delivery)指的是每条消息均会被接收且只接收一次。换言之,在一条消息已被正确接收的情况下,则无需对其再次进行接收操作。

这两类措施都与增加系统复杂性相关联;由于它们涉及消息的重复处理而增加了复杂性。这些措施保障了数据既完整又一致。

4.具体代码实例和解释说明

当下,请我们通过实际案例分析探究Flink代码实现数据处理的技术。设想中存在一个实时流处理系统,在该系统下需完成对用户的实时点击日志进行采集与管理,并完成统计用户的点击次数,并完成相应的实时数据分析与汇总过程。

首先,创建一个Flink程序。在pom.xml中添加依赖:

复制代码
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    
    <!-- Add this dependency if you are using the Elasticsearch connector -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

然后,编写程序入口类,创建StreamExecutionEnvironment对象:

复制代码
    import org.apache.flink.api.common.functions.*;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class ClickCount {
    public static void main(String[] args) throws Exception {
    
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    
        // read user clicks from a Kafka topic or other source 
        DataStream<ClickEvent> clickEvents =
                env.addSource(new FlinkKafkaConsumer011<>(
                        "clicks", new ClickEventSchema(),...));
    
        // count clicks per user ID
        DataStream<Tuple2<Long, Integer>> clickCounts = 
                clickEvents.keyBy("userId")
                          .map(new MapFunction<ClickEvent, Tuple2<Long, Integer>>() {
                               @Override
                               public Tuple2<Long, Integer> map(ClickEvent value) throws Exception {
                                   return new Tuple2<>(value.getUserId(), 1);
                               }
                           })
                          .window(TumblingWindowAssigner.of(Duration.minutes(1)))
                          .reduce(new ReduceFunction<Tuple2<Long, Integer>>() {
                               @Override
                               public Tuple2<Long, Integer> reduce(Tuple2<Long, Integer> v1,
                                                                   Tuple2<Long, Integer> v2) throws Exception {
                                   return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
                               }
                           });
    
        // write counts back to an Elasticsearch index
        String host = "...";
        int port = 9200;
        String index = "counts";
        String documentType = "_doc";
        Properties properties = new Properties();
        properties.setProperty("behavior.type", "mapping");
        ElasticsearchSinkBuilder<Tuple2<Long, Integer>> esSink =
            ElasticSearchSink.<Tuple2<Long, Integer>>newBuilder()
                             .setHostAddresses(host + ":" + port)
                             .setDefaultIndex(index)
                             .setDefaultTypeName(documentType)
                             .setBulkFlushMaxActions(1)
                             .setBulkFlushInterval(Duration.seconds(5))
                             .setDocKeyField("userId")
                             .setDocValueFields("count")
                             .setElasticsearchProperties(properties);
    
        clickCounts.addSink(esSink).name("Write to ES").disableChaining();
    
        // execute the program
        env.execute("User Click Count Example");
    }
    }
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

接下来,我们将逐行详细介绍代码:

1.导入必要的包和类:

复制代码
       import java.util.Properties;
    
       import org.apache.flink.api.java.tuple.Tuple2;
       import org.apache.flink.streaming.api.datastream.DataStream;
       import org.apache.flink.streaming.api.datastream.keyby.KeyedStream;
       import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
       import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
       import org.apache.flink.streaming.api.functions.source.SourceFunction;
       import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
       import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
       import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
    
      
      
      
      
      
      
      
      
      
      
    
    代码解读

2.初始化Streaming Environment:

复制代码
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);
    
      
    
    代码解读

设置并行度。这里设置成1是为了方便调试。

3.定义Kafka消费者或其他数据源读取用户点击事件:

复制代码
       DataStream<ClickEvent> clickEvents =
       env.addSource(new FlinkKafkaConsumer011<>(
               "clicks", new ClickEventSchema(), props));
    
      
      
    
    代码解读

使用FlinkKafkaConsumer011来读取数据,props参数用来指定连接配置。

4.按照用户ID进行分组,并计算点击次数:

复制代码
       KeyedStream<ClickEvent, Long> keyedClickEvents = 
       clickEvents.keyBy("userId", "url");
    
       SingleOutputStreamOperator<Tuple2<Long, Integer>> clickCounts = 
       keyedClickEvents
          .map(new MapFunction<ClickEvent, Tuple2<Long, Integer>>() {
               @Override
               public Tuple2<Long, Integer> map(ClickEvent value) throws Exception {
                   return new Tuple2<>(value.getUserId(), 1);
               }
           })
          .window(TumblingWindowAssigner.of(Duration.minutes(1)))
          .reduce(new ReduceFunction<Tuple2<Long, Integer>>() {
               @Override
               public Tuple2<Long, Integer> reduce(Tuple2<Long, Integer> v1,
                                                   Tuple2<Long, Integer> v2) throws Exception {
                   return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
               }
           });
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

基于用户的ID字段以及URL字段进行数据分组。接着利用map函数计算每分钟的点击频率。通过window函数构建每分钟的时间窗,并结合reduce函数对各时间段内的点击数据进行汇总统计。

5.写入Elasticsearch索引:

复制代码
       OutputFormatSinkFunction<Tuple2<Long, Integer>> outputFormatSinkFunction =
          new OutputFormatSinkFunction<>(
                  new ElasticsearchClickCountOutputFormat(
                          host, port, index, documentType), clickCounts.getType());
    
       RequestIndexer indexer =
           (element, requestTracker) -> {
               List<ActionRequest> requests =
                       Collections.singletonList(
                               new IndexRequest(index, documentType, element._1.toString())
                                      .source(XContentFactory.jsonBuilder()
                                                       .startObject()
                                                       .field("userId", element._1)
                                                       .field("count", element._2)
                                                       .endObject()));
               requestTracker.add(requests);
           };
    
       elasticsearchSink.setRestClientFactory(() -> RestClient.builder(HttpHost.create(host)).build());
    
       clickCounts.addSink(outputFormatSinkFunction)
              .setParallelism(1)
              .name("Write to ES")
              .setuid("uid");
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

实现了将数据输出至Elasticsearch的功能。
因为ElasticsearchSinkBase并未提供RequestIndexer接口。
因此我们自行开发了相应的RequestIndexer。
在开发的该接口中,
我们构造了一条插入请求,
并将数据发送至Elasticsearch系统进行处理。

6.执行程序:

复制代码
       env.execute("User Click Count Example");
    
    
    代码解读

全部评论 (0)

还没有任何评论哟~