消费者组机制实现
消费分组如何完成
通过分组相关的rpc请求完成。以一个3分区的topic为例子,指定消费组消费后梳理请求日志如下:
先加入第一个消费者后节点0的请求处理日志:
apiKey=FIND_COORDINATOR // coordinator_key=ss-t01-group,coordinator_type=0
apiKey=JOIN_GROUP // member_id=,
apiKey=SYNC_GROUP // generation_id=1,member_id=consumer-1-6d037808-5823-4c95-a61a-0f881082d35e // 第一个消费者member_id
再加入第二个消费者后节点0的请求处理日志:
apiKey=JOIN_GROUP // member_id=,
apiKey=JOIN_GROUP // member_id=consumer-1-6d037808-5823-4c95-a61a-0f881082d35e // 第二个消费者member_id
apiKey=SYNC_GROUP // generation_id=2,member_id=consumer-1-6d037808-5823-4c95-a61a-0f881082d35e
apiKey=SYNC_GROUP // generation_id=2,member_id=consumer-1-97b9c3f6-c5b7-4eb4-976a-9c178e2fe10f
先加入第一个消费者后节点2的请求处理日志:
无分组相关日志
再加入第二个消费者后节点2的请求处理日志:
apiKey=FIND_COORDINATOR
相关处理代码均在GroupCoordinator.scala中。
如何探测消费者下线
消费者会向组协调者发apiKey=HEARTBEAT的心跳请求。
组协调者每次处理心跳请求时会根据消费者的session.timeout.ms
配置来计算和更新下一个心跳应该到来的时间点。 如果超过session.timeout.ms
还没收到下一次心跳则认为消费者下线。 session.timeout.ms
默认值10s。
消费者以每隔heartbeat.interval.ms
的时间向组协调者发送心跳请求,默认为3s。
组协调
组状态梳理
stateDiagram-v2
[*]-->Empty
Empty --> PreparingRebalance : E-P
PreparingRebalance --> CompletingRebalance : P-C
CompletingRebalance --> Stable : C-S
Stable --> Dead : S-D
PreparingRebalance --> Empty : P-S
PreparingRebalance --> Dead : P-D
CompletingRebalance --> Stable : C-S
CompletingRebalance --> PreparingRebalance : C-P
CompletingRebalance --> Dead : C-D
Stable --> PreparingRebalance : S-P
Stable --> Dead : S-D
Empty --> Dead : E-D
靠GroupMetadata.transitionTo方法完成状态转换。
正常来说:一个消费者加入消费组,先发查找组协调者的请求。然后给组协调者发加入组的请求,此步完成后消费组处于CompletingRebalance状态,当然这期间会经过一个PreparingRebalance状态。然后给组协调者发同步组请求,此步骤完成后消费组才能处于Stable状态。
- 准备组均衡时,比如组成员的加入、更新或者离开等,组状态转换为PreparingRebalance。主要是内存操作,创建DelayedJoin操作,delay时间为reblance的超时时间,这样正好跟后面的complete状态能对上。
- 初始化为下一年代组时(initNextGeneration)且组成员存在时,组状态转换为CompletingRebalance。在kafka.coordinator.group.GroupMetadataManager.storeGroup中完成组信息写副本log持久化。
- 成员加入完成时,触发initNextGeneration操作。DelayedJoin.onComplete()。
- 初始化为下一年代组时(initNextGeneration)且组成员没有时,组状态转换为Empty。
- 组同步操作完成时且当前状态是CompletingRebalance,组状态转换为Stable。
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!