HW更新

leader节点

更新过程

kafka.cluster.Partition.maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long) // Partition.scala 462行

这个方法的注释作者清楚的解释了HW增加的相关逻辑与背景

/**
 * Check and maybe increment the high watermark of the partition;
 * this function can be triggered when
 *
 * 1. Partition ISR changed
 * 2. Any replica's LEO changed
 *
 * The HW is determined by the smallest log end offset among all replicas that are in sync or are considered caught-up.
 * This way, if a replica is considered caught-up, but its log end offset is smaller than HW, we will wait for this
 * replica to catch up to the HW before advancing the HW. This helps the situation when the ISR only includes the
 * leader replica and a follower tries to catch up. If we don't wait for the follower when advancing the HW, the
 * follower's log end offset may keep falling behind the HW (determined by the leader's log end offset) and therefore
 * will never be added to ISR.
 *
 * Returns true if the HW was incremented, and false otherwise.
 * Note There is no need to acquire the leaderIsrUpdate lock here
 * since all callers of this private API acquire that lock
 */

按其注释可以清晰地发现,触发leader HW增加的两个时机:

  1. 分区ISR发生改变时,expand or shrink
  2. 任意副本LEO发生变化时

上面这两点通过搜索maybeIncrementLeaderHW方法被谁调用时也能发现。除上述两点之外还有个leader选举的时候。

增长leader节点的HW逻辑是:

  1. 过滤出当前符合要求的副本。符合要求的判断标准是:副本在ISR中,或者,当前时间减去上次同步追上(catch-up)的时间的结果小于配置的最大lag时间(replicaLagTimeMaxMs) Partition.scala line 464。
  2. 取上述符合要求的所有副本的LEO的最小值作为算出来的新的HW
  3. 当符合条件时将这个算出来的新的HW赋值给leader的HW,即leaderReplica.highWatermark。那么怎么是符合条件?又有两点,满足任意之一即可,分别是:
    1. 旧的HW小于当前算出来的新的HW。如其注释所说,要保持HW单调递增。Ensure that the high watermark increases monotonically
    2. 旧的HW等于当前算出来的新的HW,且此时发生了segment切换。&& oldHighWatermark.onOlderSegment(newHighWatermark)。确认segment切换的办法就是比较baseOffset。

注意,HW是指当前能消费的消息对应的offset的下一个offset,而不是指当前能消费的消息对应的offset

初始化过程

kafka.cluster.Partition.getOrCreateReplica(replicaId: Int, isNew: Boolean) 创建Replica实例

初步看了下就是拿replication-offset-checkpoint 文件(此文件下面有详细解释)中记录的和log的LEO比较,小的一个作为初始的HW

// Partition.scala  172行
def getOrCreateReplica(replicaId: Int = localBrokerId, isNew: Boolean = false): Replica = {
    allReplicasMap.getAndMaybePut(replicaId, {
      if (isReplicaLocal(replicaId)) {
        val adminZkClient = new AdminZkClient(zkClient)
        val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
        val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
        val log = logManager.getOrCreateLog(topicPartition, config, isNew, replicaId == Request.FutureLocalReplicaId)
        val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) // 看下面的highWatermarkCheckpoints声明,不难发现hw记录在哪里的
        val offsetMap = checkpoint.read()
        if (!offsetMap.contains(topicPartition))
          info(s"No checkpointed highwatermark is found for partition $topicPartition")
        val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset)
        new Replica(replicaId, topicPartition, time, offset, Some(log))
      } else new Replica(replicaId, topicPartition, time)
    })
  } 

@volatile var highWatermarkCheckpoints = logManager.liveLogDirs.map(dir =>
    (dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
// ReplicaManager.HighWatermarkFilename  replication-offset-checkpoint

不难发现,HW是记录在 replication-offset-checkpoint 文件中的

文件内容大致如下:

0  // version
17 // size 一个topic+partition算一行,一共17行
__consumer_offsets 8 7143
__consumer_offsets 35 5594
__consumer_offsets 41 0
__consumer_offsets 23 0
__consumer_offsets 47 0
__consumer_offsets 38 0
__consumer_offsets 17 1444
__consumer_offsets 11 0
__consumer_offsets 2 0
__consumer_offsets 14 0
simon.test01.p1r2 0 55
    // ... 省略一部分

内容格式的解析与组装在:

// OffsetCheckpointFile  object Formatter extends CheckpointFileFormatter    
override def fromLine(line: String): Option[(TopicPartition, Long)] = {
      WhiteSpacesPattern.split(line) match {
        case Array(topic, partition, offset) => // 说明是三个空格连接
          Some(new TopicPartition(topic, partition.toInt), offset.toLong)
        case _ => None
      }
    }

持久化过程

分析 OffsetCheckpointFile.write(Map<TopicPartition,Object>) line: 59 这个写入被调用的时机逻辑等等

kafka.server.ReplicaManager.checkpointHighWatermarks()

其注释是: // Flushes the highwatermark value for all partitions to the highwatermark file

明确的说了这就是将高水位新刷入磁盘文件的地方,逻辑也很简单,迭代 每一个副本,然后写入,此处不得不体现了scala语法的强大

// ReplicaManager.scala 1373行
  // Flushes the highwatermark value for all partitions to the highwatermark file
  def checkpointHighWatermarks() {
      // ......
val replicasByDir = replicas.groupBy(_.log.get.dir.getParent)
    for ((dir, reps) <- replicasByDir) {
      val hwms = reps.map(r => r.topicPartition -> r.highWatermark.messageOffset).toMap
      try {
        highWatermarkCheckpoints.get(dir).foreach(_.write(hwms))
      } catch {
        case e: KafkaStorageException =>
          error(s"Error while writing to highwatermark file in directory $dir", e)
      }
    }

高水位刷盘定时任务启动的地方:

// ReplicaManager.scala  240行  
def startHighWaterMarksCheckPointThread() = {
    if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
      scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks _, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
  }//    checkpointHighWatermarks _  这个语法也比较奇怪

配置项信息如下,5s刷盘一次

replica.high.watermark.checkpoint.interval.ms The frequency with which the high watermark is saved out to disk long 5000

问题:HW如果在内存中更新,但是没来得及落盘,此时机器重启了,重启后会怎样处理?

replica节点

TODO: 继续分析副本节点hw更新步骤 1127

kafka.server.ReplicaFetcherThread.processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: ReplicaFetcherThread.PartitionData)

通过arthas观察发现 副本更新HW的地方如上,还有组协调也会不时更新,但这个我们先忽略。

stack  kafka.cluster.Replica  highWatermark_$eq

scala的函数名编译后有些奇怪哈,上面这个函数就是更新副本HW的函数,位于这里

// Replica.scala 139行 
def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
      //......
  }

用上面的stack命令追踪并发消息时不难发现其堆栈如下:

ts=2020-11-28 11:15:48;thread_name=ReplicaFetcherThread-0-0;id=3d;is_daemon=false;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
    @kafka.cluster.Replica.highWatermark_$eq()
        at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:122)
        at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:43)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:183)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
        at scala.Option.foreach(Option.scala:257)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)

关键代码:

副本会比较自己当前的LEO和leader回过来的HW的大小,取其小的作为副本自己的HW并更新

// ReplicaFetcherThread.scala 96行
def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) {
    // ......
    // Append the leader's messages to the log
    partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false) // 将从leader拿到的数据append到本地
    // ......
val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
    // ......
    replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
    // ......
}

看下面这个信息 就能对副本更新HW一目了然,两次fetch,第一次写数据,第二次更新HW(64–>65)

[arthas@19538]$ watch -f  kafka.server.ReplicaFetcherThread processPartitionData params -x 3
Press Q or Ctrl+C to abort.
Affect(class count: 1 , method count: 2) cost in 51 ms, listenerId: 12
ts=2020-11-28 12:36:46; [cost=0.768568ms] result=@Object[][
    @TopicPartition[
        hash=@Integer[-1747502779],
        partition=@Integer[0],
        topic=@String[simon.test01.p1r2],
    ],
    @Long[64],
    @PartitionData[
        underlying=@PartitionData[
            error=@Errors[NONE],
            highWatermark=@Long[64],
            lastStableOffset=@Long[-1],
            logStartOffset=@Long[55],
            abortedTransactions=null,
            records=@MemoryRecords[[(record=DefaultRecord(offset=64, timestamp=1606538205275, key=0 bytes, value=14 bytes))]],
        ],
    ],
]
ts=2020-11-28 12:36:46; [cost=5.366692ms] result=@Object[][
    @TopicPartition[
        hash=@Integer[-1747502779],
        partition=@Integer[0],
        topic=@String[simon.test01.p1r2],
    ],
    @Long[64],
    @PartitionData[
        underlying=@PartitionData[
            error=@Errors[NONE],
            highWatermark=@Long[64],
            lastStableOffset=@Long[-1],
            logStartOffset=@Long[55],
            abortedTransactions=null,
            records=@MemoryRecords[[(record=DefaultRecord(offset=64, timestamp=1606538205275, key=0 bytes, value=14 bytes))]],
        ],
    ],
]
ts=2020-11-28 12:36:46; [cost=0.144628ms] result=@Object[][
    @TopicPartition[
        hash=@Integer[-1747502779],
        partition=@Integer[0],
        topic=@String[simon.test01.p1r2],
    ],
    @Long[65],
    @PartitionData[
        underlying=@PartitionData[
            error=@Errors[NONE],
            highWatermark=@Long[65],
            lastStableOffset=@Long[-1],
            logStartOffset=@Long[55],
            abortedTransactions=null,
            records=@MemoryRecords[[]],
        ],
    ],
]
ts=2020-11-28 12:36:46; [cost=1.274486ms] result=@Object[][
    @TopicPartition[
        hash=@Integer[-1747502779],
        partition=@Integer[0],
        topic=@String[simon.test01.p1r2],
    ],
    @Long[65],
    @PartitionData[
        underlying=@PartitionData[
            error=@Errors[NONE],
            highWatermark=@Long[65],
            lastStableOffset=@Long[-1],
            logStartOffset=@Long[55],
            abortedTransactions=null,
            records=@MemoryRecords[[]],
        ],
    ],
]