Advertisement

Flume安装及使用

阅读量:

目录

    • 1、Flume安装

    • 2、演示示例

      • 2.1 netcat source
      • 2.2 exec source
      • 2.3 spooldir source
      • 2.4 http source
      • 2.5 taildir source
      • 2.6 avro sink and source
      • 2.7 HDFS sink
    • 3、练习

      • 练习1

官网: http://flume.apache.org/

1、Flume安装

1.上传安装包

2.解压

复制代码
    tar zxvf /opt/software/apache-flume-1.8.0-bin.tar.gz -C /opt/install/
    
    
      
    

3.配置Java环境变量

复制代码
    cd $FLUME_HOME/conf
    mv flume-env.sh.template flume-env.sh
    vi flume-env.sh
    -----------------------------------------
    export JAVA_HOME=/opt/install/java
    
    
      
      
      
      
      
    

4.配置Flume环境变量

复制代码
    vi /etc/profile
    -----------------------------------------
    # FLUME
    export FLUME_HOME=/opt/install/apache-flume-1.8.0-bin
    export PATH=$FLUME_HOME/bin:$PATH
    -----------------------------------------
    source /etc/profile
    
    
      
      
      
      
      
      
      
    

5.查看版本

复制代码
    [root@singleNode ~]# flume-ng version
    Flume 1.8.0
    Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
    Revision: 99f591994468633fc6f8701c5fc53e0214b6da4f
    Compiled by denes on Fri Sep 15 14:58:00 CEST 2017
    From source with checksum fbb44c8c8fb63a49be0a59e27316833d
    
    
      
      
      
      
      
      
    

2、演示示例

2.1 netcat source

image-20210115085602523

1.编写Flume的配置文件

复制代码
    vi source_netcat.conf
    --------------------------------
    # 定义 source, channel, 和sink的名字
    a1.sources = s1
    a1.channels = c1
    a1.sinks = sk1
    # 对source的一些设置
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = localhost
    a1.sources.s1.port = 5678
    a1.sources.s1.channels = c1
    # 对channel的一些设置
    a1.channels.c1.type = memory
    # 对sink的一些设置
    a1.sinks.sk1.type = logger
    a1.sinks.sk1.channel = c1
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

2.启动Flume【路径容易存在问题:①配置文件的路径, ②Flume脚本的路径】

复制代码
    flume-ng agent --name a1 -f source_netcat.conf -Dflume.root.logger=INFO,console
    
    
      
    

3.启动telnet发送数据

复制代码
    # 安装telnet
    yum -y install telnet.x86_64
    # 启动telnet
    telnet localhost 5678
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    # 发送数据
    hello world
    OK
    
    
      
      
      
      
      
      
      
      
      
      
    

4.查看Flume日志输出
image-20210115091858853

2.2 exec source

1.编写Flume的配置文件

复制代码
    vi source_exec.conf
    --------------------------------
    # 定义 source, channel, 和sink的名字
    a1.sources = s1
    a1.channels = c1
    a1.sinks = sk1
    # 对source的一些设置
    # 设置source的类型为exec, 代表需要执行一条命令, 所以需要给定一个command
    a1.sources.s1.type = exec
    a1.sources.s1.command = tail -F /root/flume_conf/xxx.log
    a1.sources.s1.channels = c1
    # 对channel的一些设置
    a1.channels.c1.type = memory
    # 对sink的一些设置
    a1.sinks.sk1.type = logger
    a1.sinks.sk1.channel = c1
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

2.启动Flume, 直接启动Flume, 由于没有/root/flume_conf/xxx.log这个文件, 程序会显示exited with 1

所以应该先创建/root/flume_conf/xxx.log, 然后在启动Flume

复制代码
    flume-ng agent --name a1 -f source_exec.conf -Dflume.root.logger=INFO,console
    
    
      
    
image-20210115100505835

3.对文件追加数据. 我们在这个文件当中添加一些数据, 验证Flume是否检测到并采集
image-20210115101114003

2.3 spooldir source

1.编写Flume的配置文件

复制代码
    vi source_spooldir.conf
    --------------------------------
    # 定义 source, channel, 和sink的名字
    a1.sources = s1
    a1.channels = c1
    a1.sinks = sk1
    # 对source的一些设置
    # 设置source的类型为spooldir, 代表监控一个文件夹里边是否有新文件产生, 所以需要给定一个文件夹
    a1.sources.s1.type = spooldir
    a1.sources.s1.spoolDir = /root/test
    a1.sources.s1.channels = c1
    # 对channel的一些设置
    a1.channels.c1.type = memory
    # 对sink的一些设置
    a1.sinks.sk1.type = logger
    a1.sinks.sk1.channel = c1
    --------------------------------
    # 创建文件夹
    mkdir /root/test
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

2.启动Flume, 直接启动Flume

复制代码
    flume-ng agent --name a1 -f source_spooldir.conf -Dflume.root.logger=INFO,console
    
    
      
    
image-20210115105005199

3.向/root/test拷贝文件, 查看Flume的输出

复制代码
    cp flume_conf/*.conf test/
    
    
      
    
image-20210115105124746

4.查看/root/test下的文件名
image-20210115105256417

2.4 http source

1.编写Flume的配置文件

复制代码
    vi source_http.conf
    --------------------------------
    # 定义 source, channel, 和sink的名字
    a1.sources = s1
    a1.channels = c1
    a1.sinks = sk1
    # 对source的一些设置
    # 设置source的类型为spooldir, 代表监控一个文件夹里边是否有新文件产生, 所以需要给定一个文件夹
    a1.sources.s1.type = http
    a1.sources.s1.port = 5678
    a1.sources.s1.channels = c1
    # 对channel的一些设置
    a1.channels.c1.type = memory
    # 对sink的一些设置
    a1.sinks.sk1.type = logger
    a1.sinks.sk1.channel = c1
    --------------------------------
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

2.启动Flume, 直接启动Flume

复制代码
    flume-ng agent --name a1 -f source_http.conf -Dflume.root.logger=INFO,console
    
    
      
    
image-20210115105832796

3.发送post请求, 查看Flume输出

复制代码
    curl -XPOST localhost:5678 -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]'
    
    
      
    
image-20210115110012034

2.5 taildir source

可以同时监控一个或多个文件, 并带有偏移量存储文件来记录上次读到的位置, 下次可以接着读

复制代码
    # 设置sources和channels的名字
    a1.sources = r1
    a1.channels = c1
    # 配置source
    a1.sources.r1.type = TAILDIR # 指定类型为TAILDIR
    a1.sources.r1.channels = c1
    a1.sources.r1.positionFile = /var/log/flume/taildir_position.json # 定义偏移量存储路径
    a1.sources.r1.filegroups = f1 f2 # 定义文件组, 多个文件 f1,f2
    a1.sources.r1.filegroups.f1 = /var/log/test1/example.log # 对f1指定绝对路径
    a1.sources.r1.headers.f1.headerKey1 = value1 # 向f1的header添加kv对
    a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.* # 对f2指定绝对路径
    a1.sources.r1.headers.f2.headerKey1 = value2 # 向f2的header添加kv对
    a1.sources.r1.headers.f2.headerKey2 = value2-2 # 向f2的header添加kv对
    a1.sources.r1.fileHeader = true # 	是否添加一个头信息来存储文件的绝对路径, 默认是false
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

2.6 avro sink and source

1.编写Flume的配置文件

复制代码
    vi sink_avro.conf
    --------------------------------
    # 定义 source, channel, 和sink的名字
    a1.sources = s1
    a1.channels = c1
    a1.sinks = sk1
    # 对source的一些设置
    # 设置source的类型为spooldir, 代表监控一个文件夹里边是否有新文件产生, 所以需要给定一个文件夹
    a1.sources.s1.type = http
    a1.sources.s1.port = 5678
    a1.sources.s1.channels = c1
    # 对channel的一些设置
    a1.channels.c1.type = memory
    # 对sink的一些设置, 设置格式为avro, 主机和端口号
    a1.sinks.sk1.type = avro
    a1.sinks.sk1.hostname = localhost
    a1.sinks.sk1.port = 4444
    a1.sinks.sk1.channel = c1
    --------------------------------
    
    vi source_avro.conf
    --------------------------------
    # 定义 source, channel, 和sink的名字
    a1.sources = s1
    a1.channels = c1
    a1.sinks = sk1
    # 对source的一些设置
    # 设置格式为avro, 主机和端口号 
    a1.sources.s1.type = avro
    a1.sources.s1.bind = localhost
    a1.sources.s1.port = 4444
    a1.sources.s1.channels = c1
    # 对channel的一些设置
    a1.channels.c1.type = memory
    # 对sink的一些设置
    a1.sinks.sk1.type = logger
    a1.sinks.sk1.channel = c1
    --------------------------------
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

2.启动Flume, 直接启动Flume, 先启动sink_avro, 在启动source_avro

复制代码
    flume-ng agent --name a1 -f source_avro.conf -Dflume.root.logger=INFO,console
    flume-ng agent --name a1 -f sink_avro.conf -Dflume.root.logger=INFO,console
    
    
      
      
    

image-20210115112325136
image-20210115112339649

3.发送post请求, 查看Flume输出

复制代码
    curl -XPOST localhost:5678 -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]'
    
    
      
    
image-20210115112426324

2.7 HDFS sink

1.编写Flume的配置文件

复制代码
    vi sink_hdfs.conf
    --------------------------------
    # 定义 source, channel, 和sink的名字
    a1.sources = s1
    a1.channels = c1
    a1.sinks = sk1
    # 对source的一些设置
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = localhost
    a1.sources.s1.port = 5678
    a1.sources.s1.channels = c1
    # 对channel的一些设置
    a1.channels.c1.type = memory
    # 对sink的一些设置
    a1.sinks.sk1.type = hdfs
    a1.sinks.sk1.hdfs.path = /data/20210115
    a1.sinks.sk1.channel = c1
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

2.启动Flume

复制代码
    flume-ng agent --name a1 -f sink_hdfs.conf -Dflume.root.logger=INFO,console
    
    
      
    
image-20210115114644690

3.启动telnet发送数据

复制代码
    # 启动telnet
    telnet localhost 5678
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    # 发送数据
    hello world
    OK
    hello spark
    OK
    hello hadoop scala
    OK
    
    
      
      
      
      
      
      
      
      
      
      
      
      
    

4.查看Flume日志输出
image-20210115114808621

5.查看HDFS上的文件

复制代码
    hdfs dfs -text /data/20210115/FlumeData.1610682426351
    
    
      
    
image-20210115115009833

【扩展】实际开发中常用参数
image-20210115115644466

3、练习

遍历文件夹:使用Exec Source遍历Linux的文件目录,并在控制台输出.

练习1

1.编写Flume的配置文件

复制代码
    vi /root/flume_conf/echoPaths.sh
    --------------------------------
    #!/bin/bash
    for i in /root/*
    do
      echo $i
    done
    --------------------------------
    vi practise1.conf
    --------------------------------
    # 定义 source, channel, 和sink的名字
    a1.sources = s1
    a1.channels = c1
    a1.sinks = sk1
    # 对source的一些设置
    # 设置source的类型为exec, 代表需要执行一条命令, 所以需要给定一个command
    a1.sources.s1.type = exec
    a1.sources.s1.command = bash /root/flume_conf/echoPaths.sh
    a1.sources.s1.channels = c1
    # 对channel的一些设置
    a1.channels.c1.type = memory
    # 对sink的一些设置
    a1.sinks.sk1.type = logger
    a1.sinks.sk1.channel = c1
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

2.启动Flume

复制代码
    flume-ng agent --name a1 -f practise1.conf -Dflume.root.logger=INFO,console
    
    
      
    
image-20210115104221911

全部评论 (0)

还没有任何评论哟~