flume-1.7.0 简单使用
在上一篇中,我们安装了 flume-ng,这一篇我们就来简单使用一下。

官网是这样描述的:为了方便操作和管理,在配置系统中您需要依次完成以下几个步骤——首先确定配置文件路径并将其设置好;其次在配置文件中明确指定 agent 名称字段;最后可以通过运行 flume-ng 命令来启动服务
1 编写配置文件
我们先拿官网上的例子来跑一下看看,就使用 example.conf 文件:
[root@master conf]# pwd
/usr/hadoop/flume-1.7.0-bin/conf
[root@master conf]# vi example.conf
# example.conf: A single-node Flume configuration
# Name the components on this agent
# 定义一个 agent 的元素
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# 配置 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
# 配置 sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
# 定义 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 用 channel 连接起来 source 和 sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
这里我随手做了几点注释,并非直接照搬他人译文,请大家审阅时不要误以为这些是我个人见解。具体相关知识,请参考官方资料吧。

这是官方发布的示意图。
我们可以这样理解:
WebServer想要向HDFS传输一些数据,并因此寻求帮助。为此,WebServer调用了一个名为flune的代理程序。该代理程序接收并处理来自WebServer的数据包(source)。flune代理接收后利用自身的渠道(如物流服务),最终送达至离HDFS最近的快递分拣中心(sink)。
该图仅作为示意图表存在,并非用于对example.conf文件的具体解析说明,请切勿将其误解为对example.conf文件的具体解析说明。
1.1 source
不听我的胡言乱语,请相信我的观点是为了让内容更加清晰易懂;其实我的想法有些复杂。
那么,
对于这个 example.conf 的配置文件,
我们定义了一个叫做a1的agent,
然后 source 源配置为netcat类型,
对于 netcat source 需要配置的内容官网上写的很清楚:

黑色加粗的几项是必须配置的,对于其他几项是可选项。
1.2 channel
接下来是'渠道'(即我们要选择哪一种'物流'),这里我们采用的是'memory'(内存),需要配置的是:

1.3 sink
我们的”物流分拣点“,sink 我们配置的是 logger,需要配置几项是:

或许你已被误导。关于它们的实际解释,请访问官网获取相关信息。如果你想要深入理解这些内容,请将这些资料转化为易于理解的形式。
2 启动 agent
2.1 启动
上面我们已经说过了,使用 flume-ng 命令启动。

具体参数看上图。
[root@master conf]# pwd
/usr/hadoop/flume-1.7.0-bin/conf
[root@master conf]# flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/usr/hadoop/hadoop-2.6.4/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/usr/hadoop/hbase-1.2.3/bin/hbase) for HBASE access
...
16/11/18 19:34:27 INFO node.Application: Starting Channel c1
16/11/18 19:34:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
16/11/18 19:34:27 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
16/11/18 19:34:27 INFO node.Application: Starting Sink k1
16/11/18 19:34:27 INFO node.Application: Starting Source r1
16/11/18 19:34:27 INFO source.NetcatSource: Source starting
16/11/18 19:34:27 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
也值得大家引起注意的是,在我的目录设置中已经将它设为了 conf,并明确指定了该目录下所要使用的应用程序以及相应的运行环境设置。此外,在查看上述情况后,则表明我们的名为 a1 的代理程序已成功启动。并为此配置了一系列参数来实现对日志信息的输出。
2.2 发送消息
接下来我们就可以再开一个终端,在这个终端上执行下面一系列命令:

可能有些朋友在他们的Linux系统上会遇到命令找不到的问题。安装telnet工具就能解决问题。当我们查看错误信息时可以看到"Connection refused"这个错误提示。这是因为 /etc/hosts 文件中将localhost映射为'::1…'这种IP地址时未能成功解析。因为这个IP地址无法被正确识别出来,telnet程序才会尝试使用127.0.0.1地址进行连接,并最终建立通路。
这个时候,我们可以在这儿输入一些东西:

然后我们回到之前运行着 agent 的那个终端:

会看到更多这样的一行内容,在某些情况下,这一行显示的内容数量可能少于我们的输入量,并不是没有接收到来的信息而是由于超出了它的显示容量而被省略掉了
这个时候应该就有所体会了,flume 是”很多个形容词“的

日志采集系统。
我们再来写几个实例,来体会一下。
3. avro ⇒ hdfs
我们的源设置为 Avro 资源(基于此),目标存储为 HDFS 位置(暂且接受这种非规范表述)。接下来,请检查 Avro 源所需配置的内容是什么。
3.1 配置 avro source

我们需要配置有四项,并且下面也给出了示例。
[root@master conf]# vi AvroHDFS.conf
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = master
a1.sources.r1.port = 4141
3.2 配置 channel
这次我们还是使用 memory:

我们在 AvroHDFS.conf 文件中追加:
[root@master conf]# vi AvroHDFS.conf
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = master
a1.sources.r1.port = 4141
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
3.3 配置 sink
我们是向 HDFS 上导数据,所以我们使用 hdfs sink:

这张图片显得过长,在某些设备上可能导致加载时间过长或显示不清晰。因此我选择仅提取了示例片段供参考,并建议大家直接访问官方文档学习具体的配置方法。
我们接着在 AvroHDFS.conf 文件中追加:
[root@master conf]# vi AvroHDFS.conf
a1.sinks = k1
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = master
a1.sources.r1.port = 4141
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /fromflume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events.
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
通过这种方式,我们配置完成了一个从 avro 到 hdfs 的配置文件;即可启动 agent 了。
3.4 启动 agent
这次我们需要先运行起来 hadoop 集群,不然是会失败的。
[root@master conf]# flume-ng agent --conf conf --conf-file AvroHDFS.conf --name a1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/usr/hadoop/hadoop-2.6.4/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/usr/hadoop/hbase-1.2.3/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/usr/hadoop/apache-hive-2.1.0-bin) for Hive access
...
16/11/18 20:46:13 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
16/11/18 20:46:13 INFO source.AvroSource: Avro source r1 started.
表明我们的 agent 成功启动了。然而,在此之后我们需要调用相应的API来继续操作。
表明我们的 agent 成功启动了。然而,在此之后我们需要调用相应的API来继续操作。
import java.nio.charset.Charset;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
public class FlumeDemo {
private String hostname;
private int port;
private RpcClient client;
public FlumeDemo(String hostname,int port) {
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
}
public void sendMessage(String data){
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
try {
client.append(event);
} catch (EventDeliveryException e) {
e.printStackTrace();
}
}
public void cleanUp(){
client.close();
}
public static void main(String[] args) {
FlumeDemo rpcClient = new FlumeDemo("master", 4141);
String data = "testing ";
for(int i=0;i<10;i++){
rpcClient.sendMessage(data + i);
}
rpcClient.cleanUp();
}
}
然后,运行我们的 java 程序,这个时候,观察我们的 agent 是什么状况:
...
16/11/18 20:46:13 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
16/11/18 20:46:13 INFO source.AvroSource: Avro source r1 started.
16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 => /192.168.38.129:4141] OPEN
16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 => /192.168.38.129:4141] BOUND: /192.168.38.129:4141
16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 => /192.168.38.129:4141] CONNECTED: /192.168.38.1:64375
16/11/18 20:46:23 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 :> /192.168.38.129:4141] DISCONNECTED
16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 :> /192.168.38.129:4141] UNBOUND
16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 :> /192.168.38.129:4141] CLOSED
16/11/18 20:46:23 INFO ipc.NettyServer: Connection to /192.168.38.1:64375 disconnected.
16/11/18 20:46:24 INFO hdfs.BucketWriter: Creating /fromflume/events/16-11-18/2040/00/events..1479473183696.tmp
...
此时段中,在HDFS上已经存储了数据。通过WebUI(访问该URL:http://master:50070)可以进入HDFS查看是否新增了从Flume传输的文件夹。

果然,我们的文件夹已经创建成功了,我们可以一级一级的进去,会看到:

文档已顺利完成保存,在查看过程中发现显示异常现象存在,并经核实确实已完成保存操作
4 avro ⇒ kafka
参考我昨天的笔记后,请我们一起来编写一个关于Flume和Kafka结合使用的实例吧。两者在功能上有一定的相似之处,并均是数据传输领域的中间人角色。一个典型的例子是Flume和Kafka结合使用以实现高效的分布式流处理系统。
有了前面两个练习,那这个我就不写的那么详细了。

但是,对于这个 kafka sink 还真是有好多需要说的:
4.1 sinks.type
这一项必须被配置为org.apache.flume.sink.kafka.KafkaSink,并且有人表示这种设置让人觉得奇怪。需要注意的是,在设置时还需要确保这一参数具有预设的默认值。
4.2 kafka.topic
关于这项内容,在别处也提到了一些相关内容的大致意思是:当 topic 出现在 event 的 header字段中时,在 Kafka 的 broker 之前就会被覆盖掉原有的相同名称的主题信息。不过我现在还不太清楚具体细节是什么样的……暂且先把这个问题搁置一边吧。稍后再补充这部分内容的时候会更加清楚一些。这块暂时不用管了哦——先记下来就可以了
好了,我们来写一下 sink :
[root@master conf]# vi AvroKafka.conf
1 a1.sources = r1
2 a1.sinks = k1
3 a1.channels = c1
4
5 a1.sources.r1.type = avro
6 a1.sources.r1.bind = master
7 a1.sources.r1.port = 44444
8
9
10 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
11 a1.sinks.k1.kafka.bootstrap.servers = localhost:9092,localhost:9093
12 a1.sinks.k1.kafka.topic = fromFlume
13
14 a1.channels.c1.type = memory
15 a1.channels.c1.capacity = 1000
16 a1.channels.c1.transactionCapacity = 100
17
18 a1.sources.r1.channels = c1
19 a1.sinks.k1.channel = c1
为了将 flume 和 kafka 结合起来,并通过 kafka 消费 flume 输出的数据流。因此,在本例中,我们需要开启 kafka 服务,并配置一个消费者。作为演示目的,在 master 节点上启动两个 kafka 服务实例。
[root@master config]# pwd
/usr/hadoop/kafka_2.11-0.10.1.0/config
[root@master config]# kafka-server-start.sh server.properties &
[1] 10693
...
[root@master config]# kafka-server-start.sh server1.properties &
[2] 10962
...
[root@master config]# jps
11233 Jps
2593 ResourceManager
10962 Kafka
2692 NodeManager
10693 Kafka
3034 QuorumPeerMain
2171 NameNode
2269 DataNode
2446 SecondaryNameNode
[root@master config]#
[root@master config]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fromFlume --from-beginning
这个时候,我们就可以启动我们的 flume agent 了:
[root@master conf]# flume-ng agent --conf conf --conf-file AvroKafka.conf --name a1 -Dflume.root.logger=INFO,console
...
不知道大家有没有晕掉,反正我当初学的时候是晕了好一阵儿呢。8
接下来,在继续开发过程中我们也调用了之前开发的Java程序:值得注意的是,在当前阶段我们需要将端口号进行调整。特别提醒的是,在设置时请确保该端口与我们的*.conf文件中指定的一致。
import java.nio.charset.Charset;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
public class FlumeDemo {
private String hostname;
private int port;
private RpcClient client;
public FlumeDemo(String hostname,int port) {
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
}
public void sendMessage(String data){
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
try {
client.append(event);
} catch (EventDeliveryException e) {
e.printStackTrace();
}
}
public void cleanUp(){
client.close();
}
public static void main(String[] args) {
FlumeDemo rpcClient = new FlumeDemo("master", 44444);
String data = "Hello World! ";
for(int i=0;i<10;i++){
rpcClient.sendMessage(data + i);
}
rpcClient.cleanUp();
}
}
运行程序,然后,我们去到启动 kafka 消费者的那个终端上,我们会看到:
[2016-11-18 21:46:54,915] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-11-18 21:47:08,073] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
Hello World! 0
Hello World! 1
Hello World! 2
Hello World! 3
Hello World! 4
Hello World! 5
Hello World! 6
Hello World! 7
Hello World! 8
Hello World! 9
不清楚大家有没有想到什么?反正我是这样认为的——输出重定向!我也说了别说我胡里胡涂的!
这些确实只是些基本的例子呢。建议大家仔细阅读文档哦!看看 flume 都是有什么 sources、channels、sinks?试着动手实践一下吧!通过不断尝试就能掌握啦!
