LEO更新
leader节点
更新过程
写消息时直接更新LEO的代码
// kafka/log/Log.scala
/*641*/ private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
// ..........
// increment the log end offset
/*777*/ updateLogEndOffset(appendInfo.lastOffset + 1)
// ............
}
// 没有复杂的逻辑,落盘成功后加1即可.
单纯的更新LEO,没啥复杂的逻辑,消息落盘成功后加1即可。
那么,加1的源头是什么?即在什么基础上+1?LEO什么时候持久化?还是说不用持久化直接从消息数据推导出来? 什么时候初始化。
LEO初始化到更新的整个过程
LEO的计算依靠kafka.log.Log.nextOffsetMetadata
依据是:
- 先是取nextOffsetMetadata的messageOffset
// kafka.log.Log.append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int) 657
// 客户端默认写入时assignOffsets是true,用arthas即可观察watch -f kafka.log.Log append {params,returnObj,throwExp} 'params[1]' -x 3
if (assignOffsets) {
// assign offsets to the message set
val offset = new LongRef(nextOffsetMetadata.messageOffset)
....
- 再将其-1,赋值给appendInfo的lastOffset
appendInfo.lastOffset = offset.value - 1 // 默认 由broker分配offset 此处在推到LO
- 加1并更新LEO
updateLogEndOffset(appendInfo.lastOffset + 1)
//Log.scala 428行
private def updateLogEndOffset(messageOffset: Long) {
nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size)
}
步骤3其实就是在更新nextOffsetMetadata
还剩一个问题nextOffsetMetadata什么时候初始化?
在Log segment read的时候
// Log.scala216--219行
locally {
val startMs = time.milliseconds
val nextOffset = loadSegments()
/* Calculate the offset of the next message */
nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
nextOffsetMetadata依赖nextOffset,nextOffset又是靠segment load回来。
分析到此,LEO的维护过程基本清晰了,即segment加载时确定初始值,然后每次写完消息加1,如此往复即可,也没啥持久化的说法,从segment上天生能推导出来,不过在load的时候有个recovery的过程,待后面分析。
nextOffset – nextOffsetMetadata – offset – appendInfo.lastOffset – 再appendInfo.lastOffset加1后赋值给nextOffsetMetadata的messageOffset。
整个分析过程还被客户端的lastOffset给干扰了,其实默认模式下对与leader节点来说,客户端的lastOffset没啥用,可以通过arathas观察watch -f kafka.log.Log kafka$log$Log$$analyzeAndValidateRecords {params,returnObj,throwExp} ‘params[1]’ -x 3的返回结果便知道了。
replica节点
关于副本节点,先注意一个事情,在kafka.log.Log append方法调用时,assignOffsets入参值为false。
即:
副本同步时 在副本节点 assignOffsets是false,客户端请求时,在leader节点 assignOffsets是true,那么这样也就决定了副本节点更新LEO的逻辑不太一样
kafka.log.Log append 方法,LEO更新逻辑主要在这里,leader的、副本均都在这里
经分析,副本的LEO更新主要是靠客户端的lastOffset决定,副本的发消息的客户端其实就是leader节点,LEO的初始值由kafka.log.Log kafka$log$Log$$analyzeAndValidateRecords计算出来,他的计算过程就是依靠客户端请求体中 lastOffset。
贴一段arthas trace的结果
[arthas@11431]$ watch -f kafka.log.Log kafka$log$Log$$analyzeAndValidateRecords returnObj -x 3
Press Q or Ctrl+C to abort.
Affect(class count: 1 , method count: 1) cost in 63 ms, listenerId: 11
ts=2020-11-08 13:51:06; [cost=0.456113ms] result=@LogAppendInfo[
firstOffset=@Long[54],
lastOffset=@Long[54],
客户端lastOffset
主要涉及代码
// org.apache.kafka.common.record.MemoryRecords 48行
private final Iterable<MutableRecordBatch> batches = new Iterable<MutableRecordBatch>() {
@Override
public Iterator<MutableRecordBatch> iterator() {
return new RecordBatchIterator<>(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
}
};
// org.apache.kafka.common.record.ByteBufferLogInputStream 41行
public MutableRecordBatch nextBatch() throws IOException {
// ......
if (magic > RecordBatch.MAGIC_VALUE_V1)
return new DefaultRecordBatch(batchSlice);
else
return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
}
// DefaultRecordBatch 178行
@Override
public long lastOffset() {
return baseOffset() + lastOffsetDelta();
}
注意看其注释中描述的这个batch的格式:
* RecordBatch =>
* BaseOffset => Int64
* Length => Int32
* PartitionLeaderEpoch => Int32
* Magic => Int8
* CRC => Uint32
* Attributes => Int16
* LastOffsetDelta => Int32 // also serves as LastSequenceDelta
* FirstTimestamp => Int64
* MaxTimestamp => Int64
* ProducerId => Int64
* ProducerEpoch => Int16
* BaseSequence => Int32
* Records => [Record]
private int lastOffsetDelta() {
return buffer.getInt(LAST_OFFSET_DELTA_OFFSET);
}
@Override
public long baseOffset() {
return buffer.getLong(BASE_OFFSET_OFFSET);
}
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!