数据写盘

broker端消息正常写盘

写盘

Log.append
LogSegment.append
FileRecords.append
MemoryRecords.java.writeFullyTo

通过FileRecords的channel(FileChannel)字段完成写入

打开

FileChannel打开过程

LogSegment.open
FileRecords.open

public static FileRecords open(File file,
                               boolean mutable,
                               boolean fileAlreadyExists,
                               int initFileSize,
                               boolean preallocate) throws IOException {
    FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate); //  重点  logsegment创建channel的地方
    int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
    return new FileRecords(file, channel, 0, end, false);
}

刷盘

写时刷

Log.flush
LogSegment.flush
FileRecords.flush

public void flush() throws IOException {
    channel.force(true);  
}

触发时机1写消息时

if (unflushedMessages >= config.flushInterval)  // Log.scala 787行
  flush()

append消息的时候会去判断未刷消息数量是否达到阈值。阈值对应的配置项是flush.messages。不过,kafka默认的flush.messages是是Long最大值,则意味着在写消息时几乎是不刷的

定时刷

触发时机2定时调度

定时调度会根据上次刷消息到现在的时间差跟配置的flush间隔时间比较,到了再刷。不过,kafka默认的flush.ms是Long最大值,则意味着几乎是不定时刷的。不过你可以根据需要配置。

      scheduler.schedule("kafka-log-flusher",  // LogManager.scala 395行
                         flushDirtyLogs _,
                         delay = InitialTaskDelayMs,
                         period = flushCheckMs,
                         TimeUnit.MILLISECONDS)  

/**
   * Flush any log which has exceeded its flush interval and has unwritten messages.
   */
  private def flushDirtyLogs(): Unit = { // 定时刷脏  LogManager.scala 907行
    debug("Checking for dirty logs to flush...")

    for ((topicPartition, log) <- currentLogs.toList ++ futureLogs.toList) {
      try {
        val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
        debug("Checking if flush is needed on " + topicPartition.topic + " flush interval  " + log.config.flushMs +
              " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
        if(timeSinceLastFlush >= log.config.flushMs)
          log.flush
      } catch {
        case e: Throwable =>
          error("Error flushing topic " + topicPartition.topic, e)
      }
    }
  }
}

mysql也有类似的机制,也是可配置的。mysql若配置成事务提交时实时刷,利用了group commit的优化技术。 通常mysql的刷盘配置成双1,即binlog和redo log都是事务提交时实时刷,大促、双11、主从导数据等场景,可以不用双1配置,此处不做展开介绍。

消息数据recovery机制

recoveryPoint

刷盘成功后才会更新recoveryPoint,值为刷盘时的LEO。

recovery会先看.kafka_cleanshutdown文件是否存在,如果存在则是正常关系无需recovery。

recovery过程其实还是比较简单的,核心逻辑就是读取磁盘上真是存在的合法的消息的byte(validBytes),然后通过FileChannel truncateTo到这,具体逻辑如下:

  • 取出所有未刷盘的segment。 (Log.logSegments Log.scala 442行发起,靠ConcurrentSkipListMap的floorKey方法)
  • 用segment的baseOffset更新stateManager(ProducerStateManager),并对stateManager做快照。快照文件后缀是.snapshot。快照里写的是producerId和ProducerStateEntry信息。
  • 开始对未刷盘的segment做recovery。
    • 迭代segment对应的batch,后面的步骤都在这个迭代中完成。关于batch的解释:LogSegment 对应FileRecords,FileRecords里面又有多个FileChannelRecordBatch(就是这里提到的batch),FileChannelRecordBatch可以理解成record的一个虚拟的view。
    • 重建offset index。重建逻辑是:已经迭代到目前为止batch的总的合法bytes跟上次建offset index的差值是否 大于配置的建索引message byte阈值,大了就建offset index。叙述的比较啰嗦,看代码LogSegment.scala 286 行还是比较好懂的。建索引message byte阈值的配置项是index.interval.bytes,默认值4096,表示大概每4096 byte的消息在offsetindex插入一个索引。
    • 计算validBytes。其过程就是相当于迭代每一个batch,将batch的bytes累加,本意是把当前log在磁盘上存在的真是的bytes算出来,recovery时数据还在盘上,表示刷盘了嘛,只要是在盘上完整的消息就要,如果有最后那种写了半拉子的不要,要truncate掉。。
    • 如果存储的是MAGIC_VALUE_V2以上版本的消息的record,要随之更新leaderEpochCache,更新的epoch是batch.partitionLeaderEpoch,更新的offset是batch.baseOffset
    • 更新updateProducerState(应该是事务消息用的)
  • 根据上面迭代batch过程中计算出来的validBytes,进行对log truncate操作。
  • 调整offset index、time index到合法值。 返回被truncate掉的字节数,结束segment的recovery动作。
  • 再对stateManager(ProducerStateManager)做一次快照。

checkpoint 文件写盘

写盘是通过FD的sync强制写盘。

先写temp file,再通过java.nio.file.Files.move(Path, Path, CopyOption…),StandardCopyOption.ATOMIC_MOVE,原子性的完成文件重命名。

  • recovery-point-offset-checkpoint:表示已经刷写到磁盘的记录。recoveryPoint以下的数据都是已经刷到磁盘上的了。标志刷盘用的,在kafka.log.Log.flush(offset: Long)处更新。
  • replication-offset-checkpoint: 用来存储每个replica的HighWatermark的(high watermark (HW),表示已经被commited的message,HW以下的数据都是各个replicas间同步的,一致的。)

LeaderEpochFile

文件名:leader-epoch-checkpoint,在每个topic-partition分区目录下。

格式说明: epoch offset

override def toLine(entry: EpochEntry): String = s"${entry.epoch} ${entry.startOffset}"

epcoh 是 controller选leader时确定的,每选一次加1

offset 是每个 epoch的第一条消息的offset

写盘时机

LeaderEpochFile是通过LeaderEpochCache跟log等打交道;正常情况下log 每次append消息时,都会assign LeaderEpochCache,assign时会确认当前的epoch、offset与最后一次记录在内存中的是否一致,如果当前的两者都变大了则做flush强制写盘(LeaderEpochFileCache.scala 61行)。也就是虽每次append消息都会assign但是通常不写盘,因为没满足写盘条件。

除了log append消息之外,还有一些log truncate、recovery等动作均会触发flush动作,不再作细细分析。

OffsetCheckpointFile

OffsetCheckpointFile这是一个笼统的称呼,因为格式都一样,只要文件名或路径不同我们就可以存储不同的数据。

格式说明: topic 分区编号 offset

s"${entry._1.topic} ${entry._1.partition} ${entry._2}"
各种OffsetCheckpointFile
  • replication-offset-checkpoint文件名的文件是用来存放高水位位点的。 highwatermark-checkpoint线程定时刷盘。
//  ReplicaManager.scala 179行  
@volatile var highWatermarkCheckpoints = logManager.liveLogDirs.map(dir =>
    (dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
  • recovery-point-offset-checkpoint文件名的文件是用来存放recovery位点的,recovery point前面recovery机制章节有介绍。
  • log-start-offset-checkpoint文件名的文件是存放log 开始位点的。
  • cleaner-offset-checkpoint文件名的文件是存放log 清理位点的。

index文件写盘

AbstractIndex

AbstractIndex是基类,time index,offset index,transaction index都是基于其实现。

通过FileChannel map出MappedByteBuffer进行读写。

其flush方法会通过MappedByteBuffer的force方法强制写盘。

这些index文件的flush一般是在log segment flush时被级联flush。

kafka对零拷贝的使用

零拷贝在前面章节已经梳理过。 具体到kafka实际运用,有客户端向broker发送消息时:

//FileRecords.java   258行   
@Override
    public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException {
        long newSize = Math.min(channel.size(), end) - start;
        int oldSize = sizeInBytes();
        if (newSize < oldSize)
            throw new KafkaException(String.format(
                    "Size of FileRecords %s has been truncated during write: old size %d, new size %d",
                    file.getAbsolutePath(), oldSize, newSize));

        long position = start + offset;
        int count = Math.min(length, oldSize);
        final long bytesTransferred;
        if (destChannel instanceof TransportLayer) {
            TransportLayer tl = (TransportLayer) destChannel;
            bytesTransferred = tl.transferFrom(channel, position, count);
        } else {
            bytesTransferred = channel.transferTo(position, count, destChannel);
        }
        return bytesTransferred;
    }

// PlaintextTransportLayer.java   215行
    @Override
    public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
        return fileChannel.transferTo(position, count, socketChannel);
    }