分区选主、分区状态机定义与变换逻辑

分区状态机启动

kafka.controller.PartitionStateMachine.startup()

  1. 初始化分区状态,按online offline new分类。
  2. 触发分区上线事件。事件的处理的逻辑主要是给partition选leader。

给分区选举leader

关于分区的leader是如何选取的,涉及的情况比较多,现在大概列举几个要点

ControllerContext的partitionReplicaAssignment的数据产生逻辑
  • 正常创建topic时,会给topic分配副本节点,在AdminUtils.assignReplicasToBrokers中完成。分配完了之后写到zk。数据大致形如:Map(simon.test05.p3r2-2 -> Vector(0, 2), simon.test05.p3r2-1 -> Vector(2, 1), simon.test05.p3r2-0 -> Vector(1, 0)) 。注意这个副本的顺序,有的是0是都第一个,有的是2是第一个,这个很重要,因为后面选leader的时候,取的是副本列表中第一个存活节点。现在这样的第一个元素基本不同,能保证后面的leader选出来的时候不会都落在同一个节点上,比如0。也就是说在topic创建的时候就为后面选出来的leader的负载均衡做了伏笔

  • 创建topic时,controller会处理TopicChange事件时KafkaController.TopicChange.process()(KafkaController.scala 1278行),再从zk读取到这些副本信息放到context的partitionReplicaAssignment中(本地cache),供后续决定是否要选leader和如何选leader时使用。

    chaneg

    val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics) // 关键地方  新建的topic  addedPartitionReplicaAssignment里面分区对应的replica是不一样的,例如:Map(simon.test05.p3r2-2 -> Vector(0, 2), simon.test05.p3r2-1 -> Vector(2, 1), simon.test05.p3r2-0 -> Vector(1, 0))   
    controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
      !deletedTopics.contains(p._1.topic))
    controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
  • 如果不是创建topic,是正常的存量的topic,在controller启动的时候会读取。KafkaController.initializeControllerContext() KafkaController.scala 649行。

    controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
启动时哪些分区需要选leader哪些不要

并不是所有的分区,在启动时需要选leader。在分区状态机启动initializePartitionState时,会确认所有的topic其leader是否在线,如果在线则置为online状态,后面(triggerOnlinePartitionStateChange里面)也不会对其选主了。offline 和 New的才会trigger选主, 如果你topic本来的leader就是当前节点,则就是online状态了。PartitionStateMachine.scala 86行,104行。

如何选leader

对于筛选出来需要选主的分区,走PartitionStateMachine.doElectLeaderForPartitions进行选leader。

选之前再做一次check,拿zk上记录的controller 的epoch与现在context(即本地cache)中epoch比 大的算invalidPartitionsForElection 小于等于的算validPartitionsForElection 正常的都是小于等于。PartitionStateMachine.scala 304行。

check完成之后对合法分区进行选主,前置状态是offline的走这里

​ PartitionStateMachine.leaderForOffline

​ PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection

关键代码:

assignment.find(id => liveReplicas.contains(id) && isr.contains(id))

find: 返回第一个满足p的元素或None。注意:如果找到满足条件的元素,迭代器会被置于该元素之后;如果没有找到,会被置于终点。

此处闭环了,从分配的副本中选择第一个存活的节点作为leader。分配副本列表前面ControllerContext的partitionReplicaAssignment的数据产生逻辑讲了,是在topic分区创建时完成的。

对应触发选leader且成功选出的分区执行addLeaderAndIsrRequestForBrokers。这样后面会发出LEADER_AND_ISR和UPDATE_METADATA请求。所以,如果你这步没有(成功)选leader是不会发这两个请求的。跟我们后面梳理的这两个请求的日志信息吻合。

分区状态机梳理

状态定义

PartitionStateMachine类的注释梳理下状态机图:

stateDiagram-v2
    [*]-->NonExistentPartition
    NonExistentPartition --> NewPartition 
    NewPartition --> OnlinePartition 
    OfflinePartition --> OnlinePartition 
    NewPartition --> OfflinePartition 
    OnlinePartition --> OfflinePartition

NewPartition: the partition should have replicas assigned to it, but no leader/isr yet.分配了副本,但是还没有leader。

OnlinePartition: Once a leader is elected for a partition, it is in the OnlinePartition state.选了leader之后。

OfflinePartition: the leader for partition dies, then the partition moves to the OfflinePartition state.选了之后leader之后,leader节点宕机。

状态处理逻辑

作者在注释中给出了详细的说明,摘录如下:

  • NonExistentPartition -> NewPartition:
    load assigned replicas from ZK to controller cache 从zk加载
  • NewPartition -> OnlinePartition
    assign first live replica as the leader and all live replicas as the isr; write leader and isr to ZK for this partition
    send LeaderAndIsr request to every live replica and UpdateMetadata request to every live broker 第一个活着的副本作为leader 并将这个信息写zk并给活着的节点发送LEADER_AND_ISR和UPDATE_METADATA请求
  • OnlinePartition,OfflinePartition -> OnlinePartition
    select new leader and isr for this partition and a set of replicas to receive the LeaderAndIsr request, and write leader and isr to ZK
    for this partition, send LeaderAndIsr request to every receiving replica and UpdateMetadata request to every live broker
  • NewPartition,OnlinePartition,OfflinePartition -> OfflinePartition
    nothing other than marking partition state as Offline
  • OfflinePartition -> NonExistentPartition
    nothing other than marking the partition state as NonExistentPartition