Kafka 核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
19216 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 44 讲
结束语 (1讲)
Kafka 核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

17 | ReplicaStateMachine:揭秘副本状态机实现原理

你好,我是胡夕。今天我们讲副本状态机。
前几节课,在讲 Controller、TopicDeletionManager 时,我反复提到副本状态机和分区状态机这两个组件。现在,你应该知道了,它们分别管理着 Kafka 集群中所有副本和分区的状态转换,但是,你知道副本和分区到底都有哪些状态吗?
带着这个问题,我们用两节课的时间,重点学习下这两个组件的源码。我们先从副本状态机(ReplicaStateMachine)开始。

课前导读

坦率地说,ReplicaStateMachine 不如前面的组件有名气,Kafka 官网文档中甚至没有任何关于它的描述,可见,它是一个内部组件,一般用户感觉不到它的存在。因此,很多人都会有这样的错觉:既然它是外部不可见的组件,那就没有必要学习它的实现代码了。
其实不然。弄明白副本状态机的原理,对于我们从根本上定位很多数据不一致问题是有帮助的。下面,我跟你分享一个我的真实经历。
曾经,我们部署过一个 3-Broker 的 Kafka 集群,版本是 2.0.0。假设这 3 个 Broker 是 A、B 和 C,我们在这 3 个 Broker 上创建了一个单分区、双副本的主题。
当时,我们发现了一个奇怪的现象:如果两个副本分别位于 A 和 B,而 Controller 在 C 上,那么,当关闭 A、B 之后,ZooKeeper 中会显示该主题的 Leader 是 -1,ISR 为空;但是,如果两个副本依然位于 A 和 B 上,而 Controller 在 B 上,当我们依次关闭 A 和 B 后,该主题在 ZooKeeper 中的 Leader 和 ISR 就变成了 B。这显然和刚刚的情况不符。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Kafka中的副本状态机是Kafka Broker端控制副本状态流转的重要组件。副本状态机的实现类ZkReplicaStateMachine通过与ZooKeeper交互和向集群Broker发送控制请求来管理副本状态之间的转换。文章详细解释了副本状态机的实现原理和状态转换操作,包括7种副本状态的定义和合法状态转换规则。通过具体的代码片段和状态转换图示,读者可以深入了解每种状态的含义和流转过程。副本状态机的深入理解对于解决数据一致性问题具有重要意义。下一步,读者将学习Kafka中另一类著名的状态机:分区状态机,从而全面掌握Kafka Broker端管理分区和副本对象的流程和手段。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(5)

  • 最新
  • 精选
  • 胡夕
    置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,咱们结合源码重点了解了TopicDeletionManager类以及Topic的被删除流程。课后我请你思考markTopicIneligibleForDeletion方法是做什么用的以及实现原理是怎么样的。我来给出我的思考:markTopicIneligibleForDeletion方法的主要作用是标记给定主题为”暂时不可删除"。它的实现原理非常简单,就是将给定的待删除主题加入到ControllerContext的topicsIneligibleForDeletion集合中。后面TopicDeletionManager在恢复主题删除过程中会再次尝试对topicsIneligibleForDeletion集合中的主题进行删除。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。
    2020-06-05
  • 伯安知心
    NonExistentReplica 分支,会更新controllerContext中当前currentAssignedReplicas的副本AR集合,纪录成功状态转换,之后删除这个副本。这里说的状态转换成功只是内存缓存更新成功,zookeeper还未成功的,有延时。NonExistentReplica 分支,会更新controllerContext中当前currentAssignedReplicas的副本AR集合,纪录成功状态转换,之后删除这个副本。这里说的状态转换成功只是内存缓存更新成功,zookeeper还未成功的,有延时。

    作者回复: 很详细的分析~

    2020-05-30
    2
  • 张子涵
    case NonExistentReplica => //遍历所有可执行转换的副本对象 validReplicas.foreach { replica => //获取副本对象的当前状态 val currentState = controllerContext.replicaState(replica) //获取分区列表 执行removeReplica操作 剔除相等的replica val newAssignedReplicas = controllerContext .partitionFullReplicaAssignment(replica.topicPartition) .removeReplica(replica.replica) //底层调用getOrElseUpdate方法 更新partition 及newAssignedReplicas信息 controllerContext.updatePartitionFullReplicaAssignment(replica.topicPartition, newAssignedReplicas) //记录成功转换状态的操作 logSuccessfulTransition(replicaId, replica.topicPartition, currentState, NonExistentReplica) //最后调用removeReplicaState方法 controllerContext.removeReplicaState(replica) } def updatePartitionFullReplicaAssignment(topicPartition: TopicPartition, newAssignment: ReplicaAssignment): Unit = { val assignments = partitionAssignments.getOrElseUpdate(topicPartition.topic, mutable.Map.empty) assignments.put(topicPartition.partition, newAssignment) } def removeReplicaState(replica: PartitionAndReplica): Unit = { replicaStates.remove(replica) } def removeReplica(replica: Int): ReplicaAssignment = { ReplicaAssignment( replicas.filterNot(_ == replica), --将不符合的结果返回 addingReplicas.filterNot(_ == replica), removingReplicas.filterNot(_ == replica) ) } 建议同学们学习的时候,可以多看一些底层方法,这样理解的更快些

    作者回复: ������

    2020-08-26
    1
  • 张子涵
    直到这节课,愈发觉得源码真的不是读读方法那么简单的,串联起来的理解更为重要

    作者回复: 嗯嗯,有机会分享你的源码学习心得:)

    2020-08-26
  • RonnieXie
    老师,请问如果doHandleStateChanges完成状态转换操作,但是doHandleStateChanges发送请求给Broker失败了,是否要回滚?似乎源码中没有这块处理逻辑,是什么逻辑保障了这异常场景?

    作者回复: 嗯,确实没有状态回滚机制,只是记录错误日志。我想这是目前部分状态不一致的原因之一吧

    2020-06-02
收起评论
显示
设置
留言
5
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部