副本状态机状态定义与变换逻辑
副本状态机启动
kafka.controller.ReplicaStateMachine.startup()
主要两个逻辑:
- 初始化副本状态 initializeReplicaState
- 处理副本状态改变事件 handleStateChanges
初始化副本状态
- 根据controllerContext.partitionReplicaAssignment(partition的replica 的broker id编号列表)依次迭代每个topic,再依次迭代每个replica,通过判断其是否在线(isReplicaOnline)来筛选出是OnlineReplica还是ReplicaDeletionIneligible,筛选出的数据维护在ReplicaStateMachine.replicaState。
- ReplicaStateMachine.replicaState结构kv。k是(partition(即TopicPartition), replicaId)的复合体,v是OnlineReplica或者ReplicaDeletionIneligible这个标志类。
- kafka.controller.ControllerContext.isReplicaOnline的判断逻辑是replica是否在线且此分区不是要离线的。 replica是否在线,controller是知道的,controller维护了所有live的broker ids。
处理副本状态改变事件
此处的副本指所有在线的副本
准备leaderAndIsrRequestMap数据。 通过ControllerBrokerRequestBatch处理,batch处理时有几个前提,可以参见其new方法中的校验限制。
addLeaderAndIsrRequestForBrokers 调用栈
Thread [controller-event-thread] (Suspended (breakpoint at line 321 in ControllerBrokerRequestBatch)) ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq<Object>, TopicPartition, LeaderIsrAndControllerEpoch, Seq<Object>, boolean) line: 321 ReplicaStateMachine$$anonfun$kafka$controller$ReplicaStateMachine$$doHandleStateChanges$4.apply(PartitionAndReplica) line: 189 ReplicaStateMachine$$anonfun$kafka$controller$ReplicaStateMachine$$doHandleStateChanges$4.apply(Object) line: 178 ReplicaStateMachine.kafka$controller$ReplicaStateMachine$$doHandleStateChanges(int, Seq<TopicPartition>, ReplicaState, Callbacks) line: 178 ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(Tuple2<Object,Seq<PartitionAndReplica>>) line: 104 ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(Object) line: 102 ReplicaStateMachine.handleStateChanges(Seq<PartitionAndReplica>, ReplicaState, Callbacks) line: 102 ReplicaStateMachine.startup() line: 66 KafkaController.onControllerFailover() line: 248 KafkaController.kafka$controller$KafkaController$$elect() line: 1206 KafkaController$Startup$.process() line: 1141 ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp() line: 69 ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply() line: 69 ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply() line: 69 KafkaTimer.time(Function0<A>) line: 31 ControllerEventManager$ControllerEventThread.doWork() line: 68 ControllerEventManager$ControllerEventThread(ShutdownableThread).run() line: 82
注意kafka.controller.ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers中会准备leaderAndIsrRequestMap的数据。
leaderAndIsrRequestMap数据结构:brokerid–>(topicpartition–>partitionstate) partitionstate 数据包括:controllerEpoch=97, leader=0, leaderEpoch=53, isr=0, zkVersion=53, replicas=0, isNew=false。
这些数据来自于controllerContext.partitionLeadershipInfo。partitionLeadershipInfo又来自于哪里呢? 来自于zk记录的。 在ControllerContext初始化和IsrChangeNotification时触发。
kafka.controller.KafkaController.updateLeaderAndIsrCache(partitions: Seq[TopicPartition]) private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) { val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions) leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) => controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) } }
给每个副本发送
LEADER_AND_ISR
请求,发送以副本为单位。谁发送?contoller。发送的数据基础是leaderAndIsrRequestMap(迭代这个map)。leaderAndIsrRequestMap的数据来自于上面步骤准备的。kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch: Int)// 414行 controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequestBuilder, (r: AbstractResponse) => controller.eventManager.put(controller.LeaderAndIsrResponseReceived(r, broker)))
调用堆栈:
Thread [controller-event-thread] (Suspended (breakpoint at line 414 in ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2)) ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(Tuple2<Object,Map<TopicPartition,PartitionState>>) line: 414 ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(Object) line: 401 ... ControllerBrokerRequestBatch.sendRequestsToBrokers(int) line: 401 ReplicaStateMachine.handleStateChanges(Seq<PartitionAndReplica>, ReplicaState, Callbacks) line: 106 ReplicaStateMachine.startup() line: 66 KafkaController.onControllerFailover() line: 248 KafkaController.kafka$controller$KafkaController$$elect() line: 1206 KafkaController$Startup$.process() line: 1141 ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp() line: 69
controller给需要的每个副本发送
UPDATE_METADATA
请求,updateMetadataRequest。需要的每个副本怎么来的?如下解释:
addLeaderAndIsrRequestForBrokers-->addUpdateMetadataRequestForBrokers addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition)) // ControllerChannelManager.scala 333行 updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0) // ControllerChannelManager.scala 388行 可以看出来是live的或者即将的关闭的且id>=0的的broker
发送的地方:
updateMetadataRequestBrokerSet.foreach { broker => // ControllerChannelManager.scala 454行 controller.sendRequest(broker, ApiKeys.UPDATE_METADATA, updateMetadataRequest, null) }
controller给需要的broker发送
STOP_REPLICA
请求。
副本状态机梳理
状态定义
状态以及触发时机梳理
目标状态 | 触发时机 |
---|---|
NewReplica | 1. startNewReplicasForReassignedPartition; 2. onNewPartitionCreation |
OnlineReplica | 1. onPartitionReassignment //5. replicas in RAR -> OnlineReplica; 2. onNewPartitionCreation; 3.onBrokerLogDirFailure; 4.onBrokerStartup; 5.ReplicaStateMachine.startup() |
OfflineReplica | 1. stopOldReplicasOfReassignedPartition; 2.doControlledShutdown |
ReplicaDeletionStarted | 1. stopOldReplicasOfReassignedPartition; |
ReplicaDeletionIneligible | |
ReplicaDeletionSuccessful | 1. stopOldReplicasOfReassignedPartition; 2.onReplicasBecomeOffline |
NonExistentReplica | 1. stopOldReplicasOfReassignedPartition; |
除了OnlineReplica中的5.ReplicaStateMachine.startup()是在副本状态机中触发的,其余的都是在KafkaController中触发的。
按ReplicaStateMachine
类的注释梳理下状态机图:
stateDiagram-v2
[*]-->NonExistentReplica
NonExistentReplica --> NewReplica
NewReplica-->OnlineReplica
OnlineReplica-->OnlineReplica
OfflineReplica-->OnlineReplica
NewReplica-->OfflineReplica
OnlineReplica-->OfflineReplica
OfflineReplica-->ReplicaDeletionStarted
ReplicaDeletionStarted-->ReplicaDeletionSuccessful
ReplicaDeletionStarted-->ReplicaDeletionIneligible
ReplicaDeletionSuccessful-->NonExistentReplica
整个状态的定义及前序合法状态的定义在kafka.controller.ReplicaState(ReplicaStateMachine.scala396行)的子类中。
状态处理逻辑
作者在注释中给出了详细的说明,摘录如下:
- NonExistentReplica –> NewReplica
send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker - NewReplica -> OnlineReplica
add the new replica to the assigned replica list if needed - OnlineReplica,OfflineReplica -> OnlineReplica
send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker - NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible -> OfflineReplica
send StopReplicaRequest to the replica (w/o deletion)
remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and
UpdateMetadata request for the partition to every live broker. - OfflineReplica -> ReplicaDeletionStarted
send StopReplicaRequest to the replica (with deletion) - ReplicaDeletionStarted -> ReplicaDeletionSuccessful
mark the state of the replica in the state machine - ReplicaDeletionStarted -> ReplicaDeletionIneligible
mark the state of the replica in the state machine - ReplicaDeletionSuccessful -> NonExistentReplica
remove the replica from the in memory partition replica assignment cache
我比较关心的是OnlineReplica的状态处理:
- 由NewReplica切换到OnlineReplica的处理逻辑是:更新partitionReplicaAssignment数据,partitionReplicaAssignment中存放的是partition与replica ids的编号的映射
- 由OnlineReplica or OfflineReplica切换到OnlineReplica的处理逻辑是:准备给replicaId发送LEAD_AND_ISR请求 (副本状态机启动时 一般就是OnlineReplica–>OnlineReplica)
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!