Advertisement

flume-1.7.0 简单使用

阅读量:

在上一篇中,我们安装了 flume-ng,这一篇我们就来简单使用一下。

这里写图片描述

官网是这样描述的:为了方便操作和管理,在配置系统中您需要依次完成以下几个步骤——首先确定配置文件路径并将其设置好;其次在配置文件中明确指定 agent 名称字段;最后可以通过运行 flume-ng 命令来启动服务

1 编写配置文件

我们先拿官网上的例子来跑一下看看,就使用 example.conf 文件:

复制代码
    [root@master conf]# pwd
    /usr/hadoop/flume-1.7.0-bin/conf
    [root@master conf]# vi example.conf 
    
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    # 定义一个 agent 的元素
    
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    # 配置 source
    
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    # 配置 sink
    
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    # 定义 channel
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    # 用 channel 连接起来 source 和 sink
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

这里我随手做了几点注释,并非直接照搬他人译文,请大家审阅时不要误以为这些是我个人见解。具体相关知识,请参考官方资料吧。

这里写图片描述

这是官方发布的示意图。
我们可以这样理解:
WebServer想要向HDFS传输一些数据,并因此寻求帮助。为此,WebServer调用了一个名为flune的代理程序。该代理程序接收并处理来自WebServer的数据包(source)。flune代理接收后利用自身的渠道(如物流服务),最终送达至离HDFS最近的快递分拣中心(sink)。
该图仅作为示意图表存在,并非用于对example.conf文件的具体解析说明,请切勿将其误解为对example.conf文件的具体解析说明。

1.1 source

不听我的胡言乱语,请相信我的观点是为了让内容更加清晰易懂;其实我的想法有些复杂。
那么,
对于这个 example.conf 的配置文件,
我们定义了一个叫做a1的agent,
然后 source 源配置为netcat类型,
对于 netcat source 需要配置的内容官网上写的很清楚:

这里写图片描述

黑色加粗的几项是必须配置的,对于其他几项是可选项。

1.2 channel

接下来是'渠道'(即我们要选择哪一种'物流'),这里我们采用的是'memory'(内存),需要配置的是:

这里写图片描述
1.3 sink

我们的”物流分拣点“,sink 我们配置的是 logger,需要配置几项是:

这里写图片描述

或许你已被误导。关于它们的实际解释,请访问官网获取相关信息。如果你想要深入理解这些内容,请将这些资料转化为易于理解的形式。

2 启动 agent

2.1 启动

上面我们已经说过了,使用 flume-ng 命令启动。

这里写图片描述

具体参数看上图。

复制代码
    [root@master conf]# pwd
    /usr/hadoop/flume-1.7.0-bin/conf
    
    [root@master conf]# flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
    
    Info: Including Hadoop libraries found via (/usr/hadoop/hadoop-2.6.4/bin/hadoop) for HDFS access
    Info: Including HBASE libraries found via (/usr/hadoop/hbase-1.2.3/bin/hbase) for HBASE access
    ...
    
    16/11/18 19:34:27 INFO node.Application: Starting Channel c1
    16/11/18 19:34:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
    16/11/18 19:34:27 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
    16/11/18 19:34:27 INFO node.Application: Starting Sink k1
    16/11/18 19:34:27 INFO node.Application: Starting Source r1
    16/11/18 19:34:27 INFO source.NetcatSource: Source starting
    16/11/18 19:34:27 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

也值得大家引起注意的是,在我的目录设置中已经将它设为了 conf,并明确指定了该目录下所要使用的应用程序以及相应的运行环境设置。此外,在查看上述情况后,则表明我们的名为 a1 的代理程序已成功启动。并为此配置了一系列参数来实现对日志信息的输出。

2.2 发送消息

接下来我们就可以再开一个终端,在这个终端上执行下面一系列命令:

这里写图片描述

可能有些朋友在他们的Linux系统上会遇到命令找不到的问题。安装telnet工具就能解决问题。当我们查看错误信息时可以看到"Connection refused"这个错误提示。这是因为 /etc/hosts 文件中将localhost映射为'::1…'这种IP地址时未能成功解析。因为这个IP地址无法被正确识别出来,telnet程序才会尝试使用127.0.0.1地址进行连接,并最终建立通路。

这个时候,我们可以在这儿输入一些东西:

这里写图片描述

然后我们回到之前运行着 agent 的那个终端:

这里写图片描述

会看到更多这样的一行内容,在某些情况下,这一行显示的内容数量可能少于我们的输入量,并不是没有接收到来的信息而是由于超出了它的显示容量而被省略掉了

这个时候应该就有所体会了,flume 是”很多个形容词“的

这里写图片描述

日志采集系统。

我们再来写几个实例,来体会一下。

3. avro ⇒ hdfs

我们的源设置为 Avro 资源(基于此),目标存储为 HDFS 位置(暂且接受这种非规范表述)。接下来,请检查 Avro 源所需配置的内容是什么。

3.1 配置 avro source
这里写图片描述

我们需要配置有四项,并且下面也给出了示例。

复制代码
    [root@master conf]# vi AvroHDFS.conf
    a1.sources = r1
    a1.channels = c1
    
    a1.sources.r1.type = avro
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = master
    a1.sources.r1.port = 4141
3.2 配置 channel

这次我们还是使用 memory:

这里写图片描述

我们在 AvroHDFS.conf 文件中追加:

复制代码
    [root@master conf]# vi AvroHDFS.conf
    a1.sources = r1
    a1.channels = c1
    
    a1.sources.r1.type = avro
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = master
    a1.sources.r1.port = 4141
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    a1.channels.c1.byteCapacityBufferPercentage = 20
    a1.channels.c1.byteCapacity = 800000
3.3 配置 sink

我们是向 HDFS 上导数据,所以我们使用 hdfs sink:

这里写图片描述

这张图片显得过长,在某些设备上可能导致加载时间过长或显示不清晰。因此我选择仅提取了示例片段供参考,并建议大家直接访问官方文档学习具体的配置方法。

我们接着在 AvroHDFS.conf 文件中追加:

复制代码
    [root@master conf]# vi AvroHDFS.conf
    a1.sinks = k1
    a1.sources = r1
    a1.channels = c1
    
    a1.sources.r1.type = avro
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = master
    a1.sources.r1.port = 4141
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    a1.channels.c1.byteCapacityBufferPercentage = 20
    a1.channels.c1.byteCapacity = 800000
    
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hdfs.path = /fromflume/events/%y-%m-%d/%H%M/%S
    a1.sinks.k1.hdfs.filePrefix = events.
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    a1.sinks.k1.hdfs.useLocalTimeStamp = true

通过这种方式,我们配置完成了一个从 avro 到 hdfs 的配置文件;即可启动 agent 了。

3.4 启动 agent

这次我们需要先运行起来 hadoop 集群,不然是会失败的。

复制代码
    [root@master conf]# flume-ng agent --conf conf --conf-file AvroHDFS.conf --name a1 -Dflume.root.logger=INFO,console
    
    Info: Including Hadoop libraries found via (/usr/hadoop/hadoop-2.6.4/bin/hadoop) for HDFS access
    Info: Including HBASE libraries found via (/usr/hadoop/hbase-1.2.3/bin/hbase) for HBASE access
    Info: Including Hive libraries found via (/usr/hadoop/apache-hive-2.1.0-bin) for Hive access
    ...
    
    16/11/18 20:46:13 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
    16/11/18 20:46:13 INFO source.AvroSource: Avro source r1 started.

表明我们的 agent 成功启动了。然而,在此之后我们需要调用相应的API来继续操作。

表明我们的 agent 成功启动了。然而,在此之后我们需要调用相应的API来继续操作。

复制代码
    import java.nio.charset.Charset;
    
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    
    public class FlumeDemo {
    private String hostname;
    private int port;
    private RpcClient client;
    
    public FlumeDemo(String hostname,int port) {
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
    }
    
    public void sendMessage(String data){
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
        try {
            client.append(event);
        } catch (EventDeliveryException e) {
            e.printStackTrace();
        }
    }
    
    public void cleanUp(){
        client.close();
    }
    
    public static void main(String[] args) {
        FlumeDemo rpcClient = new FlumeDemo("master", 4141);
    
        String data = "testing ";
    
        for(int i=0;i<10;i++){
            rpcClient.sendMessage(data + i);
        }
    
        rpcClient.cleanUp();
    }
    }

然后,运行我们的 java 程序,这个时候,观察我们的 agent 是什么状况:

复制代码
    ...
    16/11/18 20:46:13 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
    16/11/18 20:46:13 INFO source.AvroSource: Avro source r1 started.
    
    16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 => /192.168.38.129:4141] OPEN
    16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 => /192.168.38.129:4141] BOUND: /192.168.38.129:4141
    16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 => /192.168.38.129:4141] CONNECTED: /192.168.38.1:64375
    16/11/18 20:46:23 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
    16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 :> /192.168.38.129:4141] DISCONNECTED
    16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 :> /192.168.38.129:4141] UNBOUND
    16/11/18 20:46:23 INFO ipc.NettyServer: [id: 0x33dfe52f, /192.168.38.1:64375 :> /192.168.38.129:4141] CLOSED
    16/11/18 20:46:23 INFO ipc.NettyServer: Connection to /192.168.38.1:64375 disconnected.
    
    16/11/18 20:46:24 INFO hdfs.BucketWriter: Creating /fromflume/events/16-11-18/2040/00/events..1479473183696.tmp
    ...

此时段中,在HDFS上已经存储了数据。通过WebUI(访问该URL:http://master:50070)可以进入HDFS查看是否新增了从Flume传输的文件夹。

这里写图片描述

果然,我们的文件夹已经创建成功了,我们可以一级一级的进去,会看到:

这里写图片描述

文档已顺利完成保存,在查看过程中发现显示异常现象存在,并经核实确实已完成保存操作

4 avro ⇒ kafka

参考我昨天的笔记后,请我们一起来编写一个关于Flume和Kafka结合使用的实例吧。两者在功能上有一定的相似之处,并均是数据传输领域的中间人角色。一个典型的例子是Flume和Kafka结合使用以实现高效的分布式流处理系统。

有了前面两个练习,那这个我就不写的那么详细了。

这里写图片描述

但是,对于这个 kafka sink 还真是有好多需要说的:

4.1 sinks.type

这一项必须被配置为org.apache.flume.sink.kafka.KafkaSink,并且有人表示这种设置让人觉得奇怪。需要注意的是,在设置时还需要确保这一参数具有预设的默认值。

4.2 kafka.topic

关于这项内容,在别处也提到了一些相关内容的大致意思是:当 topic 出现在 event 的 header字段中时,在 Kafka 的 broker 之前就会被覆盖掉原有的相同名称的主题信息。不过我现在还不太清楚具体细节是什么样的……暂且先把这个问题搁置一边吧。稍后再补充这部分内容的时候会更加清楚一些。这块暂时不用管了哦——先记下来就可以了

好了,我们来写一下 sink :

复制代码
    [root@master conf]# vi AvroKafka.conf
      1 a1.sources = r1
      2 a1.sinks = k1
      3 a1.channels = c1
      4 
      5 a1.sources.r1.type = avro
      6 a1.sources.r1.bind = master
      7 a1.sources.r1.port = 44444
      8 
      9 
     10 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
     11 a1.sinks.k1.kafka.bootstrap.servers = localhost:9092,localhost:9093
     12 a1.sinks.k1.kafka.topic = fromFlume 
     13 
     14 a1.channels.c1.type = memory
     15 a1.channels.c1.capacity = 1000
     16 a1.channels.c1.transactionCapacity = 100
     17 
     18 a1.sources.r1.channels = c1
     19 a1.sinks.k1.channel = c1

为了将 flume 和 kafka 结合起来,并通过 kafka 消费 flume 输出的数据流。因此,在本例中,我们需要开启 kafka 服务,并配置一个消费者。作为演示目的,在 master 节点上启动两个 kafka 服务实例。

复制代码
    [root@master config]# pwd
    /usr/hadoop/kafka_2.11-0.10.1.0/config
    [root@master config]# kafka-server-start.sh server.properties &
    [1] 10693
    ...
    [root@master config]# kafka-server-start.sh server1.properties &
    [2] 10962
    ...
    [root@master config]# jps
    11233 Jps
    2593 ResourceManager
    10962 Kafka
    2692 NodeManager
    10693 Kafka
    3034 QuorumPeerMain
    2171 NameNode
    2269 DataNode
    2446 SecondaryNameNode
    [root@master config]#
复制代码
    [root@master config]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fromFlume --from-beginning

这个时候,我们就可以启动我们的 flume agent 了:

复制代码
    [root@master conf]# flume-ng agent --conf conf --conf-file AvroKafka.conf --name a1 -Dflume.root.logger=INFO,console
    
    ...

不知道大家有没有晕掉,反正我当初学的时候是晕了好一阵儿呢。8

接下来,在继续开发过程中我们也调用了之前开发的Java程序:值得注意的是,在当前阶段我们需要将端口号进行调整。特别提醒的是,在设置时请确保该端口与我们的*.conf文件中指定的一致。

复制代码
    import java.nio.charset.Charset;
    
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    
    public class FlumeDemo {
    private String hostname;
    private int port;
    private RpcClient client;
    
    public FlumeDemo(String hostname,int port) {
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
    }
    
    public void sendMessage(String data){
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
        try {
            client.append(event);
        } catch (EventDeliveryException e) {
            e.printStackTrace();
        }
    }
    
    public void cleanUp(){
        client.close();
    }
    
    public static void main(String[] args) {
        FlumeDemo rpcClient = new FlumeDemo("master", 44444);
    
        String data = "Hello World! ";
    
        for(int i=0;i<10;i++){
            rpcClient.sendMessage(data + i);
        }
    
        rpcClient.cleanUp();
    }
    }

运行程序,然后,我们去到启动 kafka 消费者的那个终端上,我们会看到:

复制代码
    [2016-11-18 21:46:54,915] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
    [2016-11-18 21:47:08,073] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
    Hello World! 0
    Hello World! 1
    Hello World! 2
    Hello World! 3
    Hello World! 4
    Hello World! 5
    Hello World! 6
    Hello World! 7
    Hello World! 8
    Hello World! 9

不清楚大家有没有想到什么?反正我是这样认为的——输出重定向!我也说了别说我胡里胡涂的!

这些确实只是些基本的例子呢。建议大家仔细阅读文档哦!看看 flume 都是有什么 sources、channels、sinks?试着动手实践一下吧!通过不断尝试就能掌握啦!

全部评论 (0)

还没有任何评论哟~