kafka技术白皮书
Kafka技术白皮书
浙江大华 软件研发中心-系统架构部 伍超 编制
一、 Kafka基本介绍
Kafka是一个基于分布式的消息发布-订阅系统,它被设计成快速、可扩展的、持久的。与其他消息发布-订阅系统类似,Kafka在Topic当中保存消息的信息。生产者向Topic写入数据,消费者从主题读取数据。由于Kafka的特性是支持分布式,同时也是基于分布式的,所以Topic也是可以在多个节点上被分区和覆盖的。
基本介绍环节对Kafka的主要组成部分及一些名词做一些解释。
1. 基本架构

2. 基本概念
n Broker:中间者,Kafka集群包含一个或多个服务器,这种服务器被称为Broker。
n Producer:消息生产者,负责发布消息到KafkaBroker,也称消息发布者(Publisher)。Producer使用Push模式将消息发布到Broker。
n Consumer:消息消费者,向KafkaBroker读取消息的客户端,也称消息订阅者(Subscriber)。Consumer使用Pull模式从Broker订阅消息。
n Consumer Group:Kafka是显式分布式的,多个Producer、Consumer和Broker可以运行在一个大的集群上,作为一个逻辑整体对外提供服务。对于Consumer,多个Consumer可以组成一个Group,这个message只能传输给某个Group中的某一个Consumer。如下图所示:

n Zookeeper:Kafka通过Zookeeper管理集群配置,在Consumer Group发生变化时进行Rebalance。
n Message:消息,是通信的基本单位。
n Topic:主题,每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处。
n Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件
PS:以上组件在分布式环境下均可以是多个的,支持故障转移。同时ZooKeeper仅和Broker和Consumer相关。值得注意的是Broker的设计是无状态的,消费的状态信息依靠消费者自己维护,通过offset偏移量来确定自己的消费进度。
一、 产品性能参数概要
1. 测试性能
acks参数控制producer发送的消息的持久性,通常设置说明如下:
1. acks=0:Producer不会等待Broker的任何确认就认为发送成功,并接着发送下一条消息。
2. acks=1:Producer会等待Leader确认以后再发送下一条消息,但不会等待ISR列表中其他的follwer副本确认。
3. acks=-1:Producer会等待ISR列表里的所有副本都确认消息以后才认为发送成功。
1台单副本
| 硬件 | 7024 * 1台,8核CPU,16G内存,Centos6.7,内核版本为2.6。单盘读写速度 100MB | |||||
|---|---|---|---|---|---|---|
| Kafka配置 | 1个broker,创建单副本,每块盘10个partition | |||||
| 网络 | 硬盘(每个broker) | 消息大小 | acks=0 | acks=1 | acks=-1 | 备注 |
| 1Gb | 1T*1 | 1KB | 65945 | 63078 | 37549 | |
| 1T*2 | 1KB | 75624 | 70966 | 45718 | ||
| 2Gb | 1T*1 | 1KB | 67287 | 64655 | 38255 | |
| 1T*2 | 1KB | 119684 | 104680 | 58693 | ||
| 1T*4 | 1KB | 138106 | 127747 | 78620 | ||
| 2Gb | 1T*1 | 1KB | 66587 | 67594 | 38255 | |
| 1T*2 | 1KB | 133684 | 120390 | 69562 | ||
| 1T*4 | 1KB | 256141 | 265269.2 | 140850 | ||
| 1T*6 | 1KB | 295825 | 307417 | 177214 |
3台单副本
| 硬件 | 硬件设备3台,24核CPU,32G内存,Centos7 单盘读写速度 90MB | |||||
|---|---|---|---|---|---|---|
| Kafka配置 | 3个broker,创建单副本,每块盘10个partition | |||||
| 网络 | 硬盘(每个broker) | 消息大小 | acks=0 | acks=1 | acks=-1 | 备注 |
| 1Gb | 1T*1 | 1KB | 146976 | 136821 | 709564 | |
| 1T*2 | 1KB | 181557 | 146962 | 105150 | ||
| 2Gb | 1T*1 | 1KB | 152890 | 103925 | 83736 | |
| 1T*2 | 1KB | 269717 | 269948 | 144215 | ||
| 1T*4 | 1KB | 415247 | 413784 | 248716 |
3台双副本
| 硬件 | 硬件设备3台,24核CPU,32G内存,Centos7 单盘读写速度 90MB | |||||
|---|---|---|---|---|---|---|
| Kafka配置 | 3个broker,创建单双本,每块盘10个partition | |||||
| 网络 | 硬盘(每个broker) | 消息大小 | acks=0 | acks=1 | acks=-1 | 备注 |
| 1Gb | 1T*1 | 1KB | 76289 | 67824 | 42545 | |
| 1T*2 | 1KB | 97697 | 99796 | 52988 | ||
| 2Gb | 1T*1 | 1KB | 76289 | 69442 | 62145 | |
| 1T*2 | 1KB | 154741 | 139694 | 84008 | ||
| 1T*4 | 1KB | 196106 | 177482 | 115738 |
2. 硬件推荐配置
Zookeeper可部署在kafka节点上,若部署在同一台机器上,内存及CPU最好也要叠加计算。
| 机器数 | 内存 | CPU | 磁盘 | |
|---|---|---|---|---|
| Zookeeper集群 | 至少3台 | 8 | 2核以上 | |
| Kafka集群 | 至少3台 | 16 | 至少12核 | 至少1块专属磁盘 |
3 . 性能推荐指标
| 副本数 | 消息条数(1KB/条) | 服务器数量(8核CPU,16G内存) | 网络 | 硬盘数据(读写90MB/s) (每台服务器上的数量) |
|---|---|---|---|---|
| 单副本 | 5w | 1 | 1Gb | 1 |
| 10w | 1 | 2Gb | 2 | |
| 30w | 1 | 4Gb | 6 | |
| 50w | 3 | 4Gb | 6 | |
| 100w | 5 | 4Gb | 6 | |
| 双副本 | 5w | 3 | 1Gb | 1 |
| 10w | 3 | 2Gb | 2 | |
| 30w | 3 | 4Gb | 6 | |
| 50w | 5 | 4Gb | 6 | |
| 100w | 9 | 4Gb | 6 |
一、 快速部署指南
1. 部署脚本说明
Kafka_deploy.sh脚本主要用于kafka集群的部署,命令说明如下
shkafka_deploy.sh autossh 设置集群节点之间免密码ssh
shkafka_deploy.sh initenv 设置kafka集群运行所需的系统环境参数
shkafka_deploy.sh install 一键自动部署kafka集群
shkafka_deploy.sh uninstall 一键清理kafka集群
shkafka_deploy.sh start 启动集群中所有zookeeper及kafka服务
shkafka_deploy.sh stop 停止集群中所有zookeeper及kafka服务

1.1.上传程序压缩包
把kafka安装包上传到其中一台kafka服务器上,然后将压缩包拷贝到/opt目录下,解压:

解压后生成kafka-deploy目录,目录的内容如下:

进入到kafka-deploy目录,添加脚本执行权限,命令如下:

1.2.编辑配置
编辑kafka_deploy.conf文件的配置,用命令 vim kafka_deploy.conf 打开文件,内容说明如下:

home_dir必须配置成占系统磁盘空间最大的目录,通过df –h命令来查看磁盘空间占用情况。如下所示,/opt目录空间最大:

编辑好kafka_deploy.conf配置文件之后,保存并退出编辑。
对linux命令不熟悉的话,在windows下编辑好再上传到服务器也行,windows下编辑最好使用notepad++或者UltraEdit,记事本这类编辑器可能会出现字符编码问题。
1.3.部署
1.3.1.配置免密ssh
在kafka-deploy目录下执行命令:sh kafka_deploy.shautossh
按照提示按回车键,或输入”yes”,提示输入密码时,请输入服务器对应的登陆密码,如下所示:

1.3.2.设置系统参数
在kafka-deploy目录下执行命令:sh kafka_deploy.sh initenv
当提示是否重启系统时,最好选择”y”,来重启系统,如下所示:

1.3.3.kafka安装
系统重启之后,如果是多台,需要等所有机器都重启完成,在kafka-deploy目录执行命令进行安装:sh kafka_deploy.shinstall
脚本会自行将kafka集群内所有的机器都部署完成,如下所示,kafka部署完成的输出内容:

1.3.4.测试
Kafka安装完成之后,最好使用kafka自带的工具测试一下功能是否正常。如kafka_deploy.conf里home_dir配置所示,kafka安装在/opt目录下,那么执行命令cd /opt/kafka/bin/进入/opt/kafka/bin目录下,内容如下:

在/opt/kafka/bin目录下一次执行如下命令,进行测试:
Ø ./kafka-topics.sh--create --topic test --partitions 1 --replication-factor 1 --zookeeper 192.168.3.10,192.168.3.11,192.168.3.12 //ip改为kafka_deploy.conf里zk_nodes对应的ip
Ø ./kafka-producer-perf-test.sh--topic test --num-records 50 --record-size 50 --throughput 100--producer-props bootstrap.servers=192.168.3.10:9092,192.168.3.11:9092,192.168.3.12:9092 //ip改为kafka_deploy.conf里kafka_nodes对应的ip
Ø ./kafka-consumer-perf-test.sh--broker-list 192.168.3.10:9092,192.168.3.11:9092,192.168.3.12:9092 --grouptest --messages 50 --new-consumer --topic test //ip改为kafka_deploy.conf里kafka_nodes对应的ip
Ø ./kafka-topics.sh --delete--topic test --zookeeper 192.168.3.10,192.168.3.11,192.168.3.12 //删除测试数据,ip改为kafka_deploy.conf里kafka_nodes对应的ip
示例如下:

如果运行过程中未出错,则表示kafka功能正常,正常输出参考上面截图。
2.kafka-manager使用说明
Kafka-manager默认安装在运行部署脚本所在的节点,默认未开启,假如该节点的IP为172.7.1.62,需要在kafka-deploy目录下执行命令shkafka_deploy.sh start manager,然后在浏览器打开http://172.7.1.62:9000这个URL即可访问界面。
新增集群,点击如下红色按钮

红色部分需要配置


这里是消费者组的web界面,logSize表示kafka该topic的总数据大小,consumer offset表示偏移量,Lag表示该消费者组没有消费的数据的数量
3.kafka-monitor使用说明
Kafka-monitor默认安装在运行部署脚本所在的节点,默认未启动,端口为kafka_deploy.conf中monitor_port配置项指定,启动需在kafka-deploy目录下执行命令sh kafka_deploy.sh start monitor,假如该节点的IP为172.7.1.62,端口配置为7777,则可通过http://172.7.1.62:7777来访问其界面,界面如下:

说明:当有消费者正在消费时,界面才会显示消费者的消费的状态信息,logSize表示该topic的总数据大小, offset表示偏移量,lag表示该消费者组没有消费的数据的数量,最上面显示的是消费者的消费速率,界面如下:

一、 双网卡部署
1. 应用场景
当应用需要通过双网卡来访问kafka服务,如下图场景所示:

Kafka服务器双网卡:eth0/10.30.100.99,eth1/22.102.37.88
Kafka侦听的端口为9092
Zookeeper侦听的端口为2181
客户端A通过eth0访问kafka服务器
客户端B通过eth1访问kafka服务器
2. Kafka配置
/etc/hosts文件需添加配置:
10.30.100.99 kafka1 //只要在kafka服务器上能ping通kafka1这个域名即可
conf目录下的配置文件server.properties需修改如下配置:
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka1:9092
3. 客户端配置
Kafka服务返回给kafka客户端的Host和port是advertised.listeners配置项对应的host和port:kafka1和9092。客户端必须能够解析kafka1这个hostname为对应的网卡IP,才能正常访问kafka服务,因此,客户端需要配置如下信息:
客户端A的/etc/hosts文件需添加配置:
10.30.100.99 kafka1
客户端B的/etc/hosts文件需添加配置:
22.102.37.88 kafka1
五、 跨网段部署
1.应用场景
当应用和kafka集群被网闸分别隔离在外网和内网,那么外网的应用访问内网的kafka集群,需要通过做端口映射来实现,下图为某应用场景:

专网里三台kafka服务器:192.168.100.10/192.168.100.20/192.168.100.30
Kafka服务的侦听端口分别为:9092/9093/9094
Zookeeper服务的侦听端口均为:2181
公安网和视频专网由网闸隔离开来,需要通过端口映射,公安网的服务才能访问专网里的kafka集群。
2.端口映射
将每台kafka服务器的端口映射出来:
192.168.100.10:9092----映射后---à172.7.100.100:9092
192.168.100.10:2181----映射后---à172.7.100.100:2181
192.168.100.20:9093----映射后---à172.7.100.100:9093
192.168.100.20:2181----映射后---à172.7.100.100:2182
192.168.100.30:9094----映射后---à172.7.100.100:9094
192.168.100.30:2181----映射后---à172.7.100.100:2183
3.Kafka配置
Kafka3台服务器需要修改的配置分别如下:
192.168.100.10这台kafka配置:
conf目录下的配置文件server.properties需修改如下配置:
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka1:9092
/etc/hosts需添加配置:
192.168.100.10kafka1
192.168.100.20kafka2
192.168.100.30kafka3
192.168.100.20这台kafka配置:
server.properties需修改如下配置:
listeners=PLAINTEXT://0.0.0.0:9093
advertised.listeners=PLAINTEXT://kafka2:9093
/etc/hosts需添加配置:
192.168.100.10kafka1
192.168.100.20kafka2
192.168.100.30kafka3
192.168.100.30这台kafka配置:
server.properties需修改如下配置:
listeners=PLAINTEXT://0.0.0.0:9094
advertised.listeners=PLAINTEXT://kafka3:9094
/etc/hosts需添加配置:
192.168.100.10kafka1
192.168.100.20kafka2
192.168.100.30kafka3
修改配置后启动kafka相关服务。
4.公安网配置
公安网的服务所在机器需要在/etc/hosts文件中添加如下配置:
172.7.100.100 kafka1 kafka2 kafka3
服务连接kafka的host和port为:kafka1:9092,kafka2:9093,kafka3:9094
服务连接zookeeper的host和port为:kafka1:2181,kafka2:2182,kafka3:2183
5.专网配置
专网里的服务所在机器需要在/etc/hosts文件中添加如下配置:
192.168.100.10kafka1
192.168.100.20kafka2
192.168.100.30kafka3
服务连接kafka的host和port为:kafka1:9092,kafka2:9093,kafka3:9094
服务连接zookeeper的host和port为:kafka1:2181,kafka2:2181,kafka3:2181
一、 kafka配置和命令
1.Kafka配置
Kafka部分配置在一键部署脚本里已经实现,下面是对kafka配置的一些说明,kafka日志在路径/opt/dcom/kafka_2.10-0.9.0.1/logs这个路径。
broker配置文件路径/opt/dcom/kafka_2.10-0.9.0.1/config/server.properties,配置说明如下:
| #此Broker的ID,集群中每个Broker的ID不可相同 broker.id=1 #kafka broker要监听的URI和协议列表,指定hostname为0.0.0.0则bind所有网络接口,不指定则bind默认网络接口 listeners=PLAINTEXT://:9092 #Broker监听/接受连接的端口 port=9092 #Broker绑定的Hostname,如果不设置,则绑定所有网络接口 host.name=nodex # 客户端连接broker的Hostname,没有设置则使用host.name配置的值,否则,通过java.net.InetAddress.getCanonicalHostName()获取。 advertised.host.name=nodex #客户端连接broker的端口,会保存在zookeeper上,若没有设置则使用broker绑定的端口 advertised.port=9092 #进行IO的线程数,应大于主机磁盘数 num.io.threads=8 #socket发送/接收缓冲区大小 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 #socket一次接收的最大请求大小 socket.request.max.bytes=104857600 # kafka会对数据做持久化,消息文件存储的路径,需要注意此路径的磁盘分区要大 log.dirs=/disk0/kafka-logs,/disk1/kafka-logs #分区数量,推荐每块磁盘3~6个分区 num.partitions=1 #每个partition目录对应的线程数,用于在启动数据恢复和关闭时刷盘,使用raid时推荐加大线程数 num.recovery.threads.per.data.dir=1 #触发刷盘的消息阈值,消息个数和时间维度 log.flush.interval.messages=10000 log.flush.interval.ms=1000 #消息文件清理周期,即清理x小时前的消息记录,持久化时间,如果磁盘不够,可更改 log.retention.hours=120 #partition目录下每个数据段文件最大大小 log.segment.bytes=1073741824 #检查是否有待删除文件的时间间隔 log.retention.check.interval.ms=300000 #Zookeeper连接串,此处填写上一节中安装的三个zk节点的ip和端口即可 zookeeper.connect=nodex:2181,nodey:2181,nodez:2181 #连接ZK的超时时间 zookeeper.connection.timeout.ms=6000 #默认副本个数 default.replication.factor=2 #删除topic的开关,如果通过管理工具删除则不受该配置影响 delete.topic.enable=true |
|---|
2.Kafka脚本工具
Kafka本身提供很多方便使用的脚本工具。

1. kafka-server-start.sh
手动开启一个kafka服务,可以指定后台运行,例如:
./kafka-server-start.sh -daemon ../config/server.properties
2. kafka-server-stop.sh
停止当前节点的kafka服务。
3. kafka-console-consumer.sh
该工具用于从kafka集群中读取数据并显示到标准输出上。例如,从kafka集群名为demo的topic读取最多20条数据,20.2.37.208:9092为kafka其中一个节点的IP和端口,命令如下:
./kafka-console-consumer.sh--bootstrap-server 20.2.37.208:9092 --from-beginning --max-messages 20 --topicdemo --zookeeper 20.2.37.208:2181
4. kafka-console-producer.sh
该工具模拟producer从标准输入读取数据发送消息到kafka集群指定的topic中,例如向名为test的这个topic生产数据,172.7.102.214为kafka其中一个节点IP,9092为kafka端口,命令如下:
./kafka-console-producer.sh --broker-list172.7.102.214:9092 --topic test
5. kafka-consumer-offset-checker.sh
查看消费者的offset信息,例如,查看xxxx消费者组消费demo这个topic的offset信息,并列出broker信息,20.2.37.208:2181为zookeeper的IP和端口:
./kafka-consumer-offset-checker.sh --topicdemo --zookeeper 20.2.37.208:2181 --group xxxx --broker-info
6. kafka-topics.sh
对topic进行增删查改操作,例如列出集群所有topic:
./kafka-topics.sh --list --zookeeper20.2.37.208:2181
创建一个分区为10,副本数为2,名为test的topic,其中副本数不能大于broker的数量:
./kafka-topics.sh --create --topic test--partitions 10 --replication-factor 2 --zookeeper 20.2.37.208:2181
7. kafka-consumer-perf-test.sh
用于模拟kafka consumer的性能测试。
8. kafka-producer-perf-test.sh
用于模拟kafka producer的性能测试。
9. kafka-mirror-maker.sh
用于同步俩个kafka集群之间的数据,从其中一个集群消费所有数据,并发布到目标集群当中,从而实现两个集群之间的数据同步。可运行多个kafka-mirror-maker.sh实例,注意每个实例的consumer配置的groupID要一致。如果其中一个实例挂了,其余的实例可以分担掉挂掉的实例的负载,从而提高系统的可用性。
从172.7.102.215这个kafka集群同步数据到172.7.102.217这个集群。关闭自动提交offset,适当减小手动提交offset时间间隔,增大socket缓冲区大小等参数,可以提高消息的实时性;topic的配置支持正则表达式。参考示例如下:
./kafka-mirror-maker.sh--consumer.config consumer.properties --offset.commit.interval.ms 3000--num.streams 3 --new.consumer --producer.config producer.properties --abort.on.send.failurefalse --whitelist mytopic
bootstrap.servers=172.7.102.215:9092
group.id=MM_group1
client.id=MM_consumer
auto.offset.reset=earliest
enable.auto.commit=false
receive.buffer.bytes=1048576
send.buffer.bytes=1048576
max.partition.fetch.bytes=1049000
bootstrap.servers=172.7.102.217:9092
acks=1
retries=60
max.in.flight.requests.per.connection=1
block.on.buffer.full=true
client.id=MM_producer
receive.buffer.bytes=1048576
send.buffer.bytes=1048576
10. kafka-replica-verification.sh
用于检测topic的每个partition副本数据是否一致。
11. kafka-preferred-replica-election.sh
用于使每个partition的preferred副本成为leader副本,保证leader的均衡分布。Kafka中某个partition的副本列表叫做AR(assignedreplica),AR中的第一个副本为preferred replica。创建新的topic或给已有的topic新增partition时,kafka保证每个preferred 副本均匀分布到每个broker上,一般创建时preferred 副本会被选为leader。随着集群的运行,可能会出现broker宕机的情况,此时宕掉的leader重新启动后会成为follwer,就会出现leader分布不均衡的情况,如果leader分布不均衡严重,会导致leader分布多的broker负载加重,该脚本可以均衡leader的分布,使preferred副本成为leader。
12. kafka-reassign-partitions.sh
用于调整partition的副本分布位置,即调整AR的分布,常用于kafka集群的扩容和缩容。Kafka新增节点,对于已有的topic,其partition不会重新分布到新的节点,可以通过该脚本调整parition副本的均匀分布;当需要缩容时,可以将需要移除的节点上的parition副本重新均匀分配到其他节点,然后再移除该节点,该脚本可以达到这个目的。
13. kafka-replay-log-producer.sh
该脚本将topic1的数据回放到topic2中,即将topic1作为生产者,topic2作为消费者,例如:
./kafka-replay-log-producer.sh--broker-list 20.2.37.208:9092 --zookeeper 20.2.37.208:2181 --inputtopic topic1--outputtopic topic2
14. kafka-simple-consumer-shell.sh
模拟kafka的low-level API从某个topic的指定副本获取数据。
15. kafka-consumer-groups.sh
列出所有消费者组,打印/删除消费者组相关信息。
./kafka-consumer-groups.sh--describe --group demo2 --zookeeper node215
16. kafka.tools.StateChangeLogMerger
合并输出kafka的状态变迁日志相关内容,可以指定topic及partitions,例如:
./kafka-run-class.sh kafka.tools.StateChangeLogMerger--logs /opt/dcom/kafka_2.10-0.9.0.1/logs/state-change.log --topic demo-partitions 0,1,2
17. kafka.tools.JmxTool
打印kafka所有的JMX metrics信息,内容很多,可以指定内容,通过kafka-run-class.sh运行kafka.tools.JmxTool类来实现,在kafka启动时需指定JMX端口,kafka-server-start.sh脚本里设置的是9093,例如:
./kafka-run-class.sh kafka.tools.JmxTool--jmx-url service:jmx:rmi:///jndi/rmi://:9093/jmxrmi --reporting-interval 10
18. kafka.tools.DumpLogSegments
打印kafka的log,显示数据索引、负载长度、负载内容等信息,通过kafka-run-class.sh运行kafka.tools.DumpLogSegments类来实现,例如:
./kafka-run-class.shkafka.tools.DumpLogSegments --files/var/lib/kafka-logs/demo-0/00000000000000000000.log --print-data-log
一、 常见问题/FAQ
1. 新建的消费者组不是从头消费topic里的消息?
答:如果想新建的消费者组从头消费topic里的消息,在编写客户端程序时设置参数auto.offset.reset为earliest;默认为latest,指新建的消费者组从最新的offset开始消费。
2. 出现重复消费
答:一般是由于消费者未提交offset到kafka,通过kafka-consumer-offset-checker.sh可查看提交的offset情况。消费者未提交offset可能是由于以下原因:
消费者一次poll调用拉取的消费太多,在session.timeout.ms超时时间内未消费完。可减少poll函数的超时时间参数,或适当调大session.timeout.ms超时时间,适当调小max.partition.fetch.bytes每次从partition拉取的数据大小,最最关键还是要优化自己程序消费数据的能力。0.10版本kafka的消费者可以通过参数max.poll.records指定一次poll拉取的消息数量。
