28 | 消费者组元数据(下):Kafka如何管理这些元数据?
胡夕
你好,我是胡夕。今天我们继续学习消费者组元数据。
学完上节课之后,我们知道,Kafka 定义了非常多的元数据,那么,这就必然涉及到对元数据的管理问题了。
这些元数据的类型不同,管理策略也就不一样。这节课,我将从消费者组状态、成员、位移和分区分配策略四个维度,对这些元数据进行拆解,带你一起了解下 Kafka 管理这些元数据的方法。
这些方法定义在 MemberMetadata 和 GroupMetadata 这两个类中,其中,GroupMetadata 类中的方法最为重要,是我们要重点学习的对象。在后面的课程中,你会看到,这些方法会被上层组件 GroupCoordinator 频繁调用,因此,它们是我们学习 Coordinator 组件代码的前提条件,你一定要多花些精力搞懂它们。
消费者组状态管理方法
消费者组状态是很重要的一类元数据。管理状态的方法,要做的事情也就是设置和查询。这些方法大多比较简单,所以我把它们汇总在一起,直接介绍给你。
1.transitionTo 方法
transitionTo 方法的作用是将消费者组状态变更成给定状态。在变更前,代码需要确保这次变更必须是合法的状态转换。这是依靠每个 GroupState 实现类定义的 validPreviousStates 集合来完成的。只有在这个集合中的状态,才是合法的前置状态。简单来说,只有集合中的这些状态,才能转换到当前状态。
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
- 深入了解
- 翻译
- 解释
- 总结
Kafka消费者组元数据管理涉及消费者组状态、成员、位移和分区分配策略。GroupMetadata类中定义了重要的方法,如transitionTo用于状态变更,canRebalance用于判断Rebalance操作条件,add用于添加成员,remove用于移除成员,has、get和size用于查询成员。除了组状态和成员管理之外,GroupMetadata还有一大类管理功能,就是管理消费者组的提交位移(Committed Offsets),主要包括添加和移除位移值。在GroupMetadata中,有3个向offsets中添加订阅分区的已消费位移值的方法,分别是initializeOffsets、onOffsetCommitAppend和completePendingTxnOffsetCommit。而移除位移值的方法removeExpiredOffsets则根据Kafka主题中消息的留存时间设置,将过期的位移值从offsets字段中移除。消费者组分区分配策略的管理涉及字段supportedProtocols的管理,其中的方法candidateProtocols用于确认消费者组支持的分区分配策略集,而selectProtocol方法则用于选出消费者组的分区消费分配策略。通过这些方法,Kafka实现了对消费者组元数据的全面管理,包括状态、成员、位移和分区分配策略。这些操作元数据的方法被上层调用方GroupCoordinator大量使用,对于深入理解Kafka的底层实现原理至关重要。
仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》,新⼈⾸单¥59
《Kafka 核心源码解读》,新⼈⾸单¥59
立即购买
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
登录 后留言
全部留言(4)
- 最新
- 精选
- 胡夕置顶你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,我们重点学习了消费者组元数据都有哪些。课后我请你思考这样一个问题:kafka-consumer-groups脚本输出中的ASSIGNMENT-STRATEGY项对应于哪一项元数据。实际上,它对应于GroupMetadata类的protocolName字段,即消费者组分区消费分配策略名称。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。2020-07-071
- 伯安知心每个消费者都有一个自己的rebalance时间,这是为了每个消费者按照自己的需要设置,但是在服务端消费者组需要管理这些组内所有消费者,消费者组也需要一个rebalance时间来平衡消费者,在服务端prepareRebalance方法有个参数delayedRebalance 要么初始化InitialDelayedJoin得到rebalance时间,要么非初始化方法DelayedJoin,而DelayedJoin中参数rebalanceTimeoutMs 从方法timeout.max(member.rebalanceTimeoutMs)计算得到,简单来说就是group中所有consumer的最大的member.rebalanceTimeoutMs。
作者回复: 👍
2020-07-023 - 夜里吃西瓜获取过期的offset的时候,有个判断逻辑: 条件1:分区所属主题不在订阅主题列表之内 这样岂不是意味着,如果topic一直存在消费者,那么提交的位移就一直不会被删除掉,最终会导致log无限膨胀? 请问一下,是不是还有其他的逻辑来规避上述的问题?2023-07-26归属地:上海1
- 梁聪明def rebalanceTimeoutMs = members.values.foldLeft(0) { (timeout, member) => timeout.max(member.rebalanceTimeoutMs) }2020-09-06
收起评论