Advertisement

Flink实时数仓项目—业务数据采集

阅读量:

Flink实时数仓项目—业务数据采集

  • 前言

  • 一、采集工具选型

  • 二、Flink-CDC学习

  • 三、MySQL数据准备

    • 1.binlog的配置
    • 2.模拟生成数据
  • 四、业务数据采集模块

    • 1.配置文件
    • 2.代码实现

前言

前面完成了日志数据的采集,下面进行业务数据的采集。


一、采集工具选型

实时数仓相关采集工具的选型在这篇文章中有提到:Flink实时数仓各种CDC对比

业务数据采集选取了Flink-CDC,Flink-CDC的学习及使用在这里:Flink-CDC 2.0学习及使用

三、MySQL数据准备

前提条件:业务数据存放在MySQL中,首先要在MySQL中创建名为gmall2022的数据库,然后执行相关sql文件导入建表语句。

1.binlog的配置

修改/etc/my.cnf文件,内容如下:

复制代码
    server-id = 1
    log-bin=mysql-bin
    binlog_format=row
    binlog-do-db=gmall2022
    
    
    AI写代码bash

这里开启了gmall2022这个数据库的binlog,并且格式为row。
配置完成后,重启MySQL使配置生效 sudo systemctl restart mysqld
可以在/var/lib/mysql目录下查看binlog文件:
在这里插入图片描述

2.模拟生成数据

业务数据是用户真实的下单数据等等,所以也是无法拿到的,所以也使用脚本模拟生成真实的数据。
1)先将对应生成数据的脚本上传到/opt/module/gmall-flink/rt_db 目录下
2)修改application.properties中数据库连接信息,内容如下:

复制代码
    logging.level.root=info
    
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    spring.datasource.url=jdbc:mysql://hadoop102:3306/gmall-flink-2022?characterEncoding=utf-8& useSSL=false&serverTimezone=GMT%2B8
    spring.datasource.username=root spring.datasource.password=000000
    
    logging.pattern.console=%m%n
    mybatis-plus.global-config.db-config.field-strategy=not_null #业务日期
    mock.date=2021-03-06
    #是否重置
    mock.clear=1
    #是否重置用户
    mock.clear.user=0
    … …
    
    
    AI写代码bash
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-05-31/AXNYz0KBIwpaRn6me84bt391kHcS.png)

3)运行jar包:java -jar gmall2020-mock-db-2020-11-27.jar
在这里插入图片描述
再次到/var/lib/mysql目录下,可以看到index文件发生了改变。

四、业务数据采集模块

在IDEA中新建模块gmall2021-realtime,创建如下的包结构:
在这里插入图片描述

目录 作用
app 处理各层数据的Flink程序
bean 数据对象
common 公共常量
utils 工具类

1.配置文件

在pom.xml中导入以下依赖:

复制代码
    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <scala.version>2.12</scala.version>
        <hadoop.version>3.1.3</hadoop.version>
    </properties>
    
    <dependencies>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
    
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
    
        <!--如果保存检查点到 hdfs 上,需要引入此依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>
    
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.0.0</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>
    
        <!--Flink 默认使用的是 slf4j 记录日志,相当于一个日志的接口,我们这里使用log4j 作为具体的日志实现-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
    
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
    </dependencies>
    
    
    AI写代码xml
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-05-31/exmFGdq1WhARnQU8Xsl0cMfNruIK.png)

在resources目录下创建log4j.properties 配置文件:

复制代码
    log4j.rootLogger=error,stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.target=System.out
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    
    
    AI写代码bash

2.代码实现

实现目标:使用Flink-CDC实时监控MySQL中的数据,然后将变化的数据进行封装,然后发送到Kafka中。
主程序代码如下:

复制代码
    public class Flink_CDCWithCustomerSchema {
    public static void main(String[] args) throws Exception {
        //1、创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    
    
        //设置检查点和状态后端
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://hadoop102:8020/gmall-flink-20220410/ck"));
    
        env.enableCheckpointing(5000L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(10000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
    
    
        //2、创建Flink-CDC的Source
        DebeziumSourceFunction<String> mysqlSource = MySqlSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("gmall-flink-2022")
                .startupOptions(StartupOptions.latest())
                .deserializer(new MyDeserializationSchema())
                .build();
    
        //3、使用CDC Source从MySQL中读取数据
        DataStreamSource<String> mysqlDataStream = env.addSource(mysqlSource);
    
        //4、将从MySQL中读取到并序列化后的数据发送到Kafka中
        mysqlDataStream.addSink(MyKafkaUtil.getKafkaSink("ods_base_db"));
    
        //5、执行任务
        env.execute();
    }
    }
    
    
    
    AI写代码java
    
    运行
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-05-31/l7dLABGYvZXNIKwCrot6yE58ms9p.png)

自定义序列化器如下:

复制代码
    public class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
    
    /*
    数据包装格式:
    {
        "database":"",
        "tableName":"",
        "operation":"",
        "before":{"id":"","tm_name":""...},
        "after":{"id":"","tm_name":""...},
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
    
        //创建JSON对象,用于封装最终的返回值数据信息
        JSONObject result = new JSONObject();
    
        //获取数据库名和表名
        //topic中包含了数据库名和表名,格式为: mysql_binlog_source.gmall-flink.z_user_info
        String[] fields = sourceRecord.topic().split("\ .");
        String database=fields[1];
        String tableName = fields[2];
        result.put("database",database);
        result.put("tableName",tableName);
    
        //获取操作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        //把create类型的操作转化为insert类型,方便使用
        String s = operation.toString().toLowerCase();
        if("create".equals(s)){
            s="insert";
        }
        result.put("operation",s);
    
        //拿到before和after的数据
        Struct value = (Struct) sourceRecord.value();
    
        //获取操作前数据
        result.put("before",getValue(value.getStruct("before")));
    
        //获取操作后数据
        result.put("after",getValue(value.getStruct("after")));
    
        //输出到下游
        collector.collect(result.toString());
    }
    
    //提取出来了一个方法
    //将before和after的数据封装到JSON对象里
    public JSONObject getValue(Struct struct){
        JSONObject valueJSON = new JSONObject();
        //如果里面有数据,就获取对应的元数据信息,即列名,然后依次将数据放入到JSON对象里
        if(struct!=null){
            Schema schema = struct.schema();
            for (Field field : schema.fields()) {
                valueJSON.put(field.name(),struct.get(field));
            }
        }
        return valueJSON;
    }
    
    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
    }
    
    
    
    AI写代码java
    
    运行
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-05-31/kenUQ6a1TO9iRb5wAg04PNEL2tpM.png)

公共类如下:

复制代码
    public class MyKafkaUtil {
    //Kafka链接地址
    private static String KAFKA_SERVE="hadoop102:9092,hadoop103:9092,hadoop104:9092";
    
    //Kafka相关配置信息
    private static Properties properties=new Properties();
    
    static{
        properties.setProperty("bootstrap.servers",KAFKA_SERVE);
    }
    
    public static FlinkKafkaProducer<String> getKafkaSink(String topic){
        return new FlinkKafkaProducer<String>(topic,new SimpleStringSchema(),properties);
    }
    }
    
    
    AI写代码java
    
    运行
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-05-31/oSLJDz7qupxtYITjKiAgW5rFUCM2.png)

这样,就完成了MySQL中业务数据的采集。

全部评论 (0)

还没有任何评论哟~