Advertisement

flume采集案例

阅读量:

1、采集目录到HDFS

数据采集需求:在Hadoop集群中的某个预设采集目录下会定期生成大量新文件,在实时监控机制下检测到新文件生成时立即启动异步数据传输协议将数据传递至HDFS存储系统。

根据需求,首先定义以下3大要素

  1. 采集源即source用于获取数据流:监控目录的路径为spooldir
  2. 下沉的目标即sink配置在HDFS系统的地址空间:其地址空间配置为hdfs sink配置项
  3. source与sink之间的传输通道称为channel;其中可用的方式包括file channel 和内存channel

配置文件编写:

#定义三大组件的名称

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

配置source组件

agent1.sources.source1.type = spooldir

agent1.sources.source1.spoolDir = /home/hadoop/logs/

agent1.sources.source1.fileHeader = false

#配置拦截器

agent1.sources.source1.interceptors = i1

agent1.sources.source1.interceptors.i1.type = host

agent1.sources.source1.interceptors.i1.hostHeader = hostname

配置sink组件

agent1.sinks.sink1.type = hdfs

agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M

agent1.sinks.sink1.hdfs.filePrefix = access_log

agent1.sinks.sink1.hdfs.maxOpenFiles = 5000

agent1.sinks.sink1.hdfs.batchSize= 100

agent1.sinks.sink1.hdfs.fileType = DataStream

agent1.sinks.sink1.hdfs.writeFormat =Text

agent1.sinks.sink1.hdfs.rollSize = 102400

agent1.sinks.sink1.hdfs.rollCount = 1000000

agent1.sinks.sink1.hdfs.rollInterval = 60

#agent1.sinks.sink1.hdfs.round = true

#agent1.sinks.sink1.hdfs.roundValue = 10

#agent1.sinks.sink1.hdfs.roundUnit = minute

agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.keep-alive = 120

agent1.channels.channel1.capacity = 500000

agent1.channels.channel1.transactionCapacity = 600

Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

Channel参数解释:

capacity:默认该通道中最大的可以存储的event数量

transactionCapacity:每次最大的事件数目是从source接收并发送到sink的数量

keep-alive:event添加到通道中或者移出的允许时间

2、采集文件到HDFS

数据采集需求:业务系统利用log4j生成日志文件时产生的日志数据持续增长,并需实现实时同步至hdfs存储服务。

根据需求,首先定义以下3大要素

数据源(即source)用于实时监控文件内容的变化:通过执行命令'tail -F file'来实现监控。
接收端口(即sink)配置于Hadoop分布式文件系统中的接收端口:hdfs sink。
传输通道方面:既可以使用文件传输通道(如基于磁盘的传输),也可以采用内存传输通道(如基于缓存的快速传输)。

配置文件编写:

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

Describe/configure tail -F source1

agent1.sources.source1.type = exec

agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log

agent1.sources.source1.channels = channel1

#configure host for source

agent1.sources.source1.interceptors = i1

agent1.sources.source1.interceptors.i1.type = host

agent1.sources.source1.interceptors.i1.hostHeader = hostname

Describe sink1

agent1.sinks.sink1.type = hdfs

#a1.sinks.k1.channel = c1

agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M

agent1.sinks.sink1.hdfs.filePrefix = access_log

agent1.sinks.sink1.hdfs.maxOpenFiles = 5000

agent1.sinks.sink1.hdfs.batchSize= 100

agent1.sinks.sink1.hdfs.fileType = DataStream

agent1.sinks.sink1.hdfs.writeFormat =Text

agent1.sinks.sink1.hdfs.rollSize = 102400

agent1.sinks.sink1.hdfs.rollCount = 1000000

agent1.sinks.sink1.hdfs.rollInterval = 60

agent1.sinks.sink1.hdfs.round = true

agent1.sinks.sink1.hdfs.roundValue = 10

agent1.sinks.sink1.hdfs.roundUnit = minute

agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.keep-alive = 120

agent1.channels.channel1.capacity = 500000

agent1.channels.channel1.transactionCapacity = 600

Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

更多source和sink组件:

Flume支持众多的source和sink类型,详细手册可参考官方文档

此页面内容提供详细的指导文档

全部评论 (0)

还没有任何评论哟~