Flume的安装与使用
文章目录
-
Apache Flume
-
-
一、概述
-
二、环境搭建
-
- 安装
- 语法详解
- Simple Example
-
- 准备配置文件
-
启动Flume Agent服务实例
-
测试
-
三、Flume的Compent详细使用
-
- Source
-
- Netcat
-
Exec
-
- 测试追加数据
-
Spooling Directory
-
Avro
-
- 通过专用客户端测试
-
Kafka
- Channel
-
- Memory
-
JDBC(不重要)
-
Kafka
-
File
-
Spillable Memory
- Sink
-
- Logger
-
HDFS
-
Avro
-
File Roll
-
HBase Sink
-
Kafka Sink
-
-
拦截组件(Interceptor)
* 时间戳记录(Timestamp)-
服务端.Host
-
静态属性.Static
-
唯一标识符(Uuid)
-
正则表达式提取器(Remex Extractor)
-
正则表达式过滤器(Remex Filter)
- Channel Seletor(通道选择器)
-
- 复制的通道选择器
-
分发的通道选择器
- Sink Group
-
- LoadBalance
-
四、作业- 基于Nginx访问日志的数据分析平台
-
Nginx
- 安装流程
-
详细描述了日志文件的结构和组成
-
实现对原始数据流的分割
- 数据清洗
-
- 正则表达式
-
- 语法
-
- 安装流程
-
使用正则表达式从Nginx访问日志中提取关键指标。
-
- 测试字符串
- 所需的正则表达式模式用于匹配和解析日志数据。
- 通过Java程序解析日志文件以获取五个重要统计信息。
-
自定义的数据清洗工具中的映射器
-
用于初始化的自定义数据清洗类
-
经过数据清洗后得到的结果
Apache Flume
一、概述
大数据需要解决的三个问题:采集、存储、计算
另一种广泛采用的大数据处理技术栈ElasticSearch Stack中包含三个主要组件:其中ElasticSearch用于实现高效的海量数据存储;Kibana则用于展示计算结果的可视化信息;而Logstash则负责从各种来源收集日志并进行初步的数据清洗和整理
Apache Flune是一个分布式的可靠且高性能的数据采集与聚合平台,在多个节点间高效地将海量日志信息集中存储于单一集中式存储系统中。用一句话总结:Flune不生成原始数据,并充当中间件角色。

二、环境搭建
安装
[root@HadoopNode00 home]# mkdir flume
[root@HadoopNode00 home]# tar -zxf /root/apache-flume-1.7.0-bin.tar.gz -C /home/flume/
[root@HadoopNode00 home]# cd flume/
[root@HadoopNode00 flume]# ll apache-flume-1.7.0-bin/
total 148
drwxr-xr-x. 2 root root 4096 Sep 27 07:30 bin
-rw-r--r--. 1 root root 77387 Oct 11 2016 CHANGELOG
drwxr-xr-x. 2 root root 4096 Sep 27 07:30 conf
-rw-r--r--. 1 root root 6172 Sep 26 2016 DEVNOTES
-rw-r--r--. 1 root root 2873 Sep 26 2016 doap_Flume.rdf
drwxr-xr-x. 10 root root 4096 Oct 13 2016 docs
drwxr-xr-x. 2 root root 4096 Sep 27 07:30 lib
-rw-r--r--. 1 root root 27625 Oct 13 2016 LICENSE
-rw-r--r--. 1 root root 249 Sep 26 2016 NOTICE
-rw-r--r--. 1 root root 2520 Sep 26 2016 README.md
-rw-r--r--. 1 root root 1585 Oct 11 2016 RELEASE-NOTES
drwxr-xr-x. 2 root root 4096 Sep 27 07:30 tools
语法详解

# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Simple Example
收集网络端口产生的访问数据,并且输出到服务的控制台窗口
准备配置文件
[root@HadoopNode00 flume]# cd apache-flume-1.7.0-bin/
[root@HadoopNode00 apache-flume-1.7.0-bin]# vi conf/simple.properties
[root@HadoopNode00 apache-flume-1.7.0-bin]# vi conf/simple.properties
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = HadoopNode00
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动Flume Agent服务实例
[root@HadoopNode00 apache-flume-1.7.0-bin]# bin/flume-ng agent --conf conf --conf-file conf/simple.properties --name a1 -Dflume.root.logger=INFO,console
测试
Microsoft Windows [版本 6.1.7601]
版权所有 (c) 2009 Microsoft Corporation。保留所有权利。
C:\Users\Administrator>telnet HadoopNode00 44444
'telnet' 不是内部或外部命令,也不是可运行的程序
或批处理文件。

三、Flume的Compent详细使用
Source
主要作用:读取/接受外部数据源参数的实时数据
Netcat
Exec
运行一个Unix操作系统所发出的操作指令在其产生的结果被视为源数据
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /home/flume/apache-flume-1.7.0-bin/Hello.log
测试追加数据
[root@HadoopNode00 apache-flume-1.7.0-bin]# echo Hello Hadoop2 >> Hello.log
Spooling Directory
从某个特定的文件夹或目录中提取文本数据集。
请注意以下事项:
- 采集完成后会自动更改为
.completed格式的名称。 - 该操作不会捕获已命名
.completed的文件。
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/flume/apache-flume-1.7.0-bin/data
Avro
类似于 Netcat 的方式进行分析时,在获取某一特定网络端口的访问数据后并将其用于搭建 Flume 集群

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = HadoopNode00
a1.sources.r1.port = 55555
通过专用客户端测试
[root@HadoopNode00 apache-flume-1.7.0-bin]# bin/flume-ng avro-client --host HadoopNode00 --port 55555 --filename README.md
Kafka
读取接受kafka消息队列中的数据,作为source的数据来源
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
# kafka集群的地址列表
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
# kafka订阅的主题名
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used
Channel
事件(Event)队列的数据结构,负责采集数据的临时存储
Memory
临时存放到内存中
JDBC(不重要)
Events存储到一个持久化的数据库(Derby),目前不支持其它数据库产品
a1.channels = c1
a1.channels.c1.type = jdbc
Kafka
Events被放置于Kafka集群中,并采用高可靠性设计和数据冗余备份机制来确保消息队列的安全性和稳定性。
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
File
Events存放到本地的文件系统中
Spillable Memory
内存溢写的channel,将超出内存容量的Events溢写到磁盘进行存储
Sink
主要作用:负责将采集到的数据最终存放/保存中央存储系统中
Logger
将采集到的数据输出到服务的控制台窗口
HDFS
该系统将采集的数据存储在HDFS分布式存储平台上,并支持文本类型以及序列化数据格式。
注意:确保HDFS分布式文件系统服务正常
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
解决方案:
HDFS Sink,需要配置一个额外的拦截器(Interceptor),Event Header中自动添加一个时间戳信息
#--------------------------------------------
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
- HDFS Sink默认采用
Sequence File文件格式作为其默认存储方式;若希望获取数据原始内容,则需配置参数a1.sinks.k1.hdfs.fileType = DataStream。
Avro
将采集到的数据发送给指定的
Avro Source
a1.sinks.k1.type = avro
# 另外一个Flume Agent服务实例的IP地址和Port
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
File Roll
将采集到的数据保存到本地文件系统中
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /root/data
HBase Sink
将采集到数据保存到HBase分布式非关系型数据库中
# Describe the sink
a1.sinks.k1.type = hbase
a1.sinks.k1.table = baizhi:t_data
a1.sinks.k1.columnFamily = cf1
Kafka Sink
将采集到的数据发布到Kafka集群中
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
Interceptor(拦截器)

注:Flume中拦截器组件位于Source与Channel之间的一个可选模块,其主要职责是接收并预处理来自源处的Event对象以提升其效能
Timestamp
在Events事件头中添加一个时间戳信息
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
Host
在Events事件头中添加一个Host/IP信息
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp
{ headers:{host=192.168.11.20, timestamp=1569627250085}
Static
在所有的Events事件头中,添加一个固定KV信息
a1.sources.r1.interceptors = i1 i2 i3
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = datecenter
a1.sources.r1.interceptors.i3.value = bj
{ headers:{host=192.168.11.20, datecenter=bj, timestamp=1569627595541}
UUID
在Events事件头中,添加一个UUID唯一标示
a1.sources.r1.interceptors = i1 i2 i3 i4
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = datecenter
a1.sources.r1.interceptors.i3.value = bj
a1.sources.r1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
Event: { headers:{host=192.168.11.20, id=44a46be3-6f32-453d-afb5-5ba592f237cd, datecenter=bj, timestamp=1569627806093}
Regex Extractor
创建一个正则表达式模式用于提取匹配结果至Event Header字段。
ERROR 2018-10-14 14:37:24 com.baizhi.HelloWorld.sayHello() ...
所需的正则表达式模式为:^(\w*).*$ 其中...表示可选字符。
a1.sources.r1.interceptors.i4.type = regex_extractor
a1.sources.r1.interceptors.i4.regex = ^(\ w*).*$
a1.sources.r1.interceptors.i4.serializers = s1
a1.sources.r1.interceptors.i4.serializers.s1.name = level
{
headers: {
level为WARN等级,
host字段的值为IP地址,
datecenter字段的值为时间区,
timestamp等于特定数值
},
body: 数字序列 空格 加上 字符串信息
}
Regex Filter
通过给定的正则表达式,过滤掉不符合条件的数据
如:只保留ERROR级别的日志数据,用以构建风险预警系统
a1.sources.r1.interceptors = i1 i2 i3
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp
a1.sources.r1.interceptors.i3.type = regex_filter
a1.sources.r1.interceptors.i3.regex = ^ERROR.*$
Channel Seletor(通道选择器)

复制的通道选择器
[root@HadoopNode00 apache-flume-1.7.0-bin]# vi conf/replicating.properties
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = HadoopNode00
a1.sources.r1.port = 44444
a1.sources.r1.selector.type = replicating
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k2.type = file_roll
a1.sinks.k2.sink.directory = /root/data2
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = file
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
分发的通道选择器
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = datecenter
a1.sources.r1.interceptors.i3.value = bj
a1.sources.r1.interceptors.i4.type = regex_extractor
a1.sources.r1.interceptors.i4.regex = ^(\ w*).*$
a1.sources.r1.interceptors.i4.serializers = s1
a1.sources.r1.interceptors.i4.serializers.s1.name = level
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = level
a1.sources.r1.selector.mapping.ERROR = c1
a1.sources.r1.selector.mapping.INFO = c2
a1.sources.r1.selector.mapping.DEBUG = c2
a1.sources.r1.selector.default = c2
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k2.type = file_roll
a1.sinks.k2.sink.directory = /root/data3
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = file
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
Sink Group

LoadBalance
通过一种大致负载均衡的方式,将Channel中的数据发送给多个Sink进行写入处理。
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = HadoopNode00
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /root/data4
a1.sinks.k2.type = file_roll
a1.sinks.k2.sink.directory = /root/data5
#--------------------------------------
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
#--------------------------------------
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
四、作业- 基于Nginx访问日志的数据分析平台
Nginx
安装
# 第一步:上传安装包
[root@HadoopNode00 nginx-1.11.1]# yum install gcc-c++ perl-devel pcre-devel openssl-devel zlib-devel wget
[root@HadoopNode00 ~]# tar -zxf nginx-1.11.1.tar.gz
[root@HadoopNode00 ~]# cd nginx-1.11.1
[root@HadoopNode00 nginx-1.11.1]# ./configure --prefix=/usr/local/nginx
[root@HadoopNode00 nginx-1.11.1]# make && make install
[root@HadoopNode00 nginx-1.11.1]# cd /usr/local/nginx/
[root@HadoopNode00 nginx]# ll
total 16
drwxr-xr-x. 2 root root 4096 Sep 28 13:00 conf
drwxr-xr-x. 2 root root 4096 Sep 28 13:00 html
drwxr-xr-x. 2 root root 4096 Sep 28 13:00 logs
drwxr-xr-x. 2 root root 4096 Sep 28 13:00 sbin
# 第二步:启动Nginx
[root@HadoopNode00 nginx]# sbin/nginx -c conf/nginx.conf
# 第三步:测试服务是否正常
# 方法一
[root@HadoopNode00 nginx]# ps -ef | grep nginx
root 38219 1 0 13:02 ? 00:00:00 nginx: master process sbin/nginx -c conf/nginx.conf
nobody 38220 38219 0 13:02 ? 00:00:00 nginx: worker process
root 38429 20552 0 13:02 pts/0 00:00:00 grep nginx
# 方法二
# http://HadoopNode00
日志文件格式
# 客户端IP地址 # 请求访问时间 # 请求方式 # 请求资源URI # HTTP协议版本 # 响应状态码 # 响应的字节大小
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
192.168.11.1 - - [28/Sep/2019:13:06:29 +0800] "GET / HTTP/1.1" 200 612 "-" "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36"
日志切割
切割脚本
#!/bin/bash
#设置日志文件存放目录
logs_path="/usr/local/nginx/logs/"
#设置pid文件
pid_path="/usr/local/nginx/logs/nginx.pid"
#重命名日志文件
mv ${logs_path}access.log /usr/local/nginx/data/access_$(date -d "yesterday" +"%Y-%m-%d%I:%M:%S").log
#向nginx主进程发信号重新打开日志
kill -USR1 `cat ${pid_path}`
[root@HadoopNode00 ~]# vi nginx.sh
#!/bin/bash
logs_path="/usr/local/nginx/logs/"
pid_path="/usr/local/nginx/logs/nginx.pid"
mv ${logs_path}access.log /usr/local/nginx/data/access_$(date -d "yesterday" +"%Y-%m-%d%I:%M:%S").log
kill -USR1 `cat ${pid_path}`
[root@HadoopNode00 ~]# chmod u+x nginx.sh
定时任务
[root@HadoopNode00 nginx]# crontab -e
# 每一分钟进行一次日志切割
*/1 * * * * /root/nginx.sh
[root@HadoopNode00 ~]# yum install -y vixie-cron
[root@HadoopNode00 ~]# crontab -e
设定定时任务的规则 并指定需要定时执行的shell脚本
*/1 * * * * /root/nginx.sh
[root@HadoopNode00 ~]# service crond restart
[root@HadoopNode00 ~]# service crond reload
### Flume数据采集
#### 配置文件
```properties
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/nginx/data
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
数据清洗
正则表达式
表达式语言,主要作用:字符串的匹配、抽取、替换
语法
规则
| 规则 | 特点 |
|---|---|
| \d | 匹配一个数值 |
| \D | 匹配一个非数值 |
| \w | 匹配一个字母 |
| \W | 匹配一个非字母 |
| \s | 匹配一个空白符(空格或者制表符) |
| . | 匹配任意字符 |
| ^ | 匹配字符串开始 |
| $ | 匹配字符串的末尾 |
匹配次数
| 语法 | 特点 |
|---|---|
| * | 规则匹配0-N次 |
| + | 规则匹配1-N次 |
| {m,n} | 规则匹配次数大于等于m,小于等于n |
抽取
(需要抽取内容)
正则匹配抽取Nginx访问日志中的关键指标

测试字符串
168.11.1 - - [28/Sep/2019:13:06:29 +0800] "GET /favicon.ico HTTP/1.1" 404 571 "http://hadoopnode00/" "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36"
需要使用到的正则表达式
^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*\[(.*)\]\s"(\w*)\s(.*)\sHTTP\/1\.1"\s(\d{3}).*$
通过JAVA程序获取访问日志中的五项关键指标
IP、请求时间、请求方式、请求资源地址、响应的状态码
import java.util.regex.Matcher;
import java.util.regex.Pattern;
final String regex = "^(\ d{1,3}\ .\ d{1,3}\ .\ d{1,3}\ .\ d{1,3})";
final String string = "192.168.11.1 - - [28/Sep/2019:13:06:29 +0800] \"GET /favicon.ico HTTP/1.1\" 404 571 \"http://hadoopnode00/\" \"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36\"\n";
final Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);
final Matcher matcher = pattern.matcher(string);
while (matcher.find()) {
System.out.println("Full match: " + matcher.group(0));
for (int i = 1; i <= matcher.groupCount(); i++) {
System.out.println("Group " + i + ": " + matcher.group(i));
}
}
自定义数据清洗的Mapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.io.IOException;
public class MyMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
/** * @param key
* @param value 一行日志数据
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
final String regex = "^(\ d{1,3}\ .\ d{1,3}\ .\ d{1,3}\ .\ d{1,3}).*\ [(.*)\ ]\ s\"(\ w*)\ s(.*)\ sHTTP\ /1\ .1\"\ s(\ d{3}).*$";
final Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);
final Matcher matcher = pattern.matcher(value.toString());
while (matcher.find()) {
String ip = matcher.group(1);
String accessTime = matcher.group(2);
String method = matcher.group(3);
String resource = matcher.group(4);
String status = matcher.group(5);
SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
Date date = null;
try {
date = sdf.parse(accessTime);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
String formatAccessTime = sdf2.format(date);
context.write(null, new Text(ip + " " + formatAccessTime + " " + method + " " + resource + " " + status));
} catch (ParseException e) {
e.printStackTrace();
}
}
}
}
自定义数据清洗的初始化类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class MyJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"nginx");
job.setJarByClass(MyJob.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.setInputPaths(job,new Path("file:///D:\ Downloads\ events-.1569650629563"));
TextOutputFormat.setOutputPath(job,new Path("file:///e:\ result"));
job.setMapperClass(MyMapper.class);
// 注意:数据清洗只有map任务,没有reduce任务
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.waitForCompletion(true);
}
}
数据清洗后的结果


