Flume 部署及相关案例
一、Flume 简介
1.Flume 提供了一个分布式的,可靠的,对大量数据的日志高效收集,聚集,移动的服务,Flume 只能在 Unix 环境下运行。
2.Flume 基于流式架构,容错性强,也结构简单。
3.Flume,Kafka 用来实时数据收集,Spark,Flume 用来实时处理数据,impala 用来实时查询。
二、Flume 角色

1.source
用于采集数据,source 是产生数据流的地方,同时 source 会将产生的数据流传输到 Channel ,这个类似 java IO 部分的 Channel。
2.Channel
用于桥接 source 和Sink ,类似于一个队列。
3.Sink
从 Channel 收集数据,将数据写到目标源(可以是下一个 source ,也可以是 HDFS 或者 HBase。
4.Event
传输单元,Flume 传输的基本单元,以事件的形式将数据从源头传输到目的地。
三、Flume 传输过程
source 监控某个文件或者数据流,数据源产生新的数据,拿到该数据后,将数据封装到一个 Event 中,并 put 到 Channel 后 commit 提交,Channel 的队列先进先出,Sink 去 Channel 队列拉取数据,然后写入到 HDFS 中。
四、Flume 部署及使用
1.文件配置
查询JAVA_HOME: echo $JAVA_HOME
显示/opt/module/jdk1.8.0_144 /opt/module/jdk1.8.0_144
安装Flume
[itstar@bigdata113 software]$ tar -zxvf apache-flume1.8.0-bin.tar.gz -C /opt/module/
改名:
[itstar@bigdata113 conf]$ mv flume-env.sh.template flume-env.sh

flume-env.sh涉及修改项:
export JAVA_HOME=/opt/module/jdk1.8.0_144

配置环境变量并 source


Linux中文件上传命令是:rz,下载:sz 如果没有这两个命令使用yum下载:yum install lrzsz

五、案例一
监控数据端口
目标:Flume监控一端Console,另一端Console发送消息,使被监控端实时显示。
示意图

分步实现:
1.安装 telnet 工具
【联网状态】yum -y install telnet

【安装完毕】
2.创建 Flume Agent 配置文件 flume-telnet.conf
#定义Agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定义netcatsource netcat端口源
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata111
a1.sources.r1.port = 44445
# 定义sink (logger 日志)
a1.sinks.k1.type = logger
# 定义channel (memory 内存)
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 双向链接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
**判断44445端口是否被占用
$ netstat -tunlp | grep 44445
启动flume配置文件
/opt/module/flume-1.8.0/bin/flume-ng agent \
--conf /opt/module/flume1.8.0/conf/ \
--name a1 \
--conf-file /opt/module/flume-1.8.0/myconf/flume-telnet.conf \
-Dflume.root.logger==INFO,console

另一种比较简便的代码格式
bin/flume-ng agent --conf conf/ --name a1 --conf-file myconf/flume-telnet.conf -Dflume.root.logger==INFO,console
使用telnet工具向本机的44444端口发送内容
$ telnet bigdata111 44445


六、案例二:实时读取本地文件到HDFS
示意图

1.创建flume-hdfs.conf文件
# 1 agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# 2 source (exec 源 tail -F 监控)
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/wind
a2.sources.r2.shell = /bin/bash -c
# 3 sink (/%Y%m%d/%H 时间)
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://bigdata111:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次 (capacity > transactionCapacity > batchSize)
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩 (DataStream 流输出)
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
#最小副本数
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 1000
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
2.执行监控配置
/opt/module/flume1.8.0/bin/flume-ng agent \
--conf /opt/module/flume1.8.0/conf/ \
--name a2 \
--conf-file /opt/module/flume1.8.0/jobconf/flume-hdfs.conf
七、案例三:实时读取目录文件到HDFS
目标:使用 flume 监听整个目录的文件
示意图

分步实现:
创建配置文件 flume-dir.conf
#1 Agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3
#2 source
#监控目录的类型
a3.sources.r3.type = spooldir
#监控目录的路径
a3.sources.r3.spoolDir = /opt/log
#哪个文件上传hdfs,然后给这个文件添加一个后缀
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传(可选)
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# 3 sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://bigdata111:9000/flume/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
#最小副本数
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
执行测试:执行如下脚本后,请向upload文件夹中添加文件试试
/opt/module/flume1.8.0/bin/flume-ng agent \
--conf /opt/module/flume1.8.0/conf/ \
--name a3 \
--conf-file /opt/module/flume1.8.0/jobconf/flume-dir.conf
尖叫提示: 在使用Spooling Directory Source时
(1) 不要在监控目录中创建并持续修改文件
(2) 上传完成的文件会以.COMPLETED结尾
(3) 被监控文件夹每500毫秒扫描一次文件变动
