Kafka 核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
19216 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 44 讲
结束语 (1讲)
Kafka 核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

26 | MetadataCache:Broker是怎么异步更新元数据缓存的?

你好,我是胡夕。今天,我们学习 Broker 上的元数据缓存(MetadataCache)。
你肯定很好奇,前面我们不是学过 Controller 端的元数据缓存了吗?这里的元数据缓存又是啥呢?其实,这里的 MetadataCache 是指 Broker 上的元数据缓存,这些数据是 Controller 通过 UpdateMetadataRequest 请求发送给 Broker 的。换句话说,Controller 实现了一个异步更新机制,能够将最新的集群信息广播给所有 Broker。
那么,为什么每台 Broker 上都要保存这份相同的数据呢?这里有两个原因。
第一个,也是最重要的原因,就是保存了这部分数据,Broker 就能够及时响应客户端发送的元数据请求,也就是处理 Metadata 请求。Metadata 请求是为数不多的能够被集群任意 Broker 处理的请求类型之一,也就是说,客户端程序能够随意地向任何一个 Broker 发送 Metadata 请求,去获取集群的元数据信息,这完全得益于 MetadataCache 的存在。
第二个原因是,Kafka 的一些重要组件会用到这部分数据。比如副本管理器会使用它来获取 Broker 的节点信息,事务管理器会使用它来获取分区 Leader 副本的信息,等等。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Kafka的MetadataCache类是Broker上的元数据缓存,通过异步更新机制保证所有Broker上的元数据缓存实现最终一致性。该类保存了集群中关于主题和Broker的所有重要数据,包括主题分区的详细信息、Controller所在Broker的ID、当前存活的Broker对象列表等。元数据缓存的重要方法包括判断类、获取类和更新类方法,用于判断给定主题或主题分区是否包含在元数据缓存中、获取主题分区的详细数据信息以及更新元数据缓存。MetadataCache类的实例化发生在Kafka Broker启动时,被Kafka的4个组件使用,包括KafkaApis、AdminManager、ReplicaManager和TransactionCoordinator。通过了解MetadataCache类的实现和使用,读者可以深入了解Kafka集群元数据的存储和更新机制。 MetadataCache类的getXXX方法包括getAllTopics、getAllPartitions和getPartitionReplicaEndpoints,分别用于获取所有主题、所有分区以及指定监听器类型下特定主题分区的所有副本的Broker节点对象。这些方法通过遍历元数据缓存中的数据,构建相应的数据结构并返回结果。其中,getPartitionReplicaEndpoints方法还涉及处理过期元数据的问题,但Kafka能够自行处理过期元数据的问题,保证客户端最终可以取得最新的元数据信息。总体而言,MetadataCache类提供了丰富的方法,能够帮助用户快速获取和更新Kafka集群的元数据信息,是Kafka集群元数据存储和更新机制的重要组成部分。 MetadataCache类的updateMetadata方法是Broker端元数据缓存的重要更新方法,通过处理UpdateMetadataRequest请求中的分区数据,更新本地元数据缓存,确保最终一致性。该方法的实现逻辑较为复杂,包括准备数据、确保集群Broker配置的监听器一致性、提取请求数据并填充元数据缓存等步骤。通过学习该方法的实现,读者可以深入了解Kafka Broker端元数据缓存的更新机制。 总的来说,本文介绍了Kafka的MetadataCache类及其重要方法,以及Broker端元数据缓存的更新机制。通过学习本文内容,读者可以全面了解Kafka集群元数据的存储和更新机制,

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(4)

  • 最新
  • 精选
  • 胡夕
    置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,我们重点学习了副本管理器是管理副本的。课后我请你去研究下maybePropagateIsrChanges源码中isrChangeSet字段在哪里被更新的。实际上,该字段被更新的时机有两个:ISR扩容和ISR收缩时。 ReplicaManager启动时会创建一个异步线程isr-expiration,定期去查看是否需要为集群中所有online分区执行ISR收缩。一旦成功执行了收缩,那么代码就会调用ReplicaManager.recordIsrChange方法将相应分区加入到isrChangeSet中。 Follower副本从Leader副本拉取消息后会尝试将自己加入到ISR中,此时可能执行ISR扩容操作。代码同样会调用ReplicaManager.recordIsrChange方法将相应分区加入到isrChangeSet中。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。
    2020-06-29
  • RonnieXie
    老师,我这边有个疑问,如果更新数据过程,Controller宕机,一部分broker更新成功,一部分broker更新失败,kafka是如何保证数据一致性?

    作者回复: 更新失败的broker会再次请求元数据的

    2020-08-23
    2
  • 伯安知心
    Controller要更新metadataCache缓存,肯定通过远程调用事件,也就是之前讲解controller中controllerChannelManager发送请求,严格来说就是ControllerBrokerRequestBatch定义了updateMetadataRequestBrokerSet和updateMetadataRequestPartitionInfoMap两个集合,在controller中通过分区状态机和副本状态机注册ControllerBrokerRequestBatch类,通过controllerChannelManager.sendRequest更新metadatacache请求。有些乱,请作者原谅~

    作者回复: 嗯,没问题的,哈哈哈:)

    2020-06-26
  • ahu0605
    您好,胡老师,在生产环境,应用跑了一段时间,我的producer经常抛出"topic xxx not present in metadata after 60000ms",我排查了broker日志并没有重选举,我怀疑是producer网络过于繁忙,导致networkclient poll超时,也可能是这篇文章描述的broker metadata异步请求所导致,这个问题如何排查呢?
    2022-05-04
收起评论
显示
设置
留言
4
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部