Kafka 核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
19216 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 44 讲
结束语 (1讲)
Kafka 核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

16 | TopicDeletionManager: Topic是怎么被删除的?

你好,我是胡夕。今天,我们正式进入到第四大模块“状态机”的学习。
Kafka 源码中有很多状态机和管理器,比如之前我们学过的 Controller 通道管理器 ControllerChannelManager、处理 Controller 事件的 ControllerEventManager,等等。这些管理器和状态机,大多与各自的“宿主”组件关系密切,可以说是大小不同、功能各异。就比如 Controller 的这两个管理器,必须要与 Controller 组件紧耦合在一起才能实现各自的功能。
不过,Kafka 中还是有一些状态机和管理器具有相对独立的功能框架,不严重依赖使用方,也就是我在这个模块为你精选的 TopicDeletionManager(主题删除管理器)、ReplicaStateMachine(副本状态机)和 PartitionStateMachine(分区状态机)。
TopicDeletionManager:负责对指定 Kafka 主题执行删除操作,清除待删除主题在集群上的各类“痕迹”。
ReplicaStateMachine:负责定义 Kafka 副本状态、合法的状态转换,以及管理状态之间的转换。
PartitionStateMachine:负责定义 Kafka 分区状态、合法的状态转换,以及管理状态之间的转换。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Kafka源码解析:深入理解TopicDeletionManager 本文深入解析了Kafka源码中的TopicDeletionManager,该管理器负责删除指定主题并清除其在集群上的“痕迹”。文章详细介绍了TopicDeletionManager的实现原理和功能,包括对错误的删除主题认知、不推荐手动删除主题的原因,以及DeletionClient接口及其实现类ControllerDeletionClient的代码。重点介绍了resumeDeletions方法的执行流程和关键操作,以及completeDeleteTopic和onTopicDeletion两个方法的实现原理。通过深入解析,读者能更好地理解Kafka主题删除的原理和操作,展示了Kafka源码的高质量和易读性。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(8)

  • 最新
  • 精选
  • 胡夕
    置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,咱们结合源码重点了解了Controller在集群中的作用。课后我请你思考这样一个问题:如果我们想要使用脚本命令增加一个主题的分区,你知道应该用KafkaController类中的哪个方法吗?其实,KafkaController的onNewPartitionCreation方法是处理新分区增加逻辑的。它包括要将新分区状态调整到Online状态以及将对应副本的状态设置成Online。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。
    2020-05-29
  • flyCoder
    胡老师,遇到了这个问题,能帮忙分析下吗?https://www.orchome.com/1085

    作者回复: 打开文件数太多这件事社区曾经有几个bug跟过,在新一点的版本(比如2.0之后)之后已经不太常见了,不妨升级下看看。另外ulimit -n 不妨设置大一点。

    2020-11-16
    2
  • 小刀
    stream DSL中除了通过缩短时间窗口可以减小state.dir(/tmp/kafka-streams)的大小之外还有别的方式么?

    作者回复: 没有太好的方法

    2020-07-04
    2
  • 曾轼麟
    controllerContext 中的 topicsToBeDeleted (待删除主题列表)和 zk中的 /delete_topics列表 进行 &得到暂时不可删除的topic。 暂时不可删除的原因: 1、topic存在的部分副本down了 2、当前topic的部分分区正在reassignment 我理解为是controllerContext缓存中的数据,可能会和当前的状态不一致的情况

    作者回复: “controllerContext缓存中的数据,可能会和当前的状态不一致的情况” 比如?能否举个例子?

    2020-06-27
  • 伯安知心
    不符合删除的条件 在这里实现,副本关闭停止删除,正在重新分区(扩容到新的broker)的不可删除。主要做逻辑与“&”操作得到不符合删除的topic。不符合删除的条件 在这里实现,副本关闭停止删除,正在重新分区(扩容到新的broker)的不可删除。主要做逻辑与“&”操作得到不符合删除的topic。

    作者回复: 👍

    2020-05-30
  • 是男人就开巴巴托斯
    mutePartitionModifications是为了不要让watcher响应zk topic各个分区子节点逐个被删除的变化。 删除是从分区节点到topic节点递归执行的,因为zk不能在有子节点的情况下直接删除父节点。 新加分区与删除topic节点的互斥还是在handleCreatePartitionsRequest函数里做的check。
    2021-01-11
    1
    2
  • 张子涵
    def enqueueTopicsForDeletion(topics: Set[String]): Unit = { if (isDeleteTopicEnabled) { controllerContext.queueTopicDeletion(topics) resumeDeletions() } }刚去研究了一下enqueueTopicsForDeletion方法源码 val isDeleteTopicEnabled: Boolean = config.deleteTopicEnable def queueTopicDeletion(topics: Set[String]): Unit = { topicsToBeDeleted ++= topics } 可以删除的topic会添加进删除队列,不可删除的会重新调用resumeDeletions方法
    2020-08-25
  • 张子涵
    if (config.deleteTopicEnable) { if (topicsToBeDeleted.nonEmpty) { info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}") // mark topic ineligible for deletion if other state changes are in progress topicsToBeDeleted.foreach { topic => val partitionReassignmentInProgress = controllerContext.partitionsBeingReassigned.map(_.topic).contains(topic) if (partitionReassignmentInProgress) topicDeletionManager.markTopicIneligibleForDeletion(Set(topic), reason = "topic reassignment in progress") } // add topic to deletion list topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted) } ①根据配置信息是否进行主题删除 是则进行下一步 ②判断要删除的主题是否为空,不为空则进行下一步 ③进行迭代判断,主题是否在partitionsBeingReassigned集合中,如果在,则将主题添加进markTopicIneligibleForDeletion 标志为主题正在重新分配 不可删除 有一点疑问是为什么没有从topicsToBeDeleted集合中把不可删除主题移除的步骤,然后就直接将topicsToBeDeleted添加进删除队列了
    2020-08-25
收起评论
显示
设置
留言
8
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部