Kafka 分区 面试官夸我这你都懂?不错不错
一、副本机制

Kafka在⼀定数量的服务器上对主题分区进⾏复制。
当集群中的某个broker发生故障时,系统能够自动地将故障转移至其他可用副本,并确保数据的安全性不受影响。
创建主题:
| kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic tp_demo_02 --partitions 2 --replication-factor 3 |
|---|
在主题配置中使用--replication-factor 3这一参数具体说明了复制因子设置为3时的配置情况:该参数指示系统将包含三个副本实例,并由一个Leader节点与两个Follower节点组成
- 将复制因素为1的未复制品被称作复制主题。
- 主题的最小单元即为其 partitions.
- 在非故障情况下,在Kafka中每个 partition 包含一个 Leader 副本以及零到多个 Follower 副本.
- 其(指包括 Leader 副本)总共有多少构成了该系统的 copy factor.
- 所有的读取与写入操作完全由其(即 Leader 副本)负责.
- 通常情况而言,在数量上来说, Kafka 中 Partition 的数量多于 Broker 的数量, 并且这些 Leader Partition 会均匀地分布在各个 Broker 之间.
Follower集群与普通Kafka消费者相同,在其 Leader 区域捕获的数据将从自身日志中捕获
允许Follower对⽇志条⽬拉取进⾏批处理 。
同步节点定义:
- 每个节点都能够持续支持与ZooKeeper的通信连接(借助该系统中基于心跳计数器的心跳校验机制)。
- 在Follower副本分区中执行的操作将基于Leader分区上的读取操作完成,并且这些操作将尽量减少执行时间。
Kafka的系统设计确保了这一功能:只要有一个副本节点处于在线运行状态,则所有提交的消息数据包都不会丢失。
宕机如何恢复 :
少数副本出现故障时
当 leader 出现故障后
系统会自动切换到 follower 中的一个作为新的 leader
一旦 leader 的工作状态恢复正常后
系统会清除之前提交的所有数据
并重新拉取最新数据以确保一致性
- 全部副本宕机
当全部副本宕机了有两种恢复⽅式
- 在 ISR 中找到一个恢复后的副本后,并将其指定为主选举人。(耗时过长会影响可用性)
- 选取首个恢复的副本作为新的主选举人(不论其是否位于 ISR 中)。未包含前次 leader 提交的数据而导致信息丢失。
二、Leader 选举
先看一张图片,在这张图片中:
- 区域P1的Leader值为0;其ISR值包含0、1两个数值。
- 区域P2的Leader设置为2;其ISR配置包括1和2两个选项。
- 区域P3的Leader设置为1;其ISR配置包括三个数值:0、1、2。

⽣产者与消费者的所有请求均由Leader副本负责处理。从属副本仅用于读取并同步与Leader副本的数据。
对于P1,如果0宕机会发⽣什么?
Leader副本与Follower副本之间的关系并非固定不变,在Leader所在的broker发生故障时,则需要进行分区切换,并且需要选举新的Leader复本
如何选举 ?
当某个分区所属的服务器出现故障无法响应请求时,在Kafka集群中,默认会选择该分区的其他副本中的一个节点作为新的 leader 位置,并将所有后续的数据读取和写入操作都会转投到该 new leader 的位置进行处理。当前面临的问题是如何合理地选择下一个 leader 节点来接管这些数据传输任务。
只有那些跟Leader保持同步的Follower才应该被选作新的Leader。
Kafka会在Zookeeper上为每个主题负责管理一个称为ISR(in-sync replica为同步副本;已同步的副本)的集群,并包含这些分区副本。
只有在这些副本实例与Leader中的所有副本达到一致状态后才完成时态转换。
如果这个集合有增减,kafka会更新zookeeper上的记录。
如果某个分区的Leader失效,则Kafka将从ISR集合中选择一个副本作为新的Leader。
显然通过ISR,kafka需要的冗余度较低 ,可以容忍的失败数⽐较⾼。
假设某个topic有N+1个副本,kafka可以容忍N个服务器不可⽤。
为什么不⽤少数服从多数的⽅法
少数服从多数是⼀种⽐较常⻅的⼀致性算发和Leader选举法。
它的含义是只有超过半数的副本同步了,系统才会认为数据已同步;
选择Leader时也是从超过半数的同步的副本中选择。
这种算法需要较⾼的冗余度,跟Kafka⽐起来,浪费资源。
例如, 仅容许一台机器出现故障, 则必须储备三个副本; 若容许最多两台机器同时出现故障, 则需储备五个副本.
⽽kafka的ISR集合⽅法,分别只需要两个和三个副本。
如果所有的ISR副本都失败了怎么办 ?
此时有两种⽅法可选:
- 该系统正在处于ISR集合中的复制件恢复状态。
- 系统可以选择任意一个当前可用的复制件。
需要注意的是,在第二个子句中,“⽴即可⽤”应理解为“当前可用”,即指系统已进入稳定运行状态并能够访问的状态。
- 需要设置
unclean.leader.election.enable=true
这两种⽅法各有利弊,实际⽣产中按需选择。
若期待ISR副本恢复,则需承受较长的时间压力。然而此方法可能无法在短时间内实现这一目标,并且这将导致较长时间内无法恢复复制机的状态。相反地若选择即时可用的副本则可能导致不一致的结果
总结 :
Kafka中的Leader分区选举过程是通过管理一个动态变化的IS(Inclusive Set)列表来实现的。一旦Leader分区丢失后,则会从IS列表中随机选择一个副本节点作为新的Leader加入到IS中。
如果ISR中的副本都丢失了,则:
- 在ISR中进行任意一个副本的恢复后,并在此时向外部提供服务,则需等待一段时间。
- 从OSR中选择一个副本作为Leader副本,则会导致数据丢失。
三、分区重新分配
为已部署好的Kafka集群添加机器之前,请完成以下步骤:首先复制已部署节点上的配置文件;其次更改其中的broker ID为全局唯一标识;最后启动该节点即可使其融入现有Kafka集群
新增的Kafka节点不会自动分配数据给系统中的各个节点,在整个集群中也无法承担负载压力,除非我们创建一个新的主题。
必须手动将某些区域迁移到新加入的Kafka节点上。Kafka提供了一定的技术手段用于重新分配指定主题的所有分区。
在对topic进行重新划分之前,接下来我们可以关注各区域的布局情况
创建主题 :
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_re_01 --partitions 5 --replication-factor 1
查看主题信息 :
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_re_01Topic:tp_re_01 PartitionCount:5 ReplicationFactor:1 Configs:Topic: tp_re_01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_01 Partition: 1 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_01 Partition: 2 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_01 Partition: 3 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_01 Partition: 4 Leader: 0 Replicas: 0 Isr: 0
在node11搭建Kafka :
安装 JDK、Kafka,这里不需要安装Zookeeper
修改 Kafka 的配置config/server.properties
| broker.id=1 | |
|---|---|
| zookeeper.connect=node1:2181/myKafka |
启动 kafka
| [root@node11 ~]# kafka-server-start.sh /usr/src/kafka_2.12-1.0.2/config/server.properties |
|---|
请留意查看node11节点启动时的ClusterId值,并与该系统中zookeeper节点上的ClusterId进行比较。如果两者ClusterId一致,则说明node11和node1处于同一个集群环境中。
node11启动的Cluster ID:

zookeeper节点上的Cluster ID:

在node1上查看zookeeper的节点信息(node11的节点已经加⼊集群了):

我们基于现有集群新增了一个Kafka节点,并通过内置脚本kafka-reassign-partitions.sh来进行分区重新分配。该脚本具备三种执行模式:
- Generate模式中,在针对需要重新分配的Topic时会自动生成新的reassign计划(但未被执行)。
- Execute模式中将根据指定的reassign计划来执行新的Partition划分。
- Verify模式中会用于验证已经划分过的Partition是否完成。
我们决定将分区3、4迁移到broker1上,并通过kafka-reassign-partitions.sh 工具生成相应的重排计划。在此前,我们需要根据需求创建一个文档来详细说明哪些主题(topic)需要进行重新分区。该文档的内容如下:
[root@node1 ~]# cat topics-to-move.json { "topics": [
{ "topic":"tp_re_01" } ], "version":1}
然后使⽤kafka-reassign-partitions.sh⼯具⽣成reassign plan

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka
--topicsto-move-json-file topics-to-move.json --broker-list "0,1"
--generate Current partition replica assignment{"version":1,"partitions":[
{"topic":"tp_re_01","partition":4,"replicas":[0],"log_dirs":["any"]},{"topic":"
tp_re_01","partition":1,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","
partition":2,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,
"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":[0],
"log_dirs":["any"]}]} Proposed partition reassignment configuration{"version":1,
"partitions":[{"topic":"tp_re_01","partition":4,"replicas":[0],"log_dirs":["any"]}
,{"topic":"tp_re_01","partition":1,"replicas":[1],"log_dirs":["any"]},
{"topic":"tp_re_01","partition":2,"replicas":[0],"log_dirs":["any"]},
{"topic":"tp_re_01","partition":3,"replicas":[1],"log_dirs":["any"]},
{"topic":"tp_re_01","partition":0,"replicas":[0],"log_dirs":["any"]}]}
通过Kafka运行reassign plan, 我们可以验证reassign plan是否完成:

查看主题的细节:

经过操作后, 各个分区的分布情况与之前相比发生了显著变化, 特别是Broker 1上的分区配置已经实现了, 分区数量也有所增加。为了提高资源利用率, 我们可以通过使用Kafka脚本工具生成的资源重新分配计划作为一种参考方案, 这种方式能够帮助大家更好地优化系统性能。然而, 我们完全有能力根据实际需求自定义一个资源重新分配方案, 并在此基础上执行相应的操作, 如下所示:

将上⾯的json数据⽂件保存到my-topics-to-execute.json⽂件中,然后也是执⾏它:

四、自动再平衡
我们在新建主题时自行指定主题各个Leader分区以及Follower分区的分布情况,并明确各个副本分布在哪些broker节点上。
在系统的运行过程中,在 broker 发生故障重启时会导致 Leader 分区与 Follower 分区的角色转换。这可能导致大部分 Leader 资源集中在少数几台 broker 上。因为 Leader 负责客户端的数据读取和 writes 操作。此时,在网络 IO、CPU 以及内存这三个方面都会面临较大的压力。
由于Leader和Follower的角色转换可能导致Leader副本在集群中的分布出现不均衡问题,在这种情况下,必须有一种手段来使Leader在集群中的分布达到均衡状态。
执⾏脚本:
[root@node11 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_demo_03 --replica-assignment "0:1,1:0,0:1"
运行结果表明, 该脚本执行后成功生成主题tp_demo_03, 并包含三个分区. 每个分区均配置了两个副本, 其中Leader副本将被配置至列表中的第一个指定的brokerId位置, 而Follower副本则将被配置至后续指定的brokerId位置.


是否有⼀种⽅式,可以让Kafka⾃动帮我们进⾏修改?改为初始的副本分配?
目前采用了 Kafka 所 之 自 动 调 节 资 源 分 配脚本:kafka-preferred-replica-election.sh
先看介绍:

该工具将每个分区的Leader副本合理放置于适当位置,并使Leader分区与Follower分区在服务器间实现均衡分布。
当此脚本仅配置了ZK地址时,则会遍历并自动平衡所有主题,并实现动态负载均衡。
具体操作:
- 创建preferred-replica.json,内容如下:


恢复到最初的分配情况。
之所以是这样的分配,是因为我们在创建主题的时候:
| --replica-assignment "0:1,1:0,0:1" |
|---|
在按逗号分隔的每一个数值对中,默认情况下位于前的部分属于Leader分区,在其对应的Leader副本应当放置于特定的brokerId位置;即所谓的preferred replica指的是将该默认位置上的数字作为确定Leader副本存储位置的关键依据
五、修改分区副本
实际项⽬中,我们可能由于主题的副本因⼦设置的问题,需要重新设置副本因⼦
或者由于集群的扩展,需要重新设置副本因⼦。
topic⼀旦使⽤⼜不能轻易删除重建,因此动态增加副本因⼦就成为最终的选择。
在Kafka 1.0版本的配置文件中,默认未定义default.replication.factor变量。因此,在创建topics时若未指定–replication-factor参数,默认副本因子为1。建议在本地机器的server.properties文件中手动设置一个常用值(如3),从而避免手动调节此参数。例如设置default.replication.factor=3,详细内容可参考官⽅⽂档https://kafka.apache.org/documentation/#replication
原因分析 :
假设我们有2个kafka broker分别broker0,broker1。
-
当我们创建的topic有2个分区partition时并且replication-factor为1,基本上⼀个broker上⼀个分区。当⼀个broker宕机了,该topic就⽆法使⽤了,因为两个个分区只有⼀个能⽤。
-
当我们创建的topic有3个分区partition时并且replication-factor为2时,可能分区数据分布情况是
broker0, partiton0,partiton1,partiton2,
broker1, partiton1,partiton0,partiton2,
每个分区有⼀个副本,当其中⼀个broker宕机了,kafka集群还能完整凑出该topic的两个分区,例如当broker0宕机了,可以通过broker1组合出topic的两个分区。 -
创建主题
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_re_02 --partitions 3 --replication-factor 1
查看主题细节
| [root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_re_02 | |
|---|---|
| Topic:tp_re_02 PartitionCount:3 ReplicationFactor:1 Configs: | |
| Topic: tp_re_02 Partition: 0 Leader: 1 Replicas: 1 Isr: 1 | |
| Topic: tp_re_02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0 | |
| Topic: tp_re_02 Partition: 2 Leader: 1 Replicas: 1 Isr: 1 | |
| [root@node1 ~]# |
修改副本因⼦:错误

采用Kafka脚本kafka-reassign-partitions.sh对副本因子进行调整
| { | |
|---|---|
| "version":1, | |
| "partitions":[ | |
| {"topic":"tp_re_02","partition":0,"replicas":[0,1]}, | |
| {"topic":"tp_re_02","partition":1,"replicas":[0,1]}, | |
| {"topic":"tp_re_02","partition":2,"replicas":[1,0]} | |
| ] | |
| } |
执⾏分配
| [root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file increase-replication-factor.json --execute | |
|---|---|
| Current partition replica assignment | |
| {"version":1,"partitions":[{"topic":"tp_re_02","partition":2,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"tp_re_02","partition":1,"replicas":[0,1],"log_dirs":["any","any"]},{"topic":"tp_re_02","partition":0,"replicas":[0,1],"log_dirs":["any","any"]}]} | |
| Save this to use as the --reassignment-json-file option during rollbackSuccessfully started reassignment of partitions. |
查看主题细节
| [root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_re_02 | |
|---|---|
| Topic:tp_re_02 PartitionCount:3 ReplicationFactor:2 Configs: | |
| Topic: tp_re_02 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1,0 | |
| Topic: tp_re_02 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1 | |
| Topic: tp_re_02 Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0 | |
| [root@node1 ~]# |
六、分区分配策略

在Kafka系统中,每个Topic都包含若干个分区.按照默认配置,单一的一个 partitions can only be consumed by a single consumer within a partitioning group. This inherent constraint leads to the issue of partition allocation. Kafka offers several algorithms for implementing multi-paxos partition assignment: RangeAssignor, Round Robin Assignor, and Sticky Assignor.
6.1 RangeAssignor
PartitionAssignor接口用于基于用户自定义的方式实现分区分配算法,并负责将消费者分组分配到不同的计算节点上。
该消费群体会根据个人兴趣选择关注特定主题,并将这一订阅关系向下级协调机制传递。这一过程由负责管理订阅组的核心机构完成,并将任务划分方案指定至团队中的核心用户以执行具体操作。随后, 所得结果会被同步至团队所有成员以完成整个流程, 而基于区间划分策略的方法是Kafka系统中默认采用的技术方案
该系统通过独立分区为每个主题分配资源。对于每个主题而言,在开始处理之前需要先按分区ID进行数值排序;随后对消费组中的消费者进行字典序排列;最后尽可能均衡地将这些因子分解器实例分发给各个消费节点。在这里只能尽力做到均匀分配,并且由于当因子分解器实例数量无法被消费节点的数量整除时,在某些节点上可能会获得额外的实例数目。

⼤致算法如下:
| assign(topic, consumers) { | |
|---|---|
| // 对分区和Consumer进⾏排序 | |
| List |
|
| sort(partitions); | |
| sort(consumers); | |
| // 计算每个Consumer分配的分区数 | |
| int numPartitionsPerConsumer = partition.size() / consumers.size(); | |
| // 额外有⼀些Consumer会多分配到分区 | |
| int consumersWithExtraPartition = partition.size() % consumers.size(); | |
| // 计算分配结果 | |
| for (int i = 0, n = consumers.size(); i < n; i++) { | |
| // 第i个Consumer分配到的分区的index | |
| int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); | |
| // 第i个Consumer分配到的分区数 | |
| int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); | |
| // 分装分配结果 | |
| assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); | |
| } | |
| } |
RangeAssignor策略的工作原理是通过计算消费者总数与分区总数的商值来确定一个区间范围,在此基础上实现对区域资源的均等划分。该策略采用基于此计算结果对区域资源进行均等划分的方式,并对每个Region进行独立处理以保证公平性。对于每一个Topic资源而言,在其对应的消费组中按注册用户的姓氏拼音顺序排列后依次为每位用户分配固定的区域区间范围。如果这种均等划分无法完全满足公平性要求,则排在姓氏拼音顺序第一位的用户将获得额外的一个区域区间资源以确保系统的公平性和稳定性。
该种资源分配策略的一个明显问题是,在消费者订阅的Topic数量不断增加的情况下,资源分配不均衡的问题将日益加剧。例如,在某示例中(如)一个场景中(假设)有四个数据分区供三个消费者使用的情况下, 系统会在这种情况下将额外的资源单元被赋予给第一个组别(C0)。当再次订阅一个包含四个数据分区的新Topic时, 系统将使额外的一个资源单元被赋予给 C0, 从而使得在两次资源扩展后,C0总共比其他两个组别多分发了两个资源单元。
字典序靠前的消费组中的消费者⽐较“贪婪 ”。

6.2 RoundRobinAssignor
采用轮询策略对资源进行公平分配,在RoundRobinAssignor中会将消费组内所有订阅过的Topic及其对应的消费者按照一定顺序进行公平分配。其中,在RangeAssignor中则是单独针对每个Topic进行分区划分和资源调配。当同一组中的消费者均订阅同一集合时(即每个消费者都关注完全相同的Topic),这种公平性表现得最为突出(即每个消费者分到资源的数量差异不会超过一个单位)。然而,在实际应用中若各消费者的关注范围存在差异,则无法保证这种最优分配方案的存在

相较于RangeAssignor而言,在订阅多个主题的情况下,Round Robin Assignator能够尽量平均地将数据分布给各个消费者(其分区数量差异可能随着订阅主题数量增加而增大)。
对于消费群体内部消费者 topic subscriptions are inconsistent across consumers: 设有两个消费者分别为 C0 和 C1, topic T1 和 T2 分别拥有 3 个和 2 个 zones. 并且 C0 subscribed to both topics T1 and T2, while C1 subscribed only to topic T2. 根据 Round Robin Assignor 的分配规则, 其具体的分配方案如下所示:

初步观察下目前的资源分配较为平衡。然而,值得注意的是,C0部门已负责管理4个区域能源消耗指标,而同时,C1部门负责订阅使用T2区域的一个资源使用计划。是否可以让C1不仅负责订阅T2这一区域的资源,还能平衡整体资源消耗呢?
6.3 StickyAssignor
动机
虽然RoundRobinAssignor在RangeAssignor上进行了某些优化以实现更为均衡的分区分配,在某些情况下仍然存在严重的分配偏差,例如当消费组中的订阅Topic列表不一致时。
核心问题在于无论是RangeAssignor还是RoundRobinAssignor现有资源划分策略未考虑前一次的结果 。显然,在进行新的资源划分操作之前考虑到前一次的结果尽量减少变更能够显著降低资源划分开销。
目标
从字⾯意义上看,Sticky是“粘性的”,可以理解为分配结果是带“粘性的”:
- 分区的分配尽量的均衡
- 每⼀次重分配的结果尽量与上⼀次分配结果保持⼀致
在两个目标发生冲突的情况下,在这种情况下优先考虑第一个目标。第一个目标是每个分配算法力图实现的目标,并且第二个目标才真正体现出StickyAssignor特性的
我们先考察预设的安排结构,并随后深入解析StickyAssignor的具体算法实现。
例如:
- 共有三个端点分别为C0、C1、C2
- 共有四个主题分别为T0至T3 每个主题包含两个分区
- 所有端点均关注上述四个分区
StickyAssignor的分配情况如下图所示(增添RoundRobinAssignor分配与之相比):

如果消费者1出现故障,则按照Round Robin算法依次轮询分配资源。重新开始安排资源分配流程:

采用Sticky策略后,负责调整消费者1对应的分区,并重点标注的内容将得到优化以实现均衡目标。

再举⼀个例⼦:
共有三个消费者:分别是编号为C₀的第一个人体智能设备(BP)、编号为C₁的第二个体态分析仪(BP)以及编号为 C₂ 的第三个环境监测装置(BP)。这里共有三个主题类型:分别是 T₀ 号(仅包含一个分区), T₁ 号(包含两个分区)以及 T₂ 号(包含三个分区)。具体来说:
- 第一个人体智能设备(BP)仅仅安装了一个环境监测装置;
- 第二个体态分析仪(BP)同时配备了两个环境监测装置;
- 第三环境监测装置(BP)配备了全部三个环境监测装置。
分配结果如下图所示:

消费者0下线,则按照轮询的⽅式分配:

按照Sticky⽅式分配分区,仅仅需要动的就是红线部分,其他部分不动。

StickyAssignor分配方式的实现稍显复杂, 但通过学习基础内容即可掌握其核心原理。对于对领域感兴趣的朋友们, 可以进一步深入学习相关知识。
6.4 自定义分区策略
自定义的分配策略必须遵循org.apache.kafka.clients consumer internals PartitionAssignor接口。该接口的定义如下。
| Subscription subscription(Set |
|
|---|---|
| String name(); | |
| Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions); | |
| void onAssignment(Assignment assignment); | |
| class Subscription { | |
| private final List |
|
| private final ByteBuffer userData; | |
| } | |
| class Assignment { | |
| private final List |
|
| private final ByteBuffer userData; | |
| } |
在PartitionAssignor接口中定义了一个叫做Subscription的内部类,并且还定义了一个叫做Assignment的内部类。
Subscription 类用于表示消费者订阅的相关信息。该类包含两个属性:topics 和 userData,在该方法中仅传递了一个参数 topics,并与 Subscription 类中的 topics 实现了一一呼应;但并未体现出与 userData 相关的参数。为了增强用户对资源分配结果的控制,在 subscription() 方法内部可以添加一些影响分配的用户自定义信息到 userData 中;这些信息可以包括权重系数、IP 地址、主机名或机架号等具体数据以供参考。
下面介绍Assignment类及其相关属性与功能实现方式。该类主要负责存储与资源分配相关的数据,并包含两个核心属性:partitions和userData。其中partitions用于记录被划分到各个分区集合的信息;userData则用于存储用户自定义的数据信息。PartitionAssignor接口中的onAssignment()方法作为回调函数,在每个消费者接收到消费组leader提供的区域划分结果时会被触发;例如,在StickyAssignor策略中,则是通过该方法将当前的区域划分方案进行持久化存储。以备在下次区域划分优化(rebalance)操作中能够提供参考依据
该接口中的name()方法用于获取分配策略的名称。在Kafka提供的三种分配策略中,例如RangeAssignor对应(protocol_name=“range”),RoundRobinAssignor对应(protocol_name=“roundrobin”),StickyAssignor对应(protocol_name=“sticky”).因此,在自定义分配策略时需要注意命名不要与已有的分配策略产生混淆.此命名不仅标识了分配策略的名称,在后续操作中如加入消费组或选举消费组leader等场景也会有所应用。
真正的分区分配方案体现在该分发函数内部操作中,在该函数内部接收并处理两个关键参数:一个是用于存储集群相关元数据的信息;另一个是用于记录消费组内各消费者订阅的具体信息;通过这些处理后,在函数执行完毕时会生成并返回各个消费者的资源分配结果。
Kafka还提供了一个通用类org.apache.kafka.clients consumer internals AbstractPartitionAssignor该类旨在简化PartitionAssignor接口的实现过程并实现了其中的assign方法其中会将Subscription中的userData信息移除以执行分配操作 Kafka所提供的3种分区划分策略均继承自该基础类 如果开发者需要在自定义分区划分策略中使用userData信息来影响最终分区结果那么他们就不能直接继承AbstractPartitionAssignor这个基础类 而必须直接实现PartitionAssignor接口
自定义分区策略
| package org.apache.kafka.clients.consumer; | |
|---|---|
| import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; | |
| import org.apache.kafka.common.TopicPartition; | |
| import java.util.*; | |
| public class MyAssignor extends AbstractPartitionAssignor { | |
| } |
在使⽤时,消费者客户端需要添加相应的Properties参数,示例如下:
| properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, MyAssignor.class.getName()); |
|---|
999
999
999

