Advertisement

Flume安装及基本使用

阅读量:

前置准备

Flume 需要依赖 JDK 1.8+,并且下面有案例需要存储数据到HDFS,所以需要Hadoop集群,教程如下:

Linux下jdk的安装

Hadoop单机伪分布式-视频教程

Hadoop完全分布式集群环境搭建-视频教程

HA(高可用)-Hadoop集群环境搭建视频+图文教程

一、概述

Apache Flume 是 Cloudera 公司开发,是一个分布式的、高可靠的、高可用的用于海量日志收集、聚合和传输的系统。它可以从不同的数据源收集数据,经过聚合后发送到存储系统中,通常用于日志数据的收集。Flume 分为 NG 和 OG (1.0 之前) 两个版本,NG 在 OG 的基础上进行了完全的重构,是目前使用最为广泛的版本。下面的介绍均以 NG 为基础。

下图为 Flume 的基本架构图:

二、安装

2.1 下载并解压

下载所需版本的 Flume ,这里我下载的是最新版本的 apache-flume-1.9.0 。下载地址为:http://flume.apache.org/download.html

复制代码
    # 下载后进行解压
    [xiaokang@hadoop ~]$ tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/software/

2.2 配置环境变量

复制代码
    [xiaokang@hadoop ~]$ sudo vim /etc/profile

添加环境变量:

复制代码
    export FLUME_HOME=/opt/software/flume-1.9.0
    export PATH=${JAVA_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${ZOOKEEPER_HOME}/bin:${HIVE_HOME}/bin:${ZEPPELIN_HOME}/bin:${HBASE_HOME}/bin:${SQOOP_HOME}/bin:${FLUME_HOME}/bin:$PATH

使得配置的环境变量立即生效:

复制代码
    [xiaokang@hadoop ~]$ source /etc/profile

2.3 修改配置

进入安装目录下的 conf 目录,拷贝 Flume 的环境配置模板 flume-env.sh.template

复制代码
    [xiaokang@hadoop conf]$ cp flume-env.sh.template flume-env.sh

修改 flume-env.sh,指定 JDK 的安装路径:

复制代码
    # Enviroment variables can be set here.
    export JAVA_HOME=/opt/moudle/jdk1.8.0_191

2.4 验证

由于已经将 Flume的 bin 目录配置到环境变量,直接使用以下命令验证是否配置成功:

复制代码
    [xiaokang@hadoop ~]$ flume-ng version

出现对应的版本信息则代表配置成功:

三、基本使用

3.1 Flume配置格式

Flume 配置通常需要以下两个步骤:

  1. 分别定义好 Agent 的 Sources,Sinks,Channels,然后将 Sources 和 Sinks 与通道进行绑定。需要注意的是一个 Source 可以配置多个 Channel,但一个 Sink 只能配置一个 Channel。基本格式如下:
复制代码
    <Agent>.sources = <Source>
    <Agent>.sinks = <Sink>
    <Agent>.channels = <Channel1> <Channel2>
    
    # set channel for source
    <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
    
    # set channel for sink
    <Agent>.sinks.<Sink>.channel = <Channel1>
  1. 分别定义 Source,Sink,Channel 的具体属性。基本格式如下:
复制代码
    <Agent>.sources.<Source>.<someProperty> = <someValue>
    
    # properties for channels
    <Agent>.channel.<Channel>.<someProperty> = <someValue>
    
    # properties for sinks
    <Agent>.sources.<Sink>.<someProperty> = <someValue>

3.2 官方案例

监控端口数据

NetCat Source :监听一个指定端口,并接收监听到的数据(接收的数据是字符串形式)。

NetCat Source 配置项说明

复制代码
    channels	|	绑定的通道
    type	|	netcat
    bind	|	指定要监听的主机
    port	|	指定要监听的端口号
    【selector.*	|	选择器配置】
    【interceptors.*	|	拦截器列表配置】
1. 配置

新建配置文件telnet-logger.properties,其内容如下:

复制代码
    #指定agent的sources、channels、sinks
    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    #配置source属性
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = localhost
    a1.sources.s1.port = 44444
    
    #配置channel类型
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100
    
    #配置sink
    a1.sinks.k1.type = logger
    
    #将sources和channels绑定
    a1.sources.s1.channels = c1
    
    #将sinks和channels绑定
    a1.sinks.k1.channel = c1
2. 启动
复制代码
    [xiaokang@hadoop flume]$ flume-ng agent -n a1 -c /opt/software/flume-1.9.0/conf/ -f ./telnet-logger.properties -Dflume.root.logger=INFO,console
3. 测试

使用 telnet 工具向本机的 44444 端口发送内容

复制代码
    [xiaokang@hadoop flume]$ telnet localhost 44444
    bash: telnet: 未找到命令
    #解决方案:使用yum命令安装即可
    [xiaokang@hadoop flume]$ sudo yum -y install telnet


3.3 案例一

使用 Flume 监听文件内容变动,将新添的内容输出到控制台

Exec Source :可以将命令产生的输出作为源来进行传递

Exec Source 配置项说明

复制代码
    channels	|	绑定的通道
    type	|	exec
    command	|	要执行的命令
    【selector.*	|	选择器配置】
    【interceptors.*	|	拦截器列表配置】

/home/xiaokang/log.txt这个文件要提前创建好,一会儿要往里面追加内容

1. 配置

新建配置文件execsource.properties,其内容如下:

复制代码
    #指定agent的sources、channels、sinks
    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    #配置source属性 
    a1.sources.s1.type = exec
    a1.sources.s1.command = tail -f /home/xiaokang/log.txt
    
    #配置channel类型 
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100
    
    #配置sink
    a1.sinks.k1.type = logger
    
    #将sources和channels绑定
    a1.sources.s1.channels = c1
    
    #将sinks和channels绑定
    a1.sinks.k1.channel = c1
2. 启动
复制代码
    [xiaokang@hadoop flume]$ flume-ng agent -n a1 -c /opt/software/flume-1.9.0/conf/ -f ./execsource.properties -Dflume.root.logger=INFO,console
    或
    [xiaokang@hadoop flume]$ flume-ng agent \
    --name a1 \
    --conf /opt/software/flume-1.9.0/conf/ \
    --conf-file ./execsource.properties \
    -Dflume.root.logger=INFO,console
3. 测试

log.txt文件中追加数据:

控制台显示如下:

3.4 案例二

监听指定目录,将目录下新增加的文件存储到 HDFS

Spooling Directory Source :flume会持续监听指定的目录,当目录下新添文件的时候会自动收集这个文件中的内容。当一个文件被收集完成之后,自动添加一个COMPLETED后缀。

Spooling Directory Source 配置项说明

复制代码
    channels	|	绑定的通道
    type	|	spooldir
    spoolDir	|	读取文件的路径,即“搜集目录”
    【selector.*	|	选择器配置】
    【interceptors.*	|	拦截器列表配置】
1. 配置

新建配置文件execsource.properties,其内容如下:

复制代码
    #指定agent的sources、channels、sinks
    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    #配置source属性
    a1.sources.s1.type = spooldir
    a1.sources.s1.spoolDir = /home/xiaokang/flume
    a1.sources.s1.basenameHeader = true
    a1.sources.s1.basenameHeaderKey = fileName 
    
    #配置channel类型
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100
    
    #配置sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /flume/%y-%m-%d/%H/
    a1.sinks.k1.hdfs.filePrefix = %{fileName}
    #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    #将sources和channels绑定
    a1.sources.s1.channels = c1
    
    #将sinks和channels绑定
    a1.sinks.k1.channel = c1
2. 启动
复制代码
    [xiaokang@hadoop flume]$ flume-ng agent -n a1 -c /opt/software/flume-1.9.0/conf/ -f ./spooldirsource.properties -Dflume.root.logger=INFO,console
3. 测试

拷贝任意文件到监听目录下,可以从日志看到文件上传到 HDFS 的路径:

复制代码
    [xiaokang@hadoop ~]$ cp user.txt flume/

查看上传到 HDFS 上的文件内容与本地是否一致:

复制代码
    [xiaokang@hadoop ~]$ hdfs dfs -cat /flume/20-04-07/15/user.txt.1586245585685

全部评论 (0)

还没有任何评论哟~