Advertisement

Flume入门到实践--Flume的安装与基础概念与安装实战

阅读量:

随着信息技术的发展, 高效管理海量日志数据对企业的信息挖掘与运营效率提升具有重要意义。Apache Flume 是专为实现高效日志处理而设计的重要工具之一。本文旨在全面展示 Apache Flume 在日志处理中的强大功能及其操作流程,并通过实际案例深入探讨其实现效果。

Flume简介

Apache Flume 作为一个分布式的、可靠且易于访问的服务系统,在集成并管理来自散源端的数据方面发挥着关键作用。该系统的主要目标是高效地整合并传输自不同来源的大规模日志数据至集中存储位置。其核心组件设计基于消息队列模型,并通过处理管道机制实现对实时数据流的快速响应与传递。

Flume 通常用作"水管"的类比来帮助理解其功能。其中一端连接着数据源(水源),另一端连接着数据存储(水桶)。它专为实时日志数据的采集和传输而设计。

Flume的核心组件

Agent :Flume的核心功能模块,在配置文件中可设置单个或多个Agent实例来实现系统的不同功能需求。
Source :接收数据的数据输入源,支持日志文件、网络接口等多种类型的数据流。
Channel :临时存储中间处理结果的数据缓冲区,在数据传输过程中起到中转作用。
Sink :数据最终归宿的目标存储系统或数据库位置(如HDFS/HBase等)。
Event :构成整个事件流的基本数据传输颗粒单位。

Flume的关键特性

  1. 稳定的数据传输: Flume通过可靠的机制保证所有数据在传输过程中的完整性与持续可用性。
  2. 长期保存的能力: 该系统依赖通道技术实现文件的持久化存储,在任何故障情况下都不会导致信息丢失。
  3. 高度可配置性: Flume能够根据不同的数据来源、传输路径以及接收设备的需求灵活配置工作流程。
  4. 应对大数据挑战的能力: 该系统设计时就考虑了随着业务规模扩大而自动调整资源分配的能力,并能有效处理海量数据流。

安装和配置

通过网盘分享的文件

该资源的下载路径为apache-flume-1.9.0-bin.tar.gz

安装Flume

Flume的安装相对简单,以Flume 1.9.0版本为例

下载并解压

把安装包拉到/opt/modules目录下

进入/opt/modules目录下

复制代码
    tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/installs/
    
    bash

进入/opt/installs目录下

复制代码
    mv apache-flume-1.9.0-bin/ flume
    
    bash

配置环境变量

进入/etc/profile配置环境变量

复制代码
 export FLUME_HOME=/opt/installs/flume

    
 export PATH=$PATH:$FLUME_HOME/bin
    
    
    
    
    bash

使用命令配置环境变量

复制代码
 echo 'export FLUME_HOME=/opt/installs/flume' >> /etc/profile

    
 echo 'export PATH=$PATH:$FLUME_HOME/bin' >> /etc/profile
    
    
    
    
    bash

刷新环境变量

复制代码
    source /etc/profile
    
    bash

修改配置文件

在/opt/installs/flume/conf路径下复制并修改flume-env.sh文件。

复制代码
    cp flume-env.sh.template flume-env.sh
    
    bash

修改 JAVA_HOME 的路径为自己的 jdk 路径。

复制代码
    export JAVA_HOME=/opt/installs/jdk

Flume的数据模型

Flume支持单一数据流和多数据流模型,可以根据实际需求灵活配置。

单一数据流模型

单一数据流模型包含一个Agent,适用于简单的日志收集场景。

多数据流模型

多个 Agent 可以参与的数据流模型能够集成复杂的系统组件,并协调 Agent 间的高效数据流动与处理

Flume的使用

Flume的使用显著地基于配置文件;通过配置Source、Channel和Sink组件及其关联关系,使数据得以流动。

编写 conf文件

flume 的使用是编写 conf文件的,的时候指定该文件

复制代码
 # 定义组件的名字

    
 <Agent>.sources = <Source>
    
 a1.sources=s1
    
 <Agent>.channels = <Channel1> <Channel2>
    
 a1.channels=c1
    
 <Agent>.sinks = <Sink>
    
 a1.sinks=sink1
    
  
    
 # 设置source 和 channel 之间的关系
    
 <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
    
 a1.sources.s1.channels=c1
    
  
    
 # 设置sink 和 channel 之间的关系
    
 <Agent>.sinks.<Sink>.channel = <Channel1>
    
 a1.sinks.sink1.channel=c1
    
  
    
 先定义agent的名字,再定义agent中三大组件的名字
    
 接着定义各个组件之间的关联关系
    
    
    
    
    bash
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-16/n5J8XG7o39eul4CKHcQSRzjLyvId.png)
复制代码
 # list the sources, sinks and channels for the agent

    
 agent_foo.sources = avro-appserver-src-1
    
 agent_foo.channels = mem-channel-1
    
 agent_foo.sinks = hdfs-sink-1
    
  
    
 # set channel for source
    
 agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
    
  
    
 # set channel for sink
    
 agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
    
    
    
    
    bash
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-16/xLTsVJ5YtfaA0KwonSCiNgl9By1U.png)

常见的组件

参考链接:Flume 1.9用户手册中文版(据称是目前翻译最详尽的修订版)

常见的Source

常见的channel

常见的sink

Kafka系统能够扮演Flume中的各个角色。三个关键组件任意两个结合使用时效果最佳。

案例展示

1.)Avro+Memory+Logger(Avro + 内存 + 日志)

该配置旨在用于演示目的。该程序能够监控指定端口的活动,并从内存中捕获数据;随后将这些数据输出至控制台界面进行显示。

先找source 中的avro看需要设置什么参数

复制代码
 #编写s1的类型是什么

    
 a1.sources.s1.type = avro
    
 a1.sources.s1.bind = 192.168.32.128
    
 a1.sources.s1.port = 4141
    
 a1.sources.s1.channels = c1
    
    
    
    
    bash

找到channel中的memory类型,再设置一下

复制代码
 a1.channels.c1.type = memory

    
 a1.channels.c1.capacity = 10000
    
 #source 或者 sink 每个事务中存取 Event 的操作数量
    
 a1.channels.c1.transactionCapacity = 10000
    
    
    
    
    bash

接着查找sink,sink的类型是logger

复制代码
 a1.sinks.s2.channel = c1

    
 a1.sinks.s2.type = logger
    
    
    
    
    bash

最终合并起来的文件就是:

配置

复制代码
 a1.sources = r1

    
 a1.channels = c1
    
 a1.sources.r1.type = avro
    
 a1.sources.r1.bind = localhost
    
 a1.sources.r1.port = 4141
    
 a1.sources.r1.channels = c1
    
 a1.channels.c1.type = memory
    
 a1.sinks = k1
    
 a1.sinks.k1.type = logger
    
 a1.sinks.k1.channel = c1
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-16/QovOuaxrHY2WfbT10ZjVyqDm7583.png)

在flume文件夹中创建一个名为myconf的子目录,并用于存储我们准备好的所有配置信息。
随后,在执行操作时生成avro-memory-log.conf。
最后,在指定位置将配置信息进行复制。

启动命令

复制代码
    flume-ng agent -c ../ -f avro-memory-log.conf -n a1 -Dflume.root.logger=INFO,console
    
    bash

-c 后面跟上 配置文件的路径
-f 跟上自己编写的conf文件
-n agent的名字
-Dflume.root.logger=INFO,console INFO 日志输出级别 Debug,INFO,warn,error 等

接着向端口中发送数据:

复制代码
>     flume-ng avro-client -c /opt/installs/flume/conf/ -H bigdata01 -p 4141 -F /home/hivedata/arr1.txt
>  
>     bash

给avro发消息,使用avro-client

Flume没有终止的时间限制,在其配置好的端口上持续监听连接。当收到一条新消息时会立即处理该连接;如果没有新消息时会等待下一条可能的消息到达,并且不会自行终止。

如果想停止,可以使用ctrl + c 终止flume。

2)Exec + Memory + HDFS(执行命令 + 内存 + HDFS)

此配置用于持续监控日志文件,并将它们存储在HDFS中。

配置

以下版本演示的是没有时间语义的案例:

复制代码
 a1.sources = r1

    
 a1.channels = c1
    
 a1.sources.r1.type = exec
    
 a1.sources.r1.command = tail -F /home/hivedata/arr1.txt
    
 a1.sources.r1.channels = c1
    
  
    
 a1.channels.c1.type = memory
    
 a1.channels.c1.capacity = 10000
    
 a1.channels.c1.transactionCapacity = 10000
    
 a1.channels.c1.byteCapacityBufferPercentage = 20
    
 a1.channels.c1.byteCapacity = 800000
    
  
    
  
    
 a1.sinks = k1
    
 a1.sinks.k1.type = hdfs
    
 a1.sinks.k1.channel = c1
    
 a1.sinks.k1.hdfs.path = /flume/event/
    
    
    
    
    bash
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-16/Y6KQlgWUXB1T2n0GLEOdyvi8t5kR.png)

接着我们演示hdfs文件中含有时间转义字符怎么办?

复制代码
 a1.sources = r1

    
 a1.channels = c1
    
 a1.sources.r1.type = exec
    
 a1.sources.r1.command = tail -F /opt/data/c.txt
    
  
    
 a1.sources.r1.channels = c1
    
  
    
 a1.channels.c1.type = memory
    
 a1.channels.c1.capacity = 10000
    
 a1.channels.c1.transactionCapacity = 10000
    
  
    
 a1.sinks = k1
    
 a1.sinks.k1.type = hdfs
    
 a1.sinks.k1.channel = c1
    
 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
    
 # round 用于控制含有时间转义符的文件夹的生成规则
    
 a1.sinks.k1.hdfs.round = true
    
 a1.sinks.k1.hdfs.roundValue = 10
    
 a1.sinks.k1.hdfs.roundUnit = minute
    
 # 使用本地时间,否则会报时间戳是null的异常
    
 a1.sinks.k1.hdfs.useLocalTimeStamp=true
    
    
    
    
    bash
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-16/a3omwUsVZBv2uNHxhp87jSO6WqtE.png)

将myconf文件夹复制到flume目录中,并用于存储我们编写的各项配置内容

启动命令

复制代码
    flume-ng agent -c ./ -f exec-memory-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
    
    bash

假如hdfs中存在此类情况时,则需要指定相应的具体时间和日期。具体来说,则可以通过Header字段提供一个完整的日期信息字段,并在必要时附加具体的小时数值。

如果HDFS配置中采用了特殊字符表示时间,则需注意以下几点:

  1. 配置文件必须选择其中之一:

    a. 使用$hdfs.time.useLocalTimeStamp = true

    b. 配置时确保time_stamp转化器被启用

  2. 时间字符串需要进行转义处理:

    %d 类型的时间字符串将无法直接解析为具体的时间值

实现不断的向a.txt中存入数据的效果

复制代码
    echo "Hello World" >> a.txt
    
    bash

3)Spool +File + HDFS(Spooldir + 文件 + HDFS)

此设置非常适合处理包含多个不断更新的日志文件的目录。

配置

复制代码
 a1.channels = ch-1

    
 a1.sources = src-1
    
  
    
 a1.sources.src-1.type = spooldir
    
 a1.sources.src-1.channels = ch-1
    
 a1.sources.src-1.spoolDir = /home/scripts/
    
 a1.sources.src-1.fileHeader = true
    
  
    
 a1.channels.ch-1.type = file
    
  
    
 a1.sinks = k1
    
 a1.sinks.k1.type = hdfs
    
 a1.sinks.k1.channel = ch-1
    
 a1.sinks.k1.hdfs.path = /flume/
    
    
    
    
    bash
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-16/ETmIdZ4k8ehlNDJjzwpYg3vtcCrB.png)

在flume目录下新建一个名为myconf的子目录,并将其用于专门保存我们编写的配置数据。
在其中生成并保存名为Spool_File_hdfs.conf的配置文件。
将现有配置内容复制到该位置。

启动命令

复制代码
    flume-ng agent -c ./ -f Spool_File_hdfs.conf -n a1 -Dflume.root.logger=INFO,console
    
    bash

数据采集的本质:不论是一个单独的文件还是一个文件夹的形式存在,在实际应用场景中都会持续不断地产生这些数据。特别是在生产环境中,则会更加频繁地产生大量数据。

以上的方式仅限于收集文件夹中是否存在新增文件的状态,并不支持捕获变动的情况

首先获取该目录下的所有文件,并将这些文件进行记录;其次,在子目录中不进行操作;最后,在已经处理过但数据变化的情况下也不再进行操作

假如你的channel是 file类型,必定会有临时文件产生,产生的文件在

总结:

通常情况下,在处理数据传输时会使用到两种主要的通道类型:一个是memory通道和file通道;其中效率最高的为memory通道,并且也是应用中最常见的选择。

4)TailDir+Memory+HDFS(TailDir+日志+HDFS)

这个案例展示了如何跟踪文件内容的变化情况,并实现了对变化内容的实时上传到HDFS。

配置

复制代码
 a1.sources = r1

    
 a1.channels = c1
    
 a1.sources.r1.type = TAILDIR
    
 a1.sources.r1.channels = c1
    
 a1.sources.r1.filegroups = f1
    
 # . 代表的意思是一个任意字符   * 代表前面的字符出现0到多次
    
 a1.sources.r1.filegroups.f1 = /home/scripts/datas/.*txt.*
    
  
    
  
    
 a1.channels.c1.type = memory
    
 a1.channels.c1.capacity = 10000
    
 a1.channels.c1.transactionCapacity = 10000
    
  
    
 a1.sinks = k1
    
 a1.sinks.k1.type = hdfs
    
 a1.sinks.k1.channel = c1
    
 a1.sinks.k1.hdfs.path = /flume3/logs
    
    
    
    
    bash
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-16/sBThVbEAvGrfUteNXmYOLn6PaM3w.png)

在flume目录下设置myconf子目录用于存放所需的配置文件
切换到该目录后建立taildir_memory_hdfs.conf存储配置信息
复制配置文件的内容至该位置完成配置操作

启动命令

复制代码
    flume-ng agent -c ./ -f taildir_memory_hdfs.conf -n a1 -Dflume.root.logger=INFO,console
    
    bash

数据不断的发生变化,每发生一次变化,就传递一次,源源不断的抽取出来。

为什么在之前的操作中收集了这么多文件仍然未完成?

根据以下参数设置的数据特性可以实现高效的文件管理:

hdfs.rollInterval 30 当前文件的写入时间达到该值后会自动创建新的文件(单位:秒)
hdfs.rollSize 1024 当前文件的大小达到该值后会触发滚动创建新文件(单位:字节)
hdfs.rollCount 10 当前事件的数量达到该值后会开始滚动创建新文件

在再次执行操作时发现不再继续因为存在重复记录导致的数据冲突问题:
需要清空/root/.flume/taildir_position.json目录中的所有内容

修改flume配置以允许处理修改过的日志:
修改1.txt中的内容之后flume系统会重新收集数据

操作示例:
将'hello'添加到1.txt中

视频链接

《flume》第一集:解密这款有趣的工具

02-flume的配置流程_哔哩哔哩_bilibili

该视频在哔哩哔哩平台上的字段结构为:字段名称对应视频ID BV1iVpuevEGu,并包含相关的信息字段

视频链接:B站链接

内容:B站视频《Flume协议中关于conf命令的具体规则解析》

详细描述:哔哩哔哩_B站视频《Flume协议中关于conf命令的具体规则解析》

完整信息:B站详细讲解Flume协议中conf命令语法与应用技巧

视频编号为 05-avro+mem+log 的展示

在创建文件之后有何不同

07-exec+mem+hdfs的演示_哔哩哔哩_bilibili

08-spool+file+hdfs_哔哩哔哩_bilibili

第九期《taildir+mem+hdfs》-哔哩哔哩-B站

结论

Apache Flume是一个功能全面的开源工具;它实现了对海量日志数据的高效收集、整合与传输过程;其灵活多样的配置能力和稳健的数据处理机制使其成为应对大数据挑战的理想解决方案;无论是实时监控单个日志文件还是整合来自多个来源的数据流;Flume都能提供一个可扩展且高度可靠的平台来处理这些信息;本文旨在介绍Flmute的核心概念及其基本安装方法;并通过几个实际应用案例展示了其功能与应用场景;通过这些介绍与分析,默认情况下读者能够更好地掌握并灵活运用这一工具

全部评论 (0)

还没有任何评论哟~