Kafka 核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
19216 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 44 讲
结束语 (1讲)
Kafka 核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

33 | GroupCoordinator:在Rebalance中,如何进行组同步?

你好,我是胡夕。今天,我们继续学习消费者组 Rebalance 流程,这节课我们重点学习这个流程的第 2 大步,也就是组同步。
组同步,也就是成员向 Coordinator 发送 SyncGroupRequest 请求,等待 Coordinator 发送分配方案。在 GroupCoordinator 类中,负责处理这个请求的入口方法就是 handleSyncGroup。它进一步调用 doSyncGroup 方法完成组同步的逻辑。后者除了给成员下发分配方案之外,还需要在元数据缓存中注册组消息,以及把组状态变更为 Stable。一旦完成了组同步操作,Rebalance 宣告结束,消费者组开始正常工作。
接下来,我们就来具体学习下组同步流程的实现逻辑。我们先从顶层的入口方法 handleSyncGroup 方法开始学习,该方法被 KafkaApis 类的 handleSyncGroupRequest 方法调用,用于处理消费者组成员发送的 SyncGroupRequest 请求。顺着这个入口方法,我们会不断深入,下沉到具体实现组同步逻辑的私有化方法 doSyncGroup。

handleSyncGroup 方法

我们从 handleSyncGroup 的方法签名开始学习,代码如下:
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入介绍了消费者组Rebalance流程中的组同步步骤,重点关注了GroupCoordinator类中handleSyncGroup方法的实现。该方法通过对消费者组状态及合法性的校验,获取消费者组元数据信息,并执行组同步任务。文章详细解释了handleSyncGroup方法的主体逻辑,并通过流程图展示了其执行过程。此外,文章还介绍了doSyncGroup方法的源码实现,包括对消费者组状态的判断和不同状态下的执行逻辑。在CompletingRebalance状态下,文章详细解释了组同步操作的实现过程,包括为成员设置组同步回调函数、处理Leader成员的SyncGroupRequest请求、保存消费者组信息到消费者组元数据等。总结来说,本文通过深入解析handleSyncGroup和doSyncGroup方法的源码实现,帮助读者深入理解消费者组Rebalance流程中的组同步功能。文章内容丰富,逻辑清晰,对于想深入了解消费者组Rebalance流程的读者来说,是一篇极具价值的技术文章。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(8)

  • 最新
  • 精选
  • 胡夕
    置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,我们重点学习了GroupCoordinator执行Rebalance第一步加入组的代码。课后我请你思考一下maybePrepareRebalance方法满足什么条件才会开启Rebalance?如果你翻开maybePrepareRebalance的代码,可以看到它会调用canRebalance方法执行是否允许Rebalance的判断,而判断的依据很简单,就是看当前消费者组状态是否在PreparingRebalance状态的合法前置状态集合中。也就是说,当前消费者组状态必须是Stable、Empty或CompletingRebalance中的一个才可以。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。
    2020-07-21
    1
  • 现实中游走
    老师,您好,请教个问题,下面的报错是什么原因导致的,要怎么查找问题的原因并解决 java.lang.IllegalStateException: Correlation id for response (62213091) does not match request (62213090),

    作者回复: 请问是什么版本的Kafka呢?这个错误是之前很早版本中的一个bug,最近比较新的Kafka中已经很少见了,需要排除下是否是已知bug

    2020-07-27
    2
  • dawn
    我不太能理解,分配方案为什么是消费端出,只是一个简单的映射,讲道理不会有太多的消耗,协调者为啥不一起做了。 如果这块很重,把这个逻辑丢给消费者来做,能减少broker的压力,我还能理解,现在看下来,协调者做了大量的同步工作,也不差方案分配这一步啊
    2022-09-05归属地:江苏
    1
    1
  • Jasonji
    private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]): Unit = { assert(group.is(CompletingRebalance)) group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId)) propagateAssignment(group, Errors.NONE) } 这个方法中的group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))这行
    2021-03-17
    1
  • Geek_aec36b
    最近线上遇到一个case,consumer group 执行Rebalance时,消费实例A 没有执行 revoke partition,然后消费实例 B又获取到了消费实例A的其中一个partition,导致 同一个consumer group 在subscribe()方法下,出现一个分区被两个消费实例消费到,请问这种case,胡老师觉得是什么原因导致的呢?
    2022-07-14
  • mushan
    老师你好, groupManager.getGroup(groupId) match { case None => responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID)) case Some(group) => doSyncGroup(group, generation, memberId, protocolType, protocolName, groupInstanceId, groupAssignment, responseCallback) } 什么情况下会出现获取不到消费者组元数据呢?请求同步组数据是必须在正常加入组的情况下,那时候消费者组元数据是应该能保证存在的吧?
    2021-10-15
  • 云端漫漫步
    GroupCoordinator中的propagateAssignment方法会遍历组成员元数据,然后根据每个member信息针对下发
    2020-07-30
  • 伯安知心
    handleSyncGroup方法验证组状态的时候validateGroupStatus通过模式匹配如果发现没有错误,执行查找自己broker保存的组groupManager.getGroup(groupId) 然后每个组依次执行自己的doSyncGroup方法。
    2020-07-14
收起评论
显示
设置
留言
8
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部