16 | TopicDeletionManager: Topic是怎么被删除的?
胡夕
你好,我是胡夕。今天,我们正式进入到第四大模块“状态机”的学习。
Kafka 源码中有很多状态机和管理器,比如之前我们学过的 Controller 通道管理器 ControllerChannelManager、处理 Controller 事件的 ControllerEventManager,等等。这些管理器和状态机,大多与各自的“宿主”组件关系密切,可以说是大小不同、功能各异。就比如 Controller 的这两个管理器,必须要与 Controller 组件紧耦合在一起才能实现各自的功能。
不过,Kafka 中还是有一些状态机和管理器具有相对独立的功能框架,不严重依赖使用方,也就是我在这个模块为你精选的 TopicDeletionManager(主题删除管理器)、ReplicaStateMachine(副本状态机)和 PartitionStateMachine(分区状态机)。
TopicDeletionManager:负责对指定 Kafka 主题执行删除操作,清除待删除主题在集群上的各类“痕迹”。
ReplicaStateMachine:负责定义 Kafka 副本状态、合法的状态转换,以及管理状态之间的转换。
PartitionStateMachine:负责定义 Kafka 分区状态、合法的状态转换,以及管理状态之间的转换。
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
- 深入了解
- 翻译
- 解释
- 总结
Kafka源码解析:深入理解TopicDeletionManager 本文深入解析了Kafka源码中的TopicDeletionManager,该管理器负责删除指定主题并清除其在集群上的“痕迹”。文章详细介绍了TopicDeletionManager的实现原理和功能,包括对错误的删除主题认知、不推荐手动删除主题的原因,以及DeletionClient接口及其实现类ControllerDeletionClient的代码。重点介绍了resumeDeletions方法的执行流程和关键操作,以及completeDeleteTopic和onTopicDeletion两个方法的实现原理。通过深入解析,读者能更好地理解Kafka主题删除的原理和操作,展示了Kafka源码的高质量和易读性。
仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》,新⼈⾸单¥59
《Kafka 核心源码解读》,新⼈⾸单¥59
立即购买
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
登录 后留言
全部留言(8)
- 最新
- 精选
- 胡夕置顶你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,咱们结合源码重点了解了Controller在集群中的作用。课后我请你思考这样一个问题:如果我们想要使用脚本命令增加一个主题的分区,你知道应该用KafkaController类中的哪个方法吗?其实,KafkaController的onNewPartitionCreation方法是处理新分区增加逻辑的。它包括要将新分区状态调整到Online状态以及将对应副本的状态设置成Online。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。2020-05-29
- flyCoder胡老师,遇到了这个问题,能帮忙分析下吗?https://www.orchome.com/1085
作者回复: 打开文件数太多这件事社区曾经有几个bug跟过,在新一点的版本(比如2.0之后)之后已经不太常见了,不妨升级下看看。另外ulimit -n 不妨设置大一点。
2020-11-162 - 小刀stream DSL中除了通过缩短时间窗口可以减小state.dir(/tmp/kafka-streams)的大小之外还有别的方式么?
作者回复: 没有太好的方法
2020-07-042 - 曾轼麟controllerContext 中的 topicsToBeDeleted (待删除主题列表)和 zk中的 /delete_topics列表 进行 &得到暂时不可删除的topic。 暂时不可删除的原因: 1、topic存在的部分副本down了 2、当前topic的部分分区正在reassignment 我理解为是controllerContext缓存中的数据,可能会和当前的状态不一致的情况
作者回复: “controllerContext缓存中的数据,可能会和当前的状态不一致的情况” 比如?能否举个例子?
2020-06-27 - 伯安知心不符合删除的条件 在这里实现,副本关闭停止删除,正在重新分区(扩容到新的broker)的不可删除。主要做逻辑与“&”操作得到不符合删除的topic。不符合删除的条件 在这里实现,副本关闭停止删除,正在重新分区(扩容到新的broker)的不可删除。主要做逻辑与“&”操作得到不符合删除的topic。
作者回复: 👍
2020-05-30 - 是男人就开巴巴托斯mutePartitionModifications是为了不要让watcher响应zk topic各个分区子节点逐个被删除的变化。 删除是从分区节点到topic节点递归执行的,因为zk不能在有子节点的情况下直接删除父节点。 新加分区与删除topic节点的互斥还是在handleCreatePartitionsRequest函数里做的check。2021-01-1112
- 张子涵def enqueueTopicsForDeletion(topics: Set[String]): Unit = { if (isDeleteTopicEnabled) { controllerContext.queueTopicDeletion(topics) resumeDeletions() } }刚去研究了一下enqueueTopicsForDeletion方法源码 val isDeleteTopicEnabled: Boolean = config.deleteTopicEnable def queueTopicDeletion(topics: Set[String]): Unit = { topicsToBeDeleted ++= topics } 可以删除的topic会添加进删除队列,不可删除的会重新调用resumeDeletions方法2020-08-25
- 张子涵if (config.deleteTopicEnable) { if (topicsToBeDeleted.nonEmpty) { info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}") // mark topic ineligible for deletion if other state changes are in progress topicsToBeDeleted.foreach { topic => val partitionReassignmentInProgress = controllerContext.partitionsBeingReassigned.map(_.topic).contains(topic) if (partitionReassignmentInProgress) topicDeletionManager.markTopicIneligibleForDeletion(Set(topic), reason = "topic reassignment in progress") } // add topic to deletion list topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted) } ①根据配置信息是否进行主题删除 是则进行下一步 ②判断要删除的主题是否为空,不为空则进行下一步 ③进行迭代判断,主题是否在partitionsBeingReassigned集合中,如果在,则将主题添加进markTopicIneligibleForDeletion 标志为主题正在重新分配 不可删除 有一点疑问是为什么没有从topicsToBeDeleted集合中把不可删除主题移除的步骤,然后就直接将topicsToBeDeleted添加进删除队列了2020-08-25
收起评论