消费者组机制实现

消费分组如何完成

通过分组相关的rpc请求完成。以一个3分区的topic为例子,指定消费组消费后梳理请求日志如下:

先加入第一个消费者后节点0的请求处理日志:

apiKey=FIND_COORDINATOR // coordinator_key=ss-t01-group,coordinator_type=0

apiKey=JOIN_GROUP // member_id=,

apiKey=SYNC_GROUP // generation_id=1,member_id=consumer-1-6d037808-5823-4c95-a61a-0f881082d35e // 第一个消费者member_id

再加入第二个消费者后节点0的请求处理日志:

apiKey=JOIN_GROUP // member_id=,

apiKey=JOIN_GROUP // member_id=consumer-1-6d037808-5823-4c95-a61a-0f881082d35e // 第二个消费者member_id

apiKey=SYNC_GROUP // generation_id=2,member_id=consumer-1-6d037808-5823-4c95-a61a-0f881082d35e

apiKey=SYNC_GROUP // generation_id=2,member_id=consumer-1-97b9c3f6-c5b7-4eb4-976a-9c178e2fe10f

先加入第一个消费者后节点2的请求处理日志:

无分组相关日志

再加入第二个消费者后节点2的请求处理日志:

apiKey=FIND_COORDINATOR

相关处理代码均在GroupCoordinator.scala中。

如何探测消费者下线

消费者会向组协调者发apiKey=HEARTBEAT的心跳请求。

组协调者每次处理心跳请求时会根据消费者的session.timeout.ms配置来计算和更新下一个心跳应该到来的时间点。 如果超过session.timeout.ms还没收到下一次心跳则认为消费者下线。 session.timeout.ms默认值10s。

消费者以每隔heartbeat.interval.ms的时间向组协调者发送心跳请求,默认为3s。

组协调

组状态梳理

stateDiagram-v2
    [*]-->Empty
    Empty --> PreparingRebalance  : E-P
    PreparingRebalance --> CompletingRebalance : P-C
    CompletingRebalance --> Stable : C-S
    Stable --> Dead : S-D
    
    PreparingRebalance --> Empty : P-S
    PreparingRebalance --> Dead : P-D
    
    CompletingRebalance --> Stable : C-S
    CompletingRebalance --> PreparingRebalance : C-P
    CompletingRebalance --> Dead : C-D
    
    Stable --> PreparingRebalance : S-P
    Stable --> Dead : S-D
    
     Empty --> Dead  : E-D

靠GroupMetadata.transitionTo方法完成状态转换。

正常来说:一个消费者加入消费组,先发查找组协调者的请求。然后给组协调者发加入组的请求,此步完成后消费组处于CompletingRebalance状态,当然这期间会经过一个PreparingRebalance状态。然后给组协调者发同步组请求,此步骤完成后消费组才能处于Stable状态。

  • 准备组均衡时,比如组成员的加入、更新或者离开等,组状态转换为PreparingRebalance。主要是内存操作,创建DelayedJoin操作,delay时间为reblance的超时时间,这样正好跟后面的complete状态能对上。
  • 初始化为下一年代组时(initNextGeneration)且组成员存在时,组状态转换为CompletingRebalance。在kafka.coordinator.group.GroupMetadataManager.storeGroup中完成组信息写副本log持久化。
    • 成员加入完成时,触发initNextGeneration操作。DelayedJoin.onComplete()。
  • 初始化为下一年代组时(initNextGeneration)且组成员没有时,组状态转换为Empty。
  • 组同步操作完成时且当前状态是CompletingRebalance,组状态转换为Stable。