Kafka 核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
19216 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 44 讲
结束语 (1讲)
Kafka 核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

28 | 消费者组元数据(下):Kafka如何管理这些元数据?

你好,我是胡夕。今天我们继续学习消费者组元数据。
学完上节课之后,我们知道,Kafka 定义了非常多的元数据,那么,这就必然涉及到对元数据的管理问题了。
这些元数据的类型不同,管理策略也就不一样。这节课,我将从消费者组状态、成员、位移和分区分配策略四个维度,对这些元数据进行拆解,带你一起了解下 Kafka 管理这些元数据的方法。
这些方法定义在 MemberMetadata 和 GroupMetadata 这两个类中,其中,GroupMetadata 类中的方法最为重要,是我们要重点学习的对象。在后面的课程中,你会看到,这些方法会被上层组件 GroupCoordinator 频繁调用,因此,它们是我们学习 Coordinator 组件代码的前提条件,你一定要多花些精力搞懂它们。

消费者组状态管理方法

消费者组状态是很重要的一类元数据。管理状态的方法,要做的事情也就是设置和查询。这些方法大多比较简单,所以我把它们汇总在一起,直接介绍给你。
// GroupMetadata.scala
// 设置/更新状态
def transitionTo(groupState: GroupState): Unit = {
assertValidTransition(groupState) // 确保是合法的状态转换
state = groupState // 设置状态到给定状态
currentStateTimestamp = Some(time.milliseconds() // 更新状态变更时间戳
// 查询状态
def currentState = state
// 判断消费者组状态是指定状态
def is(groupState: GroupState) = state == groupState
// 判断消费者组状态不是指定状态
def not(groupState: GroupState) = state != groupState
// 消费者组能否Rebalance的条件是当前状态是PreparingRebalance状态的合法前置状态
def canRebalance = PreparingRebalance.validPreviousStates.contains(state)
1.transitionTo 方法
transitionTo 方法的作用是将消费者组状态变更成给定状态。在变更前,代码需要确保这次变更必须是合法的状态转换。这是依靠每个 GroupState 实现类定义的 validPreviousStates 集合来完成的。只有在这个集合中的状态,才是合法的前置状态。简单来说,只有集合中的这些状态,才能转换到当前状态。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Kafka消费者组元数据管理涉及消费者组状态、成员、位移和分区分配策略。GroupMetadata类中定义了重要的方法,如transitionTo用于状态变更,canRebalance用于判断Rebalance操作条件,add用于添加成员,remove用于移除成员,has、get和size用于查询成员。除了组状态和成员管理之外,GroupMetadata还有一大类管理功能,就是管理消费者组的提交位移(Committed Offsets),主要包括添加和移除位移值。在GroupMetadata中,有3个向offsets中添加订阅分区的已消费位移值的方法,分别是initializeOffsets、onOffsetCommitAppend和completePendingTxnOffsetCommit。而移除位移值的方法removeExpiredOffsets则根据Kafka主题中消息的留存时间设置,将过期的位移值从offsets字段中移除。消费者组分区分配策略的管理涉及字段supportedProtocols的管理,其中的方法candidateProtocols用于确认消费者组支持的分区分配策略集,而selectProtocol方法则用于选出消费者组的分区消费分配策略。通过这些方法,Kafka实现了对消费者组元数据的全面管理,包括状态、成员、位移和分区分配策略。这些操作元数据的方法被上层调用方GroupCoordinator大量使用,对于深入理解Kafka的底层实现原理至关重要。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(4)

  • 最新
  • 精选
  • 胡夕
    置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,我们重点学习了消费者组元数据都有哪些。课后我请你思考这样一个问题:kafka-consumer-groups脚本输出中的ASSIGNMENT-STRATEGY项对应于哪一项元数据。实际上,它对应于GroupMetadata类的protocolName字段,即消费者组分区消费分配策略名称。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。
    2020-07-07
    1
  • 伯安知心
    每个消费者都有一个自己的rebalance时间,这是为了每个消费者按照自己的需要设置,但是在服务端消费者组需要管理这些组内所有消费者,消费者组也需要一个rebalance时间来平衡消费者,在服务端prepareRebalance方法有个参数delayedRebalance 要么初始化InitialDelayedJoin得到rebalance时间,要么非初始化方法DelayedJoin,而DelayedJoin中参数rebalanceTimeoutMs 从方法timeout.max(member.rebalanceTimeoutMs)计算得到,简单来说就是group中所有consumer的最大的member.rebalanceTimeoutMs。

    作者回复: 👍

    2020-07-02
    3
  • 夜里吃西瓜
    获取过期的offset的时候,有个判断逻辑: 条件1:分区所属主题不在订阅主题列表之内 这样岂不是意味着,如果topic一直存在消费者,那么提交的位移就一直不会被删除掉,最终会导致log无限膨胀? 请问一下,是不是还有其他的逻辑来规避上述的问题?
    2023-07-26归属地:上海
    1
  • 梁聪明
    def rebalanceTimeoutMs = members.values.foldLeft(0) { (timeout, member) => timeout.max(member.rebalanceTimeoutMs) }
    2020-09-06
收起评论
显示
设置
留言
4
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部