Advertisement

Flume安装部署

阅读量:

Flume搭建、安装、测试

1:准备安装包

访问https://www.flume.apache.org/download.html即可访问官方网站。
访问以下镜像站点获取稳定版本的官方软件包:http://mirrors.hust.edu.cn/apache/flume/
请通过以下途径获取所需文件包:flume-1.5.2-bin.tar.gz

img

​ 或

​ 链接:https://pan.baidu.com/s/13-kV4SO4bh3YZVsASEIKNQ
​ 提取码:02lc

2: 上传并解压,路径按个人喜好

复制代码
    rz 上传至/export/software/ 下
    
    解压 
    tar -zxvf apache-flume-1.7.0-bin -C /export/servers/
    
    重命名
    cd ../servers
    mv apache-flume-1.7.0-bin flume-1.7
    
    
      
      
      
      
      
      
      
      
    
    AI写代码

3 设置/etc/profile参数

复制代码
    vim /etc/profile
    在内容最后追加
    编辑/etc/profile文件,声明flume的home路径和在path加入bin的路径:
    #FLUME_HOME
    export FLUME_HOME=/export/servers/flume-1.7
    export FLUME_CONF_DIR=$FLUME_HOME/conf
    export PATH=$PATH:$FLUME_HOME/bin
    
    :wq
    退出后
    编译配置文件/etc/profile,并确认生效
    source /etc/profile
    echo $PATH
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码

4 设置flume-env.sh配置文件

复制代码
    在$FLUME_HOME/conf 下复制改名flume-env.sh.template为flume-env.sh,修改conf/flume-env.sh配置文件
    cd /export/servers/flume-1.7/conf
    cp flume-env.sh.template flume-env.sh
    vim flume-env.sh
    
    修改配置文件内容 :
    JAVA_HOME=/usr/java/jdk1.8.0_291
    JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote"
    
    
      
      
      
      
      
      
      
      
    
    AI写代码

5 验证安装

复制代码
1 修改flume-conf配置文件
    在$FLUME_HOME/conf目录下修改flume-conf.properties.template文件,复制并改名为flume-conf,
    cd /app/flume-1.5.2/conf
    cp flume-conf.properties.template flume-conf.properties
    sudo vi flume-conf.properties
    
    5.2 修改flume-conf配置文件内容
    # The configuration file needs to define the sources, the channels and the sinks.
    # Sources, channels and sinks are defined per agent, in this case called 'a1'
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # For each one of the sources, the type is defined
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    #The channel can be defined as follows.
    a1.sources.r1.channels = c1
    
    # Each sink's type must be defined
    a1.sinks.k1.type = logger
    
    #Specify the channel the sink should use
    a1.sinks.k1.channel = c1
    
    # Each channel's type is defined.
    a1.channels.c1.type = memory
    
    # Other config values specific to each type of channel(sink or source)
    # can be defined as well
    # In this case, it specifies the capacity of the memory channel
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码
复制代码
3 在flume的安装目录/flume-1.5.2下运行
    cd /export/servers/flume-1.7
    ./bin/flume-ng agent --conf ./conf/ --conf-file ./conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console
    
    
      
      
      
    
    AI写代码
复制代码
4 再打开一个终端,输入如下命令:
    
    telnet localhost 44444
    hello Flume
    提示"command not found",使用yum install telnet进行安装
    
    
      
      
      
      
      
    
    AI写代码
复制代码
 在$FLUME_HOME/conf目录下修改flume-conf.properties.template文件,复制并改名为flume-conf2.properties
    cd /export/servers/flume-1.7/conf
    cp flume-conf.properties.template flume-conf2.properties
    vim flume-conf2.properties
    
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    a1.sources.r1.type = exec
    a1.sources.r1.channels = c1
    a1.sources.r1.command = tail -F  /usr/local/hadoop/logs/hadoop-root-datanode-node1.log
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hdfs.path = hdfs://192.168.0.175:8888/test
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    a1.sinks.k1.hdfs.rollSize = 4000000
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.batchSize = 10
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    
    2. 在flume的安装目录/flume-1.5.2下运行
    cd /export/servers/flume-1.7
    ./bin/flume-ng agent --conf ./conf/ --conf-file ./conf/flume-conf2.properties --name a1 -Dflume.root.logger=INFO,console
    
    
    3. 不断收集hadoop-hadoop-namenode-hadoop1.log的数据写入HDFS中
    
    
    4. 查看hdfs中/class12/out_flume中的文件
    
    hadoop fs -ls /test
    hadoop fs -cat /test/event-.1623059478955
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码

6 Flume自定义Sink数据至MySQL

费话不多说 ,直接上代码

编写MySink

复制代码
    package com.flume.flume;
    
    import org.apache.flume.*;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    
    public class MySink extends AbstractSink implements Configurable {
    
    private String msgPrefix;
    
    /** * 用来保存数据,不断调用次方法
     * @return
     * @throws EventDeliveryException
     */
    @Override
    public Status process() throws EventDeliveryException {
        //获取sink对应的channnel
        Channel channel = getChannel();
        Connection connection = null;
        PreparedStatement statement = null;
        //获取事务对象
        Transaction transaction = channel.getTransaction();
        try{
            //开启事务
            transaction.begin();
            //从channel中获取数据
            Event event = channel.take();
    
            //切割数据
            String data = new String(event.getBody());
            String[] arr = data.split(",");
            int id = Integer.parseInt(arr[0]);
            String name = arr[1];
            int age = Integer.parseInt(arr[2]);
    
            //保存到mysql
            //1、获取connect
            connection = DriverManager.getConnection("jdbc:mysql://192.168.0.175:3306/dataxweb?useSSL=false","root","jnh139271");
            statement = connection.prepareStatement("insert into flume_test values(?,?,?)");
            saveToMysql(id,name,age,connection,statement);
            //模拟数据保存
            //System.out.println(msgPrefix+":"+new String(take.getBody()));
            //提交事务
            transaction.commit();
    
            return Status.READY;
        }catch (Exception e){
            transaction.rollback();
        }finally {
            //关闭事务
            transaction.close();
            if(statement!=null)
            //5、关闭
            {
                try {
                    statement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    
        return Status.BACKOFF;
    }
    
    public void saveToMysql(int id, String name, int age, Connection connection, PreparedStatement statement) throws SQLException {
    
        //2、获取statement对象
        //sql注入 【 select * from table where name='zhangsan' or 1=1】
        //connection.createStatement();
    
        //3、赋值
        statement.setInt(1,id);
        statement.setString(2,name);
        statement.setInt(3,age);
        System.out.println(id+","+name+","+age);
        //4、保存
        statement.executeUpdate();
    
    
    }
    /** * 获取sink的配置属性
     * @param context
     */
    @Override
    public void configure(Context context) {
    
        msgPrefix = context.getString("msg.prefix");
    
    }
    }
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码

打成jar包上传flume安装路径下的lib目录下

编写flume脚本

复制代码
    #定义agent
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    #定义source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 9999
    
    #定义channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 1000
    
    #定义sink
    a1.sinks.k1.type = com.atguigu.flume.MySink
    a1.sinks.k1.msg.prefix = message
    
    #定义source、channel、sink之间的绑定关系
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码

测试

复制代码
    cd  /export/servers/flume-1.7
    
    bin/flume-ng agent -c conf/ -n a1 -f job/mysik.config -Dflume.root.logger=INFO,console
    
    重新开启终端 
    Last login: Tue Jun  8 13:37:58 2021 from 192.168.0.15
    [root@node1 ~]# nc 192.168.0.175 12344
    -bash: nc: 未找到命令
    [root@node1 ~]# yum install nc
    完成后
    [root@node1 ~]# nc 192.168.0.175 12344
    1,test,666
    OK
    这是启动窗口就会显示输出
    1,test,666
    查看数据库 
    1,test,666
    
    测试完成 
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码

参考链接:
https://www.jianshu.com/p/70911083784c

全部评论 (0)

还没有任何评论哟~