快照机制分析
我有几个疑问:
SnapshotPath
根据什么确定?
这个目录下都有什么文件?
这些文件是什么时候怎们样的方式写入进来的?
快照存储的逻辑是怎么样的?
给follower安装快照的时机是怎么样的?
安装快照请求带了哪些参数过去?
follower挂掉一段时间再起来后如何优化那么多没被apply的log再在状态机中apply一遍的消耗?
这个目录下的文件是:
– /tmp/server1/snapshot/snapshot_{lastSnapshotIndex}/
– __raft_snapshot_meta
– data
/tmp/server1/snapshot 是根据 NodeOptions 的getSnapshotUri()确定的,此处是用的默认值。
lastSnapshotIndex维护在LocalSnapshotStorage中,除了init时对此字段赋值外,在snapshot存储完成时,也会更新改字段的值。
// com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotStorage.close(LocalSnapshotWriter, boolean)
final long newIndex = writer.getSnapshotIndex(); // 217行
this.lastSnapshotIndex = newIndex; // 241行
快照存储
整个链路
快照存储的整个链路是:
flowchart TD
A(snapshotTimer:imerRepeatedTimer)-.->|计时器异步|B(NodeImpl.handleSnapshotTimeout)
B-->C(SnapshotExecutorImpl.doSnapshot)
C-->F(FSMCallerImpl.onSnapshotSave:enqueueTask:TaskType.SNAPSHOT_SAVE)
F-.->|disruptor事件异步|G(FSMCallerImpl.doSnapshotSave)
G-->H(业务自定义快照保存逻辑CounterStateMachine.onSnapshotSave)
H-.->|闭包回调异步|I(业务自定义将快照数据写入 $SnapshotWriter.path/data)
I-->D(SaveSnapshotDone.continueRun)
D-->E(SnapshotExecutorImpl.onSnapshotSaveDone 保存元数据)
快照保存逻辑,触发快照保存(真正save的逻辑由业务侧实现),异步触发注册的onSnapshotSave回调:
- stopped、downloadingSnapshot、savingSnapshot几种状态判断,不处理
- lastSnapshotIndex与LastAppliedIndex之间的差值未达到配置的
SnapshotLogIndexMargin
,不处理。 - 触发快照保存。触发fsmCaller.onSnapshotSave,其中再触发StateMachine.onSnapshotSave。并将SaveSnapshotDone通过disruptor队列异步执行。注意:具体的快照保存逻辑是由业务自己在状态机中实现,如,上面的计数器的例子。
- 准备SnapshotMeta。元数据主要有 当前的以及之前的peers与learners,LastIncludedIndex,LastIncludedTerm。LastIncludedIndex就是lastAppliedIndex。也就是状态机apply到哪里了,快照就保存到哪里。 // FSMCallerImpl.java 554行~577行
- 调用业务侧实现的状态机的onSnapshotSave。此处是真正写快照数据的地方,计数器例子中快照数据就是当前计数器计数到哪里了,即计数器当前值是多少,这样在安装快照过后就避免了副本节点重新将那么多没有被apply的日志在状态机中再apply一遍了。对应文件是/tmp/server1/snapshot/snapshot_24/data
- 业务实现的状态机的onSnapshotSave中会异步触发回调逻辑保存元数据,writer.saveMeta(meta) //SnapshotExecutorImpl.java 386行
这个逻辑在leader与follower节点都会执行。
快照的元数据是在 LocalSnapshotStorage.close 中完成,每当快照存储完成后,便会调用close这个方法,这个方法里面调用了元数据保存。LocalSnapshotStorage.close具体逻辑是:
- 判断writer的code,不成功的直接返回
- 调用LocalSnapshotWriter.sync()触发元数据保存。
- 这个数据写在临时目录下的/tmp/server2/snapshot/temp/__raft_snapshot_meta。具体代码可以参见下面段落中的细节代码。
- 通过前面准备的数据构建LocalSnapshotPbMeta。进一部分析,有两部分数据:
- 一是SnapshotMeta。在上面段落中触发快照保存 准备SnapshotMeta中已经阐述了。
- 另一是fileMap。这个是filename和file元数据LocalFileMeta的映射。这个是在业务侧自行实现的状态机的onSnapshotSave中维护的:writer.addFile(“data”)。也就是你快照写了212哪个数据文件就维护进来,LocalFileMeta可以为null。
- 通过com.alipay.sofa.jraft.storage.io.ProtoBufFile.save(Message, boolean)将LocalSnapshotPbMeta实例以protobuf序列化后写磁盘落地。注意此处写的文件的内容,会先在protobuf序列化后的数据前面写上这个报文的name以及length等。具体可以参见前面《protobuf存储数据》部分。
- 将临时快照目录重名成当前SnapshotIndex对应的目录,即类似/tmp/server2/snapshot/snapshot_30。示例中的30是当前SnapshotIndex
- 对oldIndex解除引用计数,对newIndex增加引用计数。
相关细节代码与调用堆栈
写元数据是先写在临时目录下,确定这个临时目录的代码是:
// LocalSnapshotStorage.java 275行
public SnapshotWriter create(final boolean fromEmpty) {
// ...
final String snapshotPath = this.path + File.separator + TEMP_PATH;
// ...
writer = new LocalSnapshotWriter(snapshotPath, this, this.raftOptions)
snapshot存储完成后回调含触发元数据保存的调用栈如下:
ts=2022-01-01 17:25:16;thread_name=JRaft-Closure-Executor-1;id=4e;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotStorage.close() // 这里会通过调用LocalSnapshotWriter.sync()触发元数据保存
at com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotWriter.close(LocalSnapshotWriter.java:98)
at com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotWriter.close(LocalSnapshotWriter.java:93)
at com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl.onSnapshotSaveDone(SnapshotExecutorImpl.java:396)
at com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl$SaveSnapshotDone.continueRun(SnapshotExecutorImpl.java:145)
at com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl$SaveSnapshotDone.lambda$run$0(SnapshotExecutorImpl.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
SaveSnapshotDone什么时候进入disruptor队列?
// com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl.doSnapshot(Closure)
final SaveSnapshotDone saveSnapshotDone = new SaveSnapshotDone(writer, done, null); // 350行
if (!this.fsmCaller.onSnapshotSave(saveSnapshotDone)) { // SaveSnapshotDone进入disruptor队列
Utils.runClosureInThread(done, new Status(RaftError.EHOSTDOWN, "The raft node is down."));
return;
}
调用栈如下:
Affect(class count: 1 , method count: 1) cost in 77 ms, listenerId: 2
ts=2022-01-01 17:28:46;thread_name=JRaft-Closure-Executor-3;id=50;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl.doSnapshot()
at com.alipay.sofa.jraft.core.NodeImpl.doSnapshot(NodeImpl.java:3098)
at com.alipay.sofa.jraft.core.NodeImpl.lambda$handleSnapshotTimeout$0(NodeImpl.java:607)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
handleSnapshotTimeout是snapshot定时器的回调
this.snapshotTimer = new RepeatedTimer(name, this.options.getSnapshotIntervalSecs() * 1000,
TIMER_FACTORY.getSnapshotTimer(this.options.isSharedSnapshotTimer(), name)) {
private volatile boolean firstSchedule = true;
@Override
protected void onTrigger() {
handleSnapshotTimeout(); // NodeImpl.java 953行
}
// ...
执行快照保存堆栈(由业务侧定义快照保存逻辑,即快照中存什么?)
// stack com.alipay.sofa.jraft.StateMachine onSnapshotSave
ts=2022-01-02 08:29:48;thread_name=JRaft-FSMCaller-Disruptor-0;id=14;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.alipay.sofa.jraft.example.counter.CounterStateMachine.onSnapshotSave()
at com.alipay.sofa.jraft.core.FSMCallerImpl.doSnapshotSave(FSMCallerImpl.java:583)
at com.alipay.sofa.jraft.core.FSMCallerImpl.runApplyTask(FSMCallerImpl.java:392)
raft中状态机caller中doSnapshotSave以及业务侧定义的快照保存逻辑的代码示例(计数器):
// com.alipay.sofa.jraft.core.FSMCallerImpl.doSnapshotSave(SaveSnapshotClosure)
private void doSnapshotSave(final SaveSnapshotClosure done) {
Requires.requireNonNull(done, "SaveSnapshotClosure is null");
final long lastAppliedIndex = this.lastAppliedIndex.get();
final RaftOutter.SnapshotMeta.Builder metaBuilder = RaftOutter.SnapshotMeta.newBuilder() //
.setLastIncludedIndex(lastAppliedIndex) // **准备元数据**
.setLastIncludedTerm(this.lastAppliedTerm);
// ...
final SnapshotWriter writer = done.start(metaBuilder.build());
// ...
this.fsm.onSnapshotSave(writer, done); // **调用业务实现的状态机写快照数据**
}
// com/alipay/sofa/jraft/example/counter/CounterStateMachine.java
@Override
public void onSnapshotSave(final SnapshotWriter writer, final Closure done) {
final long currVal = this.value.get();
Utils.runInThread(() -> {
final CounterSnapshotFile snapshot = new CounterSnapshotFile(writer.getPath() + File.separator + "data");
if (snapshot.save(currVal)) { // **真正保存快照,值就是当前计数器计数到哪里了,即计数器当前值是多少**
if (writer.addFile("data")) {
done.run(Status.OK());
// ...
});
}
发起快照安装请求
快照安装由leader节点发起installSnapshot的rpc请求,由follower节点处理这个请求完成快照安装。
installSnapshot的rpc的请求体定义如下:
message InstallSnapshotRequest {
required string group_id = 1;
required string server_id = 2;
required string peer_id = 3;
required int64 term = 4;
required SnapshotMeta meta = 5; // 重点是这个
required string uri = 6;
};
message SnapshotMeta {
required int64 last_included_index = 1; // 1 2 这两个字段在快照存储的时候落地的
required int64 last_included_term = 2;
repeated string peers = 3;
repeated string old_peers = 4;
repeated string learners = 5;
repeated string old_learners = 6;
}
通过请求体不难看出,主要是将leader当前的存储的快照信息发过去,这个信息中有状态机apply到哪了。
请求体中的SnapshotMeta meta通过SnapshotReader.load 读取的。
安装快照的具体代码在com.alipay.sofa.jraft.core.Replicator.installSnapshot()。
什么时候发起快照安装?
在向follower发送appendEntry请求时(含空的,即sendEntries和sendEmptyEntries)时会对应构建AppendEntriesRequest做fillCommonFields,如果返回false,则对follower做快照安装。那么fillCommonFields什么时候会返回false呢?
在不是心跳请求且prevLogIndex不为0且prevLogIndex对应的term为0时。我理解是根据prevLogIndex取不到对应term了(对应数据没在follower落地)则要安装快照。
安装完了会回调Replicator.onInstallSnapshotReturned。主要是打印日志,是否超时判断,Replicator实例状态的切换等等。
处理快照安装请求
处理快照安装的请求是在NodeImpl.handleInstallSnapshot。
大体逻辑是:
- 从快照安装的请求中获取远端快照信息。
- 从follower向leader发起下载快照的rpc请求,复制文件到follower。
整个copy的调用栈:
DefaultRaftClientService.getFile(Endpoint, GetFileRequest, int, RpcResponseClosure<GetFileResponse>)
remote.CopySession.sendNextRpc()
remote.RemoteFileCopier.startCopyToFile(String, String, CopyOptions)
local.LocalSnapshotCopier.copyFile(String)
local.LocalSnapshotCopier.internalCopy()
local.LocalSnapshotCopier.startCopy()
local.LocalSnapshotCopier.start() // 异步调用startCopy
local.LocalSnapshotStorage.startToCopyFrom(String, SnapshotCopierOptions)// this.curCopier = this.snapshotStorage.startToCopyFrom(ds.request.getUri(), newCopierOpts());
SnapshotExecutorImpl.registerDownloadingSnapshot(DownloadingSnapshot)
SnapshotExecutorImpl.installSnapshot(InstallSnapshotRequest, Builder, RpcRequestClosure) // this.snapshotExecutor.installSnapshot(request, InstallSnapshotResponse.newBuilder(), done);
NodeImpl.handleInstallSnapshot(InstallSnapshotRequest, RpcRequestClosure)
计数器例子中快照数据就是当前计数器计数到哪里了,即计数器当前值是多少,这样在安装快照过后就避免了副本节点重新将那么多没有被apply的日志在状态机中再apply一遍了。对应文件是/tmp/server1/snapshot/snapshot_24/data。这也可以理解成快照机制的优化目的。
快照加载
快照加载也是由业务自行决定业务逻辑。比如计数器例子中的CounterStateMachine.onSnapshotLoad(SnapshotReader)
。
sofa-jraft会将SnapshotReader实例在onSnapshotLoad时传递到业务侧,这样业务侧就能拿到data文件的路径,就能从中加载出数据,设置为当前状态机的状态(计数器例子中就是计数器的当前值)。
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!