副本复制异常场景及样例case分析
crash时机列表(下面简称C)
前提都是leader节点写盘成功
- L(leader)还没给R(replica)返回第一个请求 // 参见 关键环节与注入点整理要点2
- R没有收到第一个请求的响应体(或者是收到了但是还没来得及处理) 1 2 是不是可以合并掉?(可以合并)有个问题要考虑消息落盘后 到 给R回复第一次请求的响应这个时间段内leader做了哪些逻辑,且 处理生产者produce请求和副本fetch请求不在一个线程上,他们之间是如何交互的? // 此时副本fetch响应中的highWaterMark 等于该分区当前最大offset 是第一次请求的响应 参见 关键环节与注入点整理 要点4
- R还没来得及发出第二个请求 // 此时副本fetch请求中的fetchOffset与该分区当前最大offset+1相同 参见 关键环节与注入点整理 要点3
- L还没来得及处理第二个请求 // 参见 关键环节与注入点整理 要点1
- L处理了第二个请求,R没收到或者收到了没来得及处理完 // 此时副本fetch响应中的highWaterMark 等于该分区当前最大offset+1 是第一次请求的响应 参见 关键环节与注入点整理 要点4
该分区当前最大offset(当前最大offset是指新发的消息还没加上去的offset 下同)
节点状态列表(下面简称S)
N0表示node0 N2表示node2 ==> 表示状态演变 NG表示该节点被kill-9了,OK表示该节点恢复正常了
- N0(OK L) N2(OK R) ==> N0(KILL -9) ==> N0(start) ==> N0(OK R) N2(OK L)
- NG R NG L
关键环节与注入点整理
1. leader接受请求准备处理
leader节点,观察其处理fetch请求 fetchOffset值是该分区当前offset加1,这种方式只能抓到第二次请求开始和结束时 :
watch -f kafka.server.KafkaApis handleFetchRequest 'params[0].bodyAndSize.request.fetchData.get(new org.apache.kafka.common.TopicPartition("simon.test02.p1r2", 0)).fetchOffset' 'params[0].bodyAndSize.request.fetchData.get(new org.apache.kafka.common.TopicPartition("simon.test02.p1r2", 0))!=null' -n 10000
第一次的请求没有必要抓,因为一定实在produce了消息之后才会需要追踪,一般是需要注入produce完成后,和准备改副本节点发响应前。
2. leader处理完了请求准备返回响应
leader节点 观察其处理fetch请求 后发送响应的部分,其实这是每个请求都走着,highWatermark值是第一次时等于该分区当前最大offset,第二次时是该分区当前最大offset加1
watch -f kafka.server.KafkaApis sendResponseExemptThrottle 'params[1].responseData.get(new org.apache.kafka.common.TopicPartition("simon.test02.p1r2", 0)).highWatermark' 'params[1].responseData!=null&¶ms[1].responseData.get(new org.apache.kafka.common.TopicPartition("simon.test02.p1r2", 0))!=null'
3. 副本replica准备发送请求
拦截副本fetch请求 观察fetchOffset 这是副本发出请求
watch -b kafka.server.ReplicaFetcherThread fetch 'params[0].sessionParts.get(new org.apache.kafka.common.TopicPartition("simon.test02.p1r2", 0)).fetchOffset' -n 100000
4. 副本replica接到响应
拦截副本fetch响应
watch -f kafka.server.ReplicaFetcherThread fetch 'returnObj.array[0]._2.highWaterMark' 'returnObj.size>0' -n 100000
_2是返回的水位相关信息 _1 是TopicPartition 当highWaterMark 等于该分区当前最大offset 是第一次请求的响应
当highWaterMark 等于该分区当前最大offset+1(即 发过一条消息后加1的offset) 是第二次请求的响应
样例case1:构造首次响应在leader端不返回-C1
按整个测试步骤来叙述这个过程以及过程中发生的事情
启动zk
启动kafka 3个broker,并确认topic的leader在node0,replica在node2(仅约定,为了后续行文方便)
bin/kafka-topics.sh --describe --topic simon.test02.p1r2 --zookeeper 127.0.0.1:2182
simon.test02.p1r2
是我们测试topic,下同, 对应参数是–partitions 1 –replication-factor 2 。确认node0 node2 pid
jps -v|grep 8050 jps -v|grep 8052
8050 8052 是我加在kafka进程上的远程调试端口,所以我用此办法过滤出对应进程。
本次实验,45748是我们node0 pid,48122是我们node2 pid。
启动console消费者
bin/kafka-console-consumer.sh --topic simon.test02.p1r2 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9094
9092 9094分别是node0和node2的端口。
arthas挂到node0
java -jar arthas-boot.jar 45748 --telnet-port 4000 --http-port 9000 --target-ip 0.0.0.0
45748是node0 pid,其余选项按需选择即可。
观察(含注入挂起)两个地方
classloader -c 18b4aac2 --load ArthasSide # ArthasSide 是我自制的一个注入挂起的工具类,需要提前手动load ## 注入1 watch -b kafka.server.KafkaApis sendResponseExemptThrottle 'params[1].responseData.get(new org.apache.kafka.common.TopicPartition("simon.test02.p1r2", 0)).highWatermark' 'params[1].responseData!=null&¶ms[1].responseData.get(new org.apache.kafka.common.TopicPartition("simon.test02.p1r2", 0))!=null&&@ArthasSide@hang("c1s1", 45000)' # 此处一定得是-b才行哈 否则 如果是-f的话 响应其实已经回给replica了哈 这个问题可以结合副本节点 watch -f kafka.cluster.Partition appendRecordsToFollowerOrFutureReplica {params,returnObj,throwExp} -x 2 看就很清楚了 # 用浏览器再启动一个arthas客户端 ## 注入2 watch -b kafka.server.KafkaApis handleProduceRequest {params,returnObj,throwExp} -x 3 # 此处也用-b 因为对于leader端不一定有返回,因为挂起leader给副本回消息了
注意此处一定得是-b才行哈 否则 如果是-f的话 响应其实已经回给replica了哈…这个问题“坑了自己”好一会..
arthas挂到node2
java -jar arthas-boot.jar 48122 --telnet-port 4002 --http-port 9002 --target-ip 0.0.0.0
48122是node2 pid,其余选项按需选择即可。
观察(含注入挂起)两个地方
## 注入3 watch -f kafka.cluster.Partition appendRecordsToFollowerOrFutureReplica {params,returnObj,throwExp} -x 2 # 这个是观察副本节点有没有从leader节点同步到消息并append到他本地的,如果此处没有执行,表示消息没能同步过来 # 用浏览器再启动一个arthas客户端 ## 注入4 watch -f kafka.server.KafkaApis handleProduceRequest {params,returnObj,throwExp} -x 3 # 此处也用-f 观察副本的这个地方可以发现,在leader切换时,producer会向切换后的节点(即之前的副本节点)发消息
发送消息,注意相关选项
./01.c1s1.sh hello730 -1 1 # bin/kafka-console-producer.sh --topic simon.test02.p1r2 --broker-list 127.0.0.1:9092,127.0.0.1:9094 --request-timeout-ms 6000000 --request-required-acks -1 --message-send-max-retries 1
我用的脚本是自己包了下的,具体选项如上面注释中的,主要是超时、acks、重试次数等设置。
kill node0
kill -9 45748
此处要kill -9,否则leader节点受到kill信息号后,arthas率先退出,leader节点就有可能将响应返回给副本节点了。就达不到测试的效果了。(不给的话 默认是15 SIGTERM 终止进程, 9是强制终止 SIGKILL)
观察相应数据
node2(一开始的副本节点)节点确实不能同步到消息。(观察注入3)
node0(一开始的leader节点)收到 了produce请求,写本地了,但是没给副本同步,首次请求的响应被挂起了。(观察注入1和2)
- node2(一开始的副本节点)在node0被kill之后切换成了新的leader节点,在切换之后收到了produce请求(发送消息那个步骤在kill节点等过程中一直是hang住的)。(观察注入4)
在步骤3之后,消费者继续能消费到消息。(说明ack为-1时,只有一个leader时也能正常发收消息,因为kill掉的节点已经踢出isr了)。
启动node0,并观察数据截断
/** * The offset of the next message that will be appended to the log */ def logEndOffset: Long = nextOffsetMetadata.messageOffset // 此时 524 // truncateTo逻辑 kafka.log.Log.truncateTo(targetOffset: Long) 1500 targetOffset // 518 // 截断调用栈 // kafka.server.AbstractFetcherThread.partitionStates 更新是怎样的逻辑 ? // 1. 一开始 Daemon Thread [kafka-request-handler-3] (Suspended (breakpoint at line 269 in AbstractFetcherThread)) owns: Object (id=151) owns: Object (id=152) ReplicaFetcherThread(AbstractFetcherThread).addPartitions(Map<TopicPartition,Object>) line: 269 // newPartitionToState Map(simon.test01.p1r2-0 -> offset:84-isReadyForFetch:false-isTruncatingLog:true, simon.test02.p1r2-0 -> offset:542-isReadyForFetch:false-isTruncatingLog:true) AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(Tuple2<BrokerAndFetcherId,Map<TopicPartition,BrokerAndInitialOffset>>) line: 138 AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(Object) line: 126 TraversableLike$WithFilter$$anonfun$foreach$1.apply(A) line: 733 Map$Map1<A,B>.foreach(Function1<Tuple2<A,B>,U>) line: 116 TraversableLike$WithFilter.foreach(Function1<A,U>) line: 732 ReplicaFetcherManager(AbstractFetcherManager).addFetcherForPartitions(Map<TopicPartition,BrokerAndInitialOffset>) line: 126 // partitionsPerFetcher: Map(BrokerAndFetcherId(BrokerEndPoint(2,xxx-ThinkCentre-M920t-N000,9094),0) -> Map(simon.test01.p1r2-0 -> BrokerAndInitialOffset(BrokerEndPoint(2,xxx-ThinkCentre-M920t-N000,9094),84), simon.test02.p1r2-0 -> BrokerAndInitialOffset(BrokerEndPoint(2,xxx-ThinkCentre-M920t-N000,9094),542))) ReplicaManager.makeFollowers(int, int, Map<Partition,PartitionState>, int, Map<TopicPartition,Errors>) line: 1304 /** ** // we do not need to check if the leader exists again since this has been done at the beginning of this process val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => partition.topicPartition -> BrokerAndInitialOffset( metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.brokerEndPoint(config.interBrokerListenerName), partition.getReplica().get.highWatermark.messageOffset)).toMap replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) */ // partitionsToMakeFollowerWithLeaderAndOffset: Map(simon.test01.p1r2-0 -> BrokerAndInitialOffset(BrokerEndPoint(2,xxx-ThinkCentre-M920t-N000,9094),84), simon.test02.p1r2-0 -> BrokerAndInitialOffset(BrokerEndPoint(2,xxx-ThinkCentre-M920t-N000,9094),542)) 这个542 就是后面trunct的targetOffset就来自于这个地方 partition.getReplica().get.highWatermark.messageOffset 分区在本地副本的高水位 ReplicaManager.becomeLeaderOrFollower(int, LeaderAndIsrRequest, Function2<Iterable<Partition>,Iterable<Partition>,BoxedUnit>) line: 1082 KafkaApis.handleLeaderAndIsrRequest(RequestChannel$Request) line: 183 KafkaApis.handle(RequestChannel$Request) line: 108 KafkaRequestHandler.run() line: 69 KafkaThread(Thread).run() line: 748 // kafka.server.ReplicaManager.becomeLeaderOrFollower(correlationId: Int, leaderAndIsrRequest: LeaderAndIsrRequest, onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit) 需要调试细读 // 从leader fetch epoch 的栈 Thread [ReplicaFetcherThread-0-2] (Suspended (breakpoint at line 133 in AbstractFetcherThread$$anonfun$maybeTruncate$1)) /** ** def maybeTruncate(): Unit = { val ResultWithPartitions(epochRequests, partitionsWithError) = inLock(partitionMapLock) { buildLeaderEpochRequest(states) } handlePartitionsWithErrors(partitionsWithError) if (epochRequests.nonEmpty) { val fetchedEpochs = fetchEpochsFromLeader(epochRequests) // 从leader查,fetchedEpochs: Map(simon.test01.p1r2-0 -> EpochEndOffset{error=NONE, endOffset=-1}, simon.test02.p1r2-0 -> EpochEndOffset{error=NONE, endOffset=542}) //Ensure we hold a lock during truncation. inLock(partitionMapLock) { //Check no leadership changes happened whilst we were unlocked, fetching epochs val leaderEpochs = fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp) } val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncate(leaderEpochs) handlePartitionsWithErrors(partitionsWithError) markTruncationCompleteAndUpdateFetchOffset(fetchOffsets) } } } */ AbstractFetcherThread$$anonfun$maybeTruncate$1.apply$mcV$sp() line: 133 AbstractFetcherThread$$anonfun$maybeTruncate$1.apply() line: 130 AbstractFetcherThread$$anonfun$maybeTruncate$1.apply() line: 130 CoreUtils$.inLock(Lock, Function0<T>) line: 250 ReplicaFetcherThread(AbstractFetcherThread).maybeTruncate() line: 130 ReplicaFetcherThread(AbstractFetcherThread).doWork() line: 100 ReplicaFetcherThread(ShutdownableThread).run() line: 82 // 开始truncate的线程栈 Thread [ReplicaFetcherThread-0-2] (Suspended) owns: Object (id=400) Log$$anonfun$truncateTo$1.apply$mcZ$sp() line: 1510 // targetOffset : 542 ; logEndOffset:548 Log$$anonfun$truncateTo$1.apply() line: 1497 Log$$anonfun$truncateTo$1.apply() line: 1497 Log.maybeHandleIOException(Function0<String>, Function0<T>) line: 1696 Log.truncateTo(long) line: 1497 LogManager$$anonfun$truncateTo$2.apply(Tuple2<TopicPartition,Object>) line: 508 LogManager$$anonfun$truncateTo$2.apply(Object) line: 494 TraversableLike$WithFilter$$anonfun$foreach$1.apply(A) line: 733 Map$Map1<A,B>.foreach(Function1<Tuple2<A,B>,U>) line: 116 TraversableLike$WithFilter.foreach(Function1<A,U>) line: 732 LogManager.truncateTo(Map<TopicPartition,Object>, boolean) line: 494 Partition$$anonfun$truncateTo$1.apply$mcV$sp() line: 665 Partition$$anonfun$truncateTo$1.apply() line: 665 Partition$$anonfun$truncateTo$1.apply() line: 665 CoreUtils$.inLock(Lock, Function0<T>) line: 250 CoreUtils$.inReadLock(ReadWriteLock, Function0<T>) line: 256 Partition.truncateTo(long, boolean) line: 664 ReplicaFetcherThread$$anonfun$maybeTruncate$1.apply(Tuple2<TopicPartition,EpochEndOffset>) line: 320 ReplicaFetcherThread$$anonfun$maybeTruncate$1.apply(Object) line: 301 Iterator$class.foreach(Iterator, Function1) line: 891 Wrappers$JMapWrapperLike$$anon$2(AbstractIterator<A>).foreach(Function1<A,U>) line: 1334 IterableLike$class.foreach(IterableLike, Function1) line: 72 Wrappers$JMapWrapper<A,B>(AbstractIterable<A>).foreach(Function1<A,U>) line: 54 ReplicaFetcherThread.maybeTruncate(Map<TopicPartition,EpochEndOffset>) line: 301 AbstractFetcherThread$$anonfun$maybeTruncate$1.apply$mcV$sp() line: 133 AbstractFetcherThread$$anonfun$maybeTruncate$1.apply() line: 130 AbstractFetcherThread$$anonfun$maybeTruncate$1.apply() line: 130 CoreUtils$.inLock(Lock, Function0<T>) line: 250 ReplicaFetcherThread(AbstractFetcherThread).maybeTruncate() line: 130 ReplicaFetcherThread(AbstractFetcherThread).doWork() line: 100 ReplicaFetcherThread(ShutdownableThread).run() line: 82 // truncate 之后 更新state Thread [ReplicaFetcherThread-0-2] (Suspended (breakpoint at line 288 in AbstractFetcherThread)) ReplicaFetcherThread(AbstractFetcherThread).kafka$server$AbstractFetcherThread$$markTruncationCompleteAndUpdateFetchOffset(Map<TopicPartition,Object>) line: 288 AbstractFetcherThread$$anonfun$maybeTruncate$1.apply$mcV$sp() line: 135 AbstractFetcherThread$$anonfun$maybeTruncate$1.apply() line: 130 AbstractFetcherThread$$anonfun$maybeTruncate$1.apply() line: 130 CoreUtils$.inLock(Lock, Function0<T>) line: 250 ReplicaFetcherThread(AbstractFetcherThread).maybeTruncate() line: 130 ReplicaFetcherThread(AbstractFetcherThread).doWork() line: 100 ReplicaFetcherThread(ShutdownableThread).run() line: 82
节点状态列表(下面简称S)
上述实验的节点状态变化,总结如下:
N0表示node0 N2表示node2 ==> 表示状态演变 NG表示该节点被kill-9了,OK表示该节点恢复正常了
N0(OK L) N2(OK R) ==> N0(KILL -9) ==> N0(start) ==> N0(OK R) N2(OK L)
注意:
如果要再次执行整个case, 此时 node起来后是副本,那么要先kill node2,此时node0切换成leader,再启动node2.然后再继续按步骤实验。
如果不是按这个来或有问题,比如直接kill node2,然后一次启动node0(此时leader是-1,即没有leader),再启动node2(此时leader在node2上),这样并不能达到我们的预期,leader在node0上。
leader -1的这个状态也说明了,某些情况下,有些个broker节点挂了,如果不手动干预的话,可能会出现选不了leader的情况。
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!