顶尖干货-Kafka-Controller模块
前言
最近在啃Kafka源码,越发发现看源码而言,看的懂只是第一步,自己能表达出来属于第二部,再其次能对源码修改写出定制化功能属于第三部。因此希望用言语对自己所学的内容有一个输出,锻炼自己的总结能力。话不多说正式开始Kafka的Controller模块的源码分析。
一、Controller定义
在Kafka中Controller模块的功能主要有两个
1.给主题下的分区竞选Leader副本
2.Controller承载着集群的元信息,因此它会管理元信息包括同步元信息到到各个Boker上
**** 在开始正式的介绍之前我们先来看看Controller模块需要掌握的重点是什么:
** 1.我们需要知道集群元信息保存在哪里,同时也要知道集群中有哪些元信息——ControllerContext**
2.如何更新元数据,比如删除主题
3.主题分区的领导者选举是怎么样的
4.controller是如何向broker发送消息的——ControllerChannelManager
5.设计到元信息的改变一般采取事件通知的方式,那么Kafka是如何定义这些事件的呢——ControllerEventManager
二、集群元信息
熟悉kafka的同学都知道在启动Kafka之前,往往都要启动一个zookeeper(社区已经宣布将来的Kafka不在依赖zookeeper)。zookeeper的临时节点,顺序节点,watch机制等分布式特性使得用zookeeper保存Kafka的元信息在遇到元信息状态变化时可以及时察觉并做出下一步处理。然而Broker并不会与Zookeeper去通信获取元信息,而是选择和Controller进行通信,从Controller中获取和更新最新的元信息。而正真定义元信息的在ControllerContext中,从名字我们也能看出ControllerContext是Controller的上下文(可以参考下Spring的ApplicationContext),换句话说及容器。不得不说kafka的代码写的非常好,紧紧从字段的定义或者方法名的定义就可以看出大致的信息
//Controller统计信息
val stats = new ControllerStats
//离线分区数
var offlinePartitionCount = 0
//首选分片失衡数
var preferredReplicaImbalanceCount = 0
//正在关闭的BrokerId列表
val shuttingDownBrokerIds = mutable.Set.empty[Int]
//正在运行的Broker列表对象
private val liveBrokers = mutable.Set.empty[Broker]
//正在运行的Broker Epoch 列表
private val liveBrokerEpochs = mutable.Map.empty[Int, Long]
//当前Controller的Enpoch值
var epoch: Int = KafkaController.InitialControllerEpoch
//Controller对应ZooKeeper节点的Epoch值
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion
// 集群主题列表
val allTopics = mutable.Set.empty[String]
// 主题分区的副本列表
val partitionAssignments = mutable.Map.empty[String, mutable.Map[Int,
ReplicaAssignment]]
// 主题分区的Leader/ISR副本信息
private val partitionLeadershipInfo = mutable.Map.empty[TopicPartition,
LeaderIsrAndControllerEpoch]
// 正处于副本重分配过程的主题分区列表
val partitionsBeingReassigned = mutable.Set.empty[TopicPartition]
// 主题分区状态列表
val partitionStates = mutable.Map.empty[TopicPartition, PartitionState]
// 主题分区的副本状态列表
val replicaStates = mutable.Map.empty[PartitionAndReplica, ReplicaState]
// 不可用磁盘路径上的副本列表
val replicasOnOfflineDirs = mutable.Map.empty[Int, Set[TopicPartition]]
// 待删除主题列表
val topicsToBeDeleted = mutable.Set.empty[String]
// 已开启删除的主题列表
val topicsWithDeletionStarted = mutable.Set.empty[String]
// 暂时无法执行删除的主题列表
val topicsIneligibleForDeletion = mutable.Set.empty[String]
