LEADER_AND_ISR和UPDATE_METADATA请求分析
节点常规启动时每个节点收到的请求的列表梳理
node0
启动0:
apiKey=UPDATE_METADATA
apiKey=LEADER_AND_ISR
apiKey=UPDATE_METADATA
启动1:
apiKey=UPDATE_METADATA
apiKey=UPDATE_METADATA
apiKey=UPDATE_METADATA
启动2:
apiKey=UPDATE_METADATA clientId=0
apiKey=UPDATE_METADATA clientId=0
apiKey=UPDATE_METADATA clientId=0
apiKey=OFFSET_FOR_LEADER_EPOCH clientId=broker-2-fetcher-0
apiKey=FETCH clientId=broker-2-fetcher-0
…// 5次fetch
apiKey=UPDATE_METADATA clientId=0
…// 一直fetch
node1 :
启动1:
apiKey=UPDATE_METADATA
apiKey=LEADER_AND_ISR
apiKey=UPDATE_METADATA
apiKey=LEADER_AND_ISR
apiKey=UPDATE_METADATA
启动2:
apiKey=UPDATE_METADATA clientId=0
apiKey=UPDATE_METADATA clientId=0
apiKey=UPDATE_METADATA clientId=0
apiKey=UPDATE_METADATA clientId=0
node2 :
启动2:
apiKey=UPDATE_METADATA
apiKey=LEADER_AND_ISR
apiKey=UPDATE_METADATA
apiKey=LEADER_AND_ISR
apiKey=UPDATE_METADATA
apiKey=UPDATE_METADATA clientId=0 // 这个fetch请求之后 又一次update_meta。
// 观察node1 2,也可以发现,一个非controller节点起来加入之后要处理5个请求(U,L,U,L,U),controller的节点起来加入之后要处理3个请求(U,L,U)。
// 这些个多次UPDATE_METADATA,更新的topic不一样。
node0端在常规启动node0时的调用栈:
Thread [controller-event-thread] (Suspended (breakpoint at line 395 in ControllerBrokerRequestBatch))
ControllerBrokerRequestBatch.sendRequestsToBrokers(int) line: 395
KafkaController.sendUpdateMetadataRequest(Seq<Object>, Set<TopicPartition>) line: 925
KafkaController.onControllerFailover() line: 246
KafkaController.kafka$controller$KafkaController$$elect() line: 1206
...
Thread [controller-event-thread] (Suspended (breakpoint at line 395 in ControllerBrokerRequestBatch))
ControllerBrokerRequestBatch.sendRequestsToBrokers(int) line: 395
ReplicaStateMachine.handleStateChanges(Seq<PartitionAndReplica>, ReplicaState, Callbacks) line: 106
ReplicaStateMachine.startup() line: 66
KafkaController.onControllerFailover() line: 248
KafkaController.kafka$controller$KafkaController$$elect() line: 1206
...
Thread [controller-event-thread] (Suspended (breakpoint at line 395 in ControllerBrokerRequestBatch))
ControllerBrokerRequestBatch.sendRequestsToBrokers(int) line: 395
PartitionStateMachine.handleStateChanges(Seq<TopicPartition>, PartitionState, Option<PartitionLeaderElectionStrategy>) line: 117
PartitionStateMachine.triggerOnlinePartitionStateChange() line: 106
PartitionStateMachine.startup() line: 62
KafkaController.onControllerFailover() line: 249
KafkaController.kafka$controller$KafkaController$$elect() line: 1206
...
三次调用 ControllerBrokerRequestBatch.sendRequestsToBrokers,但请求只有3个,按理说每个里面应该有两个请求,看来是选择性发送的
第一次UPDATE_METADATA, 第二次UPDATE_METADATA和LEADER_AND_ISR(此时是副本状态机启动),第三次没有(此时是分区状态机启动)。
启动时LEADER_AND_ISR和UPDATE_METADATA请求梳理
是否发送对应的请求,取决于相关变量中是否有数据。
LEADER_AND_ISR对应的变量是leaderAndIsrRequestMap
,UPDATE_METADATA对应的变量是updateMetadataRequestBrokerSet
。
ControllerBrokerRequestBatch.leaderAndIsrRequestMap数据的变化封装在addLeaderAndIsrRequestForBrokers方法中。
ControllerBrokerRequestBatch.updateMetadataRequestBrokerSet数据的变化封装在addUpdateMetadataRequestForBrokers方法中。
同时addLeaderAndIsrRequestForBrokers中也会调用addUpdateMetadataRequestForBrokers,也就是说有LEADER_AND_ISR请求时通常也会有UPDATE_METADATA请求。
// ControllerChannelManager.scala 317行 def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topicPartition: TopicPartition, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, replicas: Seq[Int], isNew: Boolean) { brokerIds.filter(_ >= 0).foreach { brokerId => val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty) val alreadyNew = result.get(topicPartition).exists(_.isNew) result.put(topicPartition, new LeaderAndIsrRequest.PartitionState(leaderIsrAndControllerEpoch.controllerEpoch, leaderIsrAndControllerEpoch.leaderAndIsr.leader, leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, leaderIsrAndControllerEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava, leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion, replicas.map(Integer.valueOf).asJava, isNew || alreadyNew)) } // **同时发送UPDATE_METADATA请求** addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition)) }
现在梳理三个环节中对应的变量的数据的变化逻辑:
onControllerFailover中的初次sendUpdateMetadataRequest调用
KafkaController.sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition])封装了单独(仅仅UPDATE_METADATA请求,不含有LEADER_AND_ISR请求)发送UPDATE_METADATA请求,在KafkaController中有多出调用sendUpdateMetadataRequest,onControllerFailover是其中之一。送进来的参数就是liveOrShuttingDownBrokerIds。可以粗略的理解为,在必要的时候(一些节点状态发生改变时等等,包括onBrokerStartup,onControllerFailover,onPartitionReassignment,onBrokerUpdate,onReplicasBecomeOffline,IsrChangeNotification)就给所有活着的和正在关闭的broker发送更新元数据的请求。
此次没有调用addLeaderAndIsrRequestForBrokers,所以只有UPDATE_METADATA请求。
副本状态机启动时handleStateChanges中的controllerBrokerRequestBatch.sendRequestsToBrokers
对于
NewReplica
,OnlineReplica
,OfflineReplica
三种状态都有可能触发LEADER_AND_ISR请求。从代码不难看出来,副本状态机启动时,对应目标状态是OnlineReplica
。 上面也讲述了,触发LEADER_AND_ISR请求就会级联触发UPDATE_METADATA请求。分区状态机启动时的handleStateChanges中的controllerBrokerRequestBatch.sendRequestsToBrokers
broker端处理LEADER_AND_ISR请求
入口:
KafkaApis.handleLeaderAndIsrRequest // KafkaApis.scala 156行
ReplicaManager.becomeLeaderOrFollower // ReplicaManager.scala 1018行
becomeLeaderOrFollower
逻辑:
- 通过controller的epoch的值 来确定是否有新controller出现过(controller发生转移),已确定这个请求是否合法(是否来自于老的controller上) // ReplicaManager.scala 1027行
- 检查分区leader的当前epoch是否正确(当前的小于请求中epoch就不正确,请求中的来自于从zk读取的),不正确的话用leaderAndIsrRequest.partitionStates替换掉当前的stateInfo // ReplicaManager.scala 1054行
- 根据当前broker id与请求中分区的leader的broker id比较是否相同,来划分出是become leader(broker id相同)还是become follower(broker id不同)。
- become leader的执行 makeLeaders // ReplicaManager.scala 1138行
- become follower的执行makeFollowers // ReplicaManager.scala 1218行
- 如果未启动HighWaterMarksCheckPointThread,则启动startHighWaterMarksCheckPointThread
- 其他一些后置和事件处理
makeLeaders
逻辑:
- 从Fetcher管理器中将LeaderAndIsrRequest中要成为leader的分区移除掉。
- Partition.makeLeader逻辑
- 用LeaderAndIsrRequest.PartitionState.basePartitionState.isr更新分区的inSyncReplicas字段
- 用LeaderAndIsrRequest.PartitionState.basePartitionState.leaderEpoch更新分区的leaderEpoch字段
- 用LeaderAndIsrRequest.PartitionState.basePartitionState.zkVersion更新分区的zkVersion字段
- 通过分区的leaderReplicaIdOpt字段判断是否是新leader,如果是新leader则更新leader副本实例的HW,即Replica.highWatermarkMetadata字段。更新leader副本实例的HW如下: // Partition.scala 289
- 尝试从Replica.highWatermarkMetadata处开始读取一个消息。并将这个highWatermarkMetadata和LSO LEO比较以判断是否合法
- 主要依赖Log.readUncommitted方法,这里面主要逻辑还是针对事务消息做的处理
- Replica.highWatermarkMetadata字段最初来自于哪里?来自于checkpoint文件与LEO的比较。checkpoint文件读取逻辑在Partition.getOrCreateReplica,ReplicaManager.highWatermarkCheckpoints方法中完成。文件名是
replication-offset-checkpoint
,位置在数据目录下。该文件的数据由定时调度定时写入,在ReplicaManager.checkpointHighWatermarks()处,刷的周期由replica.high.watermark.checkpoint.interval.ms
配置项决定,默认5s。LEO是靠Log load文件算出来的,靠Log.nextOffsetMetadata字段,nextOffsetMetadata又来自于log加载时和每次append之后的+1。参见locally部分**。
- 触发是否要增加hw的逻辑,是否增加hw的逻辑有: // Partition.scala 462行
- 取所有合法副本(isr中或者lag时间还没超标的副本)中的LEO的集合
- 取LEO的集合中最小的LEO作为新的HW
- 新的hw相比旧的hw增加了或者两者相同但是oldHighWatermark比newHighWatermark在更旧的segment上,那么更新hw。旧的hw是指Replica.highWatermarkMetadata字段
- 正常启动时,并不会更新,因为旧的hw和新的hw一样大。
makeFollowers
逻辑:
匹配到leader在alive brokers列表中时 做makeFollower切换,partition.makeFollower,切换成功后维护到partitionsToMakeFollower中
- partition.makeFollower主要是更新分区的controllerEpoch、inSyncReplicas、leaderEpoch、zkVersion、leaderReplicaIdOpt字段,更新的数据来源是LEADER_AND_ISR请求中带过来的。
将partitionsToMakeFollower中的分区从Fetcher管理器中移除
对将partitionsToMakeFollower中的分区tryCompleteDelayedProduce、tryCompleteDelayedFetch
梳理出partitionsToMakeFollowerWithLeaderAndOffset并准备做副本fetch
创建并启动fetch线程kafka.server.AbstractFetcherManager.addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]).addAndStartFetcherThread
创建fetch线程代码,ReplicaFetcherManager.scala 29行:
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { val prefix = threadNamePrefix.map(tp => s"${tp}:").getOrElse("") val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}" new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig, replicaManager, metrics, time, quotaManager) }
启动fetch线程代码,AbstractFetcherManager.scala 120行:
def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId, brokerIdAndFetcherId: BrokerIdAndFetcherId) { val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread) fetcherThread.start }
将分区加入fetcher管理器,并设置对应的offset,offset的值为分区副本的HW,后面trunct时的targetOffset就来自于这个地方。
BrokerAndInitialOffset( metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.brokerEndPoint(config.interBrokerListenerName), partition.getReplica().get.highWatermark.messageOffset) // ReplicaManager.scala 1301行
fetcherThreadMap(brokerIdAndFetcherId).addPartitions(initialFetchOffsets.map { case (tp, brokerAndInitOffset) =>
tp -> brokerAndInitOffset.initOffset
}) // AbstractFetcherManager.scala 138行
- 分区副本的HW的值来自于哪里?来自于checkpoint文件。跟leader的处理逻辑有一半相似,leader的还要选所有LEO中最小的,follower的不要,follower只要读checkpoint文件。读的细节部分可以参见makeLeaders章节的。
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!