副本复制整体分析
我有几个疑问:
- 复制是 leader push 给 follower,还是 follower 不断到 leader 那 pull?
- leader 是怎么安排多个 follower 复制的,并行的还是串行的?
- 一条一条 entry 复制还是一批一批?
- 副本复制的大致整体过程与细节过程是怎样的?
过程分析
大致过程
大致过程是:
业务调用Node.apply,触发新的log产生
leader收到newLogCallBack后,leader不断向follower发送appendEntries的rpc请求来复制副本,是push模式。
follower响应正常返回时触发业务状态机执行。 Replicator.onAppendEntriesReturned中触发BallotBox.commitAt,所以表示副本复制完成了才触发BallotBox commitAt,进而触发业务状态机执行。作者加的注释是Only commit index when the response is from follower.这个在前一章节有讲。
follower响应返回状态不正常,则block到心跳超时后再尝试发送,如果此时还是不行,此follower实例应该被移除了。
leader端给follower发送副本复制请求是通过发送appendEntries的rpc请求,其入口是:com.alipay.sofa.jraft.core.Replicator.sendEntries。
有几个follower就有几个replicator实例:
ts=2021-12-19 20:18:39; [cost=0.5668ms] result=@String[Replicator [state=Replicate, statInfo=<running=APPENDING_ENTRIES, firstLogIndex=3065, lastLogIncluded=0, lastLogIndex=3067, lastTermIncluded=0>, peerId=127.0.0.1:8082, type=Follower]]
ts=2021-12-19 20:18:39; [cost=0.777639ms] result=@String[Replicator [state=Replicate, statInfo=<running=APPENDING_ENTRIES, firstLogIndex=3065, lastLogIncluded=0, lastLogIndex=3067, lastTermIncluded=0>, peerId=127.0.0.1:8083, type=Follower]]
此处也间接说明了,多个follower场景下,是并行发送的。
leader怎么触发了向follower发送复制请求的流程图如下:
flowchart TD
L(Node.apply) -.-> |disruptor异步| F
F(LogEntryAndClosureHandler.onEvent) --> G
G(NodeImpl.executeApplyingTasks) --> H
H(LogManagerImpl.appendEntries) --> I
I(LogManagerImpl.wakeupAllWaiter) -.-> |异步调用| E
E(LogManagerImpl.lambda$wakeupAllWaiter$6) --> D
D(Replicator.lambda$waitMoreEntries$9) --> A
A(Replicator.continueSending) --> B(Replicator.getNextSendIndex)
B --> C(Replicator.sendEntries)
C --> |spin 直至nextSendingIndex<=prevSendIndex| B
C -.-> |rpc请求异步返回| M
M(Replicator.onRpcReturned) --> |正常返回且需要继续发| B
M --> |返回状态不OK| N(block一段时间,具体是block到心跳超时)
N -.-> |超时后继续尝试发送| B
C --> |如果待发送entries个数为0| O(waitMoreEntries 向LogManager注册有新日志的回掉并wait)
O -.- |注册有新日志的回调| D
Replicator.sendEntries被调用时,两个线程对应两个follower,即两个replicator实例。
上面这个流程图还不够详细,重新画了一张如下:
onNewLog 新日志产生触发sendEntries
具体调用栈如下:
ts=2021-12-19 20:12:52;thread_name=JRaft-Closure-Executor-2;id=4f;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac
@com.alipay.sofa.jraft.core.Replicator.sendEntries(
at com.alipay.sofa.jraft.core.Replicator.sendEntries(Replicator.java:1564)
at com.alipay.sofa.jraft.core.Replicator.continueSending(Replicator.java:989)
at com.alipay.sofa.jraft.core.Replicator.lambda$waitMoreEntries$9(Replicator.java:1547)
at com.alipay.sofa.jraft.storage.impl.LogManagerImpl.runOnNewLog(LogManagerImpl.java:1133)
at com.alipay.sofa.jraft.storage.impl.LogManagerImpl.lambda$wakeupAllWaiter$6(LogManagerImpl.java:405)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
ts=2021-12-19 20:12:52;thread_name=JRaft-Closure-Executor-3;id=50;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.alipay.sofa.jraft.core.Replicator.sendEntries()
at com.alipay.sofa.jraft.core.Replicator.sendEntries(Replicator.java:1564)
at com.alipay.sofa.jraft.core.Replicator.continueSending(Replicator.java:989)
at com.alipay.sofa.jraft.core.Replicator.lambda$waitMoreEntries$9(Replicator.java:1547)
at com.alipay.sofa.jraft.storage.impl.LogManagerImpl.runOnNewLog(LogManagerImpl.java:1133)
at com.alipay.sofa.jraft.storage.impl.LogManagerImpl.lambda$wakeupAllWaiter$6(LogManagerImpl.java:405)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
在没有请求空闲时,Replicator实例会在waitMoreEntries等待有新的entries进来(onNewLog回调)。Replicator.java 1546行。具体代码如下:
private void waitMoreEntries(final long nextWaitIndex) {
try {
LOG.debug("Node {} waits more entries", this.options.getNode().getNodeId());
if (this.waitId >= 0) {
return;
}
/**1546行**/ this.waitId = this.options.getLogManager().wait(nextWaitIndex - 1,
(arg, errorCode) -> continueSending((ThreadId) arg, errorCode), this.id);
this.statInfo.runningState = RunningState.IDLE;
} finally {
this.id.unlock();
}
}
LogManager.wait的方法签名是com.alipay.sofa.jraft.storage.LogManager.wait(long, NewLogCallback, Object)。 所以1546行送进来的第二个参数那个lambda表达式是NewLogCallback的实现,用于监听一旦有新日志产生后执行什么,即onNewLog。新日志产生后就继续发送continueSending,正好合乎自然逻辑。
我们看下at com.alipay.sofa.jraft.storage.impl.LogManagerImpl.lambda$wakeupAllWaiter$6(LogManagerImpl.java:405)的代码:
private boolean wakeupAllWaiter(final Lock lock) {
if (this.waitMap.isEmpty()) {
lock.unlock();
return false;
}
final List<WaitMeta> wms = new ArrayList<>(this.waitMap.values());
final int errCode = this.stopped ? RaftError.ESTOP.getNumber() : RaftError.SUCCESS.getNumber();
this.waitMap.clear();
lock.unlock();
final int waiterCount = wms.size();
for (int i = 0; i < waiterCount; i++) {
final WaitMeta wm = wms.get(i);
wm.errorCode = errCode;
/**405行**/ Utils.runInThread(() -> runOnNewLog(wm));
}
return true;
}
405行有个Runnable的lambda表达式,那么Replicator.sendEntries是被异步触发的。也就是说我们要分析下wakeupAllWaiter是谁触发的?抓取其具体调用栈如下:
ts=2021-12-19 20:42:38;thread_name=JRaft-NodeImpl-Disruptor-0;id=12;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.alipay.sofa.jraft.storage.impl.LogManagerImpl.wakeupAllWaiter()
at com.alipay.sofa.jraft.storage.impl.LogManagerImpl.appendEntries(LogManagerImpl.java:345)
at com.alipay.sofa.jraft.core.NodeImpl.executeApplyingTasks(NodeImpl.java:1391)
at com.alipay.sofa.jraft.core.NodeImpl.access$300(NodeImpl.java:138)
at com.alipay.sofa.jraft.core.NodeImpl$LogEntryAndClosureHandler.onEvent(NodeImpl.java:311)
at com.alipay.sofa.jraft.core.NodeImpl$LogEntryAndClosureHandler.onEvent(NodeImpl.java:291)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:137)
at java.lang.Thread.run(Thread.java:745)
简单分析也就是说,在LogEntryAndClosureHandler处理事件时会executeApplyingTasks,executeApplyingTasks中会调用LogManagerImpl.appendEntries,LogManagerImpl.appendEntries其中的关键代码如下:
// LogManagerImpl.appendEntries
while (true) {
if (tryOfferEvent(done, translator)) { // 给存储模块提交appendEntries事件
break;
} else {
retryTimes++;
if (retryTimes > APPEND_LOG_RETRY_TIMES) {
reportError(RaftError.EBUSY.getNumber(), "LogManager is busy, disk queue overload.");
return;
}
ThreadHelper.onSpinWait();
}
}
doUnlock = false;
/**345行**/if (!wakeupAllWaiter(this.writeLock)) { // 触发有新的log事件, TODO: 异常场景构建,如果这个时候存储数据还没落地怎么办?还是说只要从内存中取就可以了?
notifyLastLogIndexListeners();
}
Replicator.onRpcReturned返回状态OK触发继续sendEntries
Replicator.onRpcReturned 当发送appendEntries请求返回时,也会触发继续sendEntries。具体调用栈如下:
ts=2021-12-19 20:12:52;thread_name=Append-Entries-Thread-Send0;id=2b;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.alipay.sofa.jraft.core.Replicator.sendEntries()
at com.alipay.sofa.jraft.core.Replicator.sendEntries(Replicator.java:1564)
at com.alipay.sofa.jraft.core.Replicator.onRpcReturned(Replicator.java:1352)
at com.alipay.sofa.jraft.core.Replicator$4.run(Replicator.java:1655)
at com.alipay.sofa.jraft.rpc.impl.AbstractClientService$1.complete(AbstractClientService.java:241)
at com.alipay.sofa.jraft.rpc.impl.BoltRpcClient$BoltCallback.onResponse(BoltRpcClient.java:175)
at com.alipay.remoting.rpc.RpcInvokeCallbackListener$CallbackTask.run(RpcInvokeCallbackListener.java:182)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.runTask(MpscSingleThreadExecutor.java:352)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.run(MpscSingleThreadExecutor.java:336)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor.lambda$doStartWorker$3(MpscSingleThreadExecutor.java:263)
at java.lang.Thread.run(Thread.java:745)
ts=2021-12-19 20:12:52;thread_name=Append-Entries-Thread-Send1;id=4e;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.alipay.sofa.jraft.core.Replicator.sendEntries()
at com.alipay.sofa.jraft.core.Replicator.onRpcReturned(Replicator.java:1352)
at com.alipay.sofa.jraft.core.Replicator$4.run(Replicator.java:1655)
at com.alipay.sofa.jraft.rpc.impl.AbstractClientService$1.complete(AbstractClientService.java:241)
at com.alipay.sofa.jraft.rpc.impl.BoltRpcClient$BoltCallback.onResponse(BoltRpcClient.java:175)
at com.alipay.remoting.rpc.RpcInvokeCallbackListener$CallbackTask.run(RpcInvokeCallbackListener.java:182)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.runTask(MpscSingleThreadExecutor.java:352)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.run(MpscSingleThreadExecutor.java:336)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor.lambda$doStartWorker$3(MpscSingleThreadExecutor.java:263)
at java.lang.Thread.run(Thread.java:745)
Replicator.onRpcReturned返回状态不OK,block一段时间后再继续sendEntries
具体代码在:
// com.alipay.sofa.jraft.core.Replicator.block(long, int) 1001行
final long dueTime = startTimeMs + this.options.getDynamicHeartBeatTimeoutMs();
try {
LOG.debug("Blocking {} for {} ms", this.options.getPeerId(), this.options.getDynamicHeartBeatTimeoutMs());
this.blockTimer = this.timerManager.schedule(() -> onBlockTimeout(this.id), dueTime - Utils.nowMs(),
TimeUnit.MILLISECONDS);
this.statInfo.runningState = RunningState.BLOCKING;
// com.alipay.sofa.jraft.core.Replicator.onBlockTimeoutInNewThread(ThreadId) 941行
static void onBlockTimeoutInNewThread(final ThreadId id) {
if (id != null) {
continueSending(id, RaftError.ETIMEDOUT.getNumber());
}
}
follower响应返回状态不正常,则block到心跳超时后再尝试发送,如果此时还是不行,此follower实例应该被移除了。
副本复制优化与细节
leader节点批量发送后在follower端保证顺序
批量是指一次发送多个entry,而不是一个一个entry发送。
每次发送的entry个数,this.raftOptions.getMaxEntriesSize(), 具体代码如下:
// com.alipay.sofa.jraft.core.Replicator.sendEntries(long) 1598行
final int maxEntriesSize = this.raftOptions.getMaxEntriesSize();
final RecyclableByteBufferList byteBufList = RecyclableByteBufferList.newInstance();
try {
for (int i = 0; i < maxEntriesSize; i++) {
final RaftOutter.EntryMeta.Builder emb = RaftOutter.EntryMeta.newBuilder();
if (!prepareEntry(nextSendingIndex, i, emb, byteBufList)) {
break;
}
rb.addEntries(emb.build());
}
follower在接收处理appendEntries时,要保证消息的顺序是OK的,怎做到的?
com.alipay.sofa.jraft.storage.impl.LogManagerImpl.checkAndResolveConflict
follower节点的调用栈:
ts=2021-12-19 23:11:23;thread_name=election_test/PeerPair[127.0.0.1:8082 -> 127.0.0.1:8081]-AppendEntriesThread0;id=22;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.alipay.sofa.jraft.storage.impl.LogManagerImpl.checkAndResolveConflict()
at com.alipay.sofa.jraft.storage.impl.LogManagerImpl.appendEntries(LogManagerImpl.java:299)
at com.alipay.sofa.jraft.core.NodeImpl.handleAppendEntriesRequest(NodeImpl.java:2003)
at com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor.processRequest0(AppendEntriesRequestProcessor.java:460)
at com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor.processRequest0(AppendEntriesRequestProcessor.java:53)
at com.alipay.sofa.jraft.rpc.impl.core.NodeRequestProcessor.processRequest(NodeRequestProcessor.java:60)
at com.alipay.sofa.jraft.rpc.RpcRequestProcessor.handleRequest(RpcRequestProcessor.java:53)
at com.alipay.sofa.jraft.rpc.RpcRequestProcessor.handleRequest(RpcRequestProcessor.java:35)
at com.alipay.sofa.jraft.rpc.impl.BoltRpcServer$2.handleRequest(BoltRpcServer.java:123)
at com.alipay.remoting.rpc.protocol.RpcRequestProcessor.dispatchToUserProcessor(RpcRequestProcessor.java:234)
at com.alipay.remoting.rpc.protocol.RpcRequestProcessor.doProcess(RpcRequestProcessor.java:145)
at com.alipay.remoting.rpc.protocol.RpcRequestProcessor$ProcessTask.run(RpcRequestProcessor.java:384)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.runTask(MpscSingleThreadExecutor.java:352)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.run(MpscSingleThreadExecutor.java:336)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor.lambda$doStartWorker$3(MpscSingleThreadExecutor.java:263)
at java.lang.Thread.run(Thread.java:745)
两种情况下能为true,即检查通过。如果检查不通过,上层appendEntries就直接返回了,不再将append进行下去了。
- firstLogEntry.getId().getIndex() == this.lastLogIndex + 1 这个是fast path。要append的第一个entry的index正好是上一个index加1.
- 要点1不成立时,就要去掉一部分冲突的(重复的)entry [0,conflictingIndex)。conflictingIndex是通过判term不相等来确定的。
TODO: checkAndResolveConflict还有些细节需要分析如下。
firstLogEntry.getId().getIndex() == 0
firstLogEntry.getId().getIndex() > this.lastLogIndex + 1 的情况
lastLogEntry.getId().getIndex() <= appliedIndex 的情况
firstLogEntry.getId().getIndex() == this.lastLogIndex + 1不成立时,即上述要点2中的细节
follower节点对应复制响应返回的处理
LogManager.StableClosure 具备给rpc异步返回响应的能力,所以你看到com.alipay.sofa.jraft.core.NodeImpl.handleAppendEntriesRequest(AppendEntriesRequest, RpcRequestClosure)中方法最后在request中有entries时,返回的null。 实际上,leader收到了返回。他就是在FollowerStableClosure中返回的。
FollowerStableClosure是LogManager.StableClosure的实现,
// com.alipay.sofa.jraft.core.NodeImpl.FollowerStableClosure.run(Status)
// Don't touch node any more.
this.responseBuilder.setSuccess(true).setTerm(this.term);
// Ballot box is thread safe and tolerates disorder.
this.node.ballotBox.setLastCommittedIndex(this.committedIndex);
this.done.sendResponse(this.responseBuilder.build());
相关关键代码与调用栈
副本节点appendEntries栈:
stack com.alipay.sofa.jraft.storage.impl.LogManagerImpl appendEntries
Press Q or Ctrl+C to abort.
Affect(class count: 1 , method count: 1) cost in 236 ms, listenerId: 1
ts=2022-01-23 11:09:42;thread_name=election_test/PeerPair[127.0.0.1:8082 -> 127.0.0.1:8081]-AppendEntriesThread0;id=20;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.alipay.sofa.jraft.storage.impl.LogManagerImpl.appendEntries()
at com.alipay.sofa.jraft.core.NodeImpl.handleAppendEntriesRequest(NodeImpl.java:2003)
at com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor.processRequest0(AppendEntriesRequestProcessor.java:460)
at com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor.processRequest0(AppendEntriesRequestProcessor.java:53)
at com.alipay.sofa.jraft.rpc.impl.core.NodeRequestProcessor.processRequest(NodeRequestProcessor.java:60)
at com.alipay.sofa.jraft.rpc.RpcRequestProcessor.handleRequest(RpcRequestProcessor.java:53)
at com.alipay.sofa.jraft.rpc.RpcRequestProcessor.handleRequest(RpcRequestProcessor.java:35)
at com.alipay.sofa.jraft.rpc.impl.BoltRpcServer$2.handleRequest(BoltRpcServer.java:123)
at com.alipay.remoting.rpc.protocol.RpcRequestProcessor.dispatchToUserProcessor(RpcRequestProcessor.java:234)
at com.alipay.remoting.rpc.protocol.RpcRequestProcessor.doProcess(RpcRequestProcessor.java:145)
at com.alipay.remoting.rpc.protocol.RpcRequestProcessor$ProcessTask.run(RpcRequestProcessor.java:384)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.runTask(MpscSingleThreadExecutor.java:352)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.run(MpscSingleThreadExecutor.java:336)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor.lambda$doStartWorker$3(MpscSingleThreadExecutor.java:263)
at java.lang.Thread.run(Thread.java:745)
Follower append entries之后的闭包回调栈:
stack com.alipay.sofa.jraft.core.NodeImpl$FollowerStableClosure run
Press Q or Ctrl+C to abort.
Affect(class count: 1 , method count: 1) cost in 50 ms, listenerId: 2
ts=2022-01-23 11:11:09;thread_name=JRaft-LogManager-Disruptor-0;id=13;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.alipay.sofa.jraft.core.NodeImpl$FollowerStableClosure.run()
at com.alipay.sofa.jraft.storage.impl.LogManagerImpl$AppendBatcher.flush(LogManagerImpl.java:469)
at com.alipay.sofa.jraft.storage.impl.LogManagerImpl$StableClosureEventHandler.onEvent(LogManagerImpl.java:565)
at com.alipay.sofa.jraft.storage.impl.LogManagerImpl$StableClosureEventHandler.onEvent(LogManagerImpl.java:496)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:137)
at java.lang.Thread.run(Thread.java:745)
闭包回调触发之后发送rpc响应(返回rpc响应)的栈:
[arthas@40915]$ stack com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor sendSequenceResponse // 这个发送响应是不同于其他rpc的 是有顺序的 通过优先级队列 PriorityQueue<SequenceMessage>实现
Press Q or Ctrl+C to abort.
Affect(class count: 1 , method count: 1) cost in 56 ms, listenerId: 3
ts=2022-01-23 11:13:46;thread_name=JRaft-LogManager-Disruptor-0;id=13;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor.sendSequenceResponse()
at com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor$SequenceRpcRequestClosure.sendResponse(AppendEntriesRequestProcessor.java:124)
at com.alipay.sofa.jraft.core.NodeImpl$FollowerStableClosure.run(NodeImpl.java:1872)
at com.alipay.sofa.jraft.storage.impl.LogManagerImpl$AppendBatcher.flush(LogManagerImpl.java:469)
at com.alipay.sofa.jraft.storage.impl.LogManagerImpl$StableClosureEventHandler.onEvent(LogManagerImpl.java:565)
at com.alipay.sofa.jraft.storage.impl.LogManagerImpl$StableClosureEventHandler.onEvent(LogManagerImpl.java:496)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:137)
at java.lang.Thread.run(Thread.java:745)
RPCContext 往外发送副本复制后的响应(这个不同于心跳包的响应发送,可以看下面的栈):
stack io.netty.channel.AbstractChannelHandlerContext write
2022-01-23 11:54:49 [JRaft-LogManager-Disruptor-0] INFO result -ts=2022-01-23 11:54:49;thread_name=JRaft-LogManager-Disruptor-0;id=13;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@io.netty.channel.AbstractChannelHandlerContext.write()
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:757)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:812)
at com.alipay.remoting.RemotingContext.writeAndFlush(RemotingContext.java:112)
at com.alipay.remoting.rpc.protocol.RpcRequestProcessor.sendResponseIfNecessary(RpcRequestProcessor.java:183)
at com.alipay.remoting.rpc.protocol.RpcAsyncContext.sendResponse(RpcAsyncContext.java:62)
at com.alipay.sofa.jraft.rpc.impl.BoltRpcServer$2$1.sendResponse(BoltRpcServer.java:105)
at com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor$SequenceMessage.sendResponse(AppendEntriesRequestProcessor.java:150)
at com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor.sendSequenceResponse(AppendEntriesRequestProcessor.java:334) // **在JRaft-LogManager-Disruptor-0 直接发送 没有异步**
at com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor$SequenceRpcRequestClosure.sendResponse(AppendEntriesRequestProcessor.java:124)
at com.alipay.sofa.jraft.core.NodeImpl$FollowerStableClosure.run(NodeImpl.java:1872)
at com.alipay.sofa.jraft.storage.impl.LogManagerImpl$AppendBatcher.flush(LogManagerImpl.java:469)
at com.alipay.sofa.jraft.storage.impl.LogManagerImpl$StableClosureEventHandler.onEvent(LogManagerImpl.java:565)
at com.alipay.sofa.jraft.storage.impl.LogManagerImpl$StableClosureEventHandler.onEvent(LogManagerImpl.java:496)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:137)
at java.lang.Thread.run(Thread.java:745)
心跳包的响应发送栈(与上面的副本复制请求的返回响应的场景不同,所在线程,发起方都不同):
2022-01-23 11:54:49 [election_test/PeerPair[127.0.0.1:8082 -> 127.0.0.1:8081]-AppendEntriesThread0] INFO result -ts=2022-01-23 11:54:49;thread_name=election_test/PeerPair[127.0.0.1:8082 -> 127.0.0.1:8081]-AppendEntriesThread0;id=20;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2
@io.netty.channel.AbstractChannelHandlerContext.write()
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:757)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:812)
at com.alipay.remoting.RemotingContext.writeAndFlush(RemotingContext.java:112)
at com.alipay.remoting.rpc.protocol.RpcRequestProcessor.sendResponseIfNecessary(RpcRequestProcessor.java:183)
at com.alipay.remoting.rpc.protocol.RpcAsyncContext.sendResponse(RpcAsyncContext.java:62)
at com.alipay.sofa.jraft.rpc.impl.BoltRpcServer$2$1.sendResponse(BoltRpcServer.java:105)
at com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor.processRequest0(AppendEntriesRequestProcessor.java:464)
at com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor.processRequest0(AppendEntriesRequestProcessor.java:53)
at com.alipay.sofa.jraft.rpc.impl.core.NodeRequestProcessor.processRequest(NodeRequestProcessor.java:60)
at com.alipay.sofa.jraft.rpc.RpcRequestProcessor.handleRequest(RpcRequestProcessor.java:53)
at com.alipay.sofa.jraft.rpc.RpcRequestProcessor.handleRequest(RpcRequestProcessor.java:35)
at com.alipay.sofa.jraft.rpc.impl.BoltRpcServer$2.handleRequest(BoltRpcServer.java:123)
at com.alipay.remoting.rpc.protocol.RpcRequestProcessor.dispatchToUserProcessor(RpcRequestProcessor.java:234)
at com.alipay.remoting.rpc.protocol.RpcRequestProcessor.doProcess(RpcRequestProcessor.java:145)
at com.alipay.remoting.rpc.protocol.RpcRequestProcessor$ProcessTask.run(RpcRequestProcessor.java:384) // 通过ProcessTask(他是一个Runnable的对象) 异步执行
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.runTask(MpscSingleThreadExecutor.java:352)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor$Worker.run(MpscSingleThreadExecutor.java:336)
at com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor.lambda$doStartWorker$3(MpscSingleThreadExecutor.java:263)
at java.lang.Thread.run(Thread.java:745)
public Message processRequest0(final RaftServerService service, final AppendEntriesRequest request,
final RpcRequestClosure done) {
final Node node = (Node) service;
if (node.getRaftOptions().isReplicatorPipeline()) {
final String groupId = request.getGroupId();
final PeerPair pair = pairOf(request.getPeerId(), request.getServerId());
boolean isHeartbeat = isHeartbeatRequest(request);
int reqSequence = -1;
if (!isHeartbeat) {
reqSequence = getAndIncrementSequence(groupId, pair, done.getRpcCtx().getConnection());
}
final Message response = service.handleAppendEntriesRequest(request, new SequenceRpcRequestClosure(done,
defaultResp(), groupId, pair, reqSequence, isHeartbeat));
if (response != null) {
if (isHeartbeat) {
done.getRpcCtx().sendResponse(response); // 副本心跳包的响应发回去是走这里 因为service.handleAppendEntriesRequest此时是有返回的 此时线程是election_test/PeerPair[127.0.0.1:8082 -> 127.0.0.1:8081]-AppendEntriesThread0 然后就在此线程触发响应返回,所以你看心跳的响应返回调用栈是在这个线程上。
// 然而复制副本的 响应返回不是在这里发出去 复制副本时service.handleAppendEntriesRequest返回是null,他的响应返回是靠service.handleAppendEntriesRequest里面的闭包FollowerStableClosure回调发出去的,所以他的线程是JRaft-LogManager-Disruptor-0
} else {
sendSequenceResponse(groupId, pair, reqSequence, done.getRpcCtx(), response);
}
}
return null;
} else {
return service.handleAppendEntriesRequest(request, done);
}
}
顺序发送响应的代码片段:
// com.alipay.sofa.jraft.rpc.impl.core.AppendEntriesRequestProcessor.sendSequenceResponse(String, PeerPair, int, RpcContext, Message)
// 这个发送响应是不同于其他rpc的, 是有顺序的 通过优先级队列 PriorityQueue<SequenceMessage>实现
void sendSequenceResponse(final String groupId, final PeerPair pair, final int seq, final RpcContext rpcCtx,
final Message msg) {
final PeerRequestContext ctx = getPeerRequestContext(groupId, pair);
if (ctx == null) {
// the context was destroyed, so the response can be ignored.
return;
}
final PriorityQueue<SequenceMessage> respQueue = ctx.responseQueue;
assert (respQueue != null);
synchronized (Utils.withLockObject(respQueue)) {
respQueue.add(new SequenceMessage(rpcCtx, msg, seq));// 维护到优先级队列中,保持有序
if (!ctx.hasTooManyPendingResponses()) {
while (!respQueue.isEmpty()) {
final SequenceMessage queuedPipelinedResponse = respQueue.peek();
if (queuedPipelinedResponse.sequence != ctx.getNextRequiredSequence()) { // 一直等到期望的那个seq的响应来,再往外发,这样能保证leader端的顺序
// sequence mismatch, waiting for next response.
break;
}
respQueue.remove();
try {
queuedPipelinedResponse.sendResponse();
} finally {
ctx.getAndIncrementNextRequiredSequence();
}
}
} else {
final Connection connection = rpcCtx.getConnection();
LOG.warn("Closed connection to peer {}/{}, because of too many pending responses, queued={}, max={}",
ctx.groupId, pair, respQueue.size(), ctx.maxPendingResponses);
connection.close();
// Close the connection if there are too many pending responses in queue.
removePeerRequestContext(groupId, pair);
}
}
}
单个follower发送通道只有一个
单个follower只与leader建立一个网络通道
io.netty.channel.epoll.AbstractEpollChannel.doConnect0(SocketAddress). netty创建连接
com.alipay.remoting.DefaultConnectionManager.doCreate(Url, ConnectionPool, String, int) bolt创建连接
com.alipay.remoting.connection.AbstractConnectionFactory.doCreateConnection(String, int, int) bolt创建连接
一个独立的url建几个连接,默认是1.
com.alipay.remoting.Url.connNum com.alipay.remoting.config.Configs.DEFAULT_CONN_NUM_PER_URL
io线程跟业务线程在同一个线程
com.alipay.remoting.rpc.protocol.RpcCommandHandler.handle(RemotingContext, Object)
com.alipay.remoting.rpc.RpcConfigManager.dispatch_msg_list_in_default_executor()
com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory.ensurePipeline()
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!