分布式状态机与计数器的例子分析
计数器例子分析
分析计数器例子,我们有几个疑问:
计数器例子中涉及哪些部件?
他们是怎么相互协作的?
raft组件在协作中处于什么环节,跟他们怎么相互衔接的?
计数器在jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/目录下,我在前期准备章节中准备了其server与client的启动脚本。
工作流程与相关部件
整个计数器大致工作流程:
flowchart TD
A(客户端 CounterClient) --> B(CounterServer RPC接入 注:1)
B --> C(CounterServiceImpl / CounterOperation / Task / CounterClosure)
C --> E(Node.Apply)
E --> F(Log.append / Replcator 注:2)
F --> G(BallotBox.commitAt / FSMCallerImpl.onCommitted)
G --> H(CounterStateMachine.onApply完成操作 注:3)
注:1 处CounterServer需要的部件如下:
– RaftGroupService
– CounterStateMachine
– RpcServer
– GetValueRequestProcessor // 省略分析,看IncrementAndGetRequestProcessor即可
– CounterService
– IncrementAndGetRequestProcessor
– CounterService // incrementAndGet增加的工作由他完成
– CounterClosure // 1. 给计数器的客户端发响应 2. 携带了CounterOperation,当状态机被onapply时,从其中取到CounterOperation反序列化后得到相应的操作信息,然后在状态机中完成操作。注:3也有详细解释
– CounterOperation
注:2 此处会完成leader节点的日志落盘,和副本节点的日志复制。
onApply
注:3 CounterStateMachine.onApply是怎么完成操作的?
BallotBox.commitAt中会触发FSMCallerImpl.onCommitted
FSMCallerImpl.onCommitted方法会将你向node apply task时Task中的done带过来,此处done就是CounterClosure,当然也会将你向node apply时Task中的data带过来,此例子中data是CounterOperation经Hessian2序列化后的值。例子中优先使用了done字段。
CounterClosure会携带CounterOperation, 通过CounterClosure拿到其携带的序列化后的CounterOperation进行反序列化 得到相应的操作信息,然后在状态机中完成操作。详细代码如下:
// com/alipay/sofa/jraft/example/counter/CounterStateMachine.java 93行 switch (counterOperation.getOp()) { case INCREMENT: final long delta = counterOperation.getDelta(); final long prev = this.value.get(); current = this.value.addAndGet(delta); LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex()); break;
onSnapshotSave
计数器例子自己实现了onSnapshotSave(配对的还有onSnapshotLoad)逻辑,借助raft的快照机制能将当前的计数器的值存储到快照文件中。详细分析可以参见 《快照机制分析》一文。
raft组件中分布式状态机分析
我们有几个疑问:
- 这个状态机的触发大致是怎么个过程?
- 状态机的触发详细又是怎么个过程?
大致过程
大致过程是:业务自己调用 Node 的 apply 方法,apply 时是需要提交 Task,在 Task 数据落地(完成存储)后,回调你的(业务按需实现的)状态机,完成你的业务逻辑。
Task需要指定 Closure,即在数据落地完成后要处理的回调闭包,其字段名是done,也可以看出来。Task 还有个字段是 data,即你需要落地的数据,在计数器例子中就是 CounterOperation 经 Hessian2 序列化后的值。你可以在你的自定义 Closure 实现中放上你的业务逻辑Operation,也可以将data反序列化成你的业务逻辑Operation。这个在上面的计数器例子分析中已经阐述了。
Task的代码截取:
public class Task implements Serializable {
private static final long serialVersionUID = 2971309899898274575L;
/** Associated task data*/
private ByteBuffer data = LogEntry.EMPTY_DATA;
/** task closure, called when the data is successfully committed to the raft group or failures happen.*/
private Closure done;
/** Reject this task if expectedTerm doesn't match the current term of this Node if the value is not -1, default is -1.*/
private long expectedTerm = -1;
详细过程
业务状态机被触发时的调用栈如下:
// leader节点
Daemon Thread [JRaft-FSMCaller-Disruptor-0] (Suspended (breakpoint at line 73 in CounterStateMachine))
CounterStateMachine.onApply(Iterator) line: 73
FSMCallerImpl.doApplyTasks(IteratorImpl) line: 539
FSMCallerImpl.doCommitted(long) line: 508
FSMCallerImpl.runApplyTask(FSMCallerImpl$ApplyTask, long, boolean) line: 440
FSMCallerImpl.access$100(FSMCallerImpl, FSMCallerImpl$ApplyTask, long, boolean) line: 73
FSMCallerImpl$ApplyTaskHandler.onEvent(FSMCallerImpl$ApplyTask, long, boolean) line: 148
FSMCallerImpl$ApplyTaskHandler.onEvent(Object, long, boolean) line: 142
BatchEventProcessor<T>.run() line: 137
Thread.run() line: 745
// follower节点 跟leader节点一样
Daemon Thread [JRaft-FSMCaller-Disruptor-0] (Suspended (breakpoint at line 73 in CounterStateMachine))
CounterStateMachine.onApply(Iterator) line: 73
FSMCallerImpl.doApplyTasks(IteratorImpl) line: 539
FSMCallerImpl.doCommitted(long) line: 508
FSMCallerImpl.runApplyTask(FSMCallerImpl$ApplyTask, long, boolean) line: 440
FSMCallerImpl.access$100(FSMCallerImpl, FSMCallerImpl$ApplyTask, long, boolean) line: 73
FSMCallerImpl$ApplyTaskHandler.onEvent(FSMCallerImpl$ApplyTask, long, boolean) line: 148
FSMCallerImpl$ApplyTaskHandler.onEvent(Object, long, boolean) line: 142
BatchEventProcessor<T>.run() line: 137
Thread.run() line: 745
不难看出,这也是走了Disruptor的队列的异步机制,设计的组件如下:
com.alipay.sofa.jraft.core.FSMCallerImpl.disruptor
com.alipay.sofa.jraft.core.FSMCallerImpl.ApplyTaskHandler //this.disruptor.handleEventsWith(new ApplyTaskHandler());
com.alipay.sofa.jraft.core.FSMCallerImpl.taskQueue
抓取队列中publish event的调用栈。
leader端的如下:
ts=2021-12-18 23:58:38;thread_name=Append-Entries-Thread-Send0;id=42;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.alipay.sofa.jraft.core.FSMCallerImpl.enqueueTask()
at com.alipay.sofa.jraft.core.FSMCallerImpl.onCommitted(FSMCallerImpl.java:245)
at com.alipay.sofa.jraft.core.BallotBox.commitAt(BallotBox.java:137)
at com.alipay.sofa.jraft.core.Replicator.onAppendEntriesReturned(Replicator.java:1494) // 表示副本复制完成了才触发BallotBox commitAt,进而触发业务状态机执行。作者加的注释是Only commit index when the response is from follower.
at com.alipay.sofa.jraft.core.Replicator.onRpcReturned(Replicator.java:1324)
at com.alipay.sofa.jraft.core.Replicator$4.run(Replicator.java:1655)
at com.alipay.sofa.jraft.rpc.impl.AbstractClientService$1.complete(AbstractClientService.java:241)
at com.alipay.sofa.jraft.rpc.impl.BoltRpcClient$BoltCallback.onResponse(BoltRpcClient.java:175)
at com.alipay.remoting.rpc.RpcInvokeCallbackListener$CallbackTask.run(RpcInvokeCallbackListener.java:182)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.runTask(MpscSingleThreadExecutor.java:352)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.run(MpscSingleThreadExecutor.java:336)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor.lambda$doStartWorker$3(MpscSingleThreadExecutor.java:263)
at java.lang.Thread.run(Thread.java:745)
ts=2021-12-18 23:59:04;thread_name=JRaft-Closure-Executor-2;id=20;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.alipay.sofa.jraft.core.FSMCallerImpl.enqueueTask()
at com.alipay.sofa.jraft.core.FSMCallerImpl.onSnapshotSave(FSMCallerImpl.java:274)
at com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl.doSnapshot(SnapshotExecutorImpl.java:351)
at com.alipay.sofa.jraft.core.NodeImpl.doSnapshot(NodeImpl.java:3098)
at com.alipay.sofa.jraft.core.NodeImpl.lambda$handleSnapshotTimeout$0(NodeImpl.java:607)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
一个是Append-Entries-Thread-Send0,在给副本发送后返回时触发。
Replicator.onAppendEntriesReturned中触发BallotBox.commitAt,所以表示副本复制完成了才触发BallotBox commitAt,进而触发业务状态机执行。作者加的注释是Only commit index when the response is from follower.
一个是快照时触发。
follower端的如下:
ts=2021-12-18 23:58:38;thread_name=election_test/PeerPair[127.0.0.1:8082 -> 127.0.0.1:8083]-AppendEntriesThread0;id=56;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.alipay.sofa.jraft.core.FSMCallerImpl.enqueueTask()
at com.alipay.sofa.jraft.core.FSMCallerImpl.onCommitted(FSMCallerImpl.java:245)
at com.alipay.sofa.jraft.core.BallotBox.setLastCommittedIndex(BallotBox.java:241)
at com.alipay.sofa.jraft.core.NodeImpl.handleAppendEntriesRequest(NodeImpl.java:1963)
at com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor.processRequest0(AppendEntriesRequestProcessor.java:460)
at com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor.processRequest0(AppendEntriesRequestProcessor.java:53)
at com.alipay.sofa.jraft.rpc.impl.core.NodeRequestProcessor.processRequest(NodeRequestProcessor.java:60)
at com.alipay.sofa.jraft.rpc.RpcRequestProcessor.handleRequest(RpcRequestProcessor.java:53)
at com.alipay.sofa.jraft.rpc.RpcRequestProcessor.handleRequest(RpcRequestProcessor.java:35)
at com.alipay.sofa.jraft.rpc.impl.BoltRpcServer$2.handleRequest(BoltRpcServer.java:123)
at com.alipay.remoting.rpc.protocol.RpcRequestProcessor.dispatchToUserProcessor(RpcRequestProcessor.java:234)
at com.alipay.remoting.rpc.protocol.RpcRequestProcessor.doProcess(RpcRequestProcessor.java:145)
at com.alipay.remoting.rpc.protocol.RpcRequestProcessor$ProcessTask.run(RpcRequestProcessor.java:384)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.runTask(MpscSingleThreadExecutor.java:352)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.run(MpscSingleThreadExecutor.java:336)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor.lambda$doStartWorker$3(MpscSingleThreadExecutor.java:263)
at java.lang.Thread.run(Thread.java:745)
ts=2021-12-18 23:58:56;thread_name=JRaft-Closure-Executor-3;id=24;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.alipay.sofa.jraft.core.FSMCallerImpl.enqueueTask()
at com.alipay.sofa.jraft.core.FSMCallerImpl.onSnapshotSave(FSMCallerImpl.java:274)
at com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl.doSnapshot(SnapshotExecutorImpl.java:351)
at com.alipay.sofa.jraft.core.NodeImpl.doSnapshot(NodeImpl.java:3098)
at com.alipay.sofa.jraft.core.NodeImpl.lambda$handleSnapshotTimeout$0(NodeImpl.java:607)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
一个是election_test/PeerPair[127.0.0.1:8082 -> 127.0.0.1:8083]-AppendEntriesThread0,接收到leader端push过来的append请求完成后触发。
一个是快照时触发。
BallotBox.commitAt(BallotBox.java:137) BallotBox.setLastCommittedIndex(BallotBox.java:241) 代码:
public class BallotBox implements Lifecycle<BallotBoxOptions>, Describer {
// ...
private FSMCaller waiter;
// ...
public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) {
// ...
this.waiter.onCommitted(lastCommittedIndex); // 137行 waiter就是FSMCaller
// ...
}
// ...
public boolean setLastCommittedIndex(final long lastCommittedIndex) {
// ...
this.waiter.onCommitted(lastCommittedIndex);//241行
// ...
}
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!