快照机制分析

我有几个疑问:

SnapshotPath根据什么确定?

这个目录下都有什么文件?

这些文件是什么时候怎们样的方式写入进来的?

快照存储的逻辑是怎么样的?

给follower安装快照的时机是怎么样的?

安装快照请求带了哪些参数过去?

follower挂掉一段时间再起来后如何优化那么多没被apply的log再在状态机中apply一遍的消耗?

这个目录下的文件是:

– /tmp/server1/snapshot/snapshot_{lastSnapshotIndex}/

– __raft_snapshot_meta

– data

/tmp/server1/snapshot 是根据 NodeOptions 的getSnapshotUri()确定的,此处是用的默认值。

lastSnapshotIndex维护在LocalSnapshotStorage中,除了init时对此字段赋值外,在snapshot存储完成时,也会更新改字段的值。

 // com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotStorage.close(LocalSnapshotWriter, boolean)

final long newIndex = writer.getSnapshotIndex();  // 217行

this.lastSnapshotIndex = newIndex;  // 241行

快照存储

整个链路

快照存储的整个链路是:

flowchart TD
A(snapshotTimer:imerRepeatedTimer)-.->|计时器异步|B(NodeImpl.handleSnapshotTimeout)
B-->C(SnapshotExecutorImpl.doSnapshot)
C-->F(FSMCallerImpl.onSnapshotSave:enqueueTask:TaskType.SNAPSHOT_SAVE)
F-.->|disruptor事件异步|G(FSMCallerImpl.doSnapshotSave)
G-->H(业务自定义快照保存逻辑CounterStateMachine.onSnapshotSave)
H-.->|闭包回调异步|I(业务自定义将快照数据写入 $SnapshotWriter.path/data)
I-->D(SaveSnapshotDone.continueRun)
D-->E(SnapshotExecutorImpl.onSnapshotSaveDone 保存元数据)

快照保存逻辑,触发快照保存(真正save的逻辑由业务侧实现),异步触发注册的onSnapshotSave回调:

  • stopped、downloadingSnapshot、savingSnapshot几种状态判断,不处理
  • lastSnapshotIndex与LastAppliedIndex之间的差值未达到配置的SnapshotLogIndexMargin ,不处理。
  • 触发快照保存。触发fsmCaller.onSnapshotSave,其中再触发StateMachine.onSnapshotSave。并将SaveSnapshotDone通过disruptor队列异步执行。注意:具体的快照保存逻辑是由业务自己在状态机中实现,如,上面的计数器的例子。
    • 准备SnapshotMeta。元数据主要有 当前的以及之前的peers与learners,LastIncludedIndex,LastIncludedTerm。LastIncludedIndex就是lastAppliedIndex。也就是状态机apply到哪里了,快照就保存到哪里。 // FSMCallerImpl.java 554行~577行
    • 调用业务侧实现的状态机的onSnapshotSave。此处是真正写快照数据的地方,计数器例子中快照数据就是当前计数器计数到哪里了,即计数器当前值是多少,这样在安装快照过后就避免了副本节点重新将那么多没有被apply的日志在状态机中再apply一遍了。对应文件是/tmp/server1/snapshot/snapshot_24/data
    • 业务实现的状态机的onSnapshotSave中会异步触发回调逻辑保存元数据,writer.saveMeta(meta) //SnapshotExecutorImpl.java 386行

这个逻辑在leader与follower节点都会执行。

快照的元数据是在 LocalSnapshotStorage.close 中完成,每当快照存储完成后,便会调用close这个方法,这个方法里面调用了元数据保存。LocalSnapshotStorage.close具体逻辑是:

  • 判断writer的code,不成功的直接返回
  • 调用LocalSnapshotWriter.sync()触发元数据保存。
    • 这个数据写在临时目录下的/tmp/server2/snapshot/temp/__raft_snapshot_meta。具体代码可以参见下面段落中的细节代码。
    • 通过前面准备的数据构建LocalSnapshotPbMeta。进一部分析,有两部分数据:
      • 一是SnapshotMeta。在上面段落中触发快照保存 准备SnapshotMeta中已经阐述了。
      • 另一是fileMap。这个是filename和file元数据LocalFileMeta的映射。这个是在业务侧自行实现的状态机的onSnapshotSave中维护的:writer.addFile(“data”)。也就是你快照写了212哪个数据文件就维护进来,LocalFileMeta可以为null。
    • 通过com.alipay.sofa.jraft.storage.io.ProtoBufFile.save(Message, boolean)将LocalSnapshotPbMeta实例以protobuf序列化后写磁盘落地。注意此处写的文件的内容,会先在protobuf序列化后的数据前面写上这个报文的name以及length等。具体可以参见前面《protobuf存储数据》部分。
  • 将临时快照目录重名成当前SnapshotIndex对应的目录,即类似/tmp/server2/snapshot/snapshot_30。示例中的30是当前SnapshotIndex
  • 对oldIndex解除引用计数,对newIndex增加引用计数。

相关细节代码与调用堆栈

写元数据是先写在临时目录下,确定这个临时目录的代码是:

// LocalSnapshotStorage.java 275行
public SnapshotWriter create(final boolean fromEmpty) {
// ...
final String snapshotPath = this.path + File.separator + TEMP_PATH;
// ...
writer = new LocalSnapshotWriter(snapshotPath, this, this.raftOptions)

snapshot存储完成后回调含触发元数据保存的调用栈如下:

ts=2022-01-01 17:25:16;thread_name=JRaft-Closure-Executor-1;id=4e;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
    @com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotStorage.close() // 这里会通过调用LocalSnapshotWriter.sync()触发元数据保存
        at com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotWriter.close(LocalSnapshotWriter.java:98)
        at com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotWriter.close(LocalSnapshotWriter.java:93)
        at com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl.onSnapshotSaveDone(SnapshotExecutorImpl.java:396)
        at com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl$SaveSnapshotDone.continueRun(SnapshotExecutorImpl.java:145)
        at com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl$SaveSnapshotDone.lambda$run$0(SnapshotExecutorImpl.java:141)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

SaveSnapshotDone什么时候进入disruptor队列?

// com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl.doSnapshot(Closure)
final SaveSnapshotDone saveSnapshotDone = new SaveSnapshotDone(writer, done, null); // 350行
if (!this.fsmCaller.onSnapshotSave(saveSnapshotDone)) { // SaveSnapshotDone进入disruptor队列
  Utils.runClosureInThread(done, new Status(RaftError.EHOSTDOWN, "The raft node is down."));
  return;
}

调用栈如下:

Affect(class count: 1 , method count: 1) cost in 77 ms, listenerId: 2
ts=2022-01-01 17:28:46;thread_name=JRaft-Closure-Executor-3;id=50;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
    @com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl.doSnapshot()
        at com.alipay.sofa.jraft.core.NodeImpl.doSnapshot(NodeImpl.java:3098)
        at com.alipay.sofa.jraft.core.NodeImpl.lambda$handleSnapshotTimeout$0(NodeImpl.java:607)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

handleSnapshotTimeout是snapshot定时器的回调

this.snapshotTimer = new RepeatedTimer(name, this.options.getSnapshotIntervalSecs() * 1000,
                                       TIMER_FACTORY.getSnapshotTimer(this.options.isSharedSnapshotTimer(), name)) {

  private volatile boolean firstSchedule = true;

  @Override
  protected void onTrigger() {
    handleSnapshotTimeout(); // NodeImpl.java 953行
  }
// ...

执行快照保存堆栈(由业务侧定义快照保存逻辑,即快照中存什么?)

// stack com.alipay.sofa.jraft.StateMachine onSnapshotSave
ts=2022-01-02 08:29:48;thread_name=JRaft-FSMCaller-Disruptor-0;id=14;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
    @com.alipay.sofa.jraft.example.counter.CounterStateMachine.onSnapshotSave()
        at com.alipay.sofa.jraft.core.FSMCallerImpl.doSnapshotSave(FSMCallerImpl.java:583)
        at com.alipay.sofa.jraft.core.FSMCallerImpl.runApplyTask(FSMCallerImpl.java:392)

raft中状态机caller中doSnapshotSave以及业务侧定义的快照保存逻辑的代码示例(计数器):

// com.alipay.sofa.jraft.core.FSMCallerImpl.doSnapshotSave(SaveSnapshotClosure)
    private void doSnapshotSave(final SaveSnapshotClosure done) {
        Requires.requireNonNull(done, "SaveSnapshotClosure is null");
        final long lastAppliedIndex = this.lastAppliedIndex.get();
        final RaftOutter.SnapshotMeta.Builder metaBuilder = RaftOutter.SnapshotMeta.newBuilder() //
            .setLastIncludedIndex(lastAppliedIndex) // **准备元数据**
            .setLastIncludedTerm(this.lastAppliedTerm);
// ...
        final SnapshotWriter writer = done.start(metaBuilder.build());
// ...
        this.fsm.onSnapshotSave(writer, done); // **调用业务实现的状态机写快照数据**
    }

// com/alipay/sofa/jraft/example/counter/CounterStateMachine.java 
@Override
  public void onSnapshotSave(final SnapshotWriter writer, final Closure done) {
    final long currVal = this.value.get();
    Utils.runInThread(() -> {
      final CounterSnapshotFile snapshot = new CounterSnapshotFile(writer.getPath() + File.separator + "data");
      if (snapshot.save(currVal)) { // **真正保存快照,值就是当前计数器计数到哪里了,即计数器当前值是多少**
        if (writer.addFile("data")) {
          done.run(Status.OK());
// ...
    });
  }

发起快照安装请求

快照安装由leader节点发起installSnapshot的rpc请求,由follower节点处理这个请求完成快照安装。

installSnapshot的rpc的请求体定义如下:

message InstallSnapshotRequest {
  required string group_id = 1;
  required string server_id = 2;
  required string peer_id = 3;
  required int64 term = 4;
  required SnapshotMeta meta = 5; // 重点是这个
  required string uri = 6;
};

message SnapshotMeta {
    required int64 last_included_index = 1; // 1 2 这两个字段在快照存储的时候落地的
    required int64 last_included_term = 2; 
    repeated string peers = 3;
    repeated string old_peers = 4;
    repeated string learners = 5;
    repeated string old_learners = 6;
}

通过请求体不难看出,主要是将leader当前的存储的快照信息发过去,这个信息中有状态机apply到哪了。

请求体中的SnapshotMeta meta通过SnapshotReader.load 读取的。

安装快照的具体代码在com.alipay.sofa.jraft.core.Replicator.installSnapshot()。

什么时候发起快照安装?

在向follower发送appendEntry请求时(含空的,即sendEntries和sendEmptyEntries)时会对应构建AppendEntriesRequest做fillCommonFields,如果返回false,则对follower做快照安装。那么fillCommonFields什么时候会返回false呢?

在不是心跳请求且prevLogIndex不为0且prevLogIndex对应的term为0时。我理解是根据prevLogIndex取不到对应term了(对应数据没在follower落地)则要安装快照

安装完了会回调Replicator.onInstallSnapshotReturned。主要是打印日志,是否超时判断,Replicator实例状态的切换等等。

处理快照安装请求

处理快照安装的请求是在NodeImpl.handleInstallSnapshot。

大体逻辑是:

  • 从快照安装的请求中获取远端快照信息。
  • 从follower向leader发起下载快照的rpc请求,复制文件到follower。

整个copy的调用栈:

DefaultRaftClientService.getFile(Endpoint, GetFileRequest, int, RpcResponseClosure<GetFileResponse>)
remote.CopySession.sendNextRpc()
remote.RemoteFileCopier.startCopyToFile(String, String, CopyOptions)
local.LocalSnapshotCopier.copyFile(String)
local.LocalSnapshotCopier.internalCopy()
local.LocalSnapshotCopier.startCopy()
local.LocalSnapshotCopier.start() // 异步调用startCopy
local.LocalSnapshotStorage.startToCopyFrom(String, SnapshotCopierOptions)// this.curCopier = this.snapshotStorage.startToCopyFrom(ds.request.getUri(), newCopierOpts());
SnapshotExecutorImpl.registerDownloadingSnapshot(DownloadingSnapshot)
SnapshotExecutorImpl.installSnapshot(InstallSnapshotRequest, Builder, RpcRequestClosure)  // this.snapshotExecutor.installSnapshot(request, InstallSnapshotResponse.newBuilder(), done);
NodeImpl.handleInstallSnapshot(InstallSnapshotRequest, RpcRequestClosure)

计数器例子中快照数据就是当前计数器计数到哪里了,即计数器当前值是多少,这样在安装快照过后就避免了副本节点重新将那么多没有被apply的日志在状态机中再apply一遍了。对应文件是/tmp/server1/snapshot/snapshot_24/data。这也可以理解成快照机制的优化目的。

快照加载

快照加载也是由业务自行决定业务逻辑。比如计数器例子中的CounterStateMachine.onSnapshotLoad(SnapshotReader)

sofa-jraft会将SnapshotReader实例在onSnapshotLoad时传递到业务侧,这样业务侧就能拿到data文件的路径,就能从中加载出数据,设置为当前状态机的状态(计数器例子中就是计数器的当前值)。