Flink实时数仓项目—业务数据采集
Flink实时数仓项目—业务数据采集
-
前言
-
一、采集工具选型
-
二、Flink-CDC学习
-
三、MySQL数据准备
-
- 1.binlog的配置
- 2.模拟生成数据
-
四、业务数据采集模块
-
- 1.配置文件
- 2.代码实现
前言
前面完成了日志数据的采集,下面进行业务数据的采集。
一、采集工具选型
实时数仓相关采集工具的选型在这篇文章中有提到:Flink实时数仓各种CDC对比
二、Flink-CDC学习
业务数据采集选取了Flink-CDC,Flink-CDC的学习及使用在这里:Flink-CDC 2.0学习及使用
三、MySQL数据准备
前提条件:业务数据存放在MySQL中,首先要在MySQL中创建名为gmall2022的数据库,然后执行相关sql文件导入建表语句。
1.binlog的配置
修改/etc/my.cnf文件,内容如下:
server-id = 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall2022
AI写代码bash
这里开启了gmall2022这个数据库的binlog,并且格式为row。
配置完成后,重启MySQL使配置生效 : sudo systemctl restart mysqld
可以在/var/lib/mysql目录下查看binlog文件:

2.模拟生成数据
业务数据是用户真实的下单数据等等,所以也是无法拿到的,所以也使用脚本模拟生成真实的数据。
1)先将对应生成数据的脚本上传到/opt/module/gmall-flink/rt_db 目录下
2)修改application.properties中数据库连接信息,内容如下:
logging.level.root=info
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://hadoop102:3306/gmall-flink-2022?characterEncoding=utf-8& useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root spring.datasource.password=000000
logging.pattern.console=%m%n
mybatis-plus.global-config.db-config.field-strategy=not_null #业务日期
mock.date=2021-03-06
#是否重置
mock.clear=1
#是否重置用户
mock.clear.user=0
… …
AI写代码bash

3)运行jar包:java -jar gmall2020-mock-db-2020-11-27.jar

再次到/var/lib/mysql目录下,可以看到index文件发生了改变。
四、业务数据采集模块
在IDEA中新建模块gmall2021-realtime,创建如下的包结构:

| 目录 | 作用 |
|---|---|
| app | 处理各层数据的Flink程序 |
| bean | 数据对象 |
| common | 公共常量 |
| utils | 工具类 |
1.配置文件
在pom.xml中导入以下依赖:
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.1.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<!--如果保存检查点到 hdfs 上,需要引入此依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<!--Flink 默认使用的是 slf4j 记录日志,相当于一个日志的接口,我们这里使用log4j 作为具体的日志实现-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
AI写代码xml

在resources目录下创建log4j.properties 配置文件:
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
AI写代码bash
2.代码实现
实现目标:使用Flink-CDC实时监控MySQL中的数据,然后将变化的数据进行封装,然后发送到Kafka中。
主程序代码如下:
public class Flink_CDCWithCustomerSchema {
public static void main(String[] args) throws Exception {
//1、创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//设置检查点和状态后端
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://hadoop102:8020/gmall-flink-20220410/ck"));
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(10000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
//2、创建Flink-CDC的Source
DebeziumSourceFunction<String> mysqlSource = MySqlSource.<String>builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("000000")
.databaseList("gmall-flink-2022")
.startupOptions(StartupOptions.latest())
.deserializer(new MyDeserializationSchema())
.build();
//3、使用CDC Source从MySQL中读取数据
DataStreamSource<String> mysqlDataStream = env.addSource(mysqlSource);
//4、将从MySQL中读取到并序列化后的数据发送到Kafka中
mysqlDataStream.addSink(MyKafkaUtil.getKafkaSink("ods_base_db"));
//5、执行任务
env.execute();
}
}
AI写代码java
运行

自定义序列化器如下:
public class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
/*
数据包装格式:
{
"database":"",
"tableName":"",
"operation":"",
"before":{"id":"","tm_name":""...},
"after":{"id":"","tm_name":""...},
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//创建JSON对象,用于封装最终的返回值数据信息
JSONObject result = new JSONObject();
//获取数据库名和表名
//topic中包含了数据库名和表名,格式为: mysql_binlog_source.gmall-flink.z_user_info
String[] fields = sourceRecord.topic().split("\ .");
String database=fields[1];
String tableName = fields[2];
result.put("database",database);
result.put("tableName",tableName);
//获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
//把create类型的操作转化为insert类型,方便使用
String s = operation.toString().toLowerCase();
if("create".equals(s)){
s="insert";
}
result.put("operation",s);
//拿到before和after的数据
Struct value = (Struct) sourceRecord.value();
//获取操作前数据
result.put("before",getValue(value.getStruct("before")));
//获取操作后数据
result.put("after",getValue(value.getStruct("after")));
//输出到下游
collector.collect(result.toString());
}
//提取出来了一个方法
//将before和after的数据封装到JSON对象里
public JSONObject getValue(Struct struct){
JSONObject valueJSON = new JSONObject();
//如果里面有数据,就获取对应的元数据信息,即列名,然后依次将数据放入到JSON对象里
if(struct!=null){
Schema schema = struct.schema();
for (Field field : schema.fields()) {
valueJSON.put(field.name(),struct.get(field));
}
}
return valueJSON;
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
AI写代码java
运行

公共类如下:
public class MyKafkaUtil {
//Kafka链接地址
private static String KAFKA_SERVE="hadoop102:9092,hadoop103:9092,hadoop104:9092";
//Kafka相关配置信息
private static Properties properties=new Properties();
static{
properties.setProperty("bootstrap.servers",KAFKA_SERVE);
}
public static FlinkKafkaProducer<String> getKafkaSink(String topic){
return new FlinkKafkaProducer<String>(topic,new SimpleStringSchema(),properties);
}
}
AI写代码java
运行

这样,就完成了MySQL中业务数据的采集。
