Flume入门到实践--Flume的安装与基础概念与安装实战
随着信息技术的发展, 高效管理海量日志数据对企业的信息挖掘与运营效率提升具有重要意义。Apache Flume 是专为实现高效日志处理而设计的重要工具之一。本文旨在全面展示 Apache Flume 在日志处理中的强大功能及其操作流程,并通过实际案例深入探讨其实现效果。
Flume简介
Apache Flume 作为一个分布式的、可靠且易于访问的服务系统,在集成并管理来自散源端的数据方面发挥着关键作用。该系统的主要目标是高效地整合并传输自不同来源的大规模日志数据至集中存储位置。其核心组件设计基于消息队列模型,并通过处理管道机制实现对实时数据流的快速响应与传递。
Flume 通常用作"水管"的类比来帮助理解其功能。其中一端连接着数据源(水源),另一端连接着数据存储(水桶)。它专为实时日志数据的采集和传输而设计。
Flume的核心组件
Agent :Flume的核心功能模块,在配置文件中可设置单个或多个Agent实例来实现系统的不同功能需求。
Source :接收数据的数据输入源,支持日志文件、网络接口等多种类型的数据流。
Channel :临时存储中间处理结果的数据缓冲区,在数据传输过程中起到中转作用。
Sink :数据最终归宿的目标存储系统或数据库位置(如HDFS/HBase等)。
Event :构成整个事件流的基本数据传输颗粒单位。
Flume的关键特性
- 稳定的数据传输: Flume通过可靠的机制保证所有数据在传输过程中的完整性与持续可用性。
- 长期保存的能力: 该系统依赖通道技术实现文件的持久化存储,在任何故障情况下都不会导致信息丢失。
- 高度可配置性: Flume能够根据不同的数据来源、传输路径以及接收设备的需求灵活配置工作流程。
- 应对大数据挑战的能力: 该系统设计时就考虑了随着业务规模扩大而自动调整资源分配的能力,并能有效处理海量数据流。
安装和配置
通过网盘分享的文件
该资源的下载路径为apache-flume-1.9.0-bin.tar.gz
安装Flume
Flume的安装相对简单,以Flume 1.9.0版本为例
下载并解压
把安装包拉到/opt/modules目录下
进入/opt/modules目录下
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/installs/
bash
进入/opt/installs目录下
mv apache-flume-1.9.0-bin/ flume
bash
配置环境变量
进入/etc/profile配置环境变量
export FLUME_HOME=/opt/installs/flume
export PATH=$PATH:$FLUME_HOME/bin
bash
使用命令配置环境变量
echo 'export FLUME_HOME=/opt/installs/flume' >> /etc/profile
echo 'export PATH=$PATH:$FLUME_HOME/bin' >> /etc/profile
bash

刷新环境变量
source /etc/profile
bash

修改配置文件
在/opt/installs/flume/conf路径下复制并修改flume-env.sh文件。
cp flume-env.sh.template flume-env.sh
bash

修改 JAVA_HOME 的路径为自己的 jdk 路径。
export JAVA_HOME=/opt/installs/jdk
Flume的数据模型
Flume支持单一数据流和多数据流模型,可以根据实际需求灵活配置。
单一数据流模型
单一数据流模型包含一个Agent,适用于简单的日志收集场景。

多数据流模型
多个 Agent 可以参与的数据流模型能够集成复杂的系统组件,并协调 Agent 间的高效数据流动与处理




Flume的使用
Flume的使用显著地基于配置文件;通过配置Source、Channel和Sink组件及其关联关系,使数据得以流动。
编写 conf文件
flume 的使用是编写 conf文件的,的时候指定该文件
# 定义组件的名字
<Agent>.sources = <Source>
a1.sources=s1
<Agent>.channels = <Channel1> <Channel2>
a1.channels=c1
<Agent>.sinks = <Sink>
a1.sinks=sink1
# 设置source 和 channel 之间的关系
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
a1.sources.s1.channels=c1
# 设置sink 和 channel 之间的关系
<Agent>.sinks.<Sink>.channel = <Channel1>
a1.sinks.sink1.channel=c1
先定义agent的名字,再定义agent中三大组件的名字
接着定义各个组件之间的关联关系
bash

# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.channels = mem-channel-1
agent_foo.sinks = hdfs-sink-1
# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
bash

常见的组件
参考链接:Flume 1.9用户手册中文版(据称是目前翻译最详尽的修订版)
常见的Source

常见的channel

常见的sink

Kafka系统能够扮演Flume中的各个角色。三个关键组件任意两个结合使用时效果最佳。
案例展示
1.)Avro+Memory+Logger(Avro + 内存 + 日志)
该配置旨在用于演示目的。该程序能够监控指定端口的活动,并从内存中捕获数据;随后将这些数据输出至控制台界面进行显示。

先找source 中的avro看需要设置什么参数

#编写s1的类型是什么
a1.sources.s1.type = avro
a1.sources.s1.bind = 192.168.32.128
a1.sources.s1.port = 4141
a1.sources.s1.channels = c1
bash
找到channel中的memory类型,再设置一下
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
#source 或者 sink 每个事务中存取 Event 的操作数量
a1.channels.c1.transactionCapacity = 10000
bash
接着查找sink,sink的类型是logger
a1.sinks.s2.channel = c1
a1.sinks.s2.type = logger
bash
最终合并起来的文件就是:
配置
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

在flume文件夹中创建一个名为myconf的子目录,并用于存储我们准备好的所有配置信息。
随后,在执行操作时生成avro-memory-log.conf。
最后,在指定位置将配置信息进行复制。
启动命令
flume-ng agent -c ../ -f avro-memory-log.conf -n a1 -Dflume.root.logger=INFO,console
bash
-c 后面跟上 配置文件的路径
-f 跟上自己编写的conf文件
-n agent的名字
-Dflume.root.logger=INFO,console INFO 日志输出级别 Debug,INFO,warn,error 等接着向端口中发送数据:
> flume-ng avro-client -c /opt/installs/flume/conf/ -H bigdata01 -p 4141 -F /home/hivedata/arr1.txt
>
> bash
给avro发消息,使用avro-client
Flume没有终止的时间限制,在其配置好的端口上持续监听连接。当收到一条新消息时会立即处理该连接;如果没有新消息时会等待下一条可能的消息到达,并且不会自行终止。

如果想停止,可以使用ctrl + c 终止flume。
2)Exec + Memory + HDFS(执行命令 + 内存 + HDFS)
此配置用于持续监控日志文件,并将它们存储在HDFS中。
配置
以下版本演示的是没有时间语义的案例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hivedata/arr1.txt
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/event/
bash

接着我们演示hdfs文件中含有时间转义字符怎么办?
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/c.txt
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
# round 用于控制含有时间转义符的文件夹的生成规则
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地时间,否则会报时间戳是null的异常
a1.sinks.k1.hdfs.useLocalTimeStamp=true
bash

将myconf文件夹复制到flume目录中,并用于存储我们编写的各项配置内容
启动命令
flume-ng agent -c ./ -f exec-memory-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
bash
假如hdfs中存在此类情况时,则需要指定相应的具体时间和日期。具体来说,则可以通过Header字段提供一个完整的日期信息字段,并在必要时附加具体的小时数值。

如果HDFS配置中采用了特殊字符表示时间,则需注意以下几点:
配置文件必须选择其中之一:
a. 使用$hdfs.time.useLocalTimeStamp = true
b. 配置时确保time_stamp转化器被启用
时间字符串需要进行转义处理:
%d 类型的时间字符串将无法直接解析为具体的时间值


实现不断的向a.txt中存入数据的效果
echo "Hello World" >> a.txt
bash
3)Spool +File + HDFS(Spooldir + 文件 + HDFS)
此设置非常适合处理包含多个不断更新的日志文件的目录。
配置
a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /home/scripts/
a1.sources.src-1.fileHeader = true
a1.channels.ch-1.type = file
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = ch-1
a1.sinks.k1.hdfs.path = /flume/
bash

在flume目录下新建一个名为myconf的子目录,并将其用于专门保存我们编写的配置数据。
在其中生成并保存名为Spool_File_hdfs.conf的配置文件。
将现有配置内容复制到该位置。
启动命令
flume-ng agent -c ./ -f Spool_File_hdfs.conf -n a1 -Dflume.root.logger=INFO,console
bash


数据采集的本质:不论是一个单独的文件还是一个文件夹的形式存在,在实际应用场景中都会持续不断地产生这些数据。特别是在生产环境中,则会更加频繁地产生大量数据。
以上的方式仅限于收集文件夹中是否存在新增文件的状态,并不支持捕获变动的情况
首先获取该目录下的所有文件,并将这些文件进行记录;其次,在子目录中不进行操作;最后,在已经处理过但数据变化的情况下也不再进行操作
假如你的channel是 file类型,必定会有临时文件产生,产生的文件在


总结:
通常情况下,在处理数据传输时会使用到两种主要的通道类型:一个是memory通道和file通道;其中效率最高的为memory通道,并且也是应用中最常见的选择。

4)TailDir+Memory+HDFS(TailDir+日志+HDFS)
这个案例展示了如何跟踪文件内容的变化情况,并实现了对变化内容的实时上传到HDFS。
配置
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1
# . 代表的意思是一个任意字符 * 代表前面的字符出现0到多次
a1.sources.r1.filegroups.f1 = /home/scripts/datas/.*txt.*
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume3/logs
bash

在flume目录下设置myconf子目录用于存放所需的配置文件
切换到该目录后建立taildir_memory_hdfs.conf存储配置信息
复制配置文件的内容至该位置完成配置操作
启动命令
flume-ng agent -c ./ -f taildir_memory_hdfs.conf -n a1 -Dflume.root.logger=INFO,console
bash
数据不断的发生变化,每发生一次变化,就传递一次,源源不断的抽取出来。
为什么在之前的操作中收集了这么多文件仍然未完成?
根据以下参数设置的数据特性可以实现高效的文件管理:
hdfs.rollInterval 30 当前文件的写入时间达到该值后会自动创建新的文件(单位:秒)
hdfs.rollSize 1024 当前文件的大小达到该值后会触发滚动创建新文件(单位:字节)
hdfs.rollCount 10 当前事件的数量达到该值后会开始滚动创建新文件在再次执行操作时发现不再继续因为存在重复记录导致的数据冲突问题:
需要清空/root/.flume/taildir_position.json目录中的所有内容修改flume配置以允许处理修改过的日志:
修改1.txt中的内容之后flume系统会重新收集数据操作示例:
将'hello'添加到1.txt中
视频链接
《flume》第一集:解密这款有趣的工具
该视频在哔哩哔哩平台上的字段结构为:字段名称对应视频ID BV1iVpuevEGu,并包含相关的信息字段
视频链接:B站链接
内容:B站视频《Flume协议中关于conf命令的具体规则解析》
详细描述:哔哩哔哩_B站视频《Flume协议中关于conf命令的具体规则解析》
完整信息:B站详细讲解Flume协议中conf命令语法与应用技巧
视频编号为 05-avro+mem+log 的展示
在创建文件之后有何不同
07-exec+mem+hdfs的演示_哔哩哔哩_bilibili
08-spool+file+hdfs_哔哩哔哩_bilibili
结论
Apache Flume是一个功能全面的开源工具;它实现了对海量日志数据的高效收集、整合与传输过程;其灵活多样的配置能力和稳健的数据处理机制使其成为应对大数据挑战的理想解决方案;无论是实时监控单个日志文件还是整合来自多个来源的数据流;Flume都能提供一个可扩展且高度可靠的平台来处理这些信息;本文旨在介绍Flmute的核心概念及其基本安装方法;并通过几个实际应用案例展示了其功能与应用场景;通过这些介绍与分析,默认情况下读者能够更好地掌握并灵活运用这一工具
