Flume安装部署及采集案例
一、Telnet 客户端安装
为什么使用Telnet? Telnet是一种用于检查指定端口是否可访问的工具。在软件开发过程中,我们通常会依赖于8080这个默认端口来进行各种操作。你可以通过运行服务器并使用Telnet来快速确定其可达性。
作为TCP/IP协议家族中的一个成员, Telnet被视为互联网远程登录服务的标准方法与主要手段,它赋予了用户通过本地设备访问远程主机的能力。在终端用户安装有telnet软件的情况下,只要在其电脑上运行该程序,就可以方便地与之交互以便与服务器建立连接。通过telnet程序界面可以键入各种命令,这些指令会被发送到目标服务器执行,就像直接在服务器的控制台上输入一样,无需离开本地环境就可以完成操作。创建一个telnet会话需要用户提供用户名和密码以进行身份验证,这样即使没有网络连接也能实现对目标机器的操作,从而实现了无需网络依赖的本地管理功能。 telnet无疑是一种广泛应用于Web服务 remotely control的方法
所以我们安装telnet客户端用来测试。
linux窗口安装Telnet客户端
yum search all telnet
yum install telnet
安装完成后,我们进入,测试是否完成安装:根目录下输入以下命令
telnet
quit :使用 quit 命令可以退出 Telnet 客户端。
获取更多telnet相关资料,请通过查阅 telnet命令如何使用!
二、Flume安装部署
部署安装包至数据源节点上或通过网络协议从数据源节点接收数据的另一个节点上进行解压操作,并执行以下解压命令:tar -zxvf apache-flume-1.6.0-bin.tar.gz
配置环境变量设置文件中指定JAVA_HOME路径
在配置文件中详细说明采集方案的参数设置:包括采集方案的插件类型选择、具体采集点的文件存储位置(建议采用格式为"source_name-sink_name.conf"的形式)
在相应的系统节点上完成Flume采集方案的配置后执行启动命令
三、Flume采集案例
1.在conf目录下自定义采集方案配置文件netcat_logger.conf
将数据发送至某网络端口后,flume用于监听该端口并捕获发送的数据。随后用于将收集到的数据传递至终端节点,并输出为日志记录。
编辑配置方案文件命令:vi netcat_logger.conf
# example.conf: A single-node Flume configuration
# Name the components on this agent
#给那三个组件取个名字(#定义三大组件的名称)
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source(配置source组件)
#类型, 从网络端口接收数据,在本机启动, 所以localhost
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink(配置sink组件)
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
#下沉的时候是一批一批的, 下沉的时候是一个个eventChannel参数解释:
#capacity:默认该通道中最大的可以存储的event数量
#trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
测试阶段,启动flume agent的命令:
在根目录下启动flume agent:
bin/flume-ng agent --conf conf/ --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
然后使用telnet命令向这个端口发送数据:
telnet localhost 44444
source会采集到这个数据,经过channel,下沉到logger,然后打印出来。
Event: { headers:{} body: XXXXXXXXXXXXXX xxx. }
采集的数据封装成event,里面有头信息,信息体
2.在conf目录下自定义采集方案配置文件spooldir-hdfs.conf
需求二:服务器持续不断地在日志目录中生成新的日志文件;每当新文件生成时,则会被采集至HDFS。
需求二:服务器持续不断地在日志目录中生成新的日志文件;每当新文件生成时,则会被采集至HDFS。
需求二:服务器持续不断地在日志目录中生成新的日志文件;每当新文件生成时,则会被采集至HDFS。
需求二:服务器持续不断地在日志目录中生成新的日志文件;每当新文件生成时,则会被采集至HDFS。
需求二:服务器持续不断地在日志目录中生成新的日志文件;每当新文件生成时,则会被采集至HDFS。
需求二:服务器持续不断地在日志目录中生成新的日志文件;每当新文件生成时,则会被采集至HDFS。
需求二:服务器持续不断地在日志目录中生成新的日志文件;每当新文件生成时,则会被采集至HDFS。
需求二:服务器持续不断地在日志目录中生成新的日志文件;每当新文件生成时,则会被采集至HDFS。
需求二:服务器持续不断地在日志目录中生成新的日志 files;每当 new files 被创建时,则会立即被传输到 HDFS 中存储起来。
数据采集源:配置为监控目录下的source–spooldir
归集目的地: 配置为HDFS下的hdfs sink
传输通道:选择file channel或memory channel
编辑配置方案文件:vi spooldir-hdfs.conf
方案明细如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
#监听目录,spoolDir指定目录, fileHeader要不要给文件夹前坠名
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/logs/
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
#默认channel中可以存储的event的最大值
a1.channels.c1.capacity = 1000
#每次最大从source拿到或者送到sink中的event数量
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume agent:
bin/flume-ng agent -c conf/ -f conf/spooldir-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
向/root/logs文件夹下拷贝或者移动文件:
命令:cp file logs/
flume会监控这个目录,一旦发现有新文件进来就会采集到hdfs.
Channel参数解释:
- capacity:默认情况下该通道的最大事件存储量
- transactionCapacity:每次最多从源端获取或运往目的节点的最大事件数
业务系统利用log4j生成不断增加的日志内容,并将新增至日志文件的数据同步至HDFS
首先定义三大组件:
- 源用于运行相应的执行脚本以监控文件内容的变化情况,并将结果发送至hdfs.
- 内存用于作为传输数据的通道.
- 通过使用vi工具对尾部Hadoop配置进行编辑.
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/log.log
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
#一定要设置,因为按日期分离文件,要设置时间标准
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume agent:
bin/flume-ng agent -c conf/ -f conf/tail-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
持续不断地将数据发送至日志文件:
然后agent会把数据上传到hdfs
参数解释:
1、rollInterval
sink hdfs 间隔多长时间将临时文件滚动成目标文件,默认是30,单位是秒。如果设置为0,则表示不根据时间来滚动文件。滚动的意思是:sink hdfs将临时文件重命名为最终目标文件,并新打开一个临时文件来写数据。
2、rollsize
当临时文件达到某个值大小的时候就滚动成目标文件,默认是1024,单位是bytes。如果设置为0,则表示不根据临时文件大小来滚动文件。
3、rollCount
当event的数量达到这个值的时候临时文件就滚动成目标文件。如果此值设置成0,则表示不根据event的数量来滚动文件。
以上三个值都设置的时候,只要满足一个值就滚动文件
4、round
是否启用时间上的’舍弃’,类似与四舍五入,
5、roundValue
时间上舍去的值,默认是1
6、roundUnit
默认值是seconds,舍弃的单位。还有:second,minute,hour
例如:
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
当时间为2018-08-15 10:35:49的时候,hdfs的path依然会被解析为/flume/events/ 2018-08-15 /10:30/00因为设置的舍弃时间是10分钟,因此会舍弃10分钟以内的时间,该目录每10分钟生成一个,到了10:40会重新生成一个目录.
