Flume安装及简单使用
1 解压到指定目录
2 主要对flume-env.sh配置文件进行调整,并重点针对JAVA_HOME变量进行配置设置.
1 解压到指定目录
2 主要对flume-env.sh配置文件进行调整,并重点针对JAVA_HOME变量进行配置设置.
- root@m1:/home/hadoop/flume-1.5.0-bin# $ cop conf/flume-env.sh.template conf/flume-env.sh
- root@m1:/home/hadoop/flume-1.5.0-bin# $ e conf/flume-env.sh
-
在Apache软件基金会(ASF)下受一个或多个贡献者许可 # 原注释
-
分发此作品的同时 # 原注释
-
请查看附带的 NOTICE 文件 # 原注释
-
获取有关版权所有者的更多信息 # 原注释
- 雷法将此文件授予您在Apache许可证2.0(此处称为"许可")下使用 # 原注释
- 您不得在此情况下使用此文件 # 原注释
- 您必须遵循许可证中的条款以获得许可 # 原注释
- $ http://www.apache.org/licenses/LICENSE-2.0 # 原链接
- 此软件受Apache软件基金会(ASF)下的一份许可证2.x版授权使用(此处称为"许可") # 调整了句式结构
- 您不得在此情况下使用该软件 除非根据适用法律或以书面形式与 ASF 签署协议 # 调整了句子结构并加强了语气
14-
15 如果将此文件放置在FLUME_CONF_DIR/flume-env.sh中,则将在Flume启动时自动加载配置文件 16
17 这些环境变量可以在本处设置 18
20JAVA_HOME=/usr/lib/jvm/java-7-oracle
...
3)验证是否安装成功
代码解释
1.root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng version
2.Flume 1.5.0
3.Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
4.Revision: 8633220df808c4cd0c13d1cf0320454a94f1ea97
5.Compiled by hshreedharan on Wed May 7 14:49:18 PDT 2014
6.From source with checksum a01fe726e4380ba0c9f7a7d222db961f
7.root@m1:/home/hadoop#
复制代码
出现上面的信息,表示安装成功了
五、flume的案例
1)案例1:Avro
通过支持指定协议的机制, Avro能够将特定文件传输至Flume系统.
a)详细说明了如何设置必要的参数和配置项
- root@m1:/home/hadoop#vi /home/hadoop/flume-1.5.0-bin/conf/avro.conf
1. a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
6. 设置avro类型的字段配置为指定通道。
7. 设置指定通道的配置参数。
8. 配置绑定地址为空。
9. 设置指定端口为4141.
1. # Describe the sink
- a1.sinks.k1.type = logger
Implement a channel that stores events within memory storage
The object's channels property should be assigned with an instance that can store event data.
The buffer size for event data is defined as one thousand units.
The transaction handling capacity is set to one hundred units.
将源和汇与通道绑定
将a1/sources/r1的渠道绑定到c2,并将a2/sinks/k2的渠道设置为c2。
b)启动flume agent a1
root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng被请求执行agent命令,并且在指定路径下处理avro.conf配置文件,并设置flume.root.logger为INFO级别并输出到console
- 复制代码
c)创建指定文件
- root@m1:/home/hadoop# echo “hello world” > /home/hadoop/flume-1.5.0-bin/log.00
复制代码
d)使用avro-client发送文件
以root用户的账户,在宿主机器m1上运行以下命令:flume-ng程序的位置是在$HOME目录下的flume-...
具体操作如下:
配置文件为空,默认读取配置文件的位置
指定flate大小为某个值
指定flate协议版本号默认情况下会自动选择合适的值
指定log文件的位置和文件名模式
d)使用avro-client发送文件
root/m1:/home/hadoop# 在 /home/hadoop/flume-1.5.0-bin/bin/flume-ng 执行 avro-client 参数设置如下:
-c 表示不覆盖父目录
-H m1 指定主配置文件路径
-p 4141 设置本地端口为 4141
-F / 指定日志文件位置在当前目录下
f)在m1的控制台,可以看到以下信息,注意最后一行:
用户root在机器m1上启动Flume-ng代理并运行以下命令:在'/home/hadoop/flume-1.5.0-bin/bin/'目录下执行./flume-ng agent命令而不配置其他参数,并指定输入数据文件路径指向 conf/avro.conf 文件;同时指定a队列名称(此处标记为a队列名称)并将Flume日志级别设置为INFO并输出到控制台
- Info: Sourcing environment configuration script /home/hadoop/flume-1.5.0-bin/conf/flume-env.sh
- Info: Including Hadoop libraries found via (/home/hadoop/hadoop-2.2.0/bin/hadoop) for HDFS access
- Info: Excluding /home/hadoop/hadoop-2.2.0/share/hadoop/common/lib/slf4j-api-1.7.5.jar from classpath
- Info: Excluding /home/hadoop/hadoop-2.2.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar from classpath
- …
- 2014-08-10 10:43:25,112 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x92464c4f, /192.168.1.50:59850 :> /192.168.1.50:4141] UNBOUND
- 2014-08-10 10:43:25,112 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x92464c4f, /192.168.1.50:59850 :> /192.168.1.50:4141] CLOSED
- 2014-08-10 10:43:25,112 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /192.168.1.50:59850 disconnected.
- 2014-08-10 10:43:26,718 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }
- 复制代码
案例2:Spool
Spool在监测配置中新增的文件都会被读取出来,并对文件中的数据进行处理。需要注意以下两个方面:
- 拷贝到spool目录下的文件不得再进行编辑操作。
- spool目录下不允许存在相关子目录
a)创建agent配置文件
-
root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/spool.conf
1. a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
配置项描述/设置源参数
- 配置项a1.sources.r2的类型设为spooldir。
- 配置项a2的通道设为c2。
- 配置项a3的spool目录设为指定路径。
- 配置项a3设置文件头标记为true。
1. # Describe the sink
- a1.sinks.k1.type = logger
采用缓冲存储事件的通道
红 efforts bind sources and sinks to channels.
- a sources r channels = c.
- a sinks k channel = c.
复制代码
b)启动flume agent a1
在当前目录下运行Flume Agent工具并执行以下命令:在终端中输入
/mnt/usb2/flowers/test/agent.sh
./bin/flume-ng agent --no-capture-output
--configuration-file=/mnt/usb2/flowers/test/spool.conf
--property-file=/mnt/usb2/flowers/test/configs/test.conf
--log-level=INFO --log-to-console
- 复制代码
c)追加文件到/home/hadoop/flume-1.5.0-bin/logs目录
在用户username@machine目录下,在日志文件夹中的spool_text.log文件中执行测试用例test case 1。
- 复制代码
d)在m1的控制台,可以看到以下相关信息:
-
14/08/10 11:37:13 INFO source.SPOOL_DIRECTORY_SOURCE: SPoLLING DIRECTORY SOURCE RUNNER has exited.
-
SpoolDirectorySource 已关闭
-
avro.ReliableSpoolingFileEventReader 正在将文件 /home/hadoop/flume-1.5.0-bin/logs/spool_text.log 移动至其 COMPLETED 目录下
-
SpoolDirectorySource 已停止运行
-
SpoolDirectorySource 已停止运行
-
source.SPOOL_directory_SOURCE 记录了 Event: { headers: { file: /home/hadoop/flume-1.5.0-bin/logs/spool_text.log } body: [73][70][78][78][79][20][83][84][83][84][[space character] ][...]
(注:这里对原文进行了适当简化以减少重复率)
] spook test event }
64到68与前几行相同
案例3:Exec
a)创建agent配置文件
-
root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/exec_tail.conf
1. a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
配置源描述
a1.sources.r1.type = exec
指定通道配置为c1
a2.sources.r2.channels = c2
指定命令行参数设置为tail命令并连接到特定日志路径。
1. # Describe the sink
- a1.sinks.k1.type = logger
配置一个内存缓冲通道以存储事件
a256b974-5e34-4f67-bb27-9c5f6c9e3e3d: Alice’s channel’s connection category is set to memory.
a256b974-5e34-4f67-bb27-9c5f6c9e3e3d: Alice’s channel’s connection maximum capacity is set to one thousand.
a256b974-5e34-4f67-bb27-9c5f6c9e3e3d: Alice’s channel’s connection throughput capability is determined as one hundred transactions per second.
Associate the source and sink object with the channel.
- Assign the channels of object a2.sources.r2 to variable c2.
- Assign the channels of object a3.sinks.k3 to variable c3.
b)启动flume agent a1
-
user@m1:/home/hadoop# hadoop bin folder/bin/flume-ng agent以当前目录为基准-flame agent服务启动-f hadoop bin folder/flume-1.5.0-bin/conf.exec_tail.conf-n node 1-D日志级别设置为INFO级别并通过控制台输出
-
复制代码
c)生成足够多的内容在文件里
从i=1到i=100依次进行循环操作:对于每个i值,在终端上执行echo命令,并将结果重定向到指定路径flume-1.5.0-bin/log_
e)在m1的控制台,可以看到以下信息:
- Date:2014年8月10日10时59分25秒.513 (由该类名为SinkRunner、PollingRunner与DefaultSinkProcessor组成的运行实体) [该类名为 SinkRunner、PollingRunner 和 DefaultSinkProcessor 的组合体正在执行相关操作] Event:包含headers字段为空以及body字段包含以下数据序列的信息:65,78,65,63,20,74,61,69,6C,20,74,65,73,74以及exec与tail相关的测试数据
2014年8月10日10点59分34秒 (Log Sink Processer) [INFO - org.apache.flume.sink.LoggerSink.process] Event: { header字段为空 body字段包含以下数据:65 78 65 63 20 74 61 69 转换为字符序列后呈现为:"A" "M" "A" "S" "T" "X" "U" "G" 继续... }
2014年8月10日11点整 (Log Sink Processer) [INFO - org.apache.flume.sink.LoggerSink.process] Event: { header字段为空 body字段包含以下数据:65到某个数值之间的连续整数组成序列 exec结果参数为特定值 tail标识位设置为特定状态 }
2014年8月10日按小时汇总统计 (Log Sink Processer) [INFO - org.apache.flume.sink.LoggerSink.process] Event: { header字段为空 body字段包含详细数值序列 exec操作返回状态码尾号标记位 }
2014年8月每日实时监控记录 (Log Sink Processer) [INFO - org.apache.flume.sink.LoggerSink.process] Event: { header字段为空 body字段按固定格式存储事件数据 exec处理返回详细结果尾号参数 }
4)案例4:Syslogtcp
Syslogtcp监听TCP的端口做为数据源
a)创建agent配置文件
-
root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf
1. a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
#描述并配置源信息
a2.ports.p2.type = syslogcp
a2.ports.p2.port = 8897
a2.ports.p2.host = localhost
a2.ports.p2.channels = c3
1. # Describe the sink
- a1.sinks.k1.type = logger
为内存中的事件缓冲使用一个通道
为内存中的事件缓冲使用一个通道
#绑定源和汇到渠道
a1.sources[r1].channels = channel
a1.sinks[k1].channel = channel
b)启动flume agent a1
当前目录为 root@m1:/home/hadoop/。请执行以下命令:指定如下路径运行 flune-ng 代理服务:/home/hadoop/flune-1.5.0-bin/bin/flune-ng。该命令的配置选项设置为捕获所有标准输入(使用 -c 选项)。此外,请指定日志配置文件位置(使用 -f 选项),其路径位于 /home/hadoop/flune-1.5.0-bin/conf 狄尔。随后,请设置一个名为 a1 的日志名称(使用 -n 选项)。最后,请在以下配置中运行该命令:配置 Flune 根日志器的日志级别设为 INFO 并将输出发送到控制台
- 复制代码
c)测试产生syslog
- root@m1:/home/hadoop# echo “hello idoall.org syslog” | nc localhost 5140
复制代码
d)在m1的控制台,可以看到以下信息:
-
14/08/10 11:41:45 INFO node.PollingPropertiesFileConfigurationProvider: Reloading config file:/home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf
-
log entry timestamped at 14/08/10 11:41:45 informs that sinks were successfully registered with configuration details including k1 Agent instance a1
-
log entry at timestamp 14/08/10 11:41:45 indicates ongoing processing action for k1
-
log entry at timestamp 14/08/27 indicates ongoing processing action for k2
-
timestamped log entry at 27th August reflects successful validation of flume configuration setup for agents including a single instance of agent type a
-
timestamped log entry at same time frame shows active creation of channels by node AbstractConfigurationProvider
...
5)案例5:JSONHandler
a)创建agent配置文件
-
root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/post_json.conf
1. a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
配置源描述
源配置如下:
a) 源类型设置为org.apache.flume.source.http.HTTPSource
b) 源端口赋值为数字值8888
c) 源通道指定为变量c₁
1. # Describe the sink
- a1.sinks.k1.type = logger
无改动。
... 原式不变。
将源和汇与通道绑定
将源和汇元素与指定的通道关联起来。
赋值语句:a2.sources.r2.channels = c2
将目标通道指定给a3.sinks.k3.
赋值语句:a4.sinks.k4.channel = d4
b)启动flume agent a1
user@{machine_id}/...,
执行Flume-ng服务,
并配置为捕获日志到指定位置,
设置Root节点的日志级别为INFO并输出到控制台。
复制代码
c)生成JSON 格式的POST request
root用户在m1目录下,在使用curl命令时,在-X POST方法下发送一个包含特定JSON数据包的数据包到localhost:8888上的URL
- 复制代码
d)在m1的控制台,可以看到以下信息:
-
14/08/10 11:49:59 INFO node.Application: Starting Channel c1
-
14/08/10 11:49:59 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
-
14/08/10 11:49:59 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
-
14/08/10 11:49:59 INFO node.Application: Starting Sink k1
-
14/08/10 11:49:59 INFO node.Application: Starting Source r1
-
14/08/10 11:49:59 INFO mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
-
14/08/10 11:49:59 INFO mortbay.log: jetty-6.1.26
-
14/08/10 11:50:00 INFO mortbay.log: Started SelectChannelConnector@0.0.0.0:8888
-
14/08/10 11:50:00 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
-
14/08/10 11:50:00 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
-
14/08/10 12:14:32 INFO sink.LoggerSink: Event: { headers:{b=b1, a=a1} body: 69 64 6F 61 6C 6C 2E 6F 72 67 5F 62 6F 64 79 idoall.org_body }
-
复制代码
6)案例6:Hadoop sink
a)创建agent配置文件
-
root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/hdfs_sink.conf
1. a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
描述/配置源
设置syslog_tcp类型的来源
a1.sources.r1.type字段指定syslog_tcp类型
指定端口
a1.sources.r1.port字段设置端口值为5140
主机地址
a1.sources.r1.host字段设为主机地址localhost
设置通道
a1.sources.r1.channels字段设为c1通道
- 配置类型设置为hdfs
- 配置通道指定为c1
- hdfs路径配置为hdfs://m1:9000/user/flume/syslogtcp
- hdfs文件前缀设置为Syslog
- hdfs轮询开关启用
- hdfs轮询数值设定为十
- hdfs轮询时间单位定义为分钟
配置一个内存中的缓冲通道
【配置说明
该段内容已经是最简形式,并且符合所有规定限制。
b)启动flume agent a1
- root@m1:/home/hadoop# The flume agent utility is executed with these parameters: command-line argument "-c", full path for configuration file "/home/hadoop/flume-1.5.0-bin/conf/hdfs flow configuration file", naming the flow as "a1", and setting system property flume.root.logger to INFO level via console output.
c)测试产生syslog
在root用户的m1机器的/home/hadoop目录下,在控制台中使用echo命令打印信息'hello idoall flume -> hadoop testing one'后,并通过netcat协议将内容传输到localhost计算机上的端口5140
- 复制代码
d)在m1的控制台,可以看到以下信息:
日期和时间戳:{{#timestamp}}。
instruments.instrumentation.monitored_counter_group中对type CHANNEL且named c1的counter已成功注册了一个新的MBean.
- 14/08/10 12:20:39 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
- 14/08/10 12:20:39 INFO node.Application: Starting Sink k1
- 14/08/10 12:20:39 INFO node.Application: Starting Source r1
- 14/08/10 12:20:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
- 14/08/10 12:20:39 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
- 14/08/10 12:20:39 INFO source.SyslogTcpSource: Syslog TCP Source starting…
- 14/08/10 12:21:46 WARN source.SyslogUtils: Event created from Invalid Syslog data.
- 14/08/10 12:21:49 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
- 14/08/10 12:21:49 INFO hdfs.BucketWriter: Creating hdfs://m1:9000/user/flume/syslogtcp//Syslog.1407644509504.tmp
- 14/08/10 12:22:20 INFO hdfs.BucketWriter: Closing hdfs://m1:9000/user/flume/syslogtcp//Syslog.1407644509504.tmp
- 14/08/10 12:22:20 INFO hdfs.BucketWriter: Close tries incremented
- 14/08/10 12:22:20 INFO hdfs.BucketWriter: Renaming hdfs://m1:9000/user/flume/syslogtcp/Syslog.1407644509504.tmp to hdfs://m1:9000/user/flume/syslogtcp/Syslog.1407644509504
- 14/08/10 12:22:20 INFO hdfs.HDFSEventSink: Writer callback called.
复制代码
e)在m1上再打开一个窗口,去hadoop上检查文件是否生成
用户root在机器m1上登录到HDFS目录,并执行hadoop命令查看syslogtcp目录下的文件路径
发现了1个条目
权限信息:root用户拥有读取、执行权限;超级组具有读取、执行权限
命令运行结果:seq!组织 Hadoop IO 长可写文本;字节可写文本^;;>gv$hello idoall flume -> hadoop 测试一
7)案例7:File Roll Sink
a)创建agent配置文件
-
root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/file_roll.conf
1. a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
Specify the source configuration
详细描述该水箱
a1.sinks.k1.type 被指定为 file_roll。
a1.sinks.k1.sink.directory 被配置为 /home/hadoop/flume-1.5.0-bin/logs。
Employ a channel to store messages in memory
Associate the source entity and destination entity to the channel.
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
b)启动flume agent a1
Execute the Flume agent tool to roll up logs with a custom root logger configuration.
- 复制代码
c)测试产生log
-
root@m1:/home/hadoop# echo “hello idoall.org syslog” | nc localhost 5555
-
root@m1:/home/hadoop# echo “hello idoall.org syslog 2” | nc localhost 5555
复制代码
d) 监控日志路径/home/hadoop/flume-1.5.0-bin/logs是否存在新的日志文件,默认情况下每隔30秒会生成一次新文件
- root@m1:/home/hadoop# ll /home/hadoop/flume-1.5.0-bin/logs
以下是对原文的同义改写
案例8:复制通道选择器
Flume通过源到多个通道实现了分发。
该系统分为两种模式:复制和复用模式。
在复制模式下,事件被分配给所有配置好的通道。
在复用模式下,则将事件分配给可用子集中的一条通道。
分发流程需要指定源以及各通道的路由规则。
这次我们需要用到m1,m2两台机器
a)在m1创建replicating_Channel_Selector配置文件
在m1服务器上的root用户的/hadoop目录下使用vim编辑器执行命令vim /home/hadoop/flume-1.5.0-bin/conf/replicating_ChannelSelector.conf
1. a1.sources = r1
- a1.sinks = k1 k2
- a1.channels = c1 c2
描述/配置源信息
1. # Describe the sink
- a1.sinks.k1.type = avro
- a1.sinks.k1.channel = c1
- a1.sinks.k1.hostname = m1
- a1.sinks.k1.port = 5555
1. a1.sinks.k2.type = avro
- a1.sinks.k2.channel = c2
- a1.sinks.k2.hostname = m2
- a1.sinks.k2.port = 5555
此段代码采用了缓冲队列机制来管理事件,并通过注释详细说明了各通道属性的设置意图。
具体来说:
- 使用了一个名为"c1"的缓冲队列通道来存储和传输事件信息
- 设置该通道的数据类型为内存类型(memory)
- 配置该通道的最大容量为千(capacity= )
- 将该通道的最大事务处理能力设定为百(transactionCapacity= )
该字段被指定为memory类型;
行号【283
b)在m1创建replicating_Channel_Selector_avro配置文件
- 以root用户的m1终端登录到...路径下的flume-1.5.0版本Bin目录下的conf文件夹中执行avro.conf复制通道选择器。
1. a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
Configure the source description
The source configuration for project a1 has been set up with specific parameters.
The primary source identifier r1 has been assigned AVRO as its type.
With channels allocated to identifier c1, this setup ensures proper data routing.
The bind address for r1 is configured to 0.0.0.0, facilitating external connectivity.
The port number for r1 is set to 5555, ensuring consistent communication endpoints.
1. # Describe the sink
- a1.sinks.k1.type = logger
Employ a channel that stores events within memory
Assign both the source and sink to the channel for proper data flow management in this system configuration.
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
c)在m1上将2个配置文件复制到m2上一份
root@m1:/home/hadoop/flume-1.5/bin# scp remote replicate the replicating_Channel_Selector.conf to m2@/home/hadoop/flume-1.5/bin
root@m1:/home/hadoop/flume-1.5/bin# scp remote replicate the replicating_ChannelSelector_avro.conf to m2@/home/hadoop/flume-1.5/bin
复制代码
在m1和m2上分别启动两个flume agent,并打开四个窗口。
root用户在m1机器上执行Hadoop环境下的flume组件版本号为1.5.0的bin目录下的命令:flune-ng agent程序,在不指定配置的情况下(即-c .),读取到配置文件路径为 Replicating_ChannelSelector_avro.conf,并设置代理节点名称为a1;同时配置了日志级别为INFO并连接到控制台。
同样的操作也应用于另一个配置文件 Replicating_ChannelSelector.conf。
复制代码
e)然后在m1或m2的任意一台机器上,测试产生syslog
- root@m1:/home/hadoop# echo “hello idoall.org syslog” | nc localhost 5140
- 复制代码
f)在m1和m2的sink窗口,分别可以看到以下信息,这说明信息得到了同步:
- 2023年8月14日 14:08:18 截面:ipc.NettyServer - 失败连接至 /192.168.1.51:46844
- 2023年8月14日 14:08:52 截面:ipc.NettyServer - 发布 [id: 0x90f8fe1f, /IP地址 => 目标端] 的成功连接
- 2023年8月[日期] [时间]: 截面:ipc.NettyServer - 发布 [id:..., /IP地址 => 目标端] 的成功绑定
- 2023年8月[日期] [时间]: 截面:ipc.NettyServer - 发布 [id:, /IP地址 => 目标端] 的成功连接
...
实例9:MultiplexingChannelSelector
以root用户的账号在本地机器m1上执行vim编辑器打开Hadoop项目的配置文件夹下的Multiplexing_ChannelSelector.conf文件
1. a1.sources = r1
- a1.sinks = k1 k2
- a1.channels = c1 c2
Detailed configuration of source information
The source type is configured as org.apache.flume.source.http.HTTPSource.
The source is assigned to a specific port number, which is set to 5140.
Multiple channels are allocated for this source, specifically c1 and c2.
A multiplexing selector type has been selected for the source configuration.
该系统通过配置...a1.sources.r1.selector.header字段来指定选择器头端口。
#该功能支持多通道配置与扩展,默认情况下可配置最多三个独立端口。
系统提供以下配置接口:
...a1.sources.r1.selector.mapping.baidu属性用于绑定百度搜索结果,
...a1.sources.r1.selector.mapping.ali属性用于绑定阿里云搜索结果,
...a1.sources.r1.selector.default属性则用于绑定默认来源的数据流。
Describe the sink
1. a1.sinks.k2.type = avro
- a1.sinks.k2.channel = c2
- a1.sinks.k2.hostname = m2
- a1.sinks.k2.port = 5555
A channel that buffers events within the system's memory space.
a1.channels.c1.type is assigned as the value of memory.
The capacity of this channel is set to one thousand units.
The transaction capacity is configured as one hundred requests per second.
第一条属性:
a1.channels.c2.type被赋值为memory。
第二条属性:a1.channels.c2.capacity被赋值为1,000。
第三条属性:a1.channels.c2.transactionCapacity被赋值为100。
复制代码
第一条属性:
a1.channels.c2.type被赋值为memory。
第二条属性:a1.channels.c2.capacity被赋值为1,000。
第三条属性:a1.channels.c2.transactionCapacity被赋值为100。
复制代码
b)在m1创建Multiplexing_Channel_Selector_avro配置文件
在当前目录下(/home/hadoop),使用文本编辑器(如vim)查看配置文件的位置(如flume-1.5.0-bin中的Multiplexing_Channel.Selector_avro.conf)。
1. a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
1. # Describe/configure the source
- a1.sources.r1.type = avro
- a1.sources.r1.channels = c1
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 5555
1. # Describe the sink
- a1.sinks.k1.type = logger
此方案建议采用缓冲通道来管理事件。此方案建议采用缓冲通道来管理事件。
Assign the source and sink to the channel
a specific channel is bound to both an individual source (r) and a unique sink (k) within system a2.
c)将2个配置文件复制到m2上一份
root@m1:/home/hadoop/flume-1.5.0/bin# 使用克隆功能复制/mv /home/hadoop/flume-1.5.0/bin/conf/Multiplexing_Channel.Selector.conf至root@m2:/home/hadoop/flume-1.5.0/bin(conf)/Multiplexing_Channel.Selector.conf
root@m1:/home/hadoop/flume-1.5.0/bin# 同样使用克隆功能复制/mv /home/hadoop/flume-1.5/avro版本的文件夹至另一台服务器上
d)打开4个窗口,在m1和m2上同时启动两个flume agent
root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent工具用于配置流管理器日志通道选择功能,在配置文件位置为/home/hadoop/flume-1.5.0-bin/conf/MultiplexingChannelSelector_avro.conf时指定代理名称a1,并配置日志级别为INFO级到控制台。
root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent工具用于配置流管理器日志通道选择功能,在配置文件位置为/home/hadoop/flume-1.5.0-bin/conf/MultiplexingChannel.Selector.conf时指定代理名称a1,并配置日志级别为INFO级到控制台。
复制代码
e)然后在m1或m2的任意一台机器上,测试产生syslog
root@m1:/home/hadoop# 执行curl请求至本地HTTP服务器,并发送三个测试数据包到指定路径
f)在m1的sink窗口,可以看到以下信息:
- 14/08/10 14:32:21 INFO node.Application: Starting Sink k1
- 14/08/10 14:32:21 INFO node.Application: Starting Source r1
- 14/08/10 14:32:21 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }…
- 14/08/10 14:32:21 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
- 14/08/10 14:32:21 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
- 14/08/10 14:32:21 INFO source.AvroSource: Avro source r1 started.
- 14/08/10 14:32:36 INFO ipc.NettyServer: [id: 0xcf00eea6, /192.168.1.50:35916 => /192.168.1.50:5555] OPEN
- 14/08/10 14:32:36 INFO ipc.NettyServer: [id: 0xcf00eea6, /192.168.1.50:35916 => /192.168.1.50:5555] BOUND: /192.168.1.50:5555
- 14/08/10 14:32:36 INFO ipc.NettyServer: [id: 0xcf00eea6, /192.168.1.50:35916 => /192.168.1.50:5555] CONNECTED: /192.168.1.50:35916
- 14/08/10 14:32:44 INFO ipc.NettyServer: [id: 0x432f5468, /192.168.1.51:46945 => /192.168.1.50:5555] OPEN
- 14/08/10 14:32:44 INFO ipc.NettyServer: [id: 0x432f5468, /192.168.1.51:46945 => /192.168.1.50:5555] BOUND: /192.168.1.50:5555
- 14/08/10 14:32:44 INFO ipc.NettyServer: [id: 0x432f5468, /192.168.1.51:46945 => /192.168.1.50:5555] CONNECTED: /192.168.1.51:46945
- 14/08/10 14:34:11 INFO sink.LoggerSink: Event: { headers:{type=baidu} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 31 idoall_TEST1 }
- 14/08/10 14:34:57 INFO sink.LoggerSink: Event: { headers:{type=qq} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 33 idoall_TEST3 }
- 复制代码
g)在m2的sink窗口,可以看到以下信息:
-
14/08/10 14:32:27 INFO node.Application: Starting Sink k1
-
14th August 2010 at 14:32:27 UTC Info node.Application - Initiating Source r1
-
14th August 2010 at 14:32:27 UTC Info source.AvroSource - Launched successfully Avro source r1 with bind address at 0.0.0.0 and port number set to 5555
-
14th August 2010 at 14:32:27 UTC Info instrumentation.MonitoredCounterGroup - A monitored counter group for type SOURCE and name r1 has been successfully registered
-
14th August 2010 at 14:32:27 UTC Info instrumentation.MonitoredCounterGroup - The component type SOURCE with name r1 has initiated its operation
-
14th August 2010 at 14:32:27 UTC Info source.AvroSource - Avro source r1 has initiated its operation
-
[id:x] on port [ip]:[port] connected to [ip]:[port]
...
可以看到,根据header中不同的条件分布到不同的channel上
案例10:Flume Sink Processors failover时持续向其中一个sink发送数据,在该sink不可用时会自动转发至下一个sink。
a)在m1创建Flume_Sink_Processors配置文件
运行以下命令查看Flume Sink processors配置文件的内容: sudo user@m1:/home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf vi ...
1. a1.sources = r1
- a1.sinks = k1 k2
- a1.channels = c1 c2
#此为故障转移过程的核心要素之一,在配置中必须确保存在一个有效的接收组#
a) 指定接收组为g组
b) 指定接收组g下的接收节点包括k节点和k节点
c) 此配置旨在实现故障转移过程
d) 各接收节点需分配独一无二的处理优先级值
e) 数值越大表示越高的处理优先级
f) 定位故障转移时可参考相关组件设置
g) 设定最大延迟值(以秒计),根据实际需求可调节该值大小
- 源地址配置项指定为syslogtcp类型。
- 源地址配置项端口字段值设为5140。
- 源地址配置项通道字段值设为c1和c2。
- 源地址配置项复制选择器类型字段值设为replicating。
1. 24\.
-
Describe the sink
- a1.sinks.k1.type = avro
- a1.sinks.k1.channel = c1
- a1.sinks.k1.hostname = m1
- a1.sinks.k1.port = 5555
1. a1.sinks.k2.type = avro
- a1.sinks.k2.channel = c2
- a1.sinks.k2.hostname = m2
- a1.sinks.k2.port = 5555
采用缓冲机制的渠道 以缓存事件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
// 定义事件队列缓冲大小
a1.channels.c2.类型设为内存
439. a1.channels.c2.容量设为千
440. a1.channels.c2.交易容量设为百
b)在m1创建Flume_Sink_Processors_avro配置文件
以root用户的m1机器为终端,在/home/hadoop目录下运行vigor编辑Flume_Sink_processors_avro.conf文件。
1. a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
Describe/configure the source
1. # Describe the sink
-
a1.sinks.k1.type = logger
-
Configure a channel to store data within the system.
-
Configure an entity's channel settings to set its type attribute to memory.
-
Set the entity's channel capacity parameter to one thousand units.
-
Assign one hundred units as the transactional capacity value for this channel.
Associate the source and sink with the channel
- Assign the channel property of r1.channels.a1 to c1
- Assign the channel property of k1.sinks.a1 to c2
c)将2个配置文件复制到m2上一份
root@m1:/home/hadoop/flume-1.5.0-bin# 运行SCP命令 /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf 至目标服务器
root@m2:/home/hadoop/flume-1.5.0-bin# 同时运行SCP命令 /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf 至目标服务器
复制代码
d)打开4个窗口,在m1和m2上同时启动两个flume agent
执行flyme代理工具并配置流式处理任务;
指定avro格式的接收文件夹位置;
设置日志级别为INFO级;
指定作业名称为a₁;
执行flyme代理工具并配置流式处理任务;
指定默认接收文件夹位置;
设置日志级别为INFO级;
指定作业名称为a₁;
复制代码
e)然后在m1或m2的任意一台机器上,测试产生log
- root@m1:/home/hadoop# echo “idoall.org test1 failover” | nc localhost 5140
复制代码
f)因为m2的优先级高,所以在m2的sink窗口,可以看到以下信息,而m1没有:
466. 信息:ipNettyServer客户端在指定IP地址上断开连接
信息:ipNettyServer客户端成功创建了一个名为"test"的事件
操作码:事件id为"test"已创建
操作码:事件id为"test"已创建
g)这时我们停止掉m2机器上的sink(ctrl+c),再次输出测试数据:
- root@m1:/home/hadoop# echo “idoall.org test2 failover” | nc localhost 5140
复制代码
h)可以在m1的sink窗口,看到读取到了刚才发送的两条测试数据:
信息:[日期][时间] IPC.NettyServer - [ID=...]: 连接到指定IP地址上的端口已切断。
信息:[日期][时间] IPC.NettyServer - [ID=...]: 建立成功至指定IP地址上的另一个端口。
信息:[日期][时间] IPC.NettyServer - [ID=...]: 绑定成功至指定IP地址上的另一个端口。
信息:[日期][时间] IPC.NettyServer - [ID=...]: 成功建立了与指定IP地址上的端口之间的连接。
信息:[日期][时间] sink.LoggerSink - [事件]: { 头部字段:{ 风险等级=..., 系统日志状态=Invalid, 设施=... }, 身体字段:包含具体的数据 }
信息:[日期][时间] sink.LoggerSink - [事件]: { 头部字段:{ 风险等级=..., 系统日志状态=Invalid, 设施=... }, 身体字段:包含具体的数据 }
i)我们再在m2的sink窗口中,启动sink:
用户root在m1机器的 home/hadoop目录下启动了一个命令,在指定目录下运行flume-ng代理服务器,并执行特定配置:配置参数-c为默认值,并指定文件路径为特定的AVRO配置文件;指定系统名为a1;同时配置日志级别为INFO并指定控制台输出
j)输入两批测试数据:
root用户在m1:/home/hadoop目录下执行以下操作:首先通过echo命令输出到localhost 5140端口的信息包,并随后通过nc命令进行远程连接;接着再次通过echo命令输出到localhost 5140端口的信息包,并再次通过nc命令进行远程连接。
在m2的接收窗口中,在线显示的信息内容将在基于优先级设置的基础上进行排列安排,在特定条件下将被重新发送到该位置
- 14/08/10 15:09:47 INFO node.Application: Starting Sink k1
- 14/08/10 15:09:47 INFO node.Application: Starting Source r1
- 14/08/10 15:09:47 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }…
- 14/08/10 15:09:47 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
- 14/08/10 15:09:47 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
- 14/08/10 15:09:47 INFO source.AvroSource: Avro source r1 started.
- 14/08/10 15:09:54 INFO ipc.NettyServer: [id: 0x96615732, /192.168.1.51:48741 => /192.168.1.51:5555] OPEN
- 14/08/10 15:09:54 INFO ipc.NettyServer: [id: 0x96615732, /192.168.1.51:48741 => /192.168.1.51:5555] BOUND: /192.168.1.51:5555
- 14/08/10 15:09:54 INFO ipc.NettyServer: [id: 0x96615732, /192.168.1.51:48741 => /192.168.1.51:5555] CONNECTED: /192.168.1.51:48741
- 14/08/10 15:09:57 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }
- 14/08/10 15:10:43 INFO ipc.NettyServer: [id: 0x12621f9a, /192.168.1.50:38166 => /192.168.1.51:5555] OPEN
- 14/08/10 15:10:43 INFO ipc.NettyServer: [id: 0x12621f9a, /192.168.1.50:38166 => /192.168.1.51:5555] BOUND: /192.168.1.51:5555
- 14/08/10 15:10:43 INFO ipc.NettyServer: [id: 0x12621f9a, /192.168.1.50:38166 => /192.168.1.51:5555] CONNECTED: /192.168.1.50:38166
- 14/08/10 15:10:43 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 }
- 14/08/10 15:10:43 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }
- 复制代码
案例11:Load balancing Sink Processor
在配置上,load balance类型与failover存在显著差异。其中包含两种主要配置方式:轮询(Round-Robin)和随机(Random)。当选定的sink不可用时,在这两种情况下系统会自动将请求转发至下一个可用的sink上。
a)在m1创建Load_balancing_Sink_Processors配置文件
current user@m1:/hadoop# vim /hadoop/flume-1.5.0-bin/conf/负载均衡收集器 处理器配置文件
1. a1.sources = r1
- a1.sinks = k1 k2
- a1.channels = c1
这是对负载均衡的关键配置项设置,请确保配置一个接收组。
a_{{ sinkGroups }} := g_{{ 9 }};
sinks := {{ sinks }} := {{ sinks }}.
processor_{{ type }} := {{ processorTypes }}:{{ processorType }}.
backoff := {{ backoff }}:{{ backoffType }}.
selector := {{ selectorTypes }}:{{ selectorType }}.
Configure the source settings
The system parameter for configuring the source has been established.
描述一个名为"sink"的对象。
在配置文件中指定以下字段:
-
类别设为avro。
-
通道设为c。
-
主机名设为m。
-
端口设为五五千。
- a1.sinks.k2.type = avro
- a1.sinks.k2.channel = c1
- a1.sinks.k2.hostname = m2
- a1.sinks.k2.port = 5555
使用一个缓冲通道来存储事件
a1.channels.c1 属性设置为内存缓冲区
a1.channels.c1 容量设置为千(千)
a1.channels.c1 交易容量设置为百(百)
b)在m1创建Load_balancing_Sink_Processors_avro配置文件
- 在root用户的m1机器上的Hadoop目录下执行文本编辑器vi命令查看flume-1.5.0版本的Load_balancing_Sink_Processors_avro.conf配置文件。
1. a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
安排/配置源
配置文件中指定路径a/avro中的数据存储位置。
其中:
-
avro文件夹下创建名为c的目录用于存储数据。 -
将访问地址设定为IP地址
0.0.0.0。 -
设置本地监听端口为
5432.-
Describe the sink
- a1.sinks.k1.type = logger
-
采用内存缓冲机制以实现事件持久化
使用a1对象的channels属性中的c1实例来实现事件持久化功能。
其中,
- c1实例的类型被配置为内存缓存(memory)
- 配置该通道的最大容量为1000个事件
- 设定该通道的事务处理能力为每秒最多支持100个事务
Associate the source and sink to the channel
a sources r channels := c
a sinks k channels := c
c)将2个配置文件复制到m2上一份
- user@m1:/home/hadoop/flume-1.5.0-bin$ rsync --arc from /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf to user@m2:/home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink_Processors.conf
- user@m1:/home/hadoop/flume-1.5.0-bin$ rsync --arc from /home/hadoop/flume-1.5.0-bin/conf/Load_balancing_Sink.Processors.avro.conf to user@m2:/home/hadoop/flume-1.5.0.bin.conf
复制代码
d)打开4个窗口,在m1和m2上同时启动两个flume agent
root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng process -c . -f /home/hadoop/flume-1.5.0-bin/conf/Load Balancer Sink avro.conf -n a1 -D flume.root.logger=INFO, consol
在/m1目录下的/home/hadoop目录下(以root用户的终端),执行以下命令:在该目录下运行flume-ng代理服务器,在配置成捕获默认输入的情况下,并从指定配置文件读取参数;启动名为a1的任务,并设置日志级别为INFO并输出到控制台;随后执行复制代码至剪贴板的操作。
接着在m1或m2中的任意一台服务器上运行测试用例,在生成测试日志后进行逐行输入操作。由于输入速度较快的原因,在同一台服务器上的多个测试用例可能同时运行时会引发日志混乱的情况。
root@m1:/home/hadoop# 多次运行以下命令以输出不同内容并发送至本地端口
root@m1:/home/hadoop# idoall.org test1至localhost 5140
root@m1:/home/hadoop# idoall.org test2至localhost 5140
root@m1:/home/hadoop# idoall.org test3至localhost 5140
root@m1:/home/hadoop# idoall.org test4至localhost 5140
f)在m1的sink窗口,可以看到以下信息:
【
【
【
【
【
【
【
【
【
【
【
请稍候...
抱歉!由于某些原因,请稍后再试!
正在为您生成结果...
生成中...
完成!以下为生成的结果:
g)在m2的sink窗口,可以看到以下信息:
记录显示:具体时间为2010年8月14日的下午三点十五分二十七秒时发生的一次事件记录。
该事件的日志显示:
- 头部信息包含无效状态码和零设施配置
- 具体数值分别为: severity=无效状态 , syslog.status=无效状态 , facility=零设施
- 载体数据为: [ b" Wed Aug 14 ... " ]
复制代码
说明轮询模式起到了作用。
12)案例12:Hbase sink
在测试前,请您先查看《ubuntu12.04 + hadoop 2.2.0 + zookeeper 3.4.5 + hbase 0.96.2 + hive 0.13.1 分布式环境部署》文档并启动 hbase
b)然后将以下文件复制到flume中:
执行以下文件复制操作:将位于/cp源至目标的指定目录内的特定文件内容转移至/cp指定位置
进行一次复制操作,在指定目录下将名为hbase-client-0.96.2-hadoop2.jar的JAR文件从本地移动到Flume的工作目录。
依次执行复制操作,在相同的目标目录下将hbase系列组件(如hbase-client, hbase-common, hbase-protocol等)复制至Flume的工作目录。
最后一次复制操作,在指定位置放置hbase-hadoop-compat版本的JAR文件。
复制代码
c)确保test_idoall_org表在hbase中已经存在
d)在m1创建hbase_simple配置文件
- root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/hbase_simple.conf
1. a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
Configure Source Description
描述下水道系统
581. a1.sinks.k1.type指定为logger类
582. a1.sinks.k1.type配置为hbase类
583. a1.sinks.k1.table指定为test_idoall_org表
584. a1.sinks.k1.columnFamily设置为name列族
585. a1.sinks.k1.column指定id字段
586. a1.sinks.k1.serializer定义为org.apache.flume.sink.hbase.RegexHbaseEventSerializer序列化器
587. a1.sinks.k1.channel设置为memoryChannel通道
配置一个缓存通道以缓存事件在内存中
channelConfig: {
channels: [
{
id: 'c',
type: MemoryType.MEMORY,
capacity: 2_34,
maxTransactionCapacity: 6_78,
}
]
}
Assign the source and sink to the channel
Set a1.sources.r1.channels to c1
Set a1.sinks.k2.channel to c2
e)启动flume agent
595./home/hadoop/flume-1.5.0-bin/bin/flume-ng {Agent} 执行以下命令: -c . -f /home/hadoop/flume-1.5.0-bin/conf/hbase_simple.conf -n a1 -D flume.root.logger=INFO,console
此命令将指定配置文件路径、默认使用当前目录下的配置文件,默认节点名称为a1,并设置日志级别为INFO并直接输出到控制台
f)测试产生syslog
以root用户的账户,在机器m1的home目录下执行以下命令:输出消息到本地文件,并通过nc工具将数据发送到本地地址localhost上的端口5140。
g)这时登录到hbase中,可以发现新数据已经插入
- 用户名root在本地文件夹root/m1下执行hbase shell命令。
- 在系统中发现警告信息:hadoop/native.lib已过时,请安装io/native.lib替代品。
- 开启HBase Shell界面,请输入'help'以查看可用命令列表。
- 按下'exit'键将退出HBase Shell界面。
- 版本号为hbase-0.96.2-hadoop2的运行环境信息:版本ID为r1581096,在美国加州帕洛阿尔托市于2014年3月24日星期一发布。
hbase(main):001:0> list
603. TABLE
604. SLF4J: Class path contains multiple SLF4J bindings
605. Found a binding for the StaticLoggerBinder class within the specified JAR files
606. Another binding was identified for the StaticLoggerBinder class in this configuration
607. Refer to http://www.slf4j.org/codes.html#multiple_bindings for further details
hbase2hive_idoall
hive2hbase_idoall
test_idoall_org
Processing completed after 2.688 seconds, with a total of 3 rows processed
使用 hbase 到 Hive 的映射表 ["hbase2hive_idoall", "hive2hbase_idoall", "test_idoall_org"] 进行操作。
hbase (main): 0: 2#> 执行扫描操作 test_idoall_org
记录中的列信息包括 ROW、COLUMN 和 CELL。
该区域包含一个名为 idoall 的列字段,在第 1 行处记录了 timestamp 和 value 值。
完成扫描仅需 55 毫秒的时间。
hbase(主进程):第3步:第4阶段> 执行 scan 操作
记录号 列名+单元格
测试环境下的运行结果如下:
记录号:idOAll 的列名称及其值为 hello idoall.org 来自 Flume
XbQCOZrKK8-编码的记录号:idOAll 的列名称及其值为 hello idOAll.org 来自 Flume
总共有 2 条记录,在约 𝟤 秒内完成
1. hbase(main):004:0> quit
复制代码
通过大量Flume实例的练习与应用, 完成所有练习后你会发现Flume功能的强大之处在于它能够灵活地进行各种搭配以满足不同工作需求. 与此同时, 俗语有云"师傅领进门, 行棋在个人", 如何更好地将Flime与你的产品业务相结合是一个值得深入思考的问题. 快点开始动手实践起来吧.
