LEADER_AND_ISR和UPDATE_METADATA请求分析

节点常规启动时每个节点收到的请求的列表梳理

node0

启动0:

apiKey=UPDATE_METADATA

apiKey=LEADER_AND_ISR

apiKey=UPDATE_METADATA

启动1:

apiKey=UPDATE_METADATA

apiKey=UPDATE_METADATA

apiKey=UPDATE_METADATA

启动2:

apiKey=UPDATE_METADATA clientId=0

apiKey=UPDATE_METADATA clientId=0

apiKey=UPDATE_METADATA clientId=0

apiKey=OFFSET_FOR_LEADER_EPOCH clientId=broker-2-fetcher-0

apiKey=FETCH clientId=broker-2-fetcher-0

…// 5次fetch

apiKey=UPDATE_METADATA clientId=0

…// 一直fetch

node1 :

启动1:

apiKey=UPDATE_METADATA

apiKey=LEADER_AND_ISR

apiKey=UPDATE_METADATA

apiKey=LEADER_AND_ISR

apiKey=UPDATE_METADATA

启动2:

apiKey=UPDATE_METADATA clientId=0

apiKey=UPDATE_METADATA clientId=0

apiKey=UPDATE_METADATA clientId=0

apiKey=UPDATE_METADATA clientId=0

node2 :

启动2:

apiKey=UPDATE_METADATA

apiKey=LEADER_AND_ISR

apiKey=UPDATE_METADATA

apiKey=LEADER_AND_ISR

apiKey=UPDATE_METADATA

apiKey=UPDATE_METADATA clientId=0 // 这个fetch请求之后 又一次update_meta。

// 观察node1 2,也可以发现,一个非controller节点起来加入之后要处理5个请求(U,L,U,L,U),controller的节点起来加入之后要处理3个请求(U,L,U)。

// 这些个多次UPDATE_METADATA,更新的topic不一样。

node0端在常规启动node0时的调用栈:

Thread [controller-event-thread] (Suspended (breakpoint at line 395 in ControllerBrokerRequestBatch))	
	ControllerBrokerRequestBatch.sendRequestsToBrokers(int) line: 395	
	KafkaController.sendUpdateMetadataRequest(Seq<Object>, Set<TopicPartition>) line: 925	
	KafkaController.onControllerFailover() line: 246	
	KafkaController.kafka$controller$KafkaController$$elect() line: 1206	
...

Thread [controller-event-thread] (Suspended (breakpoint at line 395 in ControllerBrokerRequestBatch))	
	ControllerBrokerRequestBatch.sendRequestsToBrokers(int) line: 395	
	ReplicaStateMachine.handleStateChanges(Seq<PartitionAndReplica>, ReplicaState, Callbacks) line: 106	
	ReplicaStateMachine.startup() line: 66	
	KafkaController.onControllerFailover() line: 248	
	KafkaController.kafka$controller$KafkaController$$elect() line: 1206	
...

Thread [controller-event-thread] (Suspended (breakpoint at line 395 in ControllerBrokerRequestBatch))	
	ControllerBrokerRequestBatch.sendRequestsToBrokers(int) line: 395	
	PartitionStateMachine.handleStateChanges(Seq<TopicPartition>, PartitionState, Option<PartitionLeaderElectionStrategy>) line: 117	
	PartitionStateMachine.triggerOnlinePartitionStateChange() line: 106	
	PartitionStateMachine.startup() line: 62	
	KafkaController.onControllerFailover() line: 249	
	KafkaController.kafka$controller$KafkaController$$elect() line: 1206	
...

三次调用 ControllerBrokerRequestBatch.sendRequestsToBrokers,但请求只有3个,按理说每个里面应该有两个请求,看来是选择性发送的

第一次UPDATE_METADATA, 第二次UPDATE_METADATA和LEADER_AND_ISR(此时是副本状态机启动),第三次没有(此时是分区状态机启动)。

启动时LEADER_AND_ISR和UPDATE_METADATA请求梳理

是否发送对应的请求,取决于相关变量中是否有数据

LEADER_AND_ISR对应的变量是leaderAndIsrRequestMap,UPDATE_METADATA对应的变量是updateMetadataRequestBrokerSet

  • ControllerBrokerRequestBatch.leaderAndIsrRequestMap数据的变化封装在addLeaderAndIsrRequestForBrokers方法中。

  • ControllerBrokerRequestBatch.updateMetadataRequestBrokerSet数据的变化封装在addUpdateMetadataRequestForBrokers方法中。

  • 同时addLeaderAndIsrRequestForBrokers中也会调用addUpdateMetadataRequestForBrokers,也就是说有LEADER_AND_ISR请求时通常也会有UPDATE_METADATA请求

    // ControllerChannelManager.scala   317行  
    def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topicPartition: TopicPartition,
                                           leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
                                           replicas: Seq[Int], isNew: Boolean) {
    
        brokerIds.filter(_ >= 0).foreach { brokerId =>
          val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
          val alreadyNew = result.get(topicPartition).exists(_.isNew)
          result.put(topicPartition, new LeaderAndIsrRequest.PartitionState(leaderIsrAndControllerEpoch.controllerEpoch,
            leaderIsrAndControllerEpoch.leaderAndIsr.leader,
            leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch,
            leaderIsrAndControllerEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava,
            leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion,
            replicas.map(Integer.valueOf).asJava,
            isNew || alreadyNew))
        }
    // **同时发送UPDATE_METADATA请求**
        addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
      }

现在梳理三个环节中对应的变量的数据的变化逻辑:

  • onControllerFailover中的初次sendUpdateMetadataRequest调用

    KafkaController.sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition])封装了单独(仅仅UPDATE_METADATA请求,不含有LEADER_AND_ISR请求)发送UPDATE_METADATA请求,在KafkaController中有多出调用sendUpdateMetadataRequest,onControllerFailover是其中之一。送进来的参数就是liveOrShuttingDownBrokerIds。可以粗略的理解为,在必要的时候(一些节点状态发生改变时等等,包括onBrokerStartup,onControllerFailover,onPartitionReassignment,onBrokerUpdate,onReplicasBecomeOffline,IsrChangeNotification)就给所有活着的和正在关闭的broker发送更新元数据的请求。

    此次没有调用addLeaderAndIsrRequestForBrokers,所以只有UPDATE_METADATA请求。

  • 副本状态机启动时handleStateChanges中的controllerBrokerRequestBatch.sendRequestsToBrokers

    对于NewReplica,OnlineReplica,OfflineReplica三种状态都有可能触发LEADER_AND_ISR请求。从代码不难看出来,副本状态机启动时,对应目标状态是OnlineReplica。 上面也讲述了,触发LEADER_AND_ISR请求就会级联触发UPDATE_METADATA请求

  • 分区状态机启动时的handleStateChanges中的controllerBrokerRequestBatch.sendRequestsToBrokers

broker端处理LEADER_AND_ISR请求

入口:

KafkaApis.handleLeaderAndIsrRequest // KafkaApis.scala 156行

ReplicaManager.becomeLeaderOrFollower // ReplicaManager.scala 1018行

becomeLeaderOrFollower

逻辑:

  • 通过controller的epoch的值 来确定是否有新controller出现过(controller发生转移),已确定这个请求是否合法(是否来自于老的controller上) // ReplicaManager.scala 1027行
  • 检查分区leader的当前epoch是否正确(当前的小于请求中epoch就不正确,请求中的来自于从zk读取的),不正确的话用leaderAndIsrRequest.partitionStates替换掉当前的stateInfo // ReplicaManager.scala 1054行
  • 根据当前broker id与请求中分区的leader的broker id比较是否相同,来划分出是become leader(broker id相同)还是become follower(broker id不同)。
    • become leader的执行 makeLeaders // ReplicaManager.scala 1138行
    • become follower的执行makeFollowers // ReplicaManager.scala 1218行
  • 如果未启动HighWaterMarksCheckPointThread,则启动startHighWaterMarksCheckPointThread
  • 其他一些后置和事件处理

makeLeaders

逻辑:

  • 从Fetcher管理器中将LeaderAndIsrRequest中要成为leader的分区移除掉。
  • Partition.makeLeader逻辑
    • 用LeaderAndIsrRequest.PartitionState.basePartitionState.isr更新分区的inSyncReplicas字段
    • 用LeaderAndIsrRequest.PartitionState.basePartitionState.leaderEpoch更新分区的leaderEpoch字段
    • 用LeaderAndIsrRequest.PartitionState.basePartitionState.zkVersion更新分区的zkVersion字段
    • 通过分区的leaderReplicaIdOpt字段判断是否是新leader,如果是新leader则更新leader副本实例的HW,即Replica.highWatermarkMetadata字段。更新leader副本实例的HW如下: // Partition.scala 289
      1. 尝试从Replica.highWatermarkMetadata处开始读取一个消息。并将这个highWatermarkMetadata和LSO LEO比较以判断是否合法
      2. 主要依赖Log.readUncommitted方法,这里面主要逻辑还是针对事务消息做的处理
      3. Replica.highWatermarkMetadata字段最初来自于哪里?来自于checkpoint文件与LEO的比较checkpoint文件读取逻辑在Partition.getOrCreateReplica,ReplicaManager.highWatermarkCheckpoints方法中完成。文件名是replication-offset-checkpoint,位置在数据目录下。该文件的数据由定时调度定时写入,在ReplicaManager.checkpointHighWatermarks()处,刷的周期由replica.high.watermark.checkpoint.interval.ms配置项决定,默认5s。LEO是靠Log load文件算出来的,靠Log.nextOffsetMetadata字段,nextOffsetMetadata又来自于log加载时和每次append之后的+1。参见locally部分**。
    • 触发是否要增加hw的逻辑,是否增加hw的逻辑有: // Partition.scala 462行
      1. 取所有合法副本(isr中或者lag时间还没超标的副本)中的LEO的集合
      2. 取LEO的集合中最小的LEO作为新的HW
      3. 新的hw相比旧的hw增加了或者两者相同但是oldHighWatermark比newHighWatermark在更旧的segment上,那么更新hw。旧的hw是指Replica.highWatermarkMetadata字段
      4. 正常启动时,并不会更新,因为旧的hw和新的hw一样大。

makeFollowers

逻辑:

  • 匹配到leader在alive brokers列表中时 做makeFollower切换,partition.makeFollower,切换成功后维护到partitionsToMakeFollower中

    • partition.makeFollower主要是更新分区的controllerEpoch、inSyncReplicas、leaderEpoch、zkVersion、leaderReplicaIdOpt字段,更新的数据来源是LEADER_AND_ISR请求中带过来的。
  • 将partitionsToMakeFollower中的分区从Fetcher管理器中移除

  • 对将partitionsToMakeFollower中的分区tryCompleteDelayedProduce、tryCompleteDelayedFetch

  • 梳理出partitionsToMakeFollowerWithLeaderAndOffset并准备做副本fetch

    • 创建并启动fetch线程kafka.server.AbstractFetcherManager.addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]).addAndStartFetcherThread

      创建fetch线程代码,ReplicaFetcherManager.scala 29行:

      override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
        val prefix = threadNamePrefix.map(tp => s"${tp}:").getOrElse("")
        val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
        new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig, replicaManager, metrics, time, quotaManager)
      }

      启动fetch线程代码,AbstractFetcherManager.scala 120行:

      def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId, brokerIdAndFetcherId: BrokerIdAndFetcherId) {
        val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
        fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread)
        fetcherThread.start
      }
    • 将分区加入fetcher管理器,并设置对应的offset,offset的值为分区副本的HW,后面trunct时的targetOffset就来自于这个地方

      BrokerAndInitialOffset(
                  metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.brokerEndPoint(config.interBrokerListenerName),
                  partition.getReplica().get.highWatermark.messageOffset)	  // ReplicaManager.scala 1301行
fetcherThreadMap(brokerIdAndFetcherId).addPartitions(initialFetchOffsets.map { case (tp, brokerAndInitOffset) =>
  tp -> brokerAndInitOffset.initOffset
})  // AbstractFetcherManager.scala  138行
  • 分区副本的HW的值来自于哪里?来自于checkpoint文件。跟leader的处理逻辑有一半相似,leader的还要选所有LEO中最小的,follower的不要,follower只要读checkpoint文件。读的细节部分可以参见makeLeaders章节的。