flume采集案例
1、采集目录到HDFS
数据采集需求:在Hadoop集群中的某个预设采集目录下会定期生成大量新文件,在实时监控机制下检测到新文件生成时立即启动异步数据传输协议将数据传递至HDFS存储系统。
根据需求,首先定义以下3大要素
- 采集源即source用于获取数据流:监控目录的路径为spooldir
- 下沉的目标即sink配置在HDFS系统的地址空间:其地址空间配置为hdfs sink配置项
- source与sink之间的传输通道称为channel;其中可用的方式包括file channel 和内存channel
配置文件编写:
#定义三大组件的名称
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
配置source组件
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /home/hadoop/logs/
agent1.sources.source1.fileHeader = false
#配置拦截器
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
配置sink组件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
Channel参数解释:
capacity:默认该通道中最大的可以存储的event数量
transactionCapacity:每次最大的事件数目是从source接收并发送到sink的数量
keep-alive:event添加到通道中或者移出的允许时间
2、采集文件到HDFS
数据采集需求:业务系统利用log4j生成日志文件时产生的日志数据持续增长,并需实现实时同步至hdfs存储服务。
根据需求,首先定义以下3大要素
数据源(即source)用于实时监控文件内容的变化:通过执行命令'tail -F file'来实现监控。
接收端口(即sink)配置于Hadoop分布式文件系统中的接收端口:hdfs sink。
传输通道方面:既可以使用文件传输通道(如基于磁盘的传输),也可以采用内存传输通道(如基于缓存的快速传输)。
配置文件编写:
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log
agent1.sources.source1.channels = channel1
#configure host for source
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
Describe sink1
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
更多source和sink组件:
Flume支持众多的source和sink类型,详细手册可参考官方文档
此页面内容提供详细的指导文档
