分布式状态机与计数器的例子分析

计数器例子分析

分析计数器例子,我们有几个疑问:

  • 计数器例子中涉及哪些部件?

  • 他们是怎么相互协作的?

  • raft组件在协作中处于什么环节,跟他们怎么相互衔接的?

计数器在jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/目录下,我在前期准备章节中准备了其server与client的启动脚本。

工作流程与相关部件

整个计数器大致工作流程:

flowchart TD
A(客户端 CounterClient) --> B(CounterServer RPC接入  注:1)
B --> C(CounterServiceImpl / CounterOperation / Task / CounterClosure)
C --> E(Node.Apply)
E --> F(Log.append / Replcator  注:2)
F --> G(BallotBox.commitAt / FSMCallerImpl.onCommitted)
G --> H(CounterStateMachine.onApply完成操作  注:3)

注:1 处CounterServer需要的部件如下:

– RaftGroupService

​ – CounterStateMachine

​ – RpcServer

​ – GetValueRequestProcessor // 省略分析,看IncrementAndGetRequestProcessor即可

​ – CounterService

​ – IncrementAndGetRequestProcessor

​ – CounterService // incrementAndGet增加的工作由他完成

​ – CounterClosure // 1. 给计数器的客户端发响应 2. 携带了CounterOperation,当状态机被onapply时,从其中取到CounterOperation反序列化后得到相应的操作信息,然后在状态机中完成操作。注:3也有详细解释

​ – CounterOperation

注:2 此处会完成leader节点的日志落盘,和副本节点的日志复制。

onApply

注:3 CounterStateMachine.onApply是怎么完成操作的?

  • BallotBox.commitAt中会触发FSMCallerImpl.onCommitted

  • FSMCallerImpl.onCommitted方法会将你向node apply task时Task中的done带过来,此处done就是CounterClosure,当然也会将你向node apply时Task中的data带过来,此例子中data是CounterOperation经Hessian2序列化后的值。例子中优先使用了done字段。

  • CounterClosure会携带CounterOperation, 通过CounterClosure拿到其携带的序列化后的CounterOperation进行反序列化 得到相应的操作信息,然后在状态机中完成操作。详细代码如下:

    // com/alipay/sofa/jraft/example/counter/CounterStateMachine.java  93行
    switch (counterOperation.getOp()) {
    	case INCREMENT:
        final long delta = counterOperation.getDelta();
        final long prev = this.value.get();
        current = this.value.addAndGet(delta);
        LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex());
        break;

onSnapshotSave

计数器例子自己实现了onSnapshotSave(配对的还有onSnapshotLoad)逻辑,借助raft的快照机制能将当前的计数器的值存储到快照文件中。详细分析可以参见 《快照机制分析》一文。

raft组件中分布式状态机分析

我们有几个疑问:

  • 这个状态机的触发大致是怎么个过程?
  • 状态机的触发详细又是怎么个过程?

大致过程

大致过程是:业务自己调用 Node 的 apply 方法,apply 时是需要提交 Task,在 Task 数据落地(完成存储)后,回调你的(业务按需实现的)状态机,完成你的业务逻辑。

Task需要指定 Closure,即在数据落地完成后要处理的回调闭包,其字段名是done,也可以看出来。Task 还有个字段是 data,即你需要落地的数据,在计数器例子中就是 CounterOperation 经 Hessian2 序列化后的值。你可以在你的自定义 Closure 实现中放上你的业务逻辑Operation,也可以将data反序列化成你的业务逻辑Operation。这个在上面的计数器例子分析中已经阐述了。

Task的代码截取:

public class Task implements Serializable {

    private static final long serialVersionUID = 2971309899898274575L;

    /** Associated  task data*/
    private ByteBuffer        data             = LogEntry.EMPTY_DATA;
    /** task closure, called when the data is successfully committed to the raft group or failures happen.*/
    private Closure           done;
    /** Reject this task if expectedTerm doesn't match the current term of this Node if the value is not -1, default is -1.*/
    private long              expectedTerm     = -1;

详细过程

业务状态机被触发时的调用栈如下:

// leader节点
Daemon Thread [JRaft-FSMCaller-Disruptor-0] (Suspended (breakpoint at line 73 in CounterStateMachine))	
	CounterStateMachine.onApply(Iterator) line: 73	
	FSMCallerImpl.doApplyTasks(IteratorImpl) line: 539	
	FSMCallerImpl.doCommitted(long) line: 508	 
	FSMCallerImpl.runApplyTask(FSMCallerImpl$ApplyTask, long, boolean) line: 440	
	FSMCallerImpl.access$100(FSMCallerImpl, FSMCallerImpl$ApplyTask, long, boolean) line: 73	
	FSMCallerImpl$ApplyTaskHandler.onEvent(FSMCallerImpl$ApplyTask, long, boolean) line: 148	
	FSMCallerImpl$ApplyTaskHandler.onEvent(Object, long, boolean) line: 142	
	BatchEventProcessor<T>.run() line: 137	
	Thread.run() line: 745	

// follower节点  跟leader节点一样 
Daemon Thread [JRaft-FSMCaller-Disruptor-0] (Suspended (breakpoint at line 73 in CounterStateMachine))	
	CounterStateMachine.onApply(Iterator) line: 73	
	FSMCallerImpl.doApplyTasks(IteratorImpl) line: 539	
	FSMCallerImpl.doCommitted(long) line: 508	
	FSMCallerImpl.runApplyTask(FSMCallerImpl$ApplyTask, long, boolean) line: 440	
	FSMCallerImpl.access$100(FSMCallerImpl, FSMCallerImpl$ApplyTask, long, boolean) line: 73	
	FSMCallerImpl$ApplyTaskHandler.onEvent(FSMCallerImpl$ApplyTask, long, boolean) line: 148	
	FSMCallerImpl$ApplyTaskHandler.onEvent(Object, long, boolean) line: 142	
	BatchEventProcessor<T>.run() line: 137	
	Thread.run() line: 745

不难看出,这也是走了Disruptor的队列的异步机制,设计的组件如下:

com.alipay.sofa.jraft.core.FSMCallerImpl.disruptor

com.alipay.sofa.jraft.core.FSMCallerImpl.ApplyTaskHandler //this.disruptor.handleEventsWith(new ApplyTaskHandler());

com.alipay.sofa.jraft.core.FSMCallerImpl.taskQueue

抓取队列中publish event的调用栈。

leader端的如下:

ts=2021-12-18 23:58:38;thread_name=Append-Entries-Thread-Send0;id=42;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
    @com.alipay.sofa.jraft.core.FSMCallerImpl.enqueueTask()
        at com.alipay.sofa.jraft.core.FSMCallerImpl.onCommitted(FSMCallerImpl.java:245)
        at com.alipay.sofa.jraft.core.BallotBox.commitAt(BallotBox.java:137)
        at com.alipay.sofa.jraft.core.Replicator.onAppendEntriesReturned(Replicator.java:1494) // 表示副本复制完成了才触发BallotBox commitAt,进而触发业务状态机执行。作者加的注释是Only commit index when the response is from follower.
        at com.alipay.sofa.jraft.core.Replicator.onRpcReturned(Replicator.java:1324)
        at com.alipay.sofa.jraft.core.Replicator$4.run(Replicator.java:1655)
        at com.alipay.sofa.jraft.rpc.impl.AbstractClientService$1.complete(AbstractClientService.java:241)
        at com.alipay.sofa.jraft.rpc.impl.BoltRpcClient$BoltCallback.onResponse(BoltRpcClient.java:175)
        at com.alipay.remoting.rpc.RpcInvokeCallbackListener$CallbackTask.run(RpcInvokeCallbackListener.java:182)
        at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.runTask(MpscSingleThreadExecutor.java:352)
        at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.run(MpscSingleThreadExecutor.java:336)
        at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor.lambda$doStartWorker$3(MpscSingleThreadExecutor.java:263)
        at java.lang.Thread.run(Thread.java:745)

ts=2021-12-18 23:59:04;thread_name=JRaft-Closure-Executor-2;id=20;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
    @com.alipay.sofa.jraft.core.FSMCallerImpl.enqueueTask()
        at com.alipay.sofa.jraft.core.FSMCallerImpl.onSnapshotSave(FSMCallerImpl.java:274)
        at com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl.doSnapshot(SnapshotExecutorImpl.java:351)
        at com.alipay.sofa.jraft.core.NodeImpl.doSnapshot(NodeImpl.java:3098)
        at com.alipay.sofa.jraft.core.NodeImpl.lambda$handleSnapshotTimeout$0(NodeImpl.java:607)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

一个是Append-Entries-Thread-Send0,在给副本发送后返回时触发

Replicator.onAppendEntriesReturned中触发BallotBox.commitAt,所以表示副本复制完成了才触发BallotBox commitAt,进而触发业务状态机执行。作者加的注释是Only commit index when the response is from follower.

一个是快照时触发

follower端的如下:

ts=2021-12-18 23:58:38;thread_name=election_test/PeerPair[127.0.0.1:8082 -> 127.0.0.1:8083]-AppendEntriesThread0;id=56;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
    @com.alipay.sofa.jraft.core.FSMCallerImpl.enqueueTask()
        at com.alipay.sofa.jraft.core.FSMCallerImpl.onCommitted(FSMCallerImpl.java:245)
        at com.alipay.sofa.jraft.core.BallotBox.setLastCommittedIndex(BallotBox.java:241)
        at com.alipay.sofa.jraft.core.NodeImpl.handleAppendEntriesRequest(NodeImpl.java:1963)
        at com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor.processRequest0(AppendEntriesRequestProcessor.java:460)
        at com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor.processRequest0(AppendEntriesRequestProcessor.java:53)
        at com.alipay.sofa.jraft.rpc.impl.core.NodeRequestProcessor.processRequest(NodeRequestProcessor.java:60)
        at com.alipay.sofa.jraft.rpc.RpcRequestProcessor.handleRequest(RpcRequestProcessor.java:53)
        at com.alipay.sofa.jraft.rpc.RpcRequestProcessor.handleRequest(RpcRequestProcessor.java:35)
        at com.alipay.sofa.jraft.rpc.impl.BoltRpcServer$2.handleRequest(BoltRpcServer.java:123)
        at com.alipay.remoting.rpc.protocol.RpcRequestProcessor.dispatchToUserProcessor(RpcRequestProcessor.java:234)
        at com.alipay.remoting.rpc.protocol.RpcRequestProcessor.doProcess(RpcRequestProcessor.java:145)
        at com.alipay.remoting.rpc.protocol.RpcRequestProcessor$ProcessTask.run(RpcRequestProcessor.java:384)
        at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.runTask(MpscSingleThreadExecutor.java:352)
        at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.run(MpscSingleThreadExecutor.java:336)
        at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor.lambda$doStartWorker$3(MpscSingleThreadExecutor.java:263)
        at java.lang.Thread.run(Thread.java:745)

ts=2021-12-18 23:58:56;thread_name=JRaft-Closure-Executor-3;id=24;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
    @com.alipay.sofa.jraft.core.FSMCallerImpl.enqueueTask()
        at com.alipay.sofa.jraft.core.FSMCallerImpl.onSnapshotSave(FSMCallerImpl.java:274)
        at com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl.doSnapshot(SnapshotExecutorImpl.java:351)
        at com.alipay.sofa.jraft.core.NodeImpl.doSnapshot(NodeImpl.java:3098)
        at com.alipay.sofa.jraft.core.NodeImpl.lambda$handleSnapshotTimeout$0(NodeImpl.java:607)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

一个是election_test/PeerPair[127.0.0.1:8082 -> 127.0.0.1:8083]-AppendEntriesThread0,接收到leader端push过来的append请求完成后触发。

一个是快照时触发。

BallotBox.commitAt(BallotBox.java:137) BallotBox.setLastCommittedIndex(BallotBox.java:241) 代码:

public class BallotBox implements Lifecycle<BallotBoxOptions>, Describer {
// ...  
private FSMCaller                 waiter;
// ...
public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) {
// ...
this.waiter.onCommitted(lastCommittedIndex); // 137行  waiter就是FSMCaller
// ...  
} 
// ... 
public boolean setLastCommittedIndex(final long lastCommittedIndex) {
// ...   
this.waiter.onCommitted(lastCommittedIndex);//241行
// ... 
}