LEO更新

leader节点

更新过程

写消息时直接更新LEO的代码
// kafka/log/Log.scala
/*641*/  private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
 // ..........
// increment the log end offset
/*777*/ updateLogEndOffset(appendInfo.lastOffset + 1)
// ............
}

// 没有复杂的逻辑,落盘成功后加1即可.

单纯的更新LEO,没啥复杂的逻辑,消息落盘成功后加1即可。

那么,加1的源头是什么?即在什么基础上+1?LEO什么时候持久化?还是说不用持久化直接从消息数据推导出来? 什么时候初始化。

LEO初始化到更新的整个过程

LEO的计算依靠kafka.log.Log.nextOffsetMetadata

依据是:

  1. 先是取nextOffsetMetadata的messageOffset
//  kafka.log.Log.append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int)  657
// 客户端默认写入时assignOffsets是true,用arthas即可观察watch -f kafka.log.Log append {params,returnObj,throwExp} 'params[1]' -x 3
 if (assignOffsets) {
          // assign offsets to the message set
          val offset = new LongRef(nextOffsetMetadata.messageOffset)
     ....
  1. 再将其-1,赋值给appendInfo的lastOffset
appendInfo.lastOffset = offset.value - 1 // 默认 由broker分配offset  此处在推到LO
  1. 加1并更新LEO
   updateLogEndOffset(appendInfo.lastOffset + 1)

//Log.scala 428行
  private def updateLogEndOffset(messageOffset: Long) {
    nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size)
  }

步骤3其实就是在更新nextOffsetMetadata

还剩一个问题nextOffsetMetadata什么时候初始化?

在Log segment read的时候

 
// Log.scala216--219行
locally {
    val startMs = time.milliseconds

    val nextOffset = loadSegments()

    /* Calculate the offset of the next message */
    nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)

nextOffsetMetadata依赖nextOffset,nextOffset又是靠segment load回来。

分析到此,LEO的维护过程基本清晰了,即segment加载时确定初始值,然后每次写完消息加1,如此往复即可,也没啥持久化的说法,从segment上天生能推导出来,不过在load的时候有个recovery的过程,待后面分析。

nextOffset – nextOffsetMetadata – offset – appendInfo.lastOffset – 再appendInfo.lastOffset加1后赋值给nextOffsetMetadata的messageOffset。

整个分析过程还被客户端的lastOffset给干扰了,其实默认模式下对与leader节点来说,客户端的lastOffset没啥用,可以通过arathas观察watch -f kafka.log.Log kafka$log$Log$$analyzeAndValidateRecords {params,returnObj,throwExp} ‘params[1]’ -x 3的返回结果便知道了。

replica节点

关于副本节点,先注意一个事情,在kafka.log.Log append方法调用时,assignOffsets入参值为false。

即:

副本同步时 在副本节点 assignOffsets是false,客户端请求时,在leader节点 assignOffsets是true,那么这样也就决定了副本节点更新LEO的逻辑不太一样

kafka.log.Log append 方法,LEO更新逻辑主要在这里,leader的、副本均都在这里

经分析,副本的LEO更新主要是靠客户端的lastOffset决定,副本的发消息的客户端其实就是leader节点,LEO的初始值由kafka.log.Log kafka$log$Log$$analyzeAndValidateRecords计算出来,他的计算过程就是依靠客户端请求体中 lastOffset。

贴一段arthas trace的结果

[arthas@11431]$ watch -f kafka.log.Log kafka$log$Log$$analyzeAndValidateRecords returnObj  -x 3
Press Q or Ctrl+C to abort.
Affect(class count: 1 , method count: 1) cost in 63 ms, listenerId: 11
ts=2020-11-08 13:51:06; [cost=0.456113ms] result=@LogAppendInfo[
    firstOffset=@Long[54],
    lastOffset=@Long[54],

客户端lastOffset

主要涉及代码

// org.apache.kafka.common.record.MemoryRecords 48行
    private final Iterable<MutableRecordBatch> batches = new Iterable<MutableRecordBatch>() {
        @Override
        public Iterator<MutableRecordBatch> iterator() {
            return new RecordBatchIterator<>(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
        }
    };
// org.apache.kafka.common.record.ByteBufferLogInputStream 41行
    public MutableRecordBatch nextBatch() throws IOException {
// ......
        if (magic > RecordBatch.MAGIC_VALUE_V1)
            return new DefaultRecordBatch(batchSlice);
        else
            return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
    }
// DefaultRecordBatch   178行    
@Override
    public long lastOffset() {
        return baseOffset() + lastOffsetDelta();
    }

注意看其注释中描述的这个batch的格式:

* RecordBatch =>
*  BaseOffset => Int64
*  Length => Int32
*  PartitionLeaderEpoch => Int32
*  Magic => Int8
*  CRC => Uint32
*  Attributes => Int16
*  LastOffsetDelta => Int32 // also serves as LastSequenceDelta
*  FirstTimestamp => Int64
*  MaxTimestamp => Int64
*  ProducerId => Int64
*  ProducerEpoch => Int16
*  BaseSequence => Int32
*  Records => [Record]

private int lastOffsetDelta() {
    return buffer.getInt(LAST_OFFSET_DELTA_OFFSET);
}

@Override
public long baseOffset() {
    return buffer.getLong(BASE_OFFSET_OFFSET);
}