Advertisement

Flume 部署及相关案例

阅读量:

一、Flume 简介

1.Flume 提供了一个分布式的,可靠的,对大量数据的日志高效收集,聚集,移动的服务,Flume 只能在 Unix 环境下运行。

2.Flume 基于流式架构,容错性强,也结构简单。

3.Flume,Kafka 用来实时数据收集,Spark,Flume 用来实时处理数据,impala 用来实时查询。

二、Flume 角色

1

1.source
用于采集数据,source 是产生数据流的地方,同时 source 会将产生的数据流传输到 Channel ,这个类似 java IO 部分的 Channel。

2.Channel
用于桥接 source 和Sink ,类似于一个队列。

3.Sink
从 Channel 收集数据,将数据写到目标源(可以是下一个 source ,也可以是 HDFS 或者 HBase。

4.Event
传输单元,Flume 传输的基本单元,以事件的形式将数据从源头传输到目的地。

三、Flume 传输过程

source 监控某个文件或者数据流,数据源产生新的数据,拿到该数据后,将数据封装到一个 Event 中,并 put 到 Channel 后 commit 提交,Channel 的队列先进先出,Sink 去 Channel 队列拉取数据,然后写入到 HDFS 中。

四、Flume 部署及使用

1.文件配置
查询JAVA_HOME: echo $JAVA_HOME

显示/opt/module/jdk1.8.0_144 /opt/module/jdk1.8.0_144

安装Flume
[itstar@bigdata113 software]$ tar -zxvf apache-flume1.8.0-bin.tar.gz -C /opt/module/

改名:
[itstar@bigdata113 conf]$ mv flume-env.sh.template flume-env.sh

在这里插入图片描述

flume-env.sh涉及修改项:
export JAVA_HOME=/opt/module/jdk1.8.0_144

在这里插入图片描述
配置环境变量并 source
在这里插入图片描述
在这里插入图片描述

Linux中文件上传命令是:rz,下载:sz 如果没有这两个命令使用yum下载:yum install lrzsz
在这里插入图片描述

五、案例一

监控数据端口
目标:Flume监控一端Console,另一端Console发送消息,使被监控端实时显示。
示意图

在这里插入图片描述
分步实现:
1.安装 telnet 工具
【联网状态】yum -y install telnet

在这里插入图片描述
【安装完毕】

2.创建 Flume Agent 配置文件 flume-telnet.conf

复制代码
    #定义Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #定义netcatsource netcat端口源
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = bigdata111
    a1.sources.r1.port = 44445
    
    # 定义sink (logger 日志)
    a1.sinks.k1.type = logger
    
    # 定义channel (memory 内存)
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100                  
    
    # 双向链接
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

**判断44445端口是否被占用

复制代码
    $ netstat -tunlp | grep 44445

启动flume配置文件

复制代码
    /opt/module/flume-1.8.0/bin/flume-ng agent \
    --conf /opt/module/flume1.8.0/conf/ \
    --name a1 \
    --conf-file /opt/module/flume-1.8.0/myconf/flume-telnet.conf \
    -Dflume.root.logger==INFO,console

在这里插入图片描述
另一种比较简便的代码格式

复制代码
    bin/flume-ng agent --conf conf/ --name a1 --conf-file myconf/flume-telnet.conf -Dflume.root.logger==INFO,console

使用telnet工具向本机的44444端口发送内容

复制代码
    $ telnet bigdata111 44445

在这里插入图片描述
在这里插入图片描述

六、案例二:实时读取本地文件到HDFS

示意图
在这里插入图片描述
1.创建flume-hdfs.conf文件

复制代码
    	# 1 agent
    	a2.sources = r2
    	a2.sinks = k2
    	a2.channels = c2
    	
    	# 2 source (exec 源  tail -F 监控)
    	a2.sources.r2.type = exec
    	a2.sources.r2.command = tail -F /opt/wind
    	a2.sources.r2.shell = /bin/bash -c
    	
    	# 3 sink (/%Y%m%d/%H 时间)
    	a2.sinks.k2.type = hdfs
    	a2.sinks.k2.hdfs.path = hdfs://bigdata111:9000/flume/%Y%m%d/%H
    	#上传文件的前缀
    	a2.sinks.k2.hdfs.filePrefix = logs-
    	#是否按照时间滚动文件夹
    	a2.sinks.k2.hdfs.round = true
    	#多少时间单位创建一个新的文件夹
    	a2.sinks.k2.hdfs.roundValue = 1
    	#重新定义时间单位
    	a2.sinks.k2.hdfs.roundUnit = hour
    	#是否使用本地时间戳
    	a2.sinks.k2.hdfs.useLocalTimeStamp = true
    	#积攒多少个Event才flush到HDFS一次  (capacity > transactionCapacity > batchSize)
    	a2.sinks.k2.hdfs.batchSize = 1000
    	#设置文件类型,可支持压缩 (DataStream 流输出)
    	a2.sinks.k2.hdfs.fileType = DataStream
    	#多久生成一个新的文件
    	a2.sinks.k2.hdfs.rollInterval = 600
    	#设置每个文件的滚动大小
    	a2.sinks.k2.hdfs.rollSize = 134217700
    	#文件的滚动与Event数量无关
    	a2.sinks.k2.hdfs.rollCount = 0
    	#最小副本数
    	a2.sinks.k2.hdfs.minBlockReplicas = 1
    	
    	# Use a channel which buffers events in memory
    	a2.channels.c2.type = memory
    	a2.channels.c2.capacity = 1000
    	a2.channels.c2.transactionCapacity = 1000
    	
    	# Bind the source and sink to the channel
    	a2.sources.r2.channels = c2
    	a2.sinks.k2.channel = c2

2.执行监控配置

复制代码
    /opt/module/flume1.8.0/bin/flume-ng agent \
    --conf /opt/module/flume1.8.0/conf/ \
    --name a2 \
    --conf-file /opt/module/flume1.8.0/jobconf/flume-hdfs.conf

七、案例三:实时读取目录文件到HDFS

目标:使用 flume 监听整个目录的文件
示意图
在这里插入图片描述
分步实现:
创建配置文件 flume-dir.conf

复制代码
    #1 Agent
    a3.sources = r3
    a3.sinks = k3
    a3.channels = c3
    
    #2 source
    #监控目录的类型
    a3.sources.r3.type = spooldir
    #监控目录的路径
    a3.sources.r3.spoolDir = /opt/log
    #哪个文件上传hdfs,然后给这个文件添加一个后缀
    a3.sources.r3.fileSuffix = .COMPLETED
    a3.sources.r3.fileHeader = true
    #忽略所有以.tmp结尾的文件,不上传(可选)
    a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
    
    # 3 sink
    a3.sinks.k3.type = hdfs
    a3.sinks.k3.hdfs.path = hdfs://bigdata111:9000/flume/%H
    #上传文件的前缀
    a3.sinks.k3.hdfs.filePrefix = upload-
    #是否按照时间滚动文件夹
    a3.sinks.k3.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a3.sinks.k3.hdfs.roundValue = 1
    #重新定义时间单位
    a3.sinks.k3.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a3.sinks.k3.hdfs.useLocalTimeStamp = true
    #积攒多少个Event才flush到HDFS一次
    a3.sinks.k3.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a3.sinks.k3.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a3.sinks.k3.hdfs.rollInterval = 600
    #设置每个文件的滚动大小大概是128M
    a3.sinks.k3.hdfs.rollSize = 134217700
    #文件的滚动与Event数量无关
    a3.sinks.k3.hdfs.rollCount = 0
    #最小副本数
    a3.sinks.k3.hdfs.minBlockReplicas = 1
    
    # Use a channel which buffers events in memory
    a3.channels.c3.type = memory
    a3.channels.c3.capacity = 1000
    a3.channels.c3.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3

执行测试:执行如下脚本后,请向upload文件夹中添加文件试试

复制代码
    /opt/module/flume1.8.0/bin/flume-ng agent \
    --conf /opt/module/flume1.8.0/conf/ \
    --name a3 \
    --conf-file /opt/module/flume1.8.0/jobconf/flume-dir.conf

尖叫提示: 在使用Spooling Directory Source时
(1) 不要在监控目录中创建并持续修改文件
(2) 上传完成的文件会以.COMPLETED结尾
(3) 被监控文件夹每500毫秒扫描一次文件变动

全部评论 (0)

还没有任何评论哟~