副本复制异常场景及样例case分析

crash时机列表(下面简称C)

前提都是leader节点写盘成功

  1. L(leader)还没给R(replica)返回第一个请求 // 参见 关键环节与注入点整理要点2
  2. R没有收到第一个请求的响应体(或者是收到了但是还没来得及处理) 1 2 是不是可以合并掉?(可以合并)有个问题要考虑消息落盘后 到 给R回复第一次请求的响应这个时间段内leader做了哪些逻辑,且 处理生产者produce请求和副本fetch请求不在一个线程上,他们之间是如何交互的? // 此时副本fetch响应中的highWaterMark 等于该分区当前最大offset 是第一次请求的响应 参见 关键环节与注入点整理 要点4
  3. R还没来得及发出第二个请求 // 此时副本fetch请求中的fetchOffset与该分区当前最大offset+1相同 参见 关键环节与注入点整理 要点3
  4. L还没来得及处理第二个请求 // 参见 关键环节与注入点整理 要点1
  5. L处理了第二个请求,R没收到或者收到了没来得及处理完 // 此时副本fetch响应中的highWaterMark 等于该分区当前最大offset+1 是第一次请求的响应 参见 关键环节与注入点整理 要点4

该分区当前最大offset(当前最大offset是指新发的消息还没加上去的offset 下同)

节点状态列表(下面简称S)

N0表示node0 N2表示node2 ==> 表示状态演变 NG表示该节点被kill-9了,OK表示该节点恢复正常了

  1. N0(OK L) N2(OK R) ==> N0(KILL -9) ==> N0(start) ==> N0(OK R) N2(OK L)
  2. NG R NG L

关键环节与注入点整理

1. leader接受请求准备处理

leader节点,观察其处理fetch请求 fetchOffset值是该分区当前offset加1,这种方式只能抓到第二次请求开始和结束时 :

watch -f kafka.server.KafkaApis handleFetchRequest 'params[0].bodyAndSize.request.fetchData.get(new org.apache.kafka.common.TopicPartition("simon.test02.p1r2", 0)).fetchOffset' 'params[0].bodyAndSize.request.fetchData.get(new org.apache.kafka.common.TopicPartition("simon.test02.p1r2", 0))!=null' -n 10000

第一次的请求没有必要抓,因为一定实在produce了消息之后才会需要追踪,一般是需要注入produce完成后,和准备改副本节点发响应前。

2. leader处理完了请求准备返回响应

leader节点 观察其处理fetch请求 后发送响应的部分,其实这是每个请求都走着,highWatermark值是第一次时等于该分区当前最大offset,第二次时是该分区当前最大offset加1

watch -f kafka.server.KafkaApis sendResponseExemptThrottle 'params[1].responseData.get(new org.apache.kafka.common.TopicPartition("simon.test02.p1r2", 0)).highWatermark' 'params[1].responseData!=null&&params[1].responseData.get(new org.apache.kafka.common.TopicPartition("simon.test02.p1r2", 0))!=null'

3. 副本replica准备发送请求

拦截副本fetch请求 观察fetchOffset 这是副本发出请求

watch -b kafka.server.ReplicaFetcherThread fetch 'params[0].sessionParts.get(new org.apache.kafka.common.TopicPartition("simon.test02.p1r2", 0)).fetchOffset' -n 100000

4. 副本replica接到响应

拦截副本fetch响应

watch -f  kafka.server.ReplicaFetcherThread fetch 'returnObj.array[0]._2.highWaterMark' 'returnObj.size>0' -n 100000

_2是返回的水位相关信息 _1 是TopicPartition 当highWaterMark 等于该分区当前最大offset 是第一次请求的响应

当highWaterMark 等于该分区当前最大offset+1(即 发过一条消息后加1的offset) 是第二次请求的响应

样例case1:构造首次响应在leader端不返回-C1

按整个测试步骤来叙述这个过程以及过程中发生的事情

  • 启动zk

  • 启动kafka 3个broker,并确认topic的leader在node0,replica在node2(仅约定,为了后续行文方便)

    bin/kafka-topics.sh --describe --topic simon.test02.p1r2 --zookeeper 127.0.0.1:2182

    simon.test02.p1r2是我们测试topic,下同, 对应参数是–partitions 1 –replication-factor 2 。

  • 确认node0 node2 pid

    jps -v|grep 8050
    jps -v|grep 8052

    8050 8052 是我加在kafka进程上的远程调试端口,所以我用此办法过滤出对应进程。

    本次实验,45748是我们node0 pid,48122是我们node2 pid。

  • 启动console消费者

    bin/kafka-console-consumer.sh --topic simon.test02.p1r2 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9094

    9092 9094分别是node0和node2的端口。

  • arthas挂到node0

    java -jar arthas-boot.jar 45748 --telnet-port 4000 --http-port 9000 --target-ip 0.0.0.0

    45748是node0 pid,其余选项按需选择即可。

    观察(含注入挂起)两个地方

    classloader -c 18b4aac2 --load ArthasSide
    # ArthasSide 是我自制的一个注入挂起的工具类,需要提前手动load
    
    ## 注入1
    watch -b kafka.server.KafkaApis sendResponseExemptThrottle 'params[1].responseData.get(new org.apache.kafka.common.TopicPartition("simon.test02.p1r2", 0)).highWatermark' 'params[1].responseData!=null&&params[1].responseData.get(new org.apache.kafka.common.TopicPartition("simon.test02.p1r2", 0))!=null&&@ArthasSide@hang("c1s1", 45000)'
     # 此处一定得是-b才行哈   否则 如果是-f的话  响应其实已经回给replica了哈  这个问题可以结合副本节点 watch -f kafka.cluster.Partition appendRecordsToFollowerOrFutureReplica {params,returnObj,throwExp} -x 2 看就很清楚了  
    
    # 用浏览器再启动一个arthas客户端
    ## 注入2
    watch -b kafka.server.KafkaApis handleProduceRequest {params,returnObj,throwExp} -x 3
    # 此处也用-b  因为对于leader端不一定有返回,因为挂起leader给副本回消息了

    注意此处一定得是-b才行哈 否则 如果是-f的话 响应其实已经回给replica了哈…这个问题“坑了自己”好一会..

  • arthas挂到node2

    java -jar arthas-boot.jar 48122 --telnet-port 4002 --http-port 9002 --target-ip 0.0.0.0

    48122是node2 pid,其余选项按需选择即可。

    观察(含注入挂起)两个地方

     ## 注入3
     watch -f kafka.cluster.Partition appendRecordsToFollowerOrFutureReplica {params,returnObj,throwExp} -x 2
     # 这个是观察副本节点有没有从leader节点同步到消息并append到他本地的,如果此处没有执行,表示消息没能同步过来
     
     # 用浏览器再启动一个arthas客户端
     ## 注入4
    watch -f kafka.server.KafkaApis handleProduceRequest {params,returnObj,throwExp} -x 3
    # 此处也用-f 观察副本的这个地方可以发现,在leader切换时,producer会向切换后的节点(即之前的副本节点)发消息
  • 发送消息,注意相关选项

    ./01.c1s1.sh hello730 -1 1  
    # bin/kafka-console-producer.sh --topic simon.test02.p1r2 --broker-list 127.0.0.1:9092,127.0.0.1:9094 --request-timeout-ms 6000000 --request-required-acks -1 --message-send-max-retries 1

    我用的脚本是自己包了下的,具体选项如上面注释中的,主要是超时、acks、重试次数等设置。

  • kill node0

    kill -9 45748

    此处要kill -9,否则leader节点受到kill信息号后,arthas率先退出,leader节点就有可能将响应返回给副本节点了。就达不到测试的效果了。(不给的话 默认是15 SIGTERM 终止进程, 9是强制终止 SIGKILL)

  • 观察相应数据

  1. node2(一开始的副本节点)节点确实不能同步到消息。(观察注入3)

  2. node0(一开始的leader节点)收到 了produce请求,写本地了,但是没给副本同步,首次请求的响应被挂起了。(观察注入1和2)

    1. node2(一开始的副本节点)在node0被kill之后切换成了新的leader节点,在切换之后收到了produce请求(发送消息那个步骤在kill节点等过程中一直是hang住的)。(观察注入4)
  3. 在步骤3之后,消费者继续能消费到消息。(说明ack为-1时,只有一个leader时也能正常发收消息,因为kill掉的节点已经踢出isr了)。

  • 启动node0,并观察数据截断

      /**
       * The offset of the next message that will be appended to the log
       */
      def logEndOffset: Long = nextOffsetMetadata.messageOffset  // 此时 524
    
    // truncateTo逻辑
    kafka.log.Log.truncateTo(targetOffset: Long)
    
    1500  targetOffset // 518
    
    
    // 截断调用栈
    
    // kafka.server.AbstractFetcherThread.partitionStates 更新是怎样的逻辑 ?
    // 1. 一开始
    Daemon Thread [kafka-request-handler-3] (Suspended (breakpoint at line 269 in AbstractFetcherThread))	
    	owns: Object  (id=151)	
    	owns: Object  (id=152)	
    	ReplicaFetcherThread(AbstractFetcherThread).addPartitions(Map<TopicPartition,Object>) line: 269	 
    // newPartitionToState   Map(simon.test01.p1r2-0 -> offset:84-isReadyForFetch:false-isTruncatingLog:true, simon.test02.p1r2-0 -> offset:542-isReadyForFetch:false-isTruncatingLog:true)
    	AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(Tuple2<BrokerAndFetcherId,Map<TopicPartition,BrokerAndInitialOffset>>) line: 138	
    	AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(Object) line: 126
    	TraversableLike$WithFilter$$anonfun$foreach$1.apply(A) line: 733	
    	Map$Map1<A,B>.foreach(Function1<Tuple2<A,B>,U>) line: 116	
    	TraversableLike$WithFilter.foreach(Function1<A,U>) line: 732	
    	ReplicaFetcherManager(AbstractFetcherManager).addFetcherForPartitions(Map<TopicPartition,BrokerAndInitialOffset>) line: 126	
    // partitionsPerFetcher:   Map(BrokerAndFetcherId(BrokerEndPoint(2,xxx-ThinkCentre-M920t-N000,9094),0) -> Map(simon.test01.p1r2-0 -> BrokerAndInitialOffset(BrokerEndPoint(2,xxx-ThinkCentre-M920t-N000,9094),84), simon.test02.p1r2-0 -> BrokerAndInitialOffset(BrokerEndPoint(2,xxx-ThinkCentre-M920t-N000,9094),542)))
    	ReplicaManager.makeFollowers(int, int, Map<Partition,PartitionState>, int, Map<TopicPartition,Errors>) line: 1304	
    /**
    **        // we do not need to check if the leader exists again since this has been done at the beginning of this process
            val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
              partition.topicPartition -> BrokerAndInitialOffset(
                metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.brokerEndPoint(config.interBrokerListenerName),
                partition.getReplica().get.highWatermark.messageOffset)).toMap
            replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
    */
    //  partitionsToMakeFollowerWithLeaderAndOffset:   Map(simon.test01.p1r2-0 -> BrokerAndInitialOffset(BrokerEndPoint(2,xxx-ThinkCentre-M920t-N000,9094),84), simon.test02.p1r2-0 -> BrokerAndInitialOffset(BrokerEndPoint(2,xxx-ThinkCentre-M920t-N000,9094),542))  这个542 就是后面trunct的targetOffset就来自于这个地方      partition.getReplica().get.highWatermark.messageOffset  分区在本地副本的高水位  
    
    	ReplicaManager.becomeLeaderOrFollower(int, LeaderAndIsrRequest, Function2<Iterable<Partition>,Iterable<Partition>,BoxedUnit>) line: 1082	
    	KafkaApis.handleLeaderAndIsrRequest(RequestChannel$Request) line: 183	
    	KafkaApis.handle(RequestChannel$Request) line: 108	
    	KafkaRequestHandler.run() line: 69	
    	KafkaThread(Thread).run() line: 748	
    
    
    
    //   kafka.server.ReplicaManager.becomeLeaderOrFollower(correlationId: Int, leaderAndIsrRequest: LeaderAndIsrRequest, onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit) 需要调试细读  
    
    
    
    //  从leader fetch  epoch 的栈
    Thread [ReplicaFetcherThread-0-2] (Suspended (breakpoint at line 133 in AbstractFetcherThread$$anonfun$maybeTruncate$1))
    /**
    **
      def maybeTruncate(): Unit = {
        val ResultWithPartitions(epochRequests, partitionsWithError) = inLock(partitionMapLock) { buildLeaderEpochRequest(states) }
        handlePartitionsWithErrors(partitionsWithError)
    
        if (epochRequests.nonEmpty) {
          val fetchedEpochs = fetchEpochsFromLeader(epochRequests)  // 从leader查,fetchedEpochs:  Map(simon.test01.p1r2-0 -> EpochEndOffset{error=NONE, endOffset=-1}, simon.test02.p1r2-0 -> EpochEndOffset{error=NONE, endOffset=542})
          //Ensure we hold a lock during truncation.
          inLock(partitionMapLock) {
            //Check no leadership changes happened whilst we were unlocked, fetching epochs
            val leaderEpochs = fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp) }
            val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncate(leaderEpochs)
            handlePartitionsWithErrors(partitionsWithError)
            markTruncationCompleteAndUpdateFetchOffset(fetchOffsets)
          }
        }
      }
    
    */
    	AbstractFetcherThread$$anonfun$maybeTruncate$1.apply$mcV$sp() line: 133	
    	AbstractFetcherThread$$anonfun$maybeTruncate$1.apply() line: 130	
    	AbstractFetcherThread$$anonfun$maybeTruncate$1.apply() line: 130	
    	CoreUtils$.inLock(Lock, Function0<T>) line: 250	
    	ReplicaFetcherThread(AbstractFetcherThread).maybeTruncate() line: 130	
    	ReplicaFetcherThread(AbstractFetcherThread).doWork() line: 100	
    	ReplicaFetcherThread(ShutdownableThread).run() line: 82	
    
    
    // 开始truncate的线程栈
    Thread [ReplicaFetcherThread-0-2] (Suspended)	
    	owns: Object  (id=400)	
    	Log$$anonfun$truncateTo$1.apply$mcZ$sp() line: 1510	  // targetOffset :  542 ;   logEndOffset:548
    	Log$$anonfun$truncateTo$1.apply() line: 1497	
    	Log$$anonfun$truncateTo$1.apply() line: 1497	
    	Log.maybeHandleIOException(Function0<String>, Function0<T>) line: 1696	
    	Log.truncateTo(long) line: 1497	
    	LogManager$$anonfun$truncateTo$2.apply(Tuple2<TopicPartition,Object>) line: 508	
    	LogManager$$anonfun$truncateTo$2.apply(Object) line: 494	
    	TraversableLike$WithFilter$$anonfun$foreach$1.apply(A) line: 733	
    	Map$Map1<A,B>.foreach(Function1<Tuple2<A,B>,U>) line: 116	
    	TraversableLike$WithFilter.foreach(Function1<A,U>) line: 732	
    	LogManager.truncateTo(Map<TopicPartition,Object>, boolean) line: 494	
    	Partition$$anonfun$truncateTo$1.apply$mcV$sp() line: 665	
    	Partition$$anonfun$truncateTo$1.apply() line: 665	
    	Partition$$anonfun$truncateTo$1.apply() line: 665	
    	CoreUtils$.inLock(Lock, Function0<T>) line: 250	
    	CoreUtils$.inReadLock(ReadWriteLock, Function0<T>) line: 256	
    	Partition.truncateTo(long, boolean) line: 664	
    	ReplicaFetcherThread$$anonfun$maybeTruncate$1.apply(Tuple2<TopicPartition,EpochEndOffset>) line: 320	
    	ReplicaFetcherThread$$anonfun$maybeTruncate$1.apply(Object) line: 301	
    	Iterator$class.foreach(Iterator, Function1) line: 891	
    	Wrappers$JMapWrapperLike$$anon$2(AbstractIterator<A>).foreach(Function1<A,U>) line: 1334	
    	IterableLike$class.foreach(IterableLike, Function1) line: 72	
    	Wrappers$JMapWrapper<A,B>(AbstractIterable<A>).foreach(Function1<A,U>) line: 54	
    	ReplicaFetcherThread.maybeTruncate(Map<TopicPartition,EpochEndOffset>) line: 301	
    	AbstractFetcherThread$$anonfun$maybeTruncate$1.apply$mcV$sp() line: 133	
    	AbstractFetcherThread$$anonfun$maybeTruncate$1.apply() line: 130	
    	AbstractFetcherThread$$anonfun$maybeTruncate$1.apply() line: 130	
    	CoreUtils$.inLock(Lock, Function0<T>) line: 250	
    	ReplicaFetcherThread(AbstractFetcherThread).maybeTruncate() line: 130	
    	ReplicaFetcherThread(AbstractFetcherThread).doWork() line: 100	
    	ReplicaFetcherThread(ShutdownableThread).run() line: 82	
    
    
    // truncate 之后 更新state
    Thread [ReplicaFetcherThread-0-2] (Suspended (breakpoint at line 288 in AbstractFetcherThread))	
    	ReplicaFetcherThread(AbstractFetcherThread).kafka$server$AbstractFetcherThread$$markTruncationCompleteAndUpdateFetchOffset(Map<TopicPartition,Object>) line: 288	
    	AbstractFetcherThread$$anonfun$maybeTruncate$1.apply$mcV$sp() line: 135	
    	AbstractFetcherThread$$anonfun$maybeTruncate$1.apply() line: 130	
    	AbstractFetcherThread$$anonfun$maybeTruncate$1.apply() line: 130	
    	CoreUtils$.inLock(Lock, Function0<T>) line: 250	
    	ReplicaFetcherThread(AbstractFetcherThread).maybeTruncate() line: 130	
    	ReplicaFetcherThread(AbstractFetcherThread).doWork() line: 100	
    	ReplicaFetcherThread(ShutdownableThread).run() line: 82

节点状态列表(下面简称S)

上述实验的节点状态变化,总结如下:

N0表示node0 N2表示node2 ==> 表示状态演变 NG表示该节点被kill-9了,OK表示该节点恢复正常了

N0(OK L) N2(OK R) ==> N0(KILL -9) ==> N0(start) ==> N0(OK R) N2(OK L)

注意:

如果要再次执行整个case, 此时 node起来后是副本,那么要先kill node2,此时node0切换成leader,再启动node2.然后再继续按步骤实验。

如果不是按这个来或有问题,比如直接kill node2,然后一次启动node0(此时leader是-1,即没有leader),再启动node2(此时leader在node2上),这样并不能达到我们的预期,leader在node0上。

leader -1的这个状态也说明了,某些情况下,有些个broker节点挂了,如果不手动干预的话,可能会出现选不了leader的情况。