单节点存储分析

存储相关类是怎么串起来的

LogManager StableClosureEventHandler AppendBatcher LogStorage(RocksDBLogStorage)是怎么串起来的?

整体关系如下:

org.rocksdb.AbstractWriteBatch.put(ColumnFamilyHandle, byte[], byte[])

RocksDBLogStorage.addDataBatch(LogEntry, WriteBatch, WriteContext)

RocksDBLogStorage.appendEntries(List)

LogStorage.appendEntries(List)

​ – LogManagerImpl.appendToStorage(List)

​ – LogManagerImpl.AppendBatcher.flush()

​ – LogManagerImpl.StableClosureEventHandler.onEvent

StableClosureEventHandler自己几个部件的关系如下:

com.alipay.sofa.jraft.storage.impl.LogManagerImpl.StableClosureEventHandler

com.alipay.sofa.jraft.storage.impl.LogManagerImpl.disruptor

com.alipay.sofa.jraft.storage.impl.LogManagerImpl.diskQueue

事件的队列是又如何被offer的?关系如下:

LogManagerImpl.tryOfferEvent(StableClosure, EventTranslator)

– LogManagerImpl.appendEntries(List, StableClosure)

​ – NodeImpl.executeApplyingTasks(List) LeaderStableClosure

​ – NodeImpl.handleAppendEntriesRequest(AppendEntriesRequest, RpcRequestClosure) FollowerStableClosure

executeApplyingTasks LeaderStableClosure 这个是leader节点写日志,对应的闭包命名也能看出来。闭包回调中会通过ballotBox.commitAt去触发FSM状态机的apply。

handleAppendEntriesRequest FollowerStableClosure 这个是follower节点写日志,同样对应的闭包命名也是显而易见。

AppendBatcher的append与flush机制分析

AppendBatcher的内部属性有StableClosure的列表storage,cap,size,bufferSize,LogEntry的列表toAppend,lastId等。

append

append的主要逻辑有:

  • 达到阈值,触发flush。达到阈值的判断逻辑是:size到达了cap的值,或者buf到达了maxAppendBufferSize的配置值。
  • 将回调闭包StableClosure实例done加入storage列表。
  • size 加1。
  • toAppend将回调闭包中携带的Entries列表加进来。
  • bufferSize将回调闭包中携带的Entries的data的大小加进来。

flush

flush的主要逻辑有:

  • 将LogEntry的列表toAppend交由logStorage去做appendEntries(下面有更细的分析)。
  • 度量数据处理。
  • 异常与错误上报。
  • toAppend清空。
  • 返回lastId。lastId是toAppend列表中最后一个LogEntry的id。对于最后一个LogEntry的id是谁设置的,在哪里设置的,此系列文章中有其他地方对此做详细阐述。

LogStorage的appendEntries分析

在sofa-jrat中,LogStorage的默认实现是RocksDBLogStorage。也就是说存储默认是基于RocksDB完成

append时对LogEntry分两类处理,一类是ENTRY_TYPE_CONFIGURATION,配置类的,另一类是其他,也就是数据类的。处理方式略有差异,所以要分两类处理。

db写完之后,writeCtx joinAll。 其实现RocksDBSegmentLogStorage对此做的逻辑是等待事件全部完成和hooks调用完成。

然后doSync。其实现RocksDBSegmentLogStorage对此做的逻辑是获取到最后一次SegmentFile,让其sync。sync的最下层的实现在com.alipay.sofa.jraft.storage.log.SegmentFile.fsync,通过调用MappedByteBuffer的force方法完成。

配置类的处理

key是LogEntry的id中的index。

value是用logEntryEncoder对LogEntry进行编码后的值。

ColumnFamilyHandle有两个,一个是defaultHandle,一个是confHandle。

ColumnFamilyHandle是RocksDB的概念。相当于我们通常理解的表。

数据类的处理

key与value的约定与配置类相同。

但是ColumnFamilyHandle只有一个,是defaultHandle。

同时,还有一个能力是执行onDataAppend逻辑,onDataAppend逻辑就是让WriteContext finishJob

LastLogId读取逻辑

LastLogId读取的入口在LogManager接口中,其签名如下:

com.alipay.sofa.jraft.storage.LogManager.getLastLogId(boolean)

读取时分为两种场景,一种是需要在flush之后读取的,一种是不需要的。

  • 不需要在flush之后获取的逻辑比较简单,就是在lastLogIndexfirstLogIndexlastSnapshotId中进行处理。如果lastLogIndex >= firstLogIndex,则用lastLogIndex返回,否则用lastSnapshotId返回
  • 需要在flush之后获取的逻辑就复杂一些了。下面详细分解。

需要在flush之后获取LastLogId的逻辑入口处也是要处理lastLogIndex与lastSnapshotId的比较,若相同,则直接返回lastSnapshotId,否则要走LastLogIdClosure和LAST_LOG_ID这个事件机制。

基于上面的事件机制分析,我们直接阅读com.alipay.sofa.jraft.storage.impl.LogManagerImpl.StableClosureEventHandler.onEvent中对于LAST_LOG_ID事件的处理即可。

在整个事件处理逻辑中,分为两类分支:

  • 回调中带了Entries列表不为空的,交由AppendBatcher去做append
  • 回调中带了Entries列表为空的,交由AppendBatcher去做flush,flush后返回的id就是lastId,就是我们要获取的。整个flush的机制是怎么样的,上面已经分析了。

存储相关的初始化

NodeImpl中关于LogStorage的初始化

node中关于存储的初始化逻辑在com.alipay.sofa.jraft.core.NodeImpl.initLogStorage()中。

整个存储机制大概分层是:

LogManagerImpl –

准备存储机制所需要的选项LogManagerOptions:

  • LogEntryCodecFactory
  • LogStorage // 这个是真正完成存储职责的角色。本系统中是RocksDBLogStorage和RocksDBSegmentLogStorage。
  • ConfigurationManager
  • FsmCaller
  • NodeMetrics
  • DisruptorBufferSize
  • RaftOptions

用上述步骤准备好的选项进行logManager.init(opts)

LogManagerImpl中init分析

int的主要逻辑是:

  • 准备logStorage.init时需要的LogStorageOptions,其需要setConfigurationManager和LogEntryCodecFactory

  • 调用logStorage.init。

  • 从logStorage中获取firstLogIndex,lastLogIndex。并基于此构建diskId。

    diskId的index是lastLogIndex,diskId的term是根据lastLogIndex获取的,获取逻辑是:

    • 根据这个到logStorage中获取对应entry。如果取不到entry,那么term就是0。
    • 如果能取到,就返回entry的term。此外,会根据配置项,决定是否对这个entry做checksum校验。
  • build disruptor实例。

    • EventFactory是StableClosureEventFactory;
    • RingBufferSize来自于配置项,默认1024;
    • ThreadFactory是NamedThreadFactory,前缀是JRaft-LogManager-Disruptor-;
    • ProducerType是MULTI;
    • WaitStrategy是disruptor自带的TimeoutBlockingWaitStrategy。
  • disruptor handleEventsWith是StableClosureEventHandler。相应细节在相应用到的章节有阐述。

  • disruptor DefaultExceptionHandler是LogExceptionHandler

  • 启动disruptor,并得到RingBuffer实例diskQueue

  • 注册DisruptorMetricSet

logStorage中init分析

首先先看下创建实例的地方,在com.alipay.sofa.jraft.core.DefaultJRaftServiceFactory.createLogStorage(String, RaftOptions)中。

代码如下:

this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions);// NodeImpl中的代码    

// DefaultJRaftServiceFactory中的代码
public LogStorage createLogStorage(final String uri, final RaftOptions raftOptions) {
        Requires.requireTrue(StringUtils.isNotBlank(uri), "Blank log storage uri.");
        return new RocksDBLogStorage(uri, raftOptions);
    }

可以看到创建的是RocksDBLogStorage。

init逻辑分析:

处理DBOptions,默认是StorageOptionsFactory.getDefaultRocksDBOptions,主要设置的选项有:

CreateIfMissing true

CreateMissingColumnFamilies true

MaxOpenFiles -1 // -1表示不限制打开的文件数

MaxBackgroundCompactions Math.min(Utils.cpus(), 4)

MaxBackgroundFlushes 1 // 并发flush操作的最大数 一般都设置成1

按需开启统计,DebugStatistic。

开始initAndLoad。

columnFamilyDescriptors中加入两个ColumnFamilyDescriptor,一个是Configuration,一个是 DEFAULT_COLUMN_FAMILY,即default。

打开db。 openDB()。

主要代码:

com.alipay.sofa.jraft.storage.impl.RocksDBLogStorage.openDB

this.db = RocksDB.open(this.dbOptions, this.path, columnFamilyDescriptors, columnFamilyHandles);

加载db。 load()。

com.alipay.sofa.jraft.storage.impl.RocksDBLogStorage.load

用confHandle迭代存储的所有配置数据

每迭代到一条时,如果key的长度是8则build一个ConfigurationEntry实例,并纳入ConfigurationManager管理;

如果不是,则比较是否是meta/firstLogIndex,如果是,则用其值来设置FirstLogIndex,并truncatePrefixInBackground。