Kafka 核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
19216 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 44 讲
结束语 (1讲)
Kafka 核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

25 | ReplicaManager(下):副本管理器是如何管理副本的?

你好,我是胡夕。
上节课我们学习了 ReplicaManager 类源码中副本管理器是如何执行副本读写操作的。现在我们知道了,这个副本读写操作主要是通过 appendRecords 和 fetchMessages 这两个方法实现的,而这两个方法其实在底层分别调用了 Log 的 append 和 read 方法,也就是我们在第 3 节课中学到的日志消息写入和日志消息读取方法。
今天,我们继续学习 ReplicaManager 类源码,看看副本管理器是如何管理副本的。这里的副本,涵盖了广义副本对象的方方面面,包括副本和分区对象、副本位移值和 ISR 管理等。因此,本节课我们结合着源码,具体学习下这几个方面。

分区及副本管理

除了对副本进行读写之外,副本管理器还有一个重要的功能,就是管理副本和对应的分区。ReplicaManager 管理它们的方式,是通过字段 allPartitions 来实现的。
所以,我想先带你复习下第 23 节课中的 allPartitions 的代码。不过,这次为了强调它作为容器的属性,我们要把注意力放在它是对象池这个特点上,即 allPartitions 把所有分区对象汇集在一起,统一放入到一个对象池进行管理。
private val allPartitions = new Pool[TopicPartition, HostedPartition](
valueFactory = Some(tp => HostedPartition.Online(Partition(tp, time, this)))
)
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Kafka中的副本管理器是系统中的关键组件,通过ReplicaManager类实现副本的读写操作。它统一管理所有分区对象,并负责确定Leader和Follower副本。副本管理器通过Controller发送请求来实现副本角色的变化,并在收到请求后调用becomeLeaderOrFollower方法处理逻辑。makeLeaders方法的作用是让当前Broker成为给定一组分区的Leader,主要包括停止获取线程、更新分区元数据信息和将指定分区添加到Leader分区集合。makeFollowers方法则实现了将当前Broker成为给定一组分区的Follower的功能。这两个方法在副本管理中扮演着重要角色,确保了Kafka系统中副本角色的正确性和一致性。除了读写副本、管理分区和副本的功能之外,副本管理器还有一个重要的功能,那就是管理ISR。ISR的收缩操作由maybeShrinkIsr方法实现,而maybePropagateIsrChanges方法则定期向集群Broker传播ISR的变更。这些功能的实现围绕着如何处理LeaderAndIsrRequest请求数据展开,对于理解Kafka的副本机制具有重要意义。 在ReplicaManager类中,maybeShrinkIsr方法负责收缩ISR副本集合,移除与Leader差距过大的副本。方法内部通过定时调度和判断副本滞后程度来实现收缩操作。另外,maybePropagateIsrChanges方法则负责定期传播ISR变更通知给集群的其他Broker,以同步ISR操作的结果。这两个方法的实现细节展示了Kafka副本管理器的核心功能和ISR管理机制。 总的来说,本文详细介绍了Kafka中副本管理器的功能和实现细节,包括副本角色的变化、ISR的管理和传播等关键内容。通过对ReplicaManager类的核心功能和方法的梳理,读者可以深入了解Kafka副本管理器的工作原理和重要功能,为进一步学习和应用Kafka系统提供了重要参考。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(5)

  • 最新
  • 精选
  • 胡夕
    置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,我们重点学习了副本管理器是如何读写副本的。课后我请你去研究下appendRecords方法中的origin字段的作用以及最终使用origin字段的具体源码位置。我的思考是:origin表示日志写入方的来源。当前有3种来源:Follower副本、Coordinator和普通客户端(Client)。origin在Log.scala和LogValidator.scala中被用到,主要做一些消息格式方面的判断。Kafka要求来自普通客户端的消息必须符合某些规定。origin就是用于判断消息是否来自于Client端。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。
    2020-06-23
    1
  • 伯安知心
    1,在执行shrinkIsr收缩和expandIsr扩充时候,更新标识符true时候,执行原子操作recordIsrChange, 2,执行maybePropagateIsrChanges定期检查是否要传播ISR时候,条件是:最近五秒钟没有ISR更改,或者自上次ISR传播以来已超过60秒。

    作者回复: 嗯嗯,很好的总结。那再找找处理这个ISR事件的源码在哪里吧:)

    2020-06-23
    2
  • 胡夕老师,我有个疑问,有没有可能出现broker1是旧的leader(还没来得及处理leaderandisr请求),broker2上是新的leader。producer此时要是发到broker1,并且append成功的情况。如果有,由于broker2的副本已经是leader,也不会向broker1同步数据了,当broker1变为follower后,这条消息是丢失了还是两个副本数据不一致了呢。

    作者回复: 会做截断,把消息删除掉

    2020-09-01
  • 伯安知心
    对于shrink,在启动副本管理器的时候,需要运行定时调度线程schedule,replicaLagTimeMaxMs / 2为周期,执行maybeShrinkIsr,首先遍历那些ISR符合删除,然后更新zk和缓存中的数据。 对于expandIsr,在执行updateFollowerFetchState更新副本拉取状态的时候,判断副本id是否在IRS列表中,如果不在就执行maybeExpandIsr。

    作者回复: 很棒的总结👍

    2020-06-24
  • 梁聪明
    maybeShrinkIsr的shrinkIsr(newInSyncReplicaIds)方法
    2020-09-05
收起评论
显示
设置
留言
5
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部