Flume安装部署
发布时间
阅读量:
阅读量
Flume搭建、安装、测试
1:准备安装包
访问https://www.flume.apache.org/download.html即可访问官方网站。
访问以下镜像站点获取稳定版本的官方软件包:http://mirrors.hust.edu.cn/apache/flume/。
请通过以下途径获取所需文件包:flume-1.5.2-bin.tar.gz。

或
链接:https://pan.baidu.com/s/13-kV4SO4bh3YZVsASEIKNQ
提取码:02lc
2: 上传并解压,路径按个人喜好
rz 上传至/export/software/ 下
解压
tar -zxvf apache-flume-1.7.0-bin -C /export/servers/
重命名
cd ../servers
mv apache-flume-1.7.0-bin flume-1.7
AI写代码
3 设置/etc/profile参数
vim /etc/profile
在内容最后追加
编辑/etc/profile文件,声明flume的home路径和在path加入bin的路径:
#FLUME_HOME
export FLUME_HOME=/export/servers/flume-1.7
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin
:wq
退出后
编译配置文件/etc/profile,并确认生效
source /etc/profile
echo $PATH
AI写代码
4 设置flume-env.sh配置文件
在$FLUME_HOME/conf 下复制改名flume-env.sh.template为flume-env.sh,修改conf/flume-env.sh配置文件
cd /export/servers/flume-1.7/conf
cp flume-env.sh.template flume-env.sh
vim flume-env.sh
修改配置文件内容 :
JAVA_HOME=/usr/java/jdk1.8.0_291
JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote"
AI写代码
5 验证安装
1 修改flume-conf配置文件
在$FLUME_HOME/conf目录下修改flume-conf.properties.template文件,复制并改名为flume-conf,
cd /app/flume-1.5.2/conf
cp flume-conf.properties.template flume-conf.properties
sudo vi flume-conf.properties
5.2 修改flume-conf配置文件内容
# The configuration file needs to define the sources, the channels and the sinks.
# Sources, channels and sinks are defined per agent, in this case called 'a1'
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# For each one of the sources, the type is defined
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#The channel can be defined as follows.
a1.sources.r1.channels = c1
# Each sink's type must be defined
a1.sinks.k1.type = logger
#Specify the channel the sink should use
a1.sinks.k1.channel = c1
# Each channel's type is defined.
a1.channels.c1.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
AI写代码
3 在flume的安装目录/flume-1.5.2下运行
cd /export/servers/flume-1.7
./bin/flume-ng agent --conf ./conf/ --conf-file ./conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console
AI写代码
4 再打开一个终端,输入如下命令:
telnet localhost 44444
hello Flume
提示"command not found",使用yum install telnet进行安装
AI写代码
在$FLUME_HOME/conf目录下修改flume-conf.properties.template文件,复制并改名为flume-conf2.properties
cd /export/servers/flume-1.7/conf
cp flume-conf.properties.template flume-conf2.properties
vim flume-conf2.properties
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /usr/local/hadoop/logs/hadoop-root-datanode-node1.log
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://192.168.0.175:8888/test
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollSize = 4000000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.batchSize = 10
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
2. 在flume的安装目录/flume-1.5.2下运行
cd /export/servers/flume-1.7
./bin/flume-ng agent --conf ./conf/ --conf-file ./conf/flume-conf2.properties --name a1 -Dflume.root.logger=INFO,console
3. 不断收集hadoop-hadoop-namenode-hadoop1.log的数据写入HDFS中
4. 查看hdfs中/class12/out_flume中的文件
hadoop fs -ls /test
hadoop fs -cat /test/event-.1623059478955
AI写代码
6 Flume自定义Sink数据至MySQL
费话不多说 ,直接上代码
编写MySink
package com.flume.flume;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class MySink extends AbstractSink implements Configurable {
private String msgPrefix;
/** * 用来保存数据,不断调用次方法
* @return
* @throws EventDeliveryException
*/
@Override
public Status process() throws EventDeliveryException {
//获取sink对应的channnel
Channel channel = getChannel();
Connection connection = null;
PreparedStatement statement = null;
//获取事务对象
Transaction transaction = channel.getTransaction();
try{
//开启事务
transaction.begin();
//从channel中获取数据
Event event = channel.take();
//切割数据
String data = new String(event.getBody());
String[] arr = data.split(",");
int id = Integer.parseInt(arr[0]);
String name = arr[1];
int age = Integer.parseInt(arr[2]);
//保存到mysql
//1、获取connect
connection = DriverManager.getConnection("jdbc:mysql://192.168.0.175:3306/dataxweb?useSSL=false","root","jnh139271");
statement = connection.prepareStatement("insert into flume_test values(?,?,?)");
saveToMysql(id,name,age,connection,statement);
//模拟数据保存
//System.out.println(msgPrefix+":"+new String(take.getBody()));
//提交事务
transaction.commit();
return Status.READY;
}catch (Exception e){
transaction.rollback();
}finally {
//关闭事务
transaction.close();
if(statement!=null)
//5、关闭
{
try {
statement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(connection!=null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return Status.BACKOFF;
}
public void saveToMysql(int id, String name, int age, Connection connection, PreparedStatement statement) throws SQLException {
//2、获取statement对象
//sql注入 【 select * from table where name='zhangsan' or 1=1】
//connection.createStatement();
//3、赋值
statement.setInt(1,id);
statement.setString(2,name);
statement.setInt(3,age);
System.out.println(id+","+name+","+age);
//4、保存
statement.executeUpdate();
}
/** * 获取sink的配置属性
* @param context
*/
@Override
public void configure(Context context) {
msgPrefix = context.getString("msg.prefix");
}
}
AI写代码
打成jar包上传flume安装路径下的lib目录下
编写flume脚本
#定义agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#定义source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 9999
#定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
#定义sink
a1.sinks.k1.type = com.atguigu.flume.MySink
a1.sinks.k1.msg.prefix = message
#定义source、channel、sink之间的绑定关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
AI写代码
测试
cd /export/servers/flume-1.7
bin/flume-ng agent -c conf/ -n a1 -f job/mysik.config -Dflume.root.logger=INFO,console
重新开启终端
Last login: Tue Jun 8 13:37:58 2021 from 192.168.0.15
[root@node1 ~]# nc 192.168.0.175 12344
-bash: nc: 未找到命令
[root@node1 ~]# yum install nc
完成后
[root@node1 ~]# nc 192.168.0.175 12344
1,test,666
OK
这是启动窗口就会显示输出
1,test,666
查看数据库
1,test,666
测试完成
AI写代码
全部评论 (0)
还没有任何评论哟~
