32 | GroupCoordinator:在Rebalance中,Coordinator如何处理成员入组?
- 深入了解
- 翻译
- 解释
- 总结
本文深入探讨了Kafka中消费者组的Rebalance流程,重点关注了加入组的源码实现。通过详细解释handleJoinGroup方法的参数和主体代码,以及其与其他方法的交互关系,帮助读者理解了加入组的完整流程。作者通过深入分析Rebalance流程的源码实现,为读者提供了深入了解消费者组Rebalance流程的机会。 在文章中,作者详细介绍了doUnknownJoinGroup方法和doJoinGroup方法的源码实现,以及它们都会用到的addMemberAndRebalance方法。通过源码分析和图示展示了方法的流程,帮助读者理解了加入组的具体实现细节。文章通过深入分析源码实现,为读者提供了深入了解Kafka消费者组Rebalance流程的机会,尤其是加入组的具体实现细节。 总的来说,本文通过深入分析源码实现,为读者提供了深入了解Kafka消费者组Rebalance流程的机会,尤其是加入组的具体实现细节。读者可以通过本文了解到Kafka中消费者组Rebalance流程的重要性、源码实现细节以及相关方法的调用顺序,从而加深对该流程的理解。
《Kafka 核心源码解读》,新⼈⾸单¥59
全部留言(11)
- 最新
- 精选
- 胡夕置顶你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,我们重点学习了GroupMetadataManager类读写位移主题的源码。课后我请你分析cleanGroupMetadata方法的流程。实际上,这个方法调用带参数的同名方法cleanupGroupMetadata来执行组位移的清除。后者会遍历给定的所有消费者组,之后调用removeExpiredOffsets方法执行过期位移的清除。清除的主要依据是看当前时间与位移提交的时间的差值是否越过了offsets.retention.minutes参数值。如果越过了则视该位移为过期,需要从offsets中移除。同时,cleanupGroupMetadata方法还会构造tombstone消息并写入到内部位移主题执行主题中的过期位移消息的输出。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。2020-07-21
- 肖恒请教下,重平衡开启前是如何处理 offset 的?
作者回复: 会在rebalance前尝试提交位移,如果成功则okay,不成功就只能等待rebalance结束了
2021-01-201 - 对与错胡哥,协调者是依据什么来的?比如一个协调者所对应的broker挂了,那么其他消费者成员会发生什么?
作者回复: broker挂了,其他副本会成为leader,也就是新的Coordinator
2021-01-0721 - 三颗豆子生产发现JoinGroup的监控指标时延很高,短的几秒,长的1分钟,这个正常吗?其他的全部请求时延都在2秒以下。仔细分析是在remote阶段的时延很高,remote它在等什么呢?
作者回复: 能发下截图或具体的JMX bean吗?我去看下对应的逻辑
2020-12-1841 - J.Smile老师您好,想问下如何能明确观察到reblance的过程,因为有时候知道发生了reblance,但是不能确认是什么原因引起的reblance,所以想通过一定手段定位下,比如tcpdump抓包,但是用wireshark打开并转为kafka协议好像也看不出reblance的过程,也可能是我用的不熟练,想请老师可以补充下这方面的,毕竟理论结合实践才是解决问题的途径。 补充说一下,好像server.log也只能看到说Reblance开始了,Member被移除。。之类等等。
作者回复: 打开DEBUG日志,会有非常详细的日志输出,重点看看Coordinator部分的DEBUG日志
2020-07-221 - 双椒叔叔胡老师,您好 分区迁移遇到的问题怎么解决呢 1038,1037,1029-----reassign----->1038,1037,1048 其实就是1029机子先宕掉了,然后我想要把死掉的1029机子上的副本迁移到1048上 但是迁移计划卡死了,一直in progress replica变成1038,1037,1048,1029了,ISR变成了1038,1048 现在我想要在replica中remove1029,我看了源码发现是状态机维护的每一个replica状态 源码中 1038,1037,1029-----reassign----->1038,1037,1048 迁移计划卡住的那步是这样的 要把1029副本的状态从replica中移除流程是 controller先把1029offline, 然后 controller发送状态改变请求(deleted)给1029 1、first move the replica to offline state (the controller removes it from the ISR) 2、send stop replica command to the old replicas 3、Eventually partition reassignment could use a callback that does retries if deletion failed 如果1029这个节点不在线的话就会 返回一个回调状态值NonExistentReplica(因为1029现在是死了的状态)。 reassignment那里的源码大概我看了下,不知道上面的理解对不对 就想问下,想这种原本3副本,现在执行迁移计划后,replica中多了一个不在线的1029,然后执行计划一直in progress 那么这种情况如何解决呢? 是直接在zk中修改该主题的问题分区(执行迁移计划卡主的那个分区)吗?生产中1k个分区,zk中不知道敢不敢修改
作者回复: 删除zk中/admin/reassign_partitions下对应的节点然后重启broker试试呢
2020-07-131 - 云端漫漫步GroupState是Stable, CompletingRebalance, Empty这三种的情况下才可以
作者回复: ������
2020-07-252 - 懂码哥(GerryWen)胡大大,您的社区名字有点意思。https://issues.apache.org/jira/secure/ViewProfile.jspa?name=huxi_2b 😀😁😝
作者回复: 额。。。不是你想的那样:)
2020-07-13 - 伯安知心从代码上看,进入maybePrepareRebalance的时候,首先把group加入锁中,因为这里要访问消费组的元数据(线程不安全),然后只有一个判断if (group.canRebalance),这个判断主要是判断消费组元数据中的validPreviousStates的map集合中是否存在PreparingRebalance状态的数据,事实上这个状态就是一个过渡的中间状态比如某些成员加入超时的状态,所有成员离开了组,通过分区迁移删除组。
作者回复: 👍
2020-07-112 - z.l请教下,为什么消费者组非要选出一个Leader成员,直接由Coordinator计算好分区分配方案再发给每个组成员不是更简单么?Leader是否设计复杂化了?2022-09-23归属地:上海