数据写盘
broker端消息正常写盘
写盘
Log.append
LogSegment.append
FileRecords.append
MemoryRecords.java.writeFullyTo
通过FileRecords的channel(FileChannel)字段完成写入
打开
FileChannel打开过程
LogSegment.open
FileRecords.open
public static FileRecords open(File file,
boolean mutable,
boolean fileAlreadyExists,
int initFileSize,
boolean preallocate) throws IOException {
FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate); // 重点 logsegment创建channel的地方
int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
return new FileRecords(file, channel, 0, end, false);
}
刷盘
写时刷
Log.flush
LogSegment.flush
FileRecords.flush
public void flush() throws IOException {
channel.force(true);
}
触发时机1写消息时
if (unflushedMessages >= config.flushInterval) // Log.scala 787行
flush()
append消息的时候会去判断未刷消息数量是否达到阈值。阈值对应的配置项是flush.messages。不过,kafka默认的flush.messages是是Long最大值,则意味着在写消息时几乎是不刷的。
定时刷
触发时机2定时调度
定时调度会根据上次刷消息到现在的时间差跟配置的flush间隔时间比较,到了再刷。不过,kafka默认的flush.ms是Long最大值,则意味着几乎是不定时刷的。不过你可以根据需要配置。
scheduler.schedule("kafka-log-flusher", // LogManager.scala 395行
flushDirtyLogs _,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
/**
* Flush any log which has exceeded its flush interval and has unwritten messages.
*/
private def flushDirtyLogs(): Unit = { // 定时刷脏 LogManager.scala 907行
debug("Checking for dirty logs to flush...")
for ((topicPartition, log) <- currentLogs.toList ++ futureLogs.toList) {
try {
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
debug("Checking if flush is needed on " + topicPartition.topic + " flush interval " + log.config.flushMs +
" last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
if(timeSinceLastFlush >= log.config.flushMs)
log.flush
} catch {
case e: Throwable =>
error("Error flushing topic " + topicPartition.topic, e)
}
}
}
}
mysql也有类似的机制,也是可配置的。mysql若配置成事务提交时实时刷,利用了group commit的优化技术。 通常mysql的刷盘配置成双1,即binlog和redo log都是事务提交时实时刷,大促、双11、主从导数据等场景,可以不用双1配置,此处不做展开介绍。
消息数据recovery机制
recoveryPoint
刷盘成功后才会更新recoveryPoint
,值为刷盘时的LEO。
recovery会先看.kafka_cleanshutdown
文件是否存在,如果存在则是正常关系无需recovery。
recovery过程其实还是比较简单的,核心逻辑就是读取磁盘上真是存在的合法的消息的byte(validBytes),然后通过FileChannel truncateTo到这,具体逻辑如下:
- 取出所有未刷盘的segment。 (Log.logSegments Log.scala 442行发起,靠ConcurrentSkipListMap的floorKey方法)
- 用segment的baseOffset更新stateManager(ProducerStateManager),并对stateManager做快照。快照文件后缀是.snapshot。快照里写的是producerId和ProducerStateEntry信息。
- 开始对未刷盘的segment做recovery。
- 迭代segment对应的batch,后面的步骤都在这个迭代中完成。关于batch的解释:LogSegment 对应FileRecords,FileRecords里面又有多个FileChannelRecordBatch(就是这里提到的batch),FileChannelRecordBatch可以理解成record的一个虚拟的view。
- 重建offset index。重建逻辑是:已经迭代到目前为止batch的总的合法bytes跟上次建offset index的差值是否 大于配置的建索引message byte阈值,大了就建offset index。叙述的比较啰嗦,看代码LogSegment.scala 286 行还是比较好懂的。建索引message byte阈值的配置项是
index.interval.bytes
,默认值4096,表示大概每4096 byte的消息在offsetindex插入一个索引。 - 计算validBytes。其过程就是相当于迭代每一个batch,将batch的bytes累加,本意是把当前log在磁盘上存在的真是的bytes算出来,recovery时数据还在盘上,表示刷盘了嘛,只要是在盘上完整的消息就要,如果有最后那种写了半拉子的不要,要truncate掉。。
- 如果存储的是MAGIC_VALUE_V2以上版本的消息的record,要随之更新leaderEpochCache,更新的epoch是batch.partitionLeaderEpoch,更新的offset是batch.baseOffset
- 更新updateProducerState(应该是事务消息用的)
- 根据上面迭代batch过程中计算出来的validBytes,进行对log truncate操作。
- 调整offset index、time index到合法值。 返回被truncate掉的字节数,结束segment的recovery动作。
- 再对stateManager(ProducerStateManager)做一次快照。
checkpoint 文件写盘
写盘是通过FD的sync强制写盘。
先写temp file,再通过java.nio.file.Files.move(Path, Path, CopyOption…),StandardCopyOption.ATOMIC_MOVE,原子性的完成文件重命名。
- recovery-point-offset-checkpoint:表示已经刷写到磁盘的记录。recoveryPoint以下的数据都是已经刷到磁盘上的了。标志刷盘用的,在kafka.log.Log.flush(offset: Long)处更新。
- replication-offset-checkpoint: 用来存储每个replica的HighWatermark的(high watermark (HW),表示已经被commited的message,HW以下的数据都是各个replicas间同步的,一致的。)
LeaderEpochFile
文件名:leader-epoch-checkpoint
,在每个topic-partition分区目录下。
格式说明: epoch offset
override def toLine(entry: EpochEntry): String = s"${entry.epoch} ${entry.startOffset}"
epcoh 是 controller选leader时确定的,每选一次加1
offset 是每个 epoch的第一条消息的offset
写盘时机
LeaderEpochFile是通过LeaderEpochCache跟log等打交道;正常情况下log 每次append消息时,都会assign LeaderEpochCache,assign时会确认当前的epoch、offset与最后一次记录在内存中的是否一致,如果当前的两者都变大了则做flush强制写盘(LeaderEpochFileCache.scala 61行)。也就是虽每次append消息都会assign但是通常不写盘,因为没满足写盘条件。
除了log append消息之外,还有一些log truncate、recovery等动作均会触发flush动作,不再作细细分析。
OffsetCheckpointFile
OffsetCheckpointFile这是一个笼统的称呼,因为格式都一样,只要文件名或路径不同我们就可以存储不同的数据。
格式说明: topic 分区编号 offset
s"${entry._1.topic} ${entry._1.partition} ${entry._2}"
各种OffsetCheckpointFile
replication-offset-checkpoint
文件名的文件是用来存放高水位位点的。 highwatermark-checkpoint线程定时刷盘。
// ReplicaManager.scala 179行
@volatile var highWatermarkCheckpoints = logManager.liveLogDirs.map(dir =>
(dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
recovery-point-offset-checkpoint
文件名的文件是用来存放recovery位点的,recovery point前面recovery机制章节有介绍。log-start-offset-checkpoint
文件名的文件是存放log 开始位点的。cleaner-offset-checkpoint
文件名的文件是存放log 清理位点的。
index文件写盘
AbstractIndex
AbstractIndex
是基类,time index,offset index,transaction index都是基于其实现。
通过FileChannel map出MappedByteBuffer进行读写。
其flush方法会通过MappedByteBuffer的force方法强制写盘。
这些index文件的flush一般是在log segment flush时被级联flush。
kafka对零拷贝的使用
零拷贝在前面章节已经梳理过。 具体到kafka实际运用,有客户端向broker发送消息时:
//FileRecords.java 258行
@Override
public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException {
long newSize = Math.min(channel.size(), end) - start;
int oldSize = sizeInBytes();
if (newSize < oldSize)
throw new KafkaException(String.format(
"Size of FileRecords %s has been truncated during write: old size %d, new size %d",
file.getAbsolutePath(), oldSize, newSize));
long position = start + offset;
int count = Math.min(length, oldSize);
final long bytesTransferred;
if (destChannel instanceof TransportLayer) {
TransportLayer tl = (TransportLayer) destChannel;
bytesTransferred = tl.transferFrom(channel, position, count);
} else {
bytesTransferred = channel.transferTo(position, count, destChannel);
}
return bytesTransferred;
}
// PlaintextTransportLayer.java 215行
@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, socketChannel);
}
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!