HW更新
leader节点
更新过程
kafka.cluster.Partition.maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long) // Partition.scala 462行
这个方法的注释作者清楚的解释了HW增加的相关逻辑与背景
/**
* Check and maybe increment the high watermark of the partition;
* this function can be triggered when
*
* 1. Partition ISR changed
* 2. Any replica's LEO changed
*
* The HW is determined by the smallest log end offset among all replicas that are in sync or are considered caught-up.
* This way, if a replica is considered caught-up, but its log end offset is smaller than HW, we will wait for this
* replica to catch up to the HW before advancing the HW. This helps the situation when the ISR only includes the
* leader replica and a follower tries to catch up. If we don't wait for the follower when advancing the HW, the
* follower's log end offset may keep falling behind the HW (determined by the leader's log end offset) and therefore
* will never be added to ISR.
*
* Returns true if the HW was incremented, and false otherwise.
* Note There is no need to acquire the leaderIsrUpdate lock here
* since all callers of this private API acquire that lock
*/
按其注释可以清晰地发现,触发leader HW增加的两个时机:
- 分区ISR发生改变时,expand or shrink
- 任意副本LEO发生变化时
上面这两点通过搜索maybeIncrementLeaderHW方法被谁调用时也能发现。除上述两点之外还有个leader选举的时候。
增长leader节点的HW逻辑是:
- 过滤出当前符合要求的副本。符合要求的判断标准是:副本在ISR中,或者,当前时间减去上次同步追上(catch-up)的时间的结果小于配置的最大lag时间(replicaLagTimeMaxMs) Partition.scala line 464。
- 取上述符合要求的所有副本的LEO的最小值作为算出来的新的HW
- 当符合条件时将这个算出来的新的HW赋值给leader的HW,即leaderReplica.highWatermark。那么怎么是符合条件?又有两点,满足任意之一即可,分别是:
- 旧的HW小于当前算出来的新的HW。如其注释所说,要保持HW单调递增。Ensure that the high watermark increases monotonically
- 旧的HW等于当前算出来的新的HW,且此时发生了segment切换。&& oldHighWatermark.onOlderSegment(newHighWatermark)。确认segment切换的办法就是比较baseOffset。
注意,HW是指当前能消费的消息对应的offset的下一个offset,而不是指当前能消费的消息对应的offset
初始化过程
kafka.cluster.Partition.getOrCreateReplica(replicaId: Int, isNew: Boolean) 创建Replica实例
初步看了下就是拿replication-offset-checkpoint 文件(此文件下面有详细解释)中记录的和log的LEO比较,小的一个作为初始的HW。
// Partition.scala 172行
def getOrCreateReplica(replicaId: Int = localBrokerId, isNew: Boolean = false): Replica = {
allReplicasMap.getAndMaybePut(replicaId, {
if (isReplicaLocal(replicaId)) {
val adminZkClient = new AdminZkClient(zkClient)
val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
val log = logManager.getOrCreateLog(topicPartition, config, isNew, replicaId == Request.FutureLocalReplicaId)
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) // 看下面的highWatermarkCheckpoints声明,不难发现hw记录在哪里的
val offsetMap = checkpoint.read()
if (!offsetMap.contains(topicPartition))
info(s"No checkpointed highwatermark is found for partition $topicPartition")
val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset)
new Replica(replicaId, topicPartition, time, offset, Some(log))
} else new Replica(replicaId, topicPartition, time)
})
}
@volatile var highWatermarkCheckpoints = logManager.liveLogDirs.map(dir =>
(dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
// ReplicaManager.HighWatermarkFilename replication-offset-checkpoint
不难发现,HW是记录在 replication-offset-checkpoint 文件中的。
文件内容大致如下:
0 // version
17 // size 一个topic+partition算一行,一共17行
__consumer_offsets 8 7143
__consumer_offsets 35 5594
__consumer_offsets 41 0
__consumer_offsets 23 0
__consumer_offsets 47 0
__consumer_offsets 38 0
__consumer_offsets 17 1444
__consumer_offsets 11 0
__consumer_offsets 2 0
__consumer_offsets 14 0
simon.test01.p1r2 0 55
// ... 省略一部分
内容格式的解析与组装在:
// OffsetCheckpointFile object Formatter extends CheckpointFileFormatter
override def fromLine(line: String): Option[(TopicPartition, Long)] = {
WhiteSpacesPattern.split(line) match {
case Array(topic, partition, offset) => // 说明是三个空格连接
Some(new TopicPartition(topic, partition.toInt), offset.toLong)
case _ => None
}
}
持久化过程
分析 OffsetCheckpointFile.write(Map<TopicPartition,Object>) line: 59 这个写入被调用的时机逻辑等等
kafka.server.ReplicaManager.checkpointHighWatermarks()
其注释是: // Flushes the highwatermark value for all partitions to the highwatermark file
明确的说了这就是将高水位新刷入磁盘文件的地方,逻辑也很简单,迭代 每一个副本,然后写入,此处不得不体现了scala语法的强大
// ReplicaManager.scala 1373行
// Flushes the highwatermark value for all partitions to the highwatermark file
def checkpointHighWatermarks() {
// ......
val replicasByDir = replicas.groupBy(_.log.get.dir.getParent)
for ((dir, reps) <- replicasByDir) {
val hwms = reps.map(r => r.topicPartition -> r.highWatermark.messageOffset).toMap
try {
highWatermarkCheckpoints.get(dir).foreach(_.write(hwms))
} catch {
case e: KafkaStorageException =>
error(s"Error while writing to highwatermark file in directory $dir", e)
}
}
高水位刷盘定时任务启动的地方:
// ReplicaManager.scala 240行
def startHighWaterMarksCheckPointThread() = {
if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks _, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
}// checkpointHighWatermarks _ 这个语法也比较奇怪
配置项信息如下,5s刷盘一次
replica.high.watermark.checkpoint.interval.ms | The frequency with which the high watermark is saved out to disk | long | 5000 |
---|---|---|---|
问题:HW如果在内存中更新,但是没来得及落盘,此时机器重启了,重启后会怎样处理?
replica节点
TODO: 继续分析副本节点hw更新步骤 1127
kafka.server.ReplicaFetcherThread.processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: ReplicaFetcherThread.PartitionData)
通过arthas观察发现 副本更新HW的地方如上,还有组协调也会不时更新,但这个我们先忽略。
stack kafka.cluster.Replica highWatermark_$eq
scala的函数名编译后有些奇怪哈,上面这个函数就是更新副本HW的函数,位于这里
// Replica.scala 139行
def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
//......
}
用上面的stack命令追踪并发消息时不难发现其堆栈如下:
ts=2020-11-28 11:15:48;thread_name=ReplicaFetcherThread-0-0;id=3d;is_daemon=false;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@kafka.cluster.Replica.highWatermark_$eq()
at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:122)
at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:43)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:183)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
at scala.Option.foreach(Option.scala:257)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
关键代码:
副本会比较自己当前的LEO和leader回过来的HW的大小,取其小的作为副本自己的HW并更新
// ReplicaFetcherThread.scala 96行
def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) {
// ......
// Append the leader's messages to the log
partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false) // 将从leader拿到的数据append到本地
// ......
val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
// ......
replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
// ......
}
看下面这个信息 就能对副本更新HW一目了然,两次fetch,第一次写数据,第二次更新HW(64–>65)
[arthas@19538]$ watch -f kafka.server.ReplicaFetcherThread processPartitionData params -x 3
Press Q or Ctrl+C to abort.
Affect(class count: 1 , method count: 2) cost in 51 ms, listenerId: 12
ts=2020-11-28 12:36:46; [cost=0.768568ms] result=@Object[][
@TopicPartition[
hash=@Integer[-1747502779],
partition=@Integer[0],
topic=@String[simon.test01.p1r2],
],
@Long[64],
@PartitionData[
underlying=@PartitionData[
error=@Errors[NONE],
highWatermark=@Long[64],
lastStableOffset=@Long[-1],
logStartOffset=@Long[55],
abortedTransactions=null,
records=@MemoryRecords[[(record=DefaultRecord(offset=64, timestamp=1606538205275, key=0 bytes, value=14 bytes))]],
],
],
]
ts=2020-11-28 12:36:46; [cost=5.366692ms] result=@Object[][
@TopicPartition[
hash=@Integer[-1747502779],
partition=@Integer[0],
topic=@String[simon.test01.p1r2],
],
@Long[64],
@PartitionData[
underlying=@PartitionData[
error=@Errors[NONE],
highWatermark=@Long[64],
lastStableOffset=@Long[-1],
logStartOffset=@Long[55],
abortedTransactions=null,
records=@MemoryRecords[[(record=DefaultRecord(offset=64, timestamp=1606538205275, key=0 bytes, value=14 bytes))]],
],
],
]
ts=2020-11-28 12:36:46; [cost=0.144628ms] result=@Object[][
@TopicPartition[
hash=@Integer[-1747502779],
partition=@Integer[0],
topic=@String[simon.test01.p1r2],
],
@Long[65],
@PartitionData[
underlying=@PartitionData[
error=@Errors[NONE],
highWatermark=@Long[65],
lastStableOffset=@Long[-1],
logStartOffset=@Long[55],
abortedTransactions=null,
records=@MemoryRecords[[]],
],
],
]
ts=2020-11-28 12:36:46; [cost=1.274486ms] result=@Object[][
@TopicPartition[
hash=@Integer[-1747502779],
partition=@Integer[0],
topic=@String[simon.test01.p1r2],
],
@Long[65],
@PartitionData[
underlying=@PartitionData[
error=@Errors[NONE],
highWatermark=@Long[65],
lastStableOffset=@Long[-1],
logStartOffset=@Long[55],
abortedTransactions=null,
records=@MemoryRecords[[]],
],
],
]
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!