Advertisement

Flume的安装与使用

阅读量:

文章目录

  • Apache Flume

    • 一、概述

    • 二、环境搭建

      • 安装
      • 语法详解
      • Simple Example
        • 准备配置文件
    • 启动Flume Agent服务实例

    • 测试

    • 三、Flume的Compent详细使用

      • Source
        • Netcat
    • Exec

      • 测试追加数据
    • Spooling Directory

    • Avro

      • 通过专用客户端测试
    • Kafka

      • Channel
        • Memory
    • JDBC(不重要)

    • Kafka

    • File

    • Spillable Memory

      • Sink
        • Logger
    • HDFS

    • Avro

    • File Roll

    • HBase Sink

    • Kafka Sink

  • 拦截组件(Interceptor)
    * 时间戳记录(Timestamp)

    • 服务端.Host

    • 静态属性.Static

    • 唯一标识符(Uuid)

    • 正则表达式提取器(Remex Extractor)

    • 正则表达式过滤器(Remex Filter)

      • Channel Seletor(通道选择器)
        • 复制的通道选择器
    • 分发的通道选择器

      • Sink Group
        • LoadBalance

四、作业- 基于Nginx访问日志的数据分析平台

  • Nginx

    • 安装流程
      • 详细描述了日志文件的结构和组成

      • 实现对原始数据流的分割

        • 数据清洗
          • 正则表达式
        • 语法
  • 使用正则表达式从Nginx访问日志中提取关键指标。

    • 测试字符串
    • 所需的正则表达式模式用于匹配和解析日志数据。
    • 通过Java程序解析日志文件以获取五个重要统计信息。
  • 自定义的数据清洗工具中的映射器

  • 用于初始化的自定义数据清洗类

  • 经过数据清洗后得到的结果

Apache Flume

一、概述

大数据需要解决的三个问题:采集、存储、计算

另一种广泛采用的大数据处理技术栈ElasticSearch Stack中包含三个主要组件:其中ElasticSearch用于实现高效的海量数据存储;Kibana则用于展示计算结果的可视化信息;而Logstash则负责从各种来源收集日志并进行初步的数据清洗和整理

Apache Flune是一个分布式的可靠且高性能的数据采集与聚合平台,在多个节点间高效地将海量日志信息集中存储于单一集中式存储系统中。用一句话总结:Flune不生成原始数据,并充当中间件角色。

在这里插入图片描述

二、环境搭建

安装

复制代码
    [root@HadoopNode00 home]# mkdir flume
    [root@HadoopNode00 home]# tar -zxf /root/apache-flume-1.7.0-bin.tar.gz -C /home/flume/
    [root@HadoopNode00 home]# cd flume/
    [root@HadoopNode00 flume]# ll apache-flume-1.7.0-bin/
    total 148
    drwxr-xr-x.  2 root root  4096 Sep 27 07:30 bin
    -rw-r--r--.  1 root root 77387 Oct 11  2016 CHANGELOG
    drwxr-xr-x.  2 root root  4096 Sep 27 07:30 conf
    -rw-r--r--.  1 root root  6172 Sep 26  2016 DEVNOTES
    -rw-r--r--.  1 root root  2873 Sep 26  2016 doap_Flume.rdf
    drwxr-xr-x. 10 root root  4096 Oct 13  2016 docs
    drwxr-xr-x.  2 root root  4096 Sep 27 07:30 lib
    -rw-r--r--.  1 root root 27625 Oct 13  2016 LICENSE
    -rw-r--r--.  1 root root   249 Sep 26  2016 NOTICE
    -rw-r--r--.  1 root root  2520 Sep 26  2016 README.md
    -rw-r--r--.  1 root root  1585 Oct 11  2016 RELEASE-NOTES
    drwxr-xr-x.  2 root root  4096 Sep 27 07:30 tools

语法详解

在这里插入图片描述
复制代码
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

Simple Example

收集网络端口产生的访问数据,并且输出到服务的控制台窗口

准备配置文件
复制代码
    [root@HadoopNode00 flume]# cd apache-flume-1.7.0-bin/
    [root@HadoopNode00 apache-flume-1.7.0-bin]# vi conf/simple.properties
    [root@HadoopNode00 apache-flume-1.7.0-bin]# vi conf/simple.properties
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = HadoopNode00
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
启动Flume Agent服务实例
复制代码
    [root@HadoopNode00 apache-flume-1.7.0-bin]# bin/flume-ng agent --conf conf --conf-file conf/simple.properties --name a1 -Dflume.root.logger=INFO,console
测试
复制代码
    Microsoft Windows [版本 6.1.7601]
    版权所有 (c) 2009 Microsoft Corporation。保留所有权利。
    
    C:\Users\Administrator>telnet HadoopNode00 44444
    'telnet' 不是内部或外部命令,也不是可运行的程序
    或批处理文件。
在这里插入图片描述

三、Flume的Compent详细使用

Source

主要作用:读取/接受外部数据源参数的实时数据

Netcat
Exec

运行一个Unix操作系统所发出的操作指令在其产生的结果被视为源数据

复制代码
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -f /home/flume/apache-flume-1.7.0-bin/Hello.log
测试追加数据
复制代码
    [root@HadoopNode00 apache-flume-1.7.0-bin]# echo Hello Hadoop2 >> Hello.log
Spooling Directory

从某个特定的文件夹或目录中提取文本数据集。
请注意以下事项:

  • 采集完成后会自动更改为.completed格式的名称。
  • 该操作不会捕获已命名.completed的文件。
复制代码
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/flume/apache-flume-1.7.0-bin/data
Avro

类似于 Netcat 的方式进行分析时,在获取某一特定网络端口的访问数据后并将其用于搭建 Flume 集群

在这里插入图片描述
复制代码
    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = HadoopNode00
    a1.sources.r1.port = 55555
通过专用客户端测试
复制代码
    [root@HadoopNode00 apache-flume-1.7.0-bin]# bin/flume-ng avro-client --host HadoopNode00 --port 55555 --filename README.md
Kafka

读取接受kafka消息队列中的数据,作为source的数据来源

复制代码
    tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
    tier1.sources.source1.channels = channel1
    # kafka集群的地址列表
    tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 
    # kafka订阅的主题名
    tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
    # the default kafka.consumer.group.id=flume is used

Channel

事件(Event)队列的数据结构,负责采集数据的临时存储

Memory

临时存放到内存中

JDBC(不重要)

Events存储到一个持久化的数据库(Derby),目前不支持其它数据库产品

复制代码
    a1.channels = c1
    a1.channels.c1.type = jdbc
Kafka

Events被放置于Kafka集群中,并采用高可靠性设计和数据冗余备份机制来确保消息队列的安全性和稳定性。

复制代码
    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
    a1.channels.channel1.kafka.topic = channel1
    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
File

Events存放到本地的文件系统中

Spillable Memory

内存溢写的channel,将超出内存容量的Events溢写到磁盘进行存储

Sink

主要作用:负责将采集到的数据最终存放/保存中央存储系统中

Logger

将采集到的数据输出到服务的控制台窗口

HDFS

该系统将采集的数据存储在HDFS分布式存储平台上,并支持文本类型以及序列化数据格式。

注意:确保HDFS分布式文件系统服务正常

复制代码
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
复制代码
    org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
    
    解决方案:
    	HDFS Sink,需要配置一个额外的拦截器(Interceptor),Event Header中自动添加一个时间戳信息
    	#--------------------------------------------
    	a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = timestamp
  • HDFS Sink默认采用Sequence File文件格式作为其默认存储方式;若希望获取数据原始内容,则需配置参数a1.sinks.k1.hdfs.fileType = DataStream
Avro

将采集到的数据发送给指定的Avro Source

复制代码
    a1.sinks.k1.type = avro
    # 另外一个Flume Agent服务实例的IP地址和Port
    a1.sinks.k1.hostname = 10.10.10.10
    a1.sinks.k1.port = 4545
File Roll

将采集到的数据保存到本地文件系统中

复制代码
    a1.sinks.k1.type = file_roll
    a1.sinks.k1.sink.directory = /root/data
HBase Sink

将采集到数据保存到HBase分布式非关系型数据库中

复制代码
    # Describe the sink
    a1.sinks.k1.type = hbase
    a1.sinks.k1.table = baizhi:t_data
    a1.sinks.k1.columnFamily = cf1
Kafka Sink

将采集到的数据发布到Kafka集群中

复制代码
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = mytopic
    a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    a1.sinks.k1.kafka.producer.compression.type = snappy

Interceptor(拦截器)

在这里插入图片描述

注:Flume中拦截器组件位于Source与Channel之间的一个可选模块,其主要职责是接收并预处理来自源处的Event对象以提升其效能

Timestamp

在Events事件头中添加一个时间戳信息

复制代码
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = timestamp
Host

在Events事件头中添加一个Host/IP信息

复制代码
    a1.sources.r1.interceptors = i1 i2
    a1.sources.r1.interceptors.i1.type = host
    a1.sources.r1.interceptors.i2.type = timestamp

{ headers:{host=192.168.11.20, timestamp=1569627250085}

Static

在所有的Events事件头中,添加一个固定KV信息

复制代码
    a1.sources.r1.interceptors = i1 i2 i3
    a1.sources.r1.interceptors.i1.type = host
    a1.sources.r1.interceptors.i2.type = timestamp
    a1.sources.r1.interceptors.i3.type = static
    a1.sources.r1.interceptors.i3.key = datecenter
    a1.sources.r1.interceptors.i3.value = bj

{ headers:{host=192.168.11.20, datecenter=bj, timestamp=1569627595541}

UUID

在Events事件头中,添加一个UUID唯一标示

复制代码
    a1.sources.r1.interceptors = i1 i2 i3 i4
    a1.sources.r1.interceptors.i1.type = host
    a1.sources.r1.interceptors.i2.type = timestamp
    a1.sources.r1.interceptors.i3.type = static
    a1.sources.r1.interceptors.i3.key = datecenter
    a1.sources.r1.interceptors.i3.value = bj
    a1.sources.r1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

Event: { headers:{host=192.168.11.20, id=44a46be3-6f32-453d-afb5-5ba592f237cd, datecenter=bj, timestamp=1569627806093}

Regex Extractor

创建一个正则表达式模式用于提取匹配结果至Event Header字段。
ERROR 2018-10-14 14:37:24 com.baizhi.HelloWorld.sayHello() ...
所需的正则表达式模式为:^(\w*).*$ 其中...表示可选字符。

复制代码
    a1.sources.r1.interceptors.i4.type = regex_extractor
    a1.sources.r1.interceptors.i4.regex = ^(\ w*).*$
    a1.sources.r1.interceptors.i4.serializers = s1
    a1.sources.r1.interceptors.i4.serializers.s1.name = level

{
headers: {
level为WARN等级,
host字段的值为IP地址,
datecenter字段的值为时间区,
timestamp等于特定数值
},
body: 数字序列 空格 加上 字符串信息
}

Regex Filter

通过给定的正则表达式,过滤掉不符合条件的数据

如:只保留ERROR级别的日志数据,用以构建风险预警系统

复制代码
    a1.sources.r1.interceptors = i1 i2 i3
    a1.sources.r1.interceptors.i1.type = host
    a1.sources.r1.interceptors.i2.type = timestamp
    a1.sources.r1.interceptors.i3.type = regex_filter
    a1.sources.r1.interceptors.i3.regex = ^ERROR.*$

Channel Seletor(通道选择器)

在这里插入图片描述
复制的通道选择器
复制代码
    [root@HadoopNode00 apache-flume-1.7.0-bin]# vi conf/replicating.properties
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = HadoopNode00
    a1.sources.r1.port = 44444
    a1.sources.r1.selector.type = replicating
    
    
    # Describe the sink
    a1.sinks.k1.type = logger
    a1.sinks.k2.type = file_roll
    a1.sinks.k2.sink.directory = /root/data2
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.channels.c2.type = file
    
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
分发的通道选择器
复制代码
    a1.sources.r1.interceptors.i1.type = host
    a1.sources.r1.interceptors.i2.type = timestamp
    a1.sources.r1.interceptors.i3.type = static
    a1.sources.r1.interceptors.i3.key = datecenter
    a1.sources.r1.interceptors.i3.value = bj
    
    a1.sources.r1.interceptors.i4.type = regex_extractor
    a1.sources.r1.interceptors.i4.regex = ^(\ w*).*$
    a1.sources.r1.interceptors.i4.serializers = s1
    a1.sources.r1.interceptors.i4.serializers.s1.name = level
    
    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = level
    a1.sources.r1.selector.mapping.ERROR = c1
    a1.sources.r1.selector.mapping.INFO = c2
    a1.sources.r1.selector.mapping.DEBUG = c2
    a1.sources.r1.selector.default = c2
    
    # Describe the sink
    a1.sinks.k1.type = logger
    a1.sinks.k2.type = file_roll
    a1.sinks.k2.sink.directory = /root/data3
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.channels.c2.type = file
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2

Sink Group

在这里插入图片描述
LoadBalance

通过一种大致负载均衡的方式,将Channel中的数据发送给多个Sink进行写入处理。

复制代码
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = HadoopNode00
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = file_roll
    a1.sinks.k1.sink.directory = /root/data4
    
    a1.sinks.k2.type = file_roll
    a1.sinks.k2.sink.directory = /root/data5
    
    #--------------------------------------
    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = load_balance
    a1.sinkgroups.g1.processor.backoff = true
    a1.sinkgroups.g1.processor.selector = random
    #--------------------------------------
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1

四、作业- 基于Nginx访问日志的数据分析平台

Nginx

安装
复制代码
    # 第一步:上传安装包
    [root@HadoopNode00 nginx-1.11.1]# yum install gcc-c++ perl-devel pcre-devel openssl-devel zlib-devel wget
    [root@HadoopNode00 ~]# tar -zxf nginx-1.11.1.tar.gz
    [root@HadoopNode00 ~]# cd nginx-1.11.1
    [root@HadoopNode00 nginx-1.11.1]# ./configure --prefix=/usr/local/nginx
    [root@HadoopNode00 nginx-1.11.1]# make && make install
    [root@HadoopNode00 nginx-1.11.1]# cd /usr/local/nginx/
    [root@HadoopNode00 nginx]# ll
    total 16
    drwxr-xr-x. 2 root root 4096 Sep 28 13:00 conf
    drwxr-xr-x. 2 root root 4096 Sep 28 13:00 html
    drwxr-xr-x. 2 root root 4096 Sep 28 13:00 logs
    drwxr-xr-x. 2 root root 4096 Sep 28 13:00 sbin
    # 第二步:启动Nginx
    [root@HadoopNode00 nginx]# sbin/nginx -c conf/nginx.conf
    # 第三步:测试服务是否正常
    # 方法一
    [root@HadoopNode00 nginx]# ps -ef | grep nginx
    root     38219     1  0 13:02 ?        00:00:00 nginx: master process sbin/nginx -c conf/nginx.conf
    nobody   38220 38219  0 13:02 ?        00:00:00 nginx: worker process
    root     38429 20552  0 13:02 pts/0    00:00:00 grep nginx
    # 方法二
    # http://HadoopNode00
日志文件格式
复制代码
    # 客户端IP地址     # 请求访问时间                 # 请求方式  # 请求资源URI          # HTTP协议版本  # 响应状态码  # 响应的字节大小
    27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
    
    192.168.11.1 - - [28/Sep/2019:13:06:29 +0800] "GET / HTTP/1.1" 200 612 "-" "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36"
日志切割

切割脚本

复制代码
    #!/bin/bash

    #设置日志文件存放目录
    logs_path="/usr/local/nginx/logs/"
    #设置pid文件
    pid_path="/usr/local/nginx/logs/nginx.pid"
    
    #重命名日志文件
    mv ${logs_path}access.log /usr/local/nginx/data/access_$(date -d "yesterday" +"%Y-%m-%d%I:%M:%S").log
    
    #向nginx主进程发信号重新打开日志
    kill -USR1 `cat ${pid_path}`
复制代码
    [root@HadoopNode00 ~]# vi nginx.sh

    #!/bin/bash
    
    logs_path="/usr/local/nginx/logs/"
    
    pid_path="/usr/local/nginx/logs/nginx.pid"
    
    mv ${logs_path}access.log /usr/local/nginx/data/access_$(date -d "yesterday" +"%Y-%m-%d%I:%M:%S").log
    
    kill -USR1 `cat ${pid_path}`
    
    [root@HadoopNode00 ~]# chmod u+x nginx.sh

定时任务

复制代码
    [root@HadoopNode00 nginx]# crontab -e

    # 每一分钟进行一次日志切割
    */1 * * * * /root/nginx.sh
    
    [root@HadoopNode00 ~]# yum install -y vixie-cron
    [root@HadoopNode00 ~]# crontab -e 
    设定定时任务的规则  并指定需要定时执行的shell脚本
    */1 * * * * /root/nginx.sh
    [root@HadoopNode00 ~]# service crond restart
    [root@HadoopNode00 ~]# service crond reload
复制代码
      
      
    
    ### Flume数据采集
    
    #### 配置文件
    
    ​```properties
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /usr/local/nginx/data
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = timestamp
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.fileType = DataStream
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

数据清洗

正则表达式

测试网站:https://regex101.com/

表达式语言,主要作用:字符串的匹配、抽取、替换

语法

规则

规则 特点
\d 匹配一个数值
\D 匹配一个非数值
\w 匹配一个字母
\W 匹配一个非字母
\s 匹配一个空白符(空格或者制表符)
. 匹配任意字符
^ 匹配字符串开始
$ 匹配字符串的末尾

匹配次数

语法 特点
* 规则匹配0-N
+ 规则匹配1-N
{m,n} 规则匹配次数大于等于m,小于等于n

抽取

(需要抽取内容)

正则匹配抽取Nginx访问日志中的关键指标
在这里插入图片描述
测试字符串
复制代码
168.11.1 - - [28/Sep/2019:13:06:29 +0800] "GET /favicon.ico HTTP/1.1" 404 571 "http://hadoopnode00/" "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36"
需要使用到的正则表达式
复制代码
    ^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*\[(.*)\]\s"(\w*)\s(.*)\sHTTP\/1\.1"\s(\d{3}).*$
通过JAVA程序获取访问日志中的五项关键指标

IP、请求时间、请求方式、请求资源地址、响应的状态码

复制代码
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    final String regex = "^(\ d{1,3}\ .\ d{1,3}\ .\ d{1,3}\ .\ d{1,3})";
    final String string = "192.168.11.1 - - [28/Sep/2019:13:06:29 +0800] \"GET /favicon.ico HTTP/1.1\" 404 571 \"http://hadoopnode00/\" \"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36\"\n";
    
    final Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);
    final Matcher matcher = pattern.matcher(string);
    
    while (matcher.find()) {
    System.out.println("Full match: " + matcher.group(0));
    for (int i = 1; i <= matcher.groupCount(); i++) {
        System.out.println("Group " + i + ": " + matcher.group(i));
    }
    }
自定义数据清洗的Mapper
复制代码
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Locale;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    import java.io.IOException;
    
    public class MyMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
    /** * @param key
     * @param value   一行日志数据
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        final String regex = "^(\ d{1,3}\ .\ d{1,3}\ .\ d{1,3}\ .\ d{1,3}).*\ [(.*)\ ]\ s\"(\ w*)\ s(.*)\ sHTTP\ /1\ .1\"\ s(\ d{3}).*$";
        final Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);
        final Matcher matcher = pattern.matcher(value.toString());
    
        while (matcher.find()) {
            String ip = matcher.group(1);
            String accessTime = matcher.group(2);
            String method = matcher.group(3);
            String resource = matcher.group(4);
            String status = matcher.group(5);
            SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
            Date date = null;
            try {
                date = sdf.parse(accessTime);
                SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
                String formatAccessTime = sdf2.format(date);
    
                context.write(null, new Text(ip + " " + formatAccessTime + " " + method + " " + resource + " " + status));
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
    }
    }
自定义数据清洗的初始化类
复制代码
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    import java.io.IOException;
    
    public class MyJob {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"nginx");
        job.setJarByClass(MyJob.class);
    
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
    
        TextInputFormat.setInputPaths(job,new Path("file:///D:\ Downloads\ events-.1569650629563"));
        TextOutputFormat.setOutputPath(job,new Path("file:///e:\ result"));
    
        job.setMapperClass(MyMapper.class);
    
        // 注意:数据清洗只有map任务,没有reduce任务
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
    
        job.waitForCompletion(true);
    }
    }
数据清洗后的结果
在这里插入图片描述
在这里插入图片描述

全部评论 (0)

还没有任何评论哟~