Advertisement

顶尖干货-Kafka-Controller模块

阅读量:

前言

最近在啃Kafka源码,越发发现看源码而言,看的懂只是第一步,自己能表达出来属于第二部,再其次能对源码修改写出定制化功能属于第三部。因此希望用言语对自己所学的内容有一个输出,锻炼自己的总结能力。话不多说正式开始Kafka的Controller模块的源码分析。

一、Controller定义

在Kafka中Controller模块的功能主要有两个

1.给主题下的分区竞选Leader副本

2.Controller承载着集群的元信息,因此它会管理元信息包括同步元信息到到各个Boker上

**** 在开始正式的介绍之前我们先来看看Controller模块需要掌握的重点是什么:

** 1.我们需要知道集群元信息保存在哪里,同时也要知道集群中有哪些元信息——ControllerContext**

2.如何更新元数据,比如删除主题

3.主题分区的领导者选举是怎么样的

4.controller是如何向broker发送消息的——ControllerChannelManager

5.设计到元信息的改变一般采取事件通知的方式,那么Kafka是如何定义这些事件的呢——ControllerEventManager

二、集群元信息

熟悉kafka的同学都知道在启动Kafka之前,往往都要启动一个zookeeper(社区已经宣布将来的Kafka不在依赖zookeeper)。zookeeper的临时节点,顺序节点,watch机制等分布式特性使得用zookeeper保存Kafka的元信息在遇到元信息状态变化时可以及时察觉并做出下一步处理。然而Broker并不会与Zookeeper去通信获取元信息,而是选择和Controller进行通信,从Controller中获取和更新最新的元信息。而正真定义元信息的在ControllerContext中,从名字我们也能看出ControllerContext是Controller的上下文(可以参考下Spring的ApplicationContext),换句话说及容器。不得不说kafka的代码写的非常好,紧紧从字段的定义或者方法名的定义就可以看出大致的信息

复制代码
   //Controller统计信息

    
   val stats = new ControllerStats
    
   //离线分区数
    
   var offlinePartitionCount = 0
    
   //首选分片失衡数
    
   var preferredReplicaImbalanceCount = 0
    
   //正在关闭的BrokerId列表
    
   val shuttingDownBrokerIds = mutable.Set.empty[Int]
    
   //正在运行的Broker列表对象
    
   private val liveBrokers = mutable.Set.empty[Broker]
    
   //正在运行的Broker Epoch 列表
    
   private val liveBrokerEpochs = mutable.Map.empty[Int, Long]
    
   //当前Controller的Enpoch值
    
   var epoch: Int = KafkaController.InitialControllerEpoch
    
   //Controller对应ZooKeeper节点的Epoch值  
    
   var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion
    
   // 集群主题列表
    
   val allTopics = mutable.Set.empty[String]
    
   // 主题分区的副本列表
    
   val partitionAssignments = mutable.Map.empty[String, mutable.Map[Int, 
    
   ReplicaAssignment]]
    
   // 主题分区的Leader/ISR副本信息
    
   private val partitionLeadershipInfo = mutable.Map.empty[TopicPartition, 
    
   LeaderIsrAndControllerEpoch]
    
   // 正处于副本重分配过程的主题分区列表
    
   val partitionsBeingReassigned = mutable.Set.empty[TopicPartition]
    
   // 主题分区状态列表
    
   val partitionStates = mutable.Map.empty[TopicPartition, PartitionState]
    
   // 主题分区的副本状态列表
    
   val replicaStates = mutable.Map.empty[PartitionAndReplica, ReplicaState]
    
   // 不可用磁盘路径上的副本列表
    
   val replicasOnOfflineDirs = mutable.Map.empty[Int, Set[TopicPartition]]
    
   // 待删除主题列表
    
   val topicsToBeDeleted = mutable.Set.empty[String]
    
   // 已开启删除的主题列表
    
   val topicsWithDeletionStarted = mutable.Set.empty[String]
    
   // 暂时无法执行删除的主题列表
    
   val topicsIneligibleForDeletion = mutable.Set.empty[String]
    
    
    
    

全部评论 (0)

还没有任何评论哟~