flume的安装及使用及各种案例
1.下载flume-1.6.0(apache-flume-1.6.0-bin.tar.gz)
http://flume.apache.org/download.html
2.上传到集群(本文上传到hadoop04节点)
3.解压安装包
tar -zxvf apache-flume-1.6.0-bin.tar.gz -C apps/
并且修改文件夹名字
mv apache-flume-1.6.0-bin flume-1.6.0
4.进入conf文件夹修改配置信息
cd /home/hadoop/apps/flume-1.6.0/conf
5.修改配置文件
[hadoop@hadoop04 conf]$ cp flume-env.sh.template flume-env.sh
vim flume-env.sh

6.配置环境变量
vim .bashrc
增加下面的内容
export FLUME_HOME=/home/hadoop/apps/flume-1.6.0/bin
export PATH=PATH:FLUME_HOME/bin
source .bashrc
7.测试是否成功
[hadoop@hadoop04 ~]$ flume-ng version

8.flume的使用
官方帮助文档 :点击打开链接
Source:读数据
Avro Source:
watching an Avro port and catching events from an external Avro client to receive stream data(watching a specific port and capturing data on that port), in Linux you can use the nc command(to send traffic to a specific port, similar to how you would ping a domain name like baidu.com). The nc command allows you to send data packets to a specified port, functioning akin to sending probes akin to the ping command used for testing network connectivity.
Spooling Directory Source: (日志常用)
在存储设备上设置用于监控特定数据流向的目标专有Spooling目录,并通过该系统能够追踪并复制指向特定新文件的事件。特别地,请注意所选 directory 必须是专门没有子存储空间的设计目标——一旦该 directory 被系统识别并开始监控后,则会执行以下操作:每当往该 directory 下写入 files 时(即从 source 系统传入),这些 files 将会被 Spooling DirectorySource 读取其内容,并在完成处理后自动添加 .COMPLETED 后缀。
Exec Source:(日志常用)
在指定的Unix/Linux命令下运行,在该command被执行时,
执行Source会去读取该command处理的数据输入(例如 cat 或 tail 这类command)。
其本质上相当于利用Linux内置的一个command来获取数据。
从而可以通过Exec Source工具来捕获这些信息。
这也相当于生成日志记录。
tail-command 也用于捕获log information about the data flow。
获取最后多少行,获取最新的新增加的)
Exec Source 和Spool Source 比较
ExecSource能够实现实时日志数据的采集功能;然而在Flume服务不可用或操作指令执行出现问题的情况下,则会导致未能获取完整的日志记录;从而就无法验证这些日志数据的真实性与完整性
SpoolSource不具备即时数据采集功能,在处理数据时必须采用按分钟粒度划分文件存储策略,并因此能够实现接近实时的数据处理能力。
当应用无法实现按分钟粒度切割日志文件时,则可采取以下两种收集策略:一种是基于时间戳的记录方法;另一种则是采用滚动窗口采样的技术。
Channel:消息队列,缓存
Memory Channel:
将内存设为我们缓存的核心资源,并将所有事件数据存储于内存队列中。通过设置最大内存容量来优化系统性能。然而,在这种设计下虽然能够实现快速的数据吞吐能力的同时也存在一定的局限性即无法确保系统运行中的数据完整性与一致性。
JDBC Channel:
Kafaka Channel:
通道也可以用作文件存储以保证数据完整性与一致性。在配置FileChannel时应确保其目录与程序日志文件所在目录位于不同磁盘上这有助于提高运行效率
Spillable Memory Channel:可分割内存的Channel
MemoryRecoverChannel根据官方文档的指导应当采用FileChannel来进行替代
Sink:通过相应的存储文件系统、数据库以及上传至远程服务器完成数据处理。
该系统通过将事件记录至HDFS上实现管理功能。它具备生成新的文本文件、序列化数据以及采用压缩格式存储的特点。此外还具备回滚功能。其主要优势在于可以直接从HDFS中的分区导入至Hive系统
Hive Sink:
Logger Sink:将日志事件在信息的级别去显示(可以显示到控制台)
Avro Sink:写到指定的端口去写数据
Thrift Sink:
flume部署种类:
单一代理流程:

多代理流程:

流合并:(多代理的一种)

多路复用:从同一个地方收集信息后被分散记录到不同的位置上。(其核心特征在于接收的信息内容完全一致)

注意:一个sink只能指定一个channel
下面是模板:
#文件名后缀是.properties
#agent的名称可以自定义 a1 ,sources、sinks、channels的名称也可以自定义
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#为source指定他的channel
a1.sources.s1.channels = c1
#为sink指定他的channel
a1.sinks.k1.channel = c1
流配置:
单一代理流配置:
Spooling Directory Source,Memory Channel,Logger Sink的案例:
#监控指定的目录,如果有新文件产生,那么将文件的内容显示到控制台(Spooling Directory Source,Memory Channel,Logger Sink)
#运行命令:flume-ng agent --conf conf --conf-file case_single.properties --name a1 -Dflume.hadoop.logger=INFO,console
#运行命令解释:flume-ng agent --conf conf --conf-file HDFS上的配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中
#agent的名称可以自定义 a1 ,sources、sinks、channels的名称也可以自定义
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置source
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /home/hadoop/flumetest
#配置channel
a1.channels.c1.type = memory
#配置sink
a1.sinks.k1.type = logger
#为source指定他的channel
a1.sources.s1.channels = c1
#为sink指定他的channel
a1.sinks.k1.channel = c1
将上面的代码复制至case_single.properties文本文件中,并随后将其上传至hadoop04的根目录下;接着,在Hadoop用户目录下创建名为flumetest的文件夹(使用mkdir命令)
在[hadoop@hadoop04 ~]$ 执行命令 flume-ng agent,并设置其运行参数如下:首先指定配置文件名 'case_single.properties' 并加载 '-Dflume.hadoop.logger=INFO,console'。然后指定名称为 'a1'。这些设置将确保日志记录的详细性。

然后在Hadoop-04节点上创建一个新的配置界面;通过vim工具打开并输入'hello flume'字符串后完成存储。按照以下步骤操作:执行相应的命令以完成任务。
[hadoop@hadoop04 ~]$ mv t flumetest/
此时hadoop04的flume窗口会自动出现下图所示样子:

到此测试成功!
NetCat TCP Source,Memory Channel,Logger Sink的案例
#通过NetCat TCP Source读取指定端口的输入数据到控制台显示(NetCat TCP Source,Memory Channel,Logger Sink)
#往端口输入数据 可以通过nc命令,telnet命令也可以去发送数据的
#运行命令:flume-ng agent --conf conf --conf-file case_tcp.properties --name a1 -Dflume.hadoop.logger=INFO,console
#运行命令解释:flume-ng agent --conf conf --conf-file 配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置source
a1.sources.s1.type = netcat
a1.sources.s1.bind = 192.168.123.104
a1.sources.s1.port = 55555
#配置channel
a1.channels.c1.type = memory
#配置sink
a1.sinks.k1.type = logger
#为source指定他的channel
a1.sources.s1.channels = c1
#为sink指定他的channel
a1.sinks.k1.channel = c1
将上述代码复制至case_tcp.properties文本文件中,并将该文件从hadoop04节点转移至hadoop用户的根目录下。运行以下命令
[hadoop@hadoop04 ~]$ flume-ng agent --conf conf --conf-file case_tcp.properties --name a1 -Dflume.hadoop.logger=INFO,console

然后再复制hadoop04节点连接会话,输入telnet hadoop04 55555

[hadoop@hadoop04 ~]$ sudo yum install telnet

安装好telnet后使用下面命令发送数据:
[hadoop@hadoop04 ~]$ telnet hadoop04 55555

然后就可以输入数据了

另一端就能接收到:

Avro Source,Memory Channel,Logger Sink的案例
#通过Avro Source读取指定端口的输入数据到控制台显示(Avro Source,Memory Channel,Logger Sink)
#往端口输入数据 可以通过nc命令,telnet命令也可以去发送数据的
#运行命令:flume-ng agent --conf conf --conf-file case_avro.properties --name a1 -Dflume.hadoop.logger=INFO,console
#运行命令解释:flume-ng agent --conf conf --conf-file 配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置source
a1.sources.s1.type = avro
a1.sources.s1.bind = 192.168.123.104
a1.sources.s1.port = 55555
#配置channel
a1.channels.c1.type = memory
#配置sink
a1.sinks.k1.type = logger
#为source指定他的channel
a1.sources.s1.channels = c1
#为sink指定他的channel
a1.sinks.k1.channel = c1
将上述代码复制至case_avro.properties文本文件中;随后通过hadoop04节点将该文件传输至hadoop用户的根目录下;最后运行以下命令
在[hadoop@hadoop04 ~]目录下使用Flume-ng代理程序运行以下命令:flume-ng agent -Dflume.hadoop.logger=INFO,console -Djavaagent.name=a1 -Djavaagent.conf=file:case_avro.properties

然后再复制hadoop04节点连接会话,输入echo "hello avro" | nc hadoop04 55555

[hadoop@hadoop04 ~]$ sudo yum install nc

输入nc命令查看是否安装成功

再次输入
[hadoop@hadoop04 ~]$ echo "hello avro" | nc hadoop04 55555
先创建一个vim t.log 然后写qwert world
[hadoop@hadoop04 ~]$ flume-ng avro-client -c ./conf -H hadoop04 -p 55555 -F t.log

Exec Source,Memory Channel,hdfs-Sink的案例
准备:
模拟服务器上的业务系统持续不断地运行,在线定期记录日志信息并注入数据内容;最后通过Flume协议将数据传输至HDFS存储。

只要这个窗口在就不断的往里面写数据
注意这个窗口别关!!
复制一个新的窗口输入下面的命令:
[hadoop@hadoop04 tom]$ tail -F catalina.out

查看是不是正常状态,这个窗口可以结束。
正文:
首先启动整个集群,在HDFS下面建一个flumetest的文件夹
hadoop fs -mkdir /flumetest
然后新建一个case_hdfs.properties的文本文件复制下面的代码:
#使用Exec Source通过Linux命令去取数据,然后不断的写入HDFS中(Exec Source,Memory Channel,hdfs-Sink)
#运行命令:flume-ng agent --conf conf --conf-file case_hdfs.properties --name a1 -Dflume.hadoop.logger=INFO,console
#运行命令解释:flume-ng agent --conf conf --conf-file 配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置source
a1.sources.s1.type = exec
#tail -F 和-f的区别是 -F追踪的是文件名字。-f是追踪id。因为.out文件满了就变成了.out.1了,再会去重建一个.out文件
a1.sources.s1.command = tail -F /home/hadoop/tom/catalina.out
#配置channel
a1.channels.c1.type = memory
#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flumetest/%Y-%m-%d/%H%M
#设置目录回滚(首先根据时间创建个文件夹,写了一分钟后我们就需要重新建文件夹了,所以我们要指定回滚为true)
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.filePrefix = taobao
a1.sinks.k1.hdfs.fileSuffix = log
#设置文件的回滚(下面三个条件只要有一个满足都会进行回滚)
#每过多少秒回滚一次,所以一分钟内我们写6个文件(满足)
a1.sinks.k1.hdfs.rollInterval = 10
#文件多大的时候再回滚
a1.sinks.k1.hdfs.rollSize = 1024
#写了多少条就回滚(满足)
a1.sinks.k1.hdfs.rollCount = 10
#设置压缩格式为不压缩
a1.sinks.k1.hdfs.fileType = DataStream
#为source指定他的channel
a1.sources.s1.channels = c1
#为sink指定他的channel
a1.sinks.k1.channel = c1
然后上传到hadoop04的家目录下,执行命令:
[hadoop@hadoop04 ~]$ flume-ng agent --conf conf --conf-file case_hdfs.properties --name a1 -Dflume.hadoop.logger=INFO,console

flume将在执行写操作前的准备阶段持续不断地将数据存储到其指定的HDFS存储位置(即flumetest文件夹)中。
同时,在每个该存储位置下的目录中都包含六个数据项。



单代理多流配置
单一代理指代一个agent实体;系统中多路通信涉及多个源端口、传输通道及接收端口;特别需要注意的是单一代理的端口配置需确保唯一性
多代理流程

分别在hadoop04和hadoop05上跑代码
在hadoop04下面的代码:
#运行命令:flume-ng agent --conf conf --conf-file case_source.properties --name a1 -Dflume.hadoop.logger=INFO,console
#运行命令解释:flume-ng agent --conf conf --conf-file 配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中
a1.sources = s1
a1.sinks = k1
a1.channels = c1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 192.168.123.104
a1.sources.s1.port = 44455
a1.channels.c1.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.123.105
a1.sinks.k1.port = 44466
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
新建一个名为case_source.properties的文本文件
复制上面的代码并上传至hadoop04上面
在hadoop05上面执行下面的代码:
#运行命令:flume-ng agent --conf conf --conf-file case_sink.properties --name a1 -Dflume.hadoop.logger=INFO,console
#运行命令解释:flume-ng agent --conf conf --conf-file 配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中
a1.sources = s1
a1.sinks = k1
a1.channels = c1
a1.sources.s1.type = avro
a1.sources.s1.bind = 192.168.123.105
a1.sources.s1.port =44466
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
新建一个名为case_sink.properties的文本文件
复制上面的代码并上传至hadoop05上面
注意:先执行hadoop05上面的代码:
[hadoop@hadoop05 ~]$ flume-ng agent --conf conf --conf-file case_sink.properties --name a1 -Dflume.hadoop.logger=INFO,console

在执行hadoop04上面的代码:
[hadoop@hadoop04 ~]$ flume-ng agent --conf conf --conf-file case_source.properties --name a1 -Dflume.hadoop.logger=INFO,console

启动之后请回到hadoop05查看日志是不是增加了下图所示

现在开始写入数据:
复制hadoop04的连接并执行下面的命令:
telnet hadoop04 44455
然后输入hello

然后再hadoop05可以看到下图的消息(消息传输没想象中那么快):

多路复制流
目的:把hadoop04的数据一模一样的发送给hadoop05和hadoop03
把下面的代码上传到hadoop04上去:
新建 case_replicate.properties 文件:
#2个channel和2个sink的配置文件
a1.sources = s1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.s1.type = netcat
a1.sources.s1.port = 44455
a1.sources.s1.bind = 192.168.123.104
#默认就是replicating就是复制,multiplexing是复用
a1.sources.s1.selector.type = replicating
a1.sources.s1.channels = c1 c2
# Describe the sink
#发送给hadoop05
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 192.168.123.105
a1.sinks.k1.port = 44466
#发送给hadoop03
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = 192.168.123.103
a1.sinks.k2.port = 44466
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
把下面的代码上传到hadoop05上面去:
新建 case_replicate_s1.properties 文件
# Name the components on this agent
a1.sources = s1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.s1.type = avro
a1.sources.s1.channels = c1
a1.sources.s1.bind = 192.168.123.105
a1.sources.s1.port = 44466
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
把下面的代码放到hadoop03上面去:
新建 case_replicate_s2.properties 文件:
# Name the components on this agent
a1.sources = s1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.s1.type = avro
a1.sources.s1.channels = c1
a1.sources.s1.bind = 192.168.123.103
a1.sources.s1.port = 44466
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
然后执行
在hadoop05上执行:
[hadoop@hadoop05 ~]$ flume-ng agent --conf conf --conf-file case_replicate_s1.properties --name a1 -Dflume.hadoop.logger=INFO,console

在hadoop03上执行:
[hadoop@hadoop03 ~]$ flume-ng agent --conf conf --conf-file case_replicate_s2.properties --name a1 -Dflume.hadoop.logger=INFO,console

在hadoop04上执行:
[hadoop@hadoop04 ~]$ flume-ng agent --conf conf --conf-file case_replicate.properties --name a1 -Dflume.hadoop.logger=INFO,console

同时hadoop03增加显示:

同时hadoop05增加显示:

然后在hadoop04上写数据:

然后观察hadoop03和hadoop05都接受到下图所示同一数据:

多路复用:
在hadoop04上新建 case_multi_sink.properties 文件
#2个channel和2个sink的配置文件
a1.sources = s1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.s1.type = org.apache.source.http.HTTPSource
a1.sources.s1.port = 44455
a1.sources.s1.host = 192.168.123.104
#默认就是replicating就是复制,multiplexing是复用
a1.sources.s1.selector.type = multiplexing
a1.sources.s1.channels = c1 c2
#state就是发送信息的头部信息,是CZ开头往c1走,默认往c1走
a1.sources.s1.selector.header = state
a1.sources.s1.selector.mapping.CZ = c1
a1.sources.s1.selector.mapping.US = c2
a1.sources.s1.selector.default = c1
# Describe the sink
#发送给hadoop05
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 192.168.123.105
a1.sinks.k1.port = 44466
#发送给hadoop03
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = 192.168.123.103
a1.sinks.k2.port = 44466
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
在hadoop05上新建 case_multi_s1.properties 文件:
# Name the components on this agent
a1.sources = s1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.s1.type = avro
a1.sources.s1.channels = c1
a1.sources.s1.bind = 192.168.123.105
a1.sources.s1.port = 44466
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
在hadoop03上新建 case_multi_s2.properties 文件
# Name the components on this agent
a1.sources = s1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.s1.type = avro
a1.sources.s1.channels = c1
a1.sources.s1.bind = 192.168.123.103
a1.sources.s1.port = 44466
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
按顺序执行下面的命令:
[hadoop@hadoop03 ~]$ flume-ng agent --conf conf --conf-file case_multi_s2.properties --name a1 -Dflume.hadoop.logger=INFO,console
[hadoop@hadoop05 ~]$ flume-ng agent --conf conf --conf-file case_multi_s1.properties --name a1 -Dflume.hadoop.logger=INFO,console
在[hadoop@hadoop04 ~]目录下,在终端中执行以下命令:使用flume-ng代理并配置其运行参数。具体来说:执行--conf conf以指定默认配置文件;采用--conf-file case_multi_sink.properties以配置详细的参数;指定名称为a1;并设置日志级别为INFO且输出到控制台。
在hadoop04上发送数据
curl-X POST -d '[{ "headers" :{"state" :"CZ"},"body" : "TEST1"}]' http://localhost:44455
curl-X POST -d '[{ "headers" :{"state" :"US"},"body" : "TEST2"}]' http://localhost:44455
curl-X POST -d '[{ "headers" :{"state" :"cn"},"body" : "TEST2"}]' http://localhost:44455
