controller如何选主及其启动过程分析
broker启动
- kafka.controller.KafkaController.startup() // controller启动 注册了启动Startup事件用于选择controller,即controller选主(KafkaController.scala 1141行) 以及RegisterBrokerAndReelec
- kafka.server.KafkaServer.startup() // server启动
- kafka.server.KafkaServerStartable.startup() // server启动器启动
- kafka.Kafka.main(args: Array[String]) // main方法入口
上面逻辑是按照 4调3 3调2 2调1的顺序进行的。
集群id的确定
获取(没有就生成)clusterid 数据放在zk /cluster/id下 形式如: {“version”:”1”,”id”:”L268Ylm_RZGN7Hasp4_xYA”} id生成规则就是 uuid转换成long再base64
broker id的确定
- 读取数据目录下(logDirs)目录下 meta.properties文件中记录的broker的id 672行有跟当前broker配置的id进行比较 不等则失败
- 如果配置没有指定broker id 则生成,生成办法是:靠/brokers/seqid的dataVersion加上设置的基数(config.maxReservedBrokerId)确定出自动生成broker id
controller选主与KafkaController.startup的主要逻辑
kafka集群需要controller角色来做协调,包括各topic-partition的leader的确定等等。
controller的选主的原理说起来也很简单,broker启动的时候,到zk创建临时节点,谁创建成功谁就是controller。
private def elect(): Unit = {
try {
zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))// KafkaController.scala 1203行
activeControllerId = config.brokerId
onControllerFailover() // controller 挂掉的时候的回调,broker刚起来的时候也走这里。 会从zk获取分区状态数据初始化Controller的的context
} catch {
case _: NodeExistsException =>
// If someone else has written the path, then
activeControllerId = zkClient.getControllerId.getOrElse(-1)
}
controller选主与onControllerFailover分析
onControllerFailover中做了很多事情,先挨个列举下重要环节:
readControllerEpochFromZooKeeper // 从zk读取controller的epoch
incrementControllerEpoch // 增加controller的epoch,并通过cas的方式写入zk,确保写如正确
向zk注册child listener brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
isrChangeNotificationHandler
向zk注册node listener preferredReplicaElectionHandler, partitionReassignmentHandler
调用zk删除deleteLogDirEventNotifications deleteIsrChangeNotifications
initializeControllerContext,初始化ControllerContext,下简称context。
向zk读取所有broker的id。 KafkaZkClient.getAllBrokersInCluster // TODO: 都是活的broker??
向zk读取所有的topic信息。 KafkaZkClient.getAllTopicsInCluster。
向zk注册PartitionModificationsHandler。
初始化context中的partitionReplicaAssignment。通过zk获取,getReplicaAssignmentForTopics。数据结构是 TopicPartition –> Seq[Int]。这个topic对应的副本节点id信息对后面的分区选leader非常重要,zk上的数据来源是在topic创建时产生的,在后面分区选leader有详细讲
初始化context中的partitionLeadershipInfo。此时是空的。数据结构是:TopicPartition –> LeaderIsrAndControllerEpoch。partitionLeadershipInfo又来自于哪里呢? 来自于zk记录的。数据包括:controllerEpoch=97, leader=0, leaderEpoch=53, isr=0, zkVersion=53, replicas=0, isNew=false。
zk上的信息示例如下:
[zk: 127.0.0.1:2182(CONNECTED) 8] get /brokers/topics/simon.test02.p1r2/partitions/0/state {"controller_epoch":49,"leader":0,"version":1,"leader_epoch":2,"isr":[0]}
- 迭代liveBrokers,为其向zk注册BrokerModificationsHandler。 - 从zk读取leaderIsrAndControllerEpochs,并更新context中的partitionLeadershipInfo,KafkaController.updateLeaderAndIsrCache。 - 启动channel manager,startChannelManager,做的事情就是为每一个broker启动一个kafka.controller.RequestSendThread线程,用于发送请求,这些个请求都在其队列中,发送策略是带有退避backoff的不断重试。线程名是`s"$name:Controller-${config.brokerId}-to-broker-${broker.id}-send-thread"`,且controllerChannelManager挂在context中。 起这个线程的作用是为后面的ControllerBrokerRequestBatch.sendRequestsToBrokers(414,455,472行等等)做准备,也就是说controller.sendRequest的都是通过其完成的。 ```scala private[controller] def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest], callback: AbstractResponse => Unit = null) = { controllerContext.controllerChannelManager.sendRequest(brokerId, apiKey, request, callback) } def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest], callback: AbstractResponse => Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) => stateInfo.messageQueue.put(QueueItem(apiKey, request, callback)) // 请求发到这个队列里,之前起的线程就能take到了,就可以发了。 case None => warn(s"Not sending request $request to broker $brokerId, since it is offline.") } } }
- 处理即将要被删除的topic。fetchTopicDeletionsInProgress,topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)。
给存活的broker发送update metadta请求。 sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)。
副本状态机启动, replicaStateMachine.startup()
分区状态机启动, partitionStateMachine.startup()
一些其他操作:maybeTriggerPartitionReassignment,tryTopicDeletion,onPreferredReplicaElection,kafkaScheduler.startup(),scheduleAutoLeaderRebalanceTask,tokenCleanScheduler.startup()。
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!