17 | ReplicaStateMachine:揭秘副本状态机实现原理
胡夕
你好,我是胡夕。今天我们讲副本状态机。
前几节课,在讲 Controller、TopicDeletionManager 时,我反复提到副本状态机和分区状态机这两个组件。现在,你应该知道了,它们分别管理着 Kafka 集群中所有副本和分区的状态转换,但是,你知道副本和分区到底都有哪些状态吗?
带着这个问题,我们用两节课的时间,重点学习下这两个组件的源码。我们先从副本状态机(ReplicaStateMachine)开始。
课前导读
坦率地说,ReplicaStateMachine 不如前面的组件有名气,Kafka 官网文档中甚至没有任何关于它的描述,可见,它是一个内部组件,一般用户感觉不到它的存在。因此,很多人都会有这样的错觉:既然它是外部不可见的组件,那就没有必要学习它的实现代码了。
其实不然。弄明白副本状态机的原理,对于我们从根本上定位很多数据不一致问题是有帮助的。下面,我跟你分享一个我的真实经历。
曾经,我们部署过一个 3-Broker 的 Kafka 集群,版本是 2.0.0。假设这 3 个 Broker 是 A、B 和 C,我们在这 3 个 Broker 上创建了一个单分区、双副本的主题。
当时,我们发现了一个奇怪的现象:如果两个副本分别位于 A 和 B,而 Controller 在 C 上,那么,当关闭 A、B 之后,ZooKeeper 中会显示该主题的 Leader 是 -1,ISR 为空;但是,如果两个副本依然位于 A 和 B 上,而 Controller 在 B 上,当我们依次关闭 A 和 B 后,该主题在 ZooKeeper 中的 Leader 和 ISR 就变成了 B。这显然和刚刚的情况不符。
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
- 深入了解
- 翻译
- 解释
- 总结
Kafka中的副本状态机是Kafka Broker端控制副本状态流转的重要组件。副本状态机的实现类ZkReplicaStateMachine通过与ZooKeeper交互和向集群Broker发送控制请求来管理副本状态之间的转换。文章详细解释了副本状态机的实现原理和状态转换操作,包括7种副本状态的定义和合法状态转换规则。通过具体的代码片段和状态转换图示,读者可以深入了解每种状态的含义和流转过程。副本状态机的深入理解对于解决数据一致性问题具有重要意义。下一步,读者将学习Kafka中另一类著名的状态机:分区状态机,从而全面掌握Kafka Broker端管理分区和副本对象的流程和手段。
仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》,新⼈⾸单¥59
《Kafka 核心源码解读》,新⼈⾸单¥59
立即购买
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
登录 后留言
全部留言(5)
- 最新
- 精选
- 胡夕置顶你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,咱们结合源码重点了解了TopicDeletionManager类以及Topic的被删除流程。课后我请你思考markTopicIneligibleForDeletion方法是做什么用的以及实现原理是怎么样的。我来给出我的思考:markTopicIneligibleForDeletion方法的主要作用是标记给定主题为”暂时不可删除"。它的实现原理非常简单,就是将给定的待删除主题加入到ControllerContext的topicsIneligibleForDeletion集合中。后面TopicDeletionManager在恢复主题删除过程中会再次尝试对topicsIneligibleForDeletion集合中的主题进行删除。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。2020-06-05
- 伯安知心NonExistentReplica 分支,会更新controllerContext中当前currentAssignedReplicas的副本AR集合,纪录成功状态转换,之后删除这个副本。这里说的状态转换成功只是内存缓存更新成功,zookeeper还未成功的,有延时。NonExistentReplica 分支,会更新controllerContext中当前currentAssignedReplicas的副本AR集合,纪录成功状态转换,之后删除这个副本。这里说的状态转换成功只是内存缓存更新成功,zookeeper还未成功的,有延时。
作者回复: 很详细的分析~
2020-05-302 - 张子涵case NonExistentReplica => //遍历所有可执行转换的副本对象 validReplicas.foreach { replica => //获取副本对象的当前状态 val currentState = controllerContext.replicaState(replica) //获取分区列表 执行removeReplica操作 剔除相等的replica val newAssignedReplicas = controllerContext .partitionFullReplicaAssignment(replica.topicPartition) .removeReplica(replica.replica) //底层调用getOrElseUpdate方法 更新partition 及newAssignedReplicas信息 controllerContext.updatePartitionFullReplicaAssignment(replica.topicPartition, newAssignedReplicas) //记录成功转换状态的操作 logSuccessfulTransition(replicaId, replica.topicPartition, currentState, NonExistentReplica) //最后调用removeReplicaState方法 controllerContext.removeReplicaState(replica) } def updatePartitionFullReplicaAssignment(topicPartition: TopicPartition, newAssignment: ReplicaAssignment): Unit = { val assignments = partitionAssignments.getOrElseUpdate(topicPartition.topic, mutable.Map.empty) assignments.put(topicPartition.partition, newAssignment) } def removeReplicaState(replica: PartitionAndReplica): Unit = { replicaStates.remove(replica) } def removeReplica(replica: Int): ReplicaAssignment = { ReplicaAssignment( replicas.filterNot(_ == replica), --将不符合的结果返回 addingReplicas.filterNot(_ == replica), removingReplicas.filterNot(_ == replica) ) } 建议同学们学习的时候,可以多看一些底层方法,这样理解的更快些
作者回复: ������
2020-08-261 - 张子涵直到这节课,愈发觉得源码真的不是读读方法那么简单的,串联起来的理解更为重要
作者回复: 嗯嗯,有机会分享你的源码学习心得:)
2020-08-26 - RonnieXie老师,请问如果doHandleStateChanges完成状态转换操作,但是doHandleStateChanges发送请求给Broker失败了,是否要回滚?似乎源码中没有这块处理逻辑,是什么逻辑保障了这异常场景?
作者回复: 嗯,确实没有状态回滚机制,只是记录错误日志。我想这是目前部分状态不一致的原因之一吧
2020-06-02
收起评论