单节点存储分析
存储相关类是怎么串起来的
LogManager StableClosureEventHandler AppendBatcher LogStorage(RocksDBLogStorage)是怎么串起来的?
整体关系如下:
org.rocksdb.AbstractWriteBatch.put(ColumnFamilyHandle, byte[], byte[])
RocksDBLogStorage.addDataBatch(LogEntry, WriteBatch, WriteContext)
RocksDBLogStorage.appendEntries(List
LogStorage.appendEntries(List
– LogManagerImpl.appendToStorage(List
– LogManagerImpl.AppendBatcher.flush()
– LogManagerImpl.StableClosureEventHandler.onEvent
StableClosureEventHandler自己几个部件的关系如下:
com.alipay.sofa.jraft.storage.impl.LogManagerImpl.StableClosureEventHandler
com.alipay.sofa.jraft.storage.impl.LogManagerImpl.disruptor
com.alipay.sofa.jraft.storage.impl.LogManagerImpl.diskQueue
事件的队列是又如何被offer的?关系如下:
LogManagerImpl.tryOfferEvent(StableClosure, EventTranslator
– LogManagerImpl.appendEntries(List
– NodeImpl.executeApplyingTasks(List
– NodeImpl.handleAppendEntriesRequest(AppendEntriesRequest, RpcRequestClosure) FollowerStableClosure
executeApplyingTasks LeaderStableClosure 这个是leader节点写日志,对应的闭包命名也能看出来。闭包回调中会通过ballotBox.commitAt去触发FSM状态机的apply。
handleAppendEntriesRequest FollowerStableClosure 这个是follower节点写日志,同样对应的闭包命名也是显而易见。
AppendBatcher的append与flush机制分析
AppendBatcher的内部属性有StableClosure的列表storage,cap,size,bufferSize,LogEntry的列表toAppend,lastId等。
append
append的主要逻辑有:
- 达到阈值,触发flush。达到阈值的判断逻辑是:size到达了cap的值,或者buf到达了maxAppendBufferSize的配置值。
- 将回调闭包StableClosure实例done加入storage列表。
- size 加1。
- toAppend将回调闭包中携带的Entries列表加进来。
- bufferSize将回调闭包中携带的Entries的data的大小加进来。
flush
flush的主要逻辑有:
- 将LogEntry的列表toAppend交由logStorage去做appendEntries(下面有更细的分析)。
- 度量数据处理。
- 异常与错误上报。
- toAppend清空。
- 返回lastId。lastId是toAppend列表中最后一个LogEntry的id。对于最后一个LogEntry的id是谁设置的,在哪里设置的,此系列文章中有其他地方对此做详细阐述。
LogStorage的appendEntries分析
在sofa-jrat中,LogStorage的默认实现是RocksDBLogStorage。也就是说存储默认是基于RocksDB完成。
append时对LogEntry分两类处理,一类是ENTRY_TYPE_CONFIGURATION,配置类的,另一类是其他,也就是数据类的。处理方式略有差异,所以要分两类处理。
db写完之后,writeCtx joinAll。 其实现RocksDBSegmentLogStorage对此做的逻辑是等待事件全部完成和hooks调用完成。
然后doSync。其实现RocksDBSegmentLogStorage对此做的逻辑是获取到最后一次SegmentFile,让其sync。sync的最下层的实现在com.alipay.sofa.jraft.storage.log.SegmentFile.fsync,通过调用MappedByteBuffer的force方法完成。
配置类的处理
key是LogEntry的id中的index。
value是用logEntryEncoder
对LogEntry进行编码后的值。
ColumnFamilyHandle有两个,一个是defaultHandle,一个是confHandle。
ColumnFamilyHandle是RocksDB的概念。相当于我们通常理解的表。
数据类的处理
key与value的约定与配置类相同。
但是ColumnFamilyHandle只有一个,是defaultHandle。
同时,还有一个能力是执行onDataAppend逻辑,onDataAppend逻辑就是让WriteContext finishJob
LastLogId读取逻辑
LastLogId读取的入口在LogManager
接口中,其签名如下:
com.alipay.sofa.jraft.storage.LogManager.getLastLogId(boolean)
读取时分为两种场景,一种是需要在flush之后读取的,一种是不需要的。
- 不需要在flush之后获取的逻辑比较简单,就是在
lastLogIndex
、firstLogIndex
、lastSnapshotId
中进行处理。如果lastLogIndex >= firstLogIndex,则用lastLogIndex返回,否则用lastSnapshotId返回。 - 需要在flush之后获取的逻辑就复杂一些了。下面详细分解。
需要在flush之后获取LastLogId的逻辑入口处也是要处理lastLogIndex与lastSnapshotId的比较,若相同,则直接返回lastSnapshotId,否则要走LastLogIdClosure和LAST_LOG_ID
这个事件机制。
基于上面的事件机制分析,我们直接阅读com.alipay.sofa.jraft.storage.impl.LogManagerImpl.StableClosureEventHandler.onEvent中对于LAST_LOG_ID
事件的处理即可。
在整个事件处理逻辑中,分为两类分支:
- 回调中带了Entries列表不为空的,交由
AppendBatcher
去做append。 - 回调中带了Entries列表为空的,交由
AppendBatcher
去做flush,flush后返回的id就是lastId,就是我们要获取的。整个flush的机制是怎么样的,上面已经分析了。
存储相关的初始化
NodeImpl中关于LogStorage的初始化
node中关于存储的初始化逻辑在com.alipay.sofa.jraft.core.NodeImpl.initLogStorage()中。
整个存储机制大概分层是:
LogManagerImpl –
准备存储机制所需要的选项LogManagerOptions:
- LogEntryCodecFactory
- LogStorage // 这个是真正完成存储职责的角色。本系统中是RocksDBLogStorage和RocksDBSegmentLogStorage。
- ConfigurationManager
- FsmCaller
- NodeMetrics
- DisruptorBufferSize
- RaftOptions
用上述步骤准备好的选项进行logManager.init(opts)
LogManagerImpl中init分析
int的主要逻辑是:
准备logStorage.init时需要的LogStorageOptions,其需要setConfigurationManager和LogEntryCodecFactory
调用logStorage.init。
从logStorage中获取firstLogIndex,lastLogIndex。并基于此构建diskId。
diskId的index是lastLogIndex,diskId的term是根据lastLogIndex获取的,获取逻辑是:
- 根据这个到logStorage中获取对应entry。如果取不到entry,那么term就是0。
- 如果能取到,就返回entry的term。此外,会根据配置项,决定是否对这个entry做checksum校验。
build disruptor实例。
- EventFactory是StableClosureEventFactory;
- RingBufferSize来自于配置项,默认1024;
- ThreadFactory是NamedThreadFactory,前缀是JRaft-LogManager-Disruptor-;
- ProducerType是MULTI;
- WaitStrategy是disruptor自带的TimeoutBlockingWaitStrategy。
disruptor handleEventsWith是StableClosureEventHandler。相应细节在相应用到的章节有阐述。
disruptor DefaultExceptionHandler是LogExceptionHandler
启动disruptor,并得到RingBuffer实例diskQueue
注册DisruptorMetricSet
logStorage中init分析
首先先看下创建实例的地方,在com.alipay.sofa.jraft.core.DefaultJRaftServiceFactory.createLogStorage(String, RaftOptions)中。
代码如下:
this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions);// NodeImpl中的代码
// DefaultJRaftServiceFactory中的代码
public LogStorage createLogStorage(final String uri, final RaftOptions raftOptions) {
Requires.requireTrue(StringUtils.isNotBlank(uri), "Blank log storage uri.");
return new RocksDBLogStorage(uri, raftOptions);
}
可以看到创建的是RocksDBLogStorage。
init逻辑分析:
处理DBOptions,默认是StorageOptionsFactory.getDefaultRocksDBOptions,主要设置的选项有:
CreateIfMissing true
CreateMissingColumnFamilies true
MaxOpenFiles -1 // -1表示不限制打开的文件数
MaxBackgroundCompactions Math.min(Utils.cpus(), 4)
MaxBackgroundFlushes 1 // 并发flush操作的最大数 一般都设置成1
按需开启统计,DebugStatistic。
开始initAndLoad。
columnFamilyDescriptors中加入两个ColumnFamilyDescriptor,一个是Configuration,一个是 DEFAULT_COLUMN_FAMILY,即default。
打开db。 openDB()。
主要代码:
com.alipay.sofa.jraft.storage.impl.RocksDBLogStorage.openDB
this.db = RocksDB.open(this.dbOptions, this.path, columnFamilyDescriptors, columnFamilyHandles);
加载db。 load()。
com.alipay.sofa.jraft.storage.impl.RocksDBLogStorage.load
用confHandle迭代存储的所有配置数据
每迭代到一条时,如果key的长度是8则build一个ConfigurationEntry实例,并纳入ConfigurationManager管理;
如果不是,则比较是否是meta/firstLogIndex,如果是,则用其值来设置FirstLogIndex,并truncatePrefixInBackground。
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!