【Flume】Flume 简单理解及使用实例
一、Flume简介
flune 是一个由 cloudera 提供的 高可用性和高可靠性相结合 的 分布式 海量日志 收集与传输 系统。 其原始名称为 flune og(original generation)。 然而,在功能扩展后发现其代码体系庞大、核心组件设计不够合理以及配置不够规范等问题逐渐显现。 在 特别是在 最后一个发行版本 0.94.0 中 出现 日志传输 不稳定 的问题。 为了 解决这些问题 在 2011 年 10 月 22 日 cloudera 完成了一个名为 flune-728 的里程碑式 修改 对 flune 进行了彻底的重构。 构建后的版本 统称为 flune ng(next generation)。 改动的原因有两个方面;一是将 fl une 吸入 apache 营业部后;二是 clouder a-fl une 被重命名为 apache-fl une。
二、Flume OG 到 Flume NG
FLUM OG:
- FLUM OG 包含三类功能角色的节点:代理型(agent)、信息聚合型(collector)与核心管理型(master)。
- agent 负责从多个数据源捕获日志信息,并经 collector 整合后存入 HDFS存储系统。随后 master 对 agent 的运行情况进行监控管理,并指挥 collector 进行持续的数据汇总工作。
- agent 与 collector 统称为 node,在配置参数的不同设置下可划分为逻辑型(logical node)或物理型(physical node)两类。
- agent 及 collector 均由接收源与发送端构成。其中 agent 主要负责接收外部输入并通过发送端传递出去的数据。
窃取网上一张图用以说明:

Flume NG去除了对Master和Zookeeper的集中配置管理,并成为一个专注于传输功能的核心组件。该系统通过独立的工作线程进行数据读取与写入操作(统称为Runner)。在Flume OG架构中,读取操作也会执行相应的写入任务(除故障重试外),当写入速度较慢但并未完全失败时(并非完全停止),该机制会暂时阻塞接收端的数据处理能力。这种异步机制使得读取操作能够独立运行而不受后续处理环节的影响。
FLUME NG:
- NG 仅具有单一角色的代理节点(agent)。
- 核心组件未包含 collector 和 master 节点,这是其最本质的变化。
- 去除了物理节点(physical nodes)和逻辑节点(logical nodes)的概念及相关内容。
- 作为结果,在 agent 节点的组成结构上也进行了调整。Flume NG 中的 agent 由源节点(source)、汇节点(sink)以及通道(Channel)构成。
flume ng 节点组成图:

多 Agent 并联下的架构图:

三、Flume 的特性
flume 具备在日志系统中定制不同数据源的能力,并且能够收集和汇总相关信息;同时能够对收集到的数据进行初步整理和处理,并输出至多种目标平台(如文本文件、HDFS存储、HBase数据库等)的功能模块
flume的数据流贯穿始终的是一个不断流动的Event序列。每个Event都是Flume系统处理的核心数据单元,在其生命周期中包含了日志数据(采用字节数组的形式)以及附加的信息字段。这些Event由Flume外部的Agent生成,并在被捕获后 undergoes特定格式化的处理步骤之后被发送到相应的Channel中传输出去(单个或多个)。为了方便管理这些Event流,在接收端可以将Channel视为一个暂存区域,在此期间所有的Event都将被暂时存储直至Sink处理完毕。
Sink 负责持久化日志或者把事件推向另一个 Source。
flume 具备高可靠性:
当节点出现问题时,日志能够被成功地传输至其他节点而不发生丢失。Flume系统包含三种级别的可靠性保障措施,依次分为最高级别、中等级别和最低级别。
- end-to-end策略下,在处理事件时首先会将event记录到磁盘上。
- 在发生故障时存储本地并恢复传输:这也是cribe所采用的策略。
- best-effort机制下,在完成发送后不会进行确认。
四、Flume 架构组成和核心概念
- 生产者节点(Client):负责生成数据并运行在独立线程上的生产者节点。
- 事件(Event):负责生成事件或数据包的对象或资源。这些事件或数据包可以是日志记录、AVRO对象等格式的数据包;如果是基于文本文件,则通常表示为单个记录。
- 传输流程(Flow):指Event从源点到目的地转移过程的抽象表示。
- Agent:指Flume的核心组件。该核心组件将Event进行处理和传输,并由源端(Source)、传输通道(Channel)和汇端(Sink)三个部分构成。
agent 由 source, channel, sink 等构建而成:
4.1 Source:从 Client 收集数据,传递给 Channel
单一的来源模块能够支持多种数据格式;例如通过监视外部源–目录池(spooling directory)的形式实时追踪指定文件夹中新增文件的变化情况;只要一旦目录中出现新的文件就会立即获取其内容;而该组件具备处理不同日志数据格式的能力;例如:avro Sources, thrift Sources, exec, JMS, spooling directory, netcat, sequence generator, syslog, HTTP, legacy以及自定义形式等。
Source类型 说明
Avro Source 支持Avro协议(实际上是Avro RPC),内置支持|
Thrift Source 支持Thrift协议,内置支持
Exec Source 基于Unix的command在标准输出上生产数据
JMS Source 从JMS系统(消息、主题)中读取数据
Spooling Directory Source 监控指定目录内数据变更
Twitter 1% firehose Source 通过API持续下载Twitter数据,试验性质
Netcat Source 监控某个端口,将流经端口的每一个文本行数据作为Event输入
Sequence Generator Source 序列生成器数据源,生产序列数据
Syslog Sources 读取syslog数据,产生Event,支持UDP和TCP两种协议
HTTP Source 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式
Legacy Sources 兼容老的Flume OG中Source(0.9.x版本)
代码解读
详细参考官网:http://flume.apache.org/FlumeUserGuide.html#flume-sources
4.2、Channel:连接 sources 和 sinks
类似于一个队列结构的缓存池系统中,该组件能够接收来自源端的输出数据,直至目标端设备消费完毕为止,该组件中的数据将在以下两种情况下被移除:要么传递到下一个处理环节,要么被目标端设备消费完毕;当目标端设备因连接错误导致写入操作失败时,该组件会通过自动重启机制将系统带回活跃状态,从而避免数据丢失;在组件处于休眠状态时,仍能够持续提供服务;此外,该组件还支持将临时存储的数据分配到不同的存储位置,包括但不限于memory通道、jdbc通道、file通道以及自定义化的专用通道等
Channel类型 说明
Memory Channel Event数据存储在内存中
JDBC Channel Event数据存储在持久化存储中,当前Flume Channel内置支持Derby
File Channel Event数据存储在磁盘文件中
Spillable Memory Channel Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件
Pseudo Transaction Channel 测试用途
Custom Channel 自定义Channel实现
代码解读
详细参考官网:这里
4.3、Sink:从Channel收集数据,运行在一个独立线程
用于将数据发送至目的组件包含 hdfs、logger、avro 和 thrift 等 IPC 标准协议接口组件;同时还包括 ipc 和 file 格式的本地传输组件;此外还有 null 类型字段(如 null 指南针)以及 hbase 和 solr 等自定义的组件。
详细参考官网:http://flume.apache.org/FlumeUserGuide.html#flume-sinks
flume可以支持:
- 多级 flume 系统能够相互衔接(即一个 flume 将数据传递给下一个 flume 继续处理)。
- 该系统具备扇入功能(即 source 能够接收并整合来自多处的数据流)。
- 同时该系统也具备扇出功能(即 sink 能够将处理后的数据扩散至多个目的地)。
五、Flume 使用实例
模拟使用 Flume 监听日志变化,并且把增量的日志文件写入到 hdfs 中。
根据需求,首先定义一下3大要素:
- Source:持续更新日志文件内容;执行命令为
Exec Source(tail -f “file”) - Sink:基于HDFS的文件存储系统;操作为"hdfs sink"
- Channel:数据传输通道;支持类型包括"file channel" 和 "Memory channel"
5.1、编写配置文件
确定好了 Source/Sink/Channel 之后,开始编写配置文件。
gedit exec_tail.conf
代码解读
内容如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
## exec表示flume调用给的命令,然后从给的命令的结果中去拿数据
a1.sources.r1.type = exec
## 使用tail这个命令来读数据
a1.sources.r1.command = tail -F /home/hadoop/testdir/flumedata/test.log
a1.sources.r1.channels = c1
# Describe the sink
## 表示下沉到hdfs,类型决定了下面的参数
a1.sinks.k1.type = hdfs
## sinks.k1只能连接一个channel,source可以配置多个
a1.sinks.k1.channel = c1
## 下面的配置告诉用hdfs去写文件的时候写到什么位置,下面的表示不是写死的,而是可以动态的变化的。表示输出的目录名称是可变的
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/tailout/%y-%m-%d/%H%M/
##表示最后的文件的前缀
a1.sinks.k1.hdfs.filePrefix = events-
## 表示到了需要触发的时间时,是否要更新文件夹,true:表示要
a1.sinks.k1.hdfs.round = true
## 表示每隔1分钟改变一次
a1.sinks.k1.hdfs.roundValue = 1
## 切换文件的时候的时间单位是分钟
a1.sinks.k1.hdfs.roundUnit = minute
## 表示只要过了3秒钟,就切换生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 3
## 如果记录的文件大于20字节时切换一次
a1.sinks.k1.hdfs.rollSize = 20
## 当写了5个事件时触发
a1.sinks.k1.hdfs.rollCount = 5
## 收到了多少条消息往dfs中追加内容
a1.sinks.k1.hdfs.batchSize = 10
## 使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream:为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
##使用内存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
代码解读
5.2、启动 flume 监控
cd /opt/flume-1.8.0
bin/flume-ng agent -c conf -f conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console
代码解读
命令参数解释:
agent:启动一个 Flume 代理--conf / -c <conf>:配置文件设置于何处--conf-file / -f <file>:配置文件必须位于指定目录--name / -n <name>: 设置代理名称
5.3、模拟日志
hadoop@master:/opt/flume-1.8.0$ cd /home/hadoop/testdir/flumedata/
hadoop@master:~/testdir/flumedata$ while true
> do
> date >> test.log
> sleep 2
> done
代码解读
查看日志变化:
hadoop@master:~/testdir/flumedata$ tail -f test.log
Mon Aug 20 20:09:31 PDT 2018
Mon Aug 20 20:09:33 PDT 2018
Mon Aug 20 20:09:35 PDT 2018
Mon Aug 20 20:09:37 PDT 2018
Mon Aug 20 20:09:39 PDT 2018
Mon Aug 20 20:09:41 PDT 2018
Mon Aug 20 20:09:43 PDT 2018
Mon Aug 20 20:09:45 PDT 2018
代码解读
通过 tail 命令,可以看到test.log在不停的追加数据。
到hdfs中进行查看,效果如下:

也可以到页面查看:

