Flume初始篇之flume安装及简单测试
Flume安装及简单测试
一、Flume简介
flume 作为 cloudera 开发的一套实时日志收集系统,在行业内获得了广泛认可并得到了广泛应用。最初发布的版本被统一称为 flume og(original generation),由 cloudera 开发维护。然而随着 flume 功能的扩展,在 flume og 的最后一个发布版本 0.94.0 中日志传输稳定性问题逐渐显现出来。为了解决这些问题, cloudera 在 2011 年 10 月 22 日发布了 flume-728 改动:重构核心组件、核心配置以及代码架构,新发布版本则被统称为 flume ng(next generation)。此次改动不仅是为了优化架构,还有将 flume 吸入 apache 营业部并将 cloudera 的 flume 标称为 apache flume 的原因
1、Flume的特点
flume是一种分布式且稳定可靠的海量日志采集与分析平台。它通过提供灵活配置接口,在日志系统中实现对原始事件的数据抓取;同时具备高效的预处理功能,并通过多种输出接口将处理后的结果导出到文本文件或其他存储介质如HDFS和Hbase等位置的能力

数据流在Flume系统中贯穿始终的是事件(Event),它们作为核心数据单元承载着日志信息并包含元数据字段。这些日志数据采用字节数组形式存储,并伴随有元数据字段。这些Event由位于系统外围的Agent源(Source)生成,在捕获到特定事件后会被相应的源处理模块识别并提取关键信息。随后该源会将整理好的信息按照预设格式转换后发送至单个或多个目标通道(Channel)。将Channel视为一个临时存储区域,在此期间系统会等待接收方完成该事件处理流程。
在这种情况下,在接收端配置适当的收集器(Collector)即可实现有效的日志持久化功能或者将处理结果传递给其他来源继续处理。
2、Flume的可靠性
当节点发生故障时,在Flume系统中设计的机制能够确保日志得以传递到其他节点而不发生丢失问题。该系统提供三种层次的可靠性保障方案:从高到低依次为端到端机制(end-to-end)、存储于失败机制(Store on failure)以及最佳努力机制(Besteffort)。其中:
- 端到端机制通过将事件记录在磁盘上并完成传输后自动删除记录文件;若传输过程出现故障,则允许重传以解决不可预见的问题。
- 存储于失败机制则采用类似Scribe协议中的策略,在接收方发生故障的情况下立即保存数据文件,并待其恢复后再继续向其他节点传送。
- 最佳努力机制则假设一旦消息被发送出去就不会再次检查其是否已送达目标节点
3、Flume的可恢复性
仍然依赖传统的Channel机制。建议采用FileChannel作为存储方案,并利用其事件持久化特性(该机制的性能表现相对较差)。将事件存储在本地文件系统里。
4、Flume基本概念
| Agent | 一个Agent包含 source ,channel,sink 和其他组件。 |
|---|---|
| Source | Source 负责接收event或通过特殊机制产生event,并将events批量的放到一个或多个Channel |
| Channel | Channel位于Source和Sink之间,用于缓存进来的event |
| Sink | Sink负责将event传输到下一个source或最终目的地,成功后将event从channel移除 |
| Client | Client 是一个将原始log包装成events并且发送他们到一个或多个agent的实体,目的是从数据源系统中解耦Flume,在flume的拓扑结构中不是必须的。 |
| Events | Event是Flume数据传输的基本单元。可以是日志记录、 avro 对象等。 |
5、Flume三大组件
Flume主要由3个重要的组件购成:
源端接收并处理来自外部系统的日志数据流。将这些数据流根据其属性分类为transition和event类型后输入至channel中
Channel:主要功能是支持队列操作,并对源数据进行基本缓存。
Sink:获取Channel中的数据,并用于相应的存储介质、数据库或上传至远程服务器。
(1) Source

(2) Sink

(3)Channel

二、Flume的安装及测试
基于Linux操作系统,在虚拟化环境下采用的伪分布式集群架构下,并行部署了Hadoop 2.6.4版本的分布式计算框架以及Flume软件的版本号为1.6.0的流处理组件
1、网上下载flume安装包,并将安装包上传至安装目录(例:hadoop)
2、解压文件至指定目录(例:hadoo文件夹下)
tar -zxvf apache-flume-1.6.0-bin.tar.gz
注:为方便操作,解压后将文件改名为 flume-1.6.0
mv apache-flume-1.6.0-bin flume-1.6.0
3、配置环境变量
vi /etc/profile
export JAVA_HOME=/hadoop/jdk1.7.0_75
export HADOOP_HOME=/hadoop/hadoop-2.6.4
export FLUME_HOME=/hadoop/flume-1.6.0
export SQOOP_HOME=/hadoop/sqoop-1.4.4
export HIVE_HOME=/hadoop/hive-1.2.1
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$FLUME_HOME/bin:$HIVE_HOME/bin:$SQOOP_HOME/bin
保存退出后,刷新profile
source /etc/profile
4、验证
#查看版本
[root@hadoop bin]# ./flume-ng version
flume-ng flume-ng.cmd flume-ng.ps1<strong>
</strong>
出现以下信息,表示安装成功
Flume 1.6.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
From source with checksum b29e416802ce9ece3269d34233baf43f
三、测试:实时采集案例
1、创建测试文件夹,用以测试数据(例:/hadoop/testdata/flume)
2、创建配置文件(例:f2.conf)
vi f2.conf
#定义agent名, source、channel、sink的名称
f2.sources = r1
f2.channels = c1
f2.sinks = k1
#具体定义source
f2.sources.r1.type = exec
f2.sources.r1.command = tail -f /hadoop/testdata/flume/server.log #指定采集目录
#具体定义channel
f2.channels.c1.type = memory
f2.channels.c1.capacity = 1000
f2.channels.c1.transactionCapacity = 100
#具体定义sink
f2.sinks.k1.type = logger
#组装source、channel、sink
f2.sources.r1.channels = c1
f2.sinks.k1.channel = c1
执行agent:
flume-ng agent -n f2 -c conf -f /wishedu/testdata/flume/f2.conf -Dflume.root.logger=INFO,console

执行成功等待数据ing...
3、创建文本,采集数据(复制一个窗口)
vi server.log
添加信息

保存退出,查看数据,采集成功

