副本状态机状态定义与变换逻辑

副本状态机启动

kafka.controller.ReplicaStateMachine.startup()

主要两个逻辑:

  1. 初始化副本状态 initializeReplicaState
  2. 处理副本状态改变事件 handleStateChanges

初始化副本状态

  • 根据controllerContext.partitionReplicaAssignment(partition的replica 的broker id编号列表)依次迭代每个topic,再依次迭代每个replica,通过判断其是否在线(isReplicaOnline)来筛选出是OnlineReplica还是ReplicaDeletionIneligible,筛选出的数据维护在ReplicaStateMachine.replicaState。
  • ReplicaStateMachine.replicaState结构kv。k是(partition(即TopicPartition), replicaId)的复合体,v是OnlineReplica或者ReplicaDeletionIneligible这个标志类。
  • kafka.controller.ControllerContext.isReplicaOnline的判断逻辑是replica是否在线且此分区不是要离线的。 replica是否在线,controller是知道的,controller维护了所有live的broker ids。

处理副本状态改变事件

  • 此处的副本指所有在线的副本

  • 准备leaderAndIsrRequestMap数据。 通过ControllerBrokerRequestBatch处理,batch处理时有几个前提,可以参见其new方法中的校验限制。

    addLeaderAndIsrRequestForBrokers 调用栈

    Thread [controller-event-thread] (Suspended (breakpoint at line 321 in ControllerBrokerRequestBatch))	
    	ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq<Object>, TopicPartition, LeaderIsrAndControllerEpoch, Seq<Object>, boolean) line: 321	
    	ReplicaStateMachine$$anonfun$kafka$controller$ReplicaStateMachine$$doHandleStateChanges$4.apply(PartitionAndReplica) line: 189	
    	ReplicaStateMachine$$anonfun$kafka$controller$ReplicaStateMachine$$doHandleStateChanges$4.apply(Object) line: 178	
    	ReplicaStateMachine.kafka$controller$ReplicaStateMachine$$doHandleStateChanges(int, Seq<TopicPartition>, ReplicaState, Callbacks) line: 178	
    	ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(Tuple2<Object,Seq<PartitionAndReplica>>) line: 104	
    	ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(Object) line: 102	
    	ReplicaStateMachine.handleStateChanges(Seq<PartitionAndReplica>, ReplicaState, Callbacks) line: 102	
    	ReplicaStateMachine.startup() line: 66	
    	KafkaController.onControllerFailover() line: 248	
    	KafkaController.kafka$controller$KafkaController$$elect() line: 1206	
    	KafkaController$Startup$.process() line: 1141	
    	ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp() line: 69	
    	ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply() line: 69	
    	ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply() line: 69	
    	KafkaTimer.time(Function0<A>) line: 31	
    	ControllerEventManager$ControllerEventThread.doWork() line: 68	
    	ControllerEventManager$ControllerEventThread(ShutdownableThread).run() line: 82

    注意kafka.controller.ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers中会准备leaderAndIsrRequestMap的数据。

    leaderAndIsrRequestMap数据结构:brokerid–>(topicpartition–>partitionstate) partitionstate 数据包括:controllerEpoch=97, leader=0, leaderEpoch=53, isr=0, zkVersion=53, replicas=0, isNew=false

    这些数据来自于controllerContext.partitionLeadershipInfo。partitionLeadershipInfo又来自于哪里呢来自于zk记录的。 在ControllerContext初始化和IsrChangeNotification时触发。

    kafka.controller.KafkaController.updateLeaderAndIsrCache(partitions: Seq[TopicPartition])
      private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) {
        val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
        leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
          controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
        }
      }
  • 给每个副本发送LEADER_AND_ISR请求,发送以副本为单位。谁发送?contoller。发送的数据基础是leaderAndIsrRequestMap(迭代这个map)。leaderAndIsrRequestMap的数据来自于上面步骤准备的。

    kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch: Int)// 414行
    controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequestBuilder,
              (r: AbstractResponse) => controller.eventManager.put(controller.LeaderAndIsrResponseReceived(r, broker)))

    调用堆栈:

    Thread [controller-event-thread] (Suspended (breakpoint at line 414 in ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2))	
    	ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(Tuple2<Object,Map<TopicPartition,PartitionState>>) line: 414	
    	ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(Object) line: 401	
    	...
    	ControllerBrokerRequestBatch.sendRequestsToBrokers(int) line: 401	
    	ReplicaStateMachine.handleStateChanges(Seq<PartitionAndReplica>, ReplicaState, Callbacks) line: 106	
    	ReplicaStateMachine.startup() line: 66	
    	KafkaController.onControllerFailover() line: 248	
    	KafkaController.kafka$controller$KafkaController$$elect() line: 1206	
    	KafkaController$Startup$.process() line: 1141	
    	ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp() line: 69
  • controller给需要的每个副本发送UPDATE_METADATA请求,updateMetadataRequest。

    需要的每个副本怎么来的?如下解释:

    addLeaderAndIsrRequestForBrokers-->addUpdateMetadataRequestForBrokers
    
    addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition)) // ControllerChannelManager.scala 333行
    updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)  // ControllerChannelManager.scala 388行
    
    可以看出来是live的或者即将的关闭的且id>=0的的broker

    发送的地方:

    updateMetadataRequestBrokerSet.foreach { broker =>   // ControllerChannelManager.scala 454行
      controller.sendRequest(broker, ApiKeys.UPDATE_METADATA, updateMetadataRequest, null)
    }
  • controller给需要的broker发送STOP_REPLICA请求。

副本状态机梳理

状态定义

状态以及触发时机梳理

目标状态 触发时机
NewReplica 1. startNewReplicasForReassignedPartition; 2. onNewPartitionCreation
OnlineReplica 1. onPartitionReassignment //5. replicas in RAR -> OnlineReplica; 2. onNewPartitionCreation; 3.onBrokerLogDirFailure; 4.onBrokerStartup; 5.ReplicaStateMachine.startup()
OfflineReplica 1. stopOldReplicasOfReassignedPartition; 2.doControlledShutdown
ReplicaDeletionStarted 1. stopOldReplicasOfReassignedPartition;
ReplicaDeletionIneligible
ReplicaDeletionSuccessful 1. stopOldReplicasOfReassignedPartition; 2.onReplicasBecomeOffline
NonExistentReplica 1. stopOldReplicasOfReassignedPartition;

除了OnlineReplica中的5.ReplicaStateMachine.startup()是在副本状态机中触发的,其余的都是在KafkaController中触发的。

ReplicaStateMachine类的注释梳理下状态机图:

stateDiagram-v2
    [*]-->NonExistentReplica
    NonExistentReplica --> NewReplica 
    NewReplica-->OnlineReplica
    OnlineReplica-->OnlineReplica
    OfflineReplica-->OnlineReplica
    NewReplica-->OfflineReplica
    OnlineReplica-->OfflineReplica
    OfflineReplica-->ReplicaDeletionStarted
    ReplicaDeletionStarted-->ReplicaDeletionSuccessful
    ReplicaDeletionStarted-->ReplicaDeletionIneligible
    ReplicaDeletionSuccessful-->NonExistentReplica

整个状态的定义及前序合法状态的定义在kafka.controller.ReplicaState(ReplicaStateMachine.scala396行)的子类中。

状态处理逻辑

作者在注释中给出了详细的说明,摘录如下:

  • NonExistentReplica –> NewReplica
    send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
  • NewReplica -> OnlineReplica
    add the new replica to the assigned replica list if needed
  • OnlineReplica,OfflineReplica -> OnlineReplica
    send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
  • NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible -> OfflineReplica
    send StopReplicaRequest to the replica (w/o deletion)
    remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and
    UpdateMetadata request for the partition to every live broker.
  • OfflineReplica -> ReplicaDeletionStarted
    send StopReplicaRequest to the replica (with deletion)
  • ReplicaDeletionStarted -> ReplicaDeletionSuccessful
    mark the state of the replica in the state machine
  • ReplicaDeletionStarted -> ReplicaDeletionIneligible
    mark the state of the replica in the state machine
  • ReplicaDeletionSuccessful -> NonExistentReplica
    remove the replica from the in memory partition replica assignment cache

我比较关心的是OnlineReplica的状态处理:

  • 由NewReplica切换到OnlineReplica的处理逻辑是:更新partitionReplicaAssignment数据,partitionReplicaAssignment中存放的是partition与replica ids的编号的映射
  • 由OnlineReplica or OfflineReplica切换到OnlineReplica的处理逻辑是:准备给replicaId发送LEAD_AND_ISR请求 (副本状态机启动时 一般就是OnlineReplica–>OnlineReplica)