Advertisement

flume kafka sink kafka对接flume简单实现详解

阅读量:

1、flume的配置文件 kafka-sink.properties

复制代码
    # Name the components on this agent
    # source:起一个别名
    # properties文件它是java的配置文件,=左边就是键,=右边是值;键的开头都是以a1(就是flume的名字--agent的名字就是a1);a1随便起
    a1.sources = r1
    # sink:起一个别名
    a1.sinks = k1
    # channels;:起一个别名
    a1.channels = c1
    
    # Describe/configure the source
    # a1(agent的名字).sources(来源).r1(来源的名字);配置多个来源
    # type:不能随便写(文档上说明)
    a1.sources.r1.type = netcat
    # bind:netcat的一个属性(绑定),允许任何人访问
    a1.sources.r1.bind = 0.0.0.0
    # port:netcat的一个属性;(端口)
    a1.sources.r1.port = 44444
    
    # Describe the sink
    # 描述一个sink: lkafka sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    # kafak集群
    a1.sinks.k1.kafka.bootstrap.servers = cenos7-1:9092,cenos7-2:9092,cenos7-3:9092,centos7-4:9092
    # kafka主题
    a1.sinks.k1.kafka.topic = bigdata1
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    
    # Use a channel which buffers events in memory
    # 描述一下channel:内存
    a1.channels.c1.type = memory
    # capacity:容量
    a1.channels.c1.capacity = 1000
    # transactionCapacity:事务的容量
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    # 绑定;source和channel绑定
    a1.sources.r1.channels = c1
    # sink和channel绑定
    a1.sinks.k1.channel = c1

2、启动flume,准备收集数据

复制代码
    [root@centos7-3 apache-flume]# bin/flume-ng agent --conf conf --conf-file 
    conf/kafka-sink.properties --name a1 -Dflume.root.logger=INFO,console

3、启动kafka消费者,准备接收flume收集到 kafka producer 的数据

复制代码
    [root@centos7-1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bigdata1 --from-beginning

4、模拟生产数据

复制代码
    [root@centos7-3 data]# telnet localhost 44444

5、效果演示

在这里插入图片描述

全部评论 (0)

还没有任何评论哟~