controller如何选主及其启动过程分析

broker启动

  1. kafka.controller.KafkaController.startup() // controller启动 注册了启动Startup事件用于选择controller,即controller选主(KafkaController.scala 1141行) 以及RegisterBrokerAndReelec
  2. kafka.server.KafkaServer.startup() // server启动
  3. kafka.server.KafkaServerStartable.startup() // server启动器启动
  4. kafka.Kafka.main(args: Array[String]) // main方法入口

上面逻辑是按照 4调3 3调2 2调1的顺序进行的。

集群id的确定

获取(没有就生成)clusterid 数据放在zk /cluster/id下 形式如: {“version”:”1”,”id”:”L268Ylm_RZGN7Hasp4_xYA”} id生成规则就是 uuid转换成long再base64

broker id的确定

  1. 读取数据目录下(logDirs)目录下 meta.properties文件中记录的broker的id 672行有跟当前broker配置的id进行比较 不等则失败
  2. 如果配置没有指定broker id 则生成,生成办法是:靠/brokers/seqid的dataVersion加上设置的基数(config.maxReservedBrokerId)确定出自动生成broker id

controller选主与KafkaController.startup的主要逻辑

kafka集群需要controller角色来做协调,包括各topic-partition的leader的确定等等。

controller的选主的原理说起来也很简单,broker启动的时候,到zk创建临时节点,谁创建成功谁就是controller。

private def elect(): Unit = {
    try {
    zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))// KafkaController.scala 1203行
    activeControllerId = config.brokerId
    onControllerFailover() // controller 挂掉的时候的回调,broker刚起来的时候也走这里。  会从zk获取分区状态数据初始化Controller的的context
  } catch {
    case _: NodeExistsException =>
      // If someone else has written the path, then
      activeControllerId = zkClient.getControllerId.getOrElse(-1)
}

controller选主与onControllerFailover分析

onControllerFailover中做了很多事情,先挨个列举下重要环节:

  • readControllerEpochFromZooKeeper // 从zk读取controller的epoch

  • incrementControllerEpoch // 增加controller的epoch,并通过cas的方式写入zk,确保写如正确

  • 向zk注册child listener brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,

    isrChangeNotificationHandler   
  • 向zk注册node listener preferredReplicaElectionHandler, partitionReassignmentHandler

  • 调用zk删除deleteLogDirEventNotifications deleteIsrChangeNotifications

  • initializeControllerContext,初始化ControllerContext,下简称context。

    • 向zk读取所有broker的id。 KafkaZkClient.getAllBrokersInCluster // TODO: 都是活的broker??

    • 向zk读取所有的topic信息。 KafkaZkClient.getAllTopicsInCluster。

    • 向zk注册PartitionModificationsHandler。

    • 初始化context中的partitionReplicaAssignment。通过zk获取,getReplicaAssignmentForTopics。数据结构是 TopicPartition –> Seq[Int]。这个topic对应的副本节点id信息对后面的分区选leader非常重要,zk上的数据来源是在topic创建时产生的,在后面分区选leader有详细讲

    • 初始化context中的partitionLeadershipInfo。此时是空的。数据结构是:TopicPartition –> LeaderIsrAndControllerEpoch。partitionLeadershipInfo又来自于哪里呢来自于zk记录的数据包括:controllerEpoch=97, leader=0, leaderEpoch=53, isr=0, zkVersion=53, replicas=0, isNew=false

      zk上的信息示例如下

      [zk: 127.0.0.1:2182(CONNECTED) 8] get /brokers/topics/simon.test02.p1r2/partitions/0/state
      {"controller_epoch":49,"leader":0,"version":1,"leader_epoch":2,"isr":[0]}
    - 迭代liveBrokers,为其向zk注册BrokerModificationsHandler。  
    
    - 从zk读取leaderIsrAndControllerEpochs,并更新context中的partitionLeadershipInfo,KafkaController.updateLeaderAndIsrCache。  
    
    - 启动channel manager,startChannelManager,做的事情就是为每一个broker启动一个kafka.controller.RequestSendThread线程,用于发送请求,这些个请求都在其队列中,发送策略是带有退避backoff的不断重试。线程名是`s"$name:Controller-${config.brokerId}-to-broker-${broker.id}-send-thread"`,且controllerChannelManager挂在context中。  起这个线程的作用是为后面的ControllerBrokerRequestBatch.sendRequestsToBrokers(414,455,472行等等)做准备,也就是说controller.sendRequest的都是通过其完成的。  
    
      ```scala
        private[controller] def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest],
                                            callback: AbstractResponse => Unit = null) = {
          controllerContext.controllerChannelManager.sendRequest(brokerId, apiKey, request, callback)
        }
        def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest],
                        callback: AbstractResponse => Unit = null) {
          brokerLock synchronized {
            val stateInfoOpt = brokerStateInfo.get(brokerId)
            stateInfoOpt match {
              case Some(stateInfo) =>
                stateInfo.messageQueue.put(QueueItem(apiKey, request, callback)) // 请求发到这个队列里,之前起的线程就能take到了,就可以发了。  
              case None =>
                warn(s"Not sending request $request to broker $brokerId, since it is offline.")
            }
          }
        }
    • 处理即将要被删除的topic。fetchTopicDeletionsInProgress,topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)。
  • 给存活的broker发送update metadta请求。 sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)。

  • 副本状态机启动, replicaStateMachine.startup()

  • 分区状态机启动, partitionStateMachine.startup()

  • 一些其他操作:maybeTriggerPartitionReassignment,tryTopicDeletion,onPreferredReplicaElection,kafkaScheduler.startup(),scheduleAutoLeaderRebalanceTask,tokenCleanScheduler.startup()。