第一章 kafka入门笔记
一、Kafka简介
1、什么是Kafka
kafka最初是由linkedin内部开发的一个基础设施组件。它最初的开发动机源于linkedin虽然已经具备了用于存储和管理大量信息的技术手段,但却缺乏专门用于处理持续流动的数据的能力。因此,在设计理念上,开发者不希望仅局限于开发一种能够存储静态或孤立的数据的方法,而是希望通过构建一种新的体系结构来实现对持续流动的信息进行有效管理和处理
从外观上看,Kafka确实具有类似消息系统的特征,它支持发布订阅机制,但相较于传统消息系统存在显著区别.首先作为现代分布式集群系统,Kafka采用集群模式运行,具备良好的扩展能力.其次其数据存储特点更为灵活,用户可以根据需求自由设置存储时间.再者流式处理能力的提升使得Kafka在数据处理层级上实现了质的飞跃,传统消息系统仅负责传递信息而无法对流进行深度解析.因此不仅是一个高效的消息传递平台,Kafka更提供了一个智能的数据流管理与分析框架.该框架支持动态生成并管理多种派生流类型,并能方便地整合外部数据源进行统一管理.这种设计理念体现了对实时性与灵活性的高度追求
在大数据领域中,“Kafka也可被视为Hadoop的实时版本”,但两者存在明显的不同之处。“Hadoop不仅用于存储并定期处理海量数据文件(数量通常以TB为单位),而Kafka则专注于持续处理大规模的数据流。”此外,“Hadoop主要用于数据分析领域”,而“由于其低延迟的特点”,Kafka更适用于核心业务应用领域。“因此,在国内的大企业中(例如京东),人们通常会结合使用这两种技术来构建实时数据计算架构(参考《张开涛-海量数据下的应用系统架构实践》)。”常见的大数据处理框架包括Storm、Spark、Flink以及Blink(阿里巴巴开发)等。
Kafka的名字源自德国哲学家雅斯贝尔斯对其思想体系的独特诠释。他与法国作家马塞尔·普鲁斯特、爱尔兰作家詹姆斯·乔伊斯等一并被誉为西方现代主义文学领域的先驱与大师。《变形记》作为卡夫卡的重要短篇小说代表作,在其艺术成就中占据了一席之地,并被视为现代文学史上最重要的作品之一(建议达到管理层高度时深入阅读相关人文书籍以提升管理知识及人格修养)。基于kafka_2.11-2.3.0版本进行开发较为合理(其他版本则无需特别考虑)。值得注意的是Kafka采用Scala编写代码而非其他小众语言(除非具备高级技术能力或有特殊需求)。
******2、**Kafka中的基本概念
******2.1、**消息和批次
消息—****Kafka中的数据元素即为一般消息中间件中的消息概念(可类比数据库中的一条记录)。消息体由字节数组构成而携带键信息(可选的元数据均为字节形式),主要负责将消息分配至特定分区。作为一套高效的分布式系统为了提升性能 消息通常会采用分批量的方式进行写入至Kafka。
一批——即一组消息,这些都属于同一主题及分区。若仅发送一个 msg,将会导致显著的网络开销,而分批发送则能有效降低这一开销。然而这要求在传输延迟与数据吞吐速率之间找到平衡,因为随着每批包含的消息数量增加,单位时间内处理的数据量虽有提升但却会延长每个 msg 的传输时间(尽管吞吐率随之提高)。此外,采用压缩技术能够提高数据传输及存储效率,但这需要投入额外的计算资源来进行编码处理。
对于Kafka而言,在实际应用中处理消息时会遇到难以理解的二进制数据结构。通常我们会采用序列化与反序列化的技术手段来处理这些数据。常见的包括JSON、XML以及Avro(Hadoop开发的一款序列化框架),具体的实现细节则由业务的具体需求决定。
******2.2、**主题和分区
Kafka系统中的消息按照主题进行划分(类似于数据库中的表)。每个主题下都可以划分为多个子分区(采用分表技术)。每个子分区相当于一个记录新数据的文件。每当收到新消息时,该信息会按追加的方式存储于相应的子分区中,并记录下来作为新增内容。随后会采用先进先出的方式对这些数据行行存取。

然而由于主题包含多个区域所以就无法确保消息传递的顺序而单一区域则能够确保。Kafka通过划分不同区域实现数据冗余与弹性扩展这是因为各个区域可部署在不同的服务器上也就是说一个主题能够横跨多台服务器(这也是Kafka高性能的重要原因之一多台服务器相比一台在磁盘读写性能上具有显著优势)
在前面讨论中提到过Kafka作为一个流处理平台的重要性。通常情况下,在实际应用中我们往往将同一主题下的数据视为连续流动的部分,并且不论数据被划分为多少个分布区域这一逻辑依然适用。
******2.3、**生产者和消费者、偏移量、消费者群组
主要涉及的一般消息中间件中生产者与消费者的角色。其他的一些高级客户端API(如数据管道API、流式处理的Kafka Stream)则以基本的生产者与消费者组件为基础,并附加了额外的功能。
生产者默认会均匀地将消息分发至主题的所有分区中。若需指定特定分区,则需利用消息中的键值对与分区机制。消费者可订阅主题的一个或多个副本,并按接收到的消息产生顺序进行处理。为判断已接收的消息数量而设置的偏移量字段是一个全局唯一的整数标识符,在 Kafka 中创建新条目时会自动记录当前的最大偏移值。每个分区最后读取的消息偏移量会保存到Zookeeper或Kafka中。
由多个消费者组成的群组即是一个由共同订阅同一主题的消费者所组成的集合体;通过这种方式就可以确保每个分区仅被单个消费者拥有。

这种映射关系被称为消费者对分区的所有权关联,在这种情况下每个分区内只分配给一名用户;但每个用户都可以管理多个分区内。每个分区内只能由一名在线的用户提供服务(避免重复使用),一旦某位用户离线将会有其他在线的用户提供服务。
******2.4、**Broker和集群
我们通常将单独运行的Kafka实例称为Broker。其主要职责包括接收生产者发送的消息并记录消息的位置偏移值;将这些消息持久化存储于磁盘;同时向消费者提供实时数据服务并处理来自消费者的数据请求;返回已持久化的消息副本。在合适的硬件配置下(如具备足够的内存、存储资源以及高效的网络连接),单个Broker能够处理数千个主题以及每秒处理数百万条消息的能力(这要求我们对操作系统和JVM进行性能优化)。
在该Cluster中, 多个Broker通过选举机制选出主要的Controller, 该Controller负责划分区域给各个Broker并监督其运行状态. 在该Cluster中, 一个Partition主要归于一个Broker, 被称为主要Broker; 然而, Partition也可以分配给多个Broker以实现数据冗余, 这时就会触发Partition复制功能. 集群内部通常采用管道技术来进行高效的Partition复制操作.

通过分区复制实现的好处在于,它增加了消息冗余性。当首领broker失效时,剩余的broker能够接手领导职责,并将相应的消息路由转移至新的首领位置。同时相关的消费者和生产者也需要重新连接至新的首领位置以完成任务。
******2.5、**保留消息
消息在指定时间段内的持久保存是Kafka系统的关键特征之一。其默认持久化策略通常是:要么保持持续时间(如7天),要么维持特定大小(如1GB)。当达到限制时,过期的消息会被删除。每个主题可以根据业务需求配置独特的持久化策略;开发过程中需特别注意:与MySQL等传统数据库不同的是
******3、**为什么选择Kafka
******3.1、**优点
- 多个生产者与多个消费者;
- 基于磁盘存储机制的设计使得Kafka的数据具有天然的持久性。
- 高度可扩展性。通过横向扩展生产者、消费者以及 brokers( brokers 能够自动加入或退出集群以适应负载需求),这一特性确保了系统的稳定性和响应能力。
- 该架构使得Kafka能够高效地处理 LinkedIn 公司每天高达万亿级的数据量,并且保证亚秒级的消息延迟。
******3.2、**常见场景
****1)****活动跟踪
收集网站用户的互动数据,并包括页面浏览量及用户点击行为等指标。通过发布到不同的主题或频道(例如今日头条、淘宝zone等),从而利用这些数据提升机器学习的效果,并在分析之前就已经进行了活动追踪。
****2)****传递消息
标准消息中间件的功能。
****3)****收集指标和日志
获取应用程序和系统的性能监控指标, 或者获取应用日志信息; 将数据经由 Kafka 传输至 专业的日志存储平台, 并采用 Elasticsearch (国内常用) 作为主要的日志存储方案.
****4)****提交日志
记录其他系统的变动日志,并以数据库为例。将数据库的最新更新与发布到Kafka同步,并由应用通过监控事件流接收实时数据。当其他系统出现故障时,在线重放相关的事件日志以恢复正常运行状态(异地灾备处理)。
****5)****流处理
操作实时数据流时会执行统计、转换以及复杂的计算操作。随着大数据技术的不断发展与成熟程度不断提升,无论是传统企业还是互联网公司都已经不再满足于离线批处理模式,实时流处理的重要性与需求日益凸显。近年来,业界一直在进行实时流计算引擎及API的研究与应用开发,例如这几年风靡全球的Spark Streaming、Kafka Streaming、Beam以及Flink等技术,其中阿里双11期间展示的实际销售金额应用即基于Flink平台并进行了相应的定制优化为Blink技术
操作实时数据流时会执行统计、转换以及复杂的计算操作
******二、**Kafka的安装、管理和配置
******1、**安装
******1.1、**预备环境
属于 Java 生态系统的项目之一,并基于 Scala 开发,并在 Java 虚拟机上运行。安装后与普通的 Java 程序没有明显区别。遵循 Kafka 的官方指南进行安装时,默认会配置 ZooKeeper 用于存储集群元数据及消费者信息。推荐使用 Java 8 环境,并且为了提升稳定性和可靠性建议独立部署 ZooKeeper 集群而不是默认配置的 ZooKeeper 服务端口
1.2、下载和安装Kafka
访问Kafka官方网站的下载页面http://kafka.apache.org/downloads,并选择合适的版本进行下载。项目中采用的版本为kafka_2.11-2.3.0。完成下载后将文件解压缩并放置在本地指定目录中。
******1.3、**运行
启动ZooKeeper服务,在Windows系统下的Kafka目录下的Bin文件夹中的Windows子目录中创建一个命名为start_zookeeper.bat的批处理脚本,并执行以下命令:zookeeper-server-start.bat ../../config/zookeeper.properties
在Cm命令框中打开并导航至kafka目录下的\bin\windows文件夹后,在其中运行start_zookeeper.bat脚本。当zookeeper启动完成后会自动出现一个提示框窗口,请确保该窗口不要被关闭。
启动并管理Kafka集群服务;访问指定目录下的bin\windows文件夹;创建一个名为start_kafka.bat的批处理文件;配置输入参数为kafka-server-start.bat,并指定配置文件路径为../../config/server.properties。
通过CMD命令行工具导航至kafka目录下的bin windows子目录,在其中运行start_kafka.bat.exe程序。若启动成功将临时弹出提示对话框,请确保该提示框不被关闭。当系统显示指定界面时则表示操作完成。

Linux下与此类似,进入bin后,执行对应的sh文件即可。
******1.4、**基本的操作和管理
##列出所有主题
kafka-topics.bat --zookeeper localhost:2181 --list
##列出所有主题的详细信息
kafka-topics.bat --zookeeper localhost:2181 --describe
##创建主题 主题名 my-topic,1 副本,8 分区
运行Kafka主题并配置其参数:启动Kafka集群用于创建主题my-topic名称;设置复制因子为1份;将主题划分为8份
##增加分区,注意:分区无法被删除
kafka-topics.bat
kafka-topics.bat --zookeeper localhost:2181 --delete --topic my-topic
##创建生产者(控制台)
kafka-console-producer.bat --broker-list localhost:9092 --topic my-topic
##创建消费者(控制台)
kafka-console-consumer.bat --bootstrap-server localhost上运行 --topic my-topic --from-beginning
##列出消费者群组(仅 Linux)
kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --list
##列出消费者群组详细信息(仅 Linux)
kafka-topics.sh - 新增消费者 - 启动代理服务器 localhost:9092 - 描述信息 - 分组 群组名
******1.5、**Kafka的Linux下的运行
步骤与窗口相同:首先启动Zookeeper, 然后启动Kafka. 配置路径位于kafka目录下的bin文件夹内, 并通过shell脚本执行.
注意:Kafka运行占用的端口:Zookeeper默认端口2181,Kafka默认端口9092
******2、**Zookeeper数据查看工具ZooInspector
Zookeeper通常被视为一个常见的集群协调组件,并在各个领域得到广泛应用;尤其是在大数据生态系统中表现得尤为突出。
Zookeeper集群存储各个节点信息,包括:Hadoop、Hbase、Storm、Kafka等等;
2.1**、查询ZK数据的方式******
如何获取Zookeeper中的数据?可以通过运行ZkCli.sh命令行客户端程序来实现这一功能。然而该操作不够直观因为其内部的数据采用树形结构组织存储因此建议使用名为ZooInspector的界面操作工具以获得更加便捷的操作体验
2.2**、ZooInspector的使用******
1)下载:https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
2)解压,进入目录ZooInspector\build,运行:zookeeper-dev-ZooInspector.jar
运行结果:
运行成功后,该JAR文件将直接出现java UI客户端界面

3**、Broker配置******
配置文件主要为server.properties文件, 位于Kafka目录中的config目录内.
3**.1、**常规配置
broker.id在单机环境下无需更改,在部署于集群环境时通常需要进行相应的配置设置。它是每个brocker在集群中所具有的唯一标识符,并且必须为正整数值。如果该服务器的IP地址发生变化,则不会影响到consumers的消息接收情况
Listeners
0.0.0.0:9092。
指定集群地址:提供一个或多个zk集群的IP地址列表,默认使用根路径。用逗号分隔的形式列出多个IP:端口对。
log.dirs****:由log.dirs指定了消息存储的位置。该配置支持多路径模式,并以逗号分隔的形式进行配置。当采用多路径配置时,“最少使用”原则被遵循:即所有相同主题下的日志片段都会被存储在同一指定目录中。Kafka会将新创建的分区优先分配到存储数据量较少的目录中以确保负载均衡。
num_recovery_threads_per_data_dir****:** 每个数据目录负责日志恢复启动与服务器启停相关的线程数量。由于这些线程仅在服务器正常重启及崩溃后重启、以及服务关闭时发挥作用,因此可以适当增加一定数量的线程以实现多线程操作。特别提示:此参数指定每个数据目录下的线程数量。
该参数指定是否支持自动生成主题。若设为true,则 produce 生产者会将消息发送到主题中;而 consume 消费者则会读取这些消息或获取相关元数据信息(特别是当客户端向未存在的主题发送元数据请求时)。此时系统会自动生成该主题,并且缺省设置为 true。
******3.2、**主题配置
新建主题的默认参数
NPartitions:这是一个指定每个新建主题中分区数量的参数(该参数只能递增而不能递减)。该参数通常需要进行评估(例如,在每秒处理1,000MB数据的情况下)。如果每个消费者每秒能够处理50MB的数据,则建议设置为20个分区(这样可以让20个消费者同时访问这些分区),从而实现预期目标(一般经验是将分区大小控制在不超过25GB较为理想)。
log.retention.hours:日志保存时长,默认设置为7天(即168小时),当保存时长超出该设定将自动清除相关数据。 无论哪一个参数先达到预定值(bytes或minutes),都会触发相应的逻辑操作。如果同时设置了 log.retention.minutes 和 log.retention.ms **等参数,则优先采用最小的那个值进行比较判断。(提示:time retention for logs is determined by checking the last modified time of the log fragment files, which is essentially when the fragment was closed, indicating the timestamp of the last message.)
log.retention.bytes:topic中每个分区的最大文件容量,在一个 topic 中的最大存储空间由分区数量乘以 log.retention.bytes 确定。若 log.retention.bytes 或 log.retention.minutes 中任一参数达到预设值将触发数据清除。通常建议避免采用最大文件删除策略而采用文件过期删除策略。
log.segment.bytes:位于特定目录下的多个文件中按段分割的日志片段块的最大存储容量;当存储达到该数值上限时自动重命名并释放旧文件以腾出空间。每个日志片段块一旦被创建完成即刻删除已关闭的旧版本以节省存储资源,默认过期时间为1G。
如果一个主题每日仅能接收100MB的消息,则基于默认参数计算,在经过10天后该主题的存储空间将被填满。此外,在关闭日志片段之前的消息不会自动失效;因此若log.retention.hours维持其默认设置,则该特定的日志片段将在17天后才会彻底失效。当删除该日志片段时,在这期间系统会等待其自然失效(需10天),而后还需额外等待7天才能完成清除操作。
该标记符的作用与log.segment.bytes相似,但其判别标准基于时间,同样涉及的两个参数中,较早到达的一方将被优先处理,该标记符默认处于关闭状态
message.max.bytes: **表示服务器处理消息的最大容量;需要确保producer和consumer设置一致,并且不应超过fetch.message.max.bytes属性所限定的消费者最大消息容量;该字段默认设置为1,000,000个字节,并对应约925.92KB至1MB的空间占用;如果启用压缩,则应根据压缩后的情况进行调整;需要注意的是,默认情况下字段大小会影响网络传输时间和I/O操作时间等性能指标;Kafka的设计初衷在于高效处理较短的消息片段,在此场景下通常建议将消息大小控制在约1万字节以内以达到最佳吞吐量(如LinkedIn的相关性能测试)
4、硬件配置对Kafka性能的影响****
为Kafka系统选配合适的硬件配置如同一项艺术,正如它的名字所暗示的那样。我们需要系统地从磁盘性能、内存管理能力、网络带宽以及处理器核心数量等方面进行评估,并识别出这些关键因素,在预算允许的情况下合理搭配硬件配置以实现最佳性能表现。
******4.1、********磁盘吞吐量/**磁盘容量
生产者的运行效率会受到磁盘吞吐量(每秒的IOPS数值)的影响。具体而言, 生产者生成的消息需经由服务器进行持久化存储. 由于客户端通常会持续等待直至至少一个服务器确认消息已成功写入后才释放锁. 因此, 在提升系统性能方面具有决定性作用的是磁盘的写入速度: SSD固态硬盘虽然价格较高且单个设备速度较快, 在某些场景下可能无法通过增加数量来显著提升整体吞吐量; 相比之下, 在HDD领域则可以通过配置多个设备并合理管理数据分布来提高处理效率. 关于存储容量的选择, 则应根据系统需求来定: 若每天接收1TB的数据并计划长期存储7天, 则所需的总存储容量为7TB.
******4.2、**内存
虽然Kafka系统本身所需的内存规模并不算大,但内存资源的管理对消费者端的性能有着重要影响.通常在实际应用中,消费者处理的数据主要来源于内存中的缓存机制(包括页面缓存和系统内存分配),相较于从磁盘上读取数据而言,这种操作速度明显更快.运行一个Kafka集群所需的Java虚拟机(JVM)内存通常不会非常庞大,剩余未被Kafka占用的系统内存空间既可以作为页面缓存存储额外数据块,也可以用于临时存储当前处理的日志片段.为了避免资源竞争和性能瓶颈问题,在实际部署中我们通常建议将Kafka集群与具有独立存储能力的应用服务部署在同一台服务器上,以确保各个服务能够访问到一致的日志数据.
******4.3、**网络
网络吞吐量直接影响了Kafka能够处理的最大数据流量。它与磁盘一起成为限制Kafka扩展规模的主要因素。生产者与消费者在写入和读取数据时都需要共同使用大量的网络资源。在进行集群复制时也会占用大量网络资源。
******4.4、**CPU
相对Kafka而言,其对CPU的要求并不算高。主要功能集中在消息的解压与压缩操作上,因此,在评估系统性能时,CPU性能并非优先考虑的因素
******4.5、**总结
在为 Kafka 硬件选择时,请优先评估存储系统及其容量,并关注生产者的处理能力(即磁盘吞吐量)。确定好存储后,请再选择合适的 CPU 和内存配置将使后续工作更加便捷。对于网络部分的选择,请结合业务需求进行评估,并确保其配置合理以提高整体系统效率。
******三、**Kafka的集群
******1、**为何需要Kafka集群
本地开发对生产环境而言至关重要,在本地环境中一台Kafka节点足以满足需求。在实际生产中,在多台服务器上部署集群以实现负载均衡是必要的;此外,在单机故障发生时复制功能能够有效防止数据丢失;同时集群设计通常会考虑高可用性。
******2、**如何估算Kafka集群中Broker的数量
要估量以下几个因素:
所需数量的磁盘空间用于保留数据以及每个broker上可使用的空间是多少?例如,在某些情况下:
- 假设一个集群存储着10TB的数据(需长期保留),并且其中每个broker能够存储2TB的数据,则该集群最少应配备5个broker;
- 如果启用了数据复制功能(这会占用额外的空间),则所需的总数量将翻倍;
- 因此在这种情况下该集群将需要配备10个broker。
该系统具有并行处理请求的能力,并且由于磁盘吞吐量及内存不足导致系统性能出现问题时,系统可以通过扩展broker组件来缓解该问题。
该系统具有并行处理请求的能力,并且由于磁盘吞吐量及内存不足导致系统性能出现问题时,系统可以通过扩展broker组件来缓解该问题。
******3、**Broker如何加入Kafka集群
非常简单,只需要两个参数:
第一:配置zookeeper.connect
第二:为新增的Broker配置每个该集群内部分配一个唯一的ID,并说明Kafka中的集群能够实现动态扩展以支持资源增加。
