Kafka 核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
19216 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 44 讲
结束语 (1讲)
Kafka 核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

29 | GroupMetadataManager:组元数据管理器是个什么东西?

你好,我是胡夕。今天,我们学习 GroupMetadataManager 类的源码。从名字上来看,它是组元数据管理器,但是,从它提供的功能来看,我更愿意将它称作消费者组管理器,因为它定义的方法,提供的都是添加消费者组、移除组、查询组这样组级别的基础功能。
不过,这个类的知名度不像 KafkaController、GroupCoordinator 那么高,你之前可能都没有听说过它。但是,它其实是非常重要的消费者组管理类。
GroupMetadataManager 类是在消费者组 Coordinator 组件被创建时被实例化的。这就是说,每个 Broker 在启动过程中,都会创建并维持一个 GroupMetadataManager 实例,以实现对该 Broker 负责的消费者组进行管理。更重要的是,生产环境输出日志中的与消费者组相关的大多数信息,都和它息息相关。
我举一个简单的例子。你应该见过这样的日志输出:
Removed ××× expired offsets in ××× milliseconds.
这条日志每 10 分钟打印一次。你有没有想过,它为什么要这么操作呢?其实,这是由 GroupMetadataManager 类创建的定时任务引发的。如果你不清楚 GroupMetadataManager 的原理,虽然暂时不会影响你使用,但是,一旦你在实际环境中看到了有关消费者组的错误日志,仅凭日志输出,你是无法定位错误原因的。要解决这个问题,就只有一个办法:通过阅读源码,彻底搞懂底层实现原理,做到以不变应万变
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Kafka中的GroupMetadataManager是一个关键的消费者组管理类,负责管理消费者组的元数据和位移信息。该类在消费者组Coordinator组件被创建时被实例化,每个Broker在启动过程中都会创建并维护一个GroupMetadataManager实例。它的功能包括添加消费者组、移除组、查询组等基础功能。GroupMetadataManager类的构造函数需要7个参数,其中包括brokerId、interBrokerProtocolVersion、config、replicaManager和zkClient等字段。除了构造函数所需的字段外,该类还定义了一些重要字段,如compressionType、groupMetadataCache、loadingPartitions、ownedPartitions和groupMetadataTopicPartitionCount等。在消费者组元数据管理方面,GroupMetadataManager类提供了查询获取组信息、添加组、移除组和加载组信息等方法。除了消费者组的管理,GroupMetadataManager类还提供消费者组位移的管理,包括位移数据的保存和查询。总的来说,GroupMetadataManager类是Kafka中非常重要的消费者组管理类,对于理解Kafka消费者组的工作原理和实现机制具有重要意义。另外,文章还介绍了storeOffsets方法的实现逻辑,该方法负责保存消费者组位移,通过过滤、判断Broker是否为Coordinator、构造位移主题消息、写入消息到位移主题以及更新消费者元数据等步骤完成位移信息的保存。文章还提到了getOffsets方法的实现逻辑,用于查询消费者组位移,通过读取groupMetadataCache中的组元数据,判断组状态并获取对应的位移数据。总的来说,GroupMetadataManager类在Kafka中扮演着至关重要的角色,对于理解消费者组管理和位移管理具有重要意义。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(5)

  • 最新
  • 精选
  • 胡夕
    置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,我们重点学习了Kafka如何管理消费者组元数据。课后我请你思考Kafka是怎么确认消费者组使用哪个成员的超时时间作为整个组的超时时间。实际上,Kafka取消费者组下所有成员的最大超时时间作为整个组的超时时间。具体代码可以看下GroupMetadata的rebalanceTimeoutMs方法。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。
    2020-07-07
  • 二进制傻瓜
    胡大您好!单个kafka集群broker数,topic数以及partition数是否有上限?broker数或者topic 数以及partition太多会不会影响kafka的性能?这方面是否有最佳实践?

    作者回复: 最佳实践是单台broker上分区数最好不超过2000

    2020-07-05
    2
  • 胡小禾
    看到2.7.1 版本的代码中有这样的,其实在至少这个版本里, 老师最后说的 case 是不会出现了吧? if (config.loadBufferSize < bytesNeeded) warn(s"Loaded offsets and group metadata from $topicPartition with buffer larger ($bytesNeeded bytes) than " + s"configured offsets.load.buffer.size (${config.loadBufferSize} bytes)") buffer = ByteBuffer.allocate(bytesNeeded)
    2021-12-27
    1
  • z.l
    有个问题,broker刚启动时是怎么从__consumer_offsets中加载offset的?怎么确定某个分区的offset在__consumer_offsets的哪个分区,哪个位置?
    2022-09-22归属地:上海
    2
  • 伯安知心
    从代码看,删除消费组的位移方法是removeGroupsForPartition,只有在处理身份或者位移变动调用handleGroupEmigration,请求这个方法的有2个调用。第一个方法handleLeaderAndIsrRequest判断就是新选择的broker重启太快导致leaderandisr请求变化,收到之前的请求也需要清理组的缓存记录,并且变动的topic是GROUP_METADATA_TOPIC_NAME的就需要删除消费组位移记录,第二个 方法handleStopReplicaRequest判断就是新选择的broker重启太快收到之前的请求也需要清理组的缓存记录。总之就是GROUP_METADATA_TOPIC_NAME变化在handleStopReplicaRequest请求和handleLeaderAndIsrRequest请求中就会有清除消费组位移记录的变化。
    2020-07-06
收起评论
显示
设置
留言
5
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部