15 | 如何理解Controller在Kafka集群中的作用?
胡夕
你好,我是胡夕。
上节课,我们学习了 Controller 选举的源码,了解了 Controller 组件的选举触发场景,以及它是如何被选举出来的。Controller 就绪之后,就会行使它作为控制器的重要权利了,包括管理集群成员、维护主题、操作元数据,等等。
之前在学习 Kafka 的时候,我一直很好奇,新启动的 Broker 是如何加入到集群中的。官方文档里的解释是:“Adding servers to a Kafka cluster is easy, just assign them a unique broker id and start up Kafka on your new servers.”显然,你只要启动 Broker 进程,就可以实现集群的扩展,甚至包括集群元数据信息的同步。
不过,你是否思考过,这一切是怎么做到的呢?其实,这就是 Controller 组件源码提供的一个重要功能:管理新集群成员。
当然,作为核心组件,Controller 提供的功能非常多。除了集群成员管理,主题管理也是一个极其重要的功能。今天,我就带你深入了解下它们的实现代码。可以说,这是 Controller 最核心的两个功能,它们几乎涉及到了集群元数据中的所有重要数据。掌握了这些,之后你在探索 Controller 的其他代码时,会更加游刃有余。
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
- 深入了解
- 翻译
- 解释
- 总结
Kafka集群中Controller组件的关键功能和实现细节是本文的重点。文章深入探讨了Controller在管理新集群成员和主题管理方面的重要角色,特别关注了其处理逻辑和代码实现。通过详细介绍新增成员和移除现有成员、以及单个成员的管理等功能,读者可以更好地理解和应用Kafka集群中的Controller组件。此外,文章还介绍了Controller事件队列的处理逻辑,以及处理Broker终止和启动的方法,以及主题的创建、变更与删除的实现机制。通过代码示例和详细解释,为读者提供了深入学习Controller组件的良好指导。总之,本文内容涵盖了Controller的关键功能和实现细节,对于想要深入了解Kafka集群中Controller组件的读者来说,是一份非常有价值的资料。
仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》,新⼈⾸单¥59
《Kafka 核心源码解读》,新⼈⾸单¥59
立即购买
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
登录 后留言
全部留言(10)
- 最新
- 精选
- 胡夕置顶你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,咱们重点了解了Controller选举部分的代码。课后我请你思考这样一个问题:源码中当Controller选举之后哪里更新的元数据请求? 其实这是在onControllerFailover方法中完成的。该方法中调用: sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)令集群所有Broker更新元数据 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。2020-05-291
- RonnieXie老师,学习了关于Controller源码课,我再深入看相关源码,梳理了流程图,您看这流程图是否正确 http://note.youdao.com/noteshare?id=1f999786b77ada75fbb58b2831770e47&sub=554F637E4F984C7B8036365C51E0758B
作者回复: 很棒的总结啊!我自己也学到了一些:)
2020-05-2723 - 雕老师,在看 Controller 处理各种事件的时候,发现相关的处理方法中都没有重新注册watcher。比如新启动节点触发了 BrokerChange 事件,处理流程中并没有重新注册对/brokers/ids 子节点的监听。这个是在哪里进行重新注册的
作者回复: 在KafkaController的onControllerFailover方法中会注册
2020-12-22 - RonnieXie胡夕老师,我们项目最近在预研使用图数据库,刚看碰巧在一篇文章(https://zhuanlan.zhihu.com/p/99381529) 看到你们公司使用JanusGraph,想问下这数据库稳定性如何,同时也看到另外一款百度开源的HugeGraph, 你们选型的JanusGraph看中的优势是什么?
作者回复: 比较稳定,API也算好用。没怎么用过百度的图数据库。另外我们公司用图数据库很长时间了,那时HugeGraph还没有开源。
2020-05-27 - 空知调用脚本kafka-topics.sh exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@" 对应的类object TopicCommand 的topicService.createTopic(opts)根据是否定义ZK信息走实现类ZookeeperTopicService或者AdminClientTopicService然后 ZookeeperTopicService里面 adminZkClient.createTopic 然后就可以触发topicChange事件...
作者回复: 是的
2020-05-24 - 张三丰为什么新增的broker马上就分配好了副本呢?什么时候分配的?2023-12-13归属地:四川
- ahu0605这节课应该往前提,提到这个主题的前面,讲的太好了2022-05-18
- 贝胡夕老师,最近遇到个问题,向请教一下您: 我下线了controller节点1后,2被选为新的controller。但是发现有个topic A的partition1元数据信息不对,原先它的三个副本在2,3,4(2为leader)。但是看打印的日志中发现新controller 2 告诉6节点说topic A的partition1的副本是3,2,6,但并没有向2,3,4同步元数据的变更,导致6节点无故新增了一副本并向leader拉取一直失败,以下是日志:您帮我看看这是为什么呢?感觉像是controller的下线导致元数据错乱了 6机器上的日志: [2021-03-16 14:12:39.979] TRACE [Broker id=6] Cached leader info UpdateMetadataPartitionState(topicName='A', p artitionIndex=1, controllerEpoch=29, leader=2, leaderEpoch=13, isr=[2, 3, 4], zkVersion=25, replicas=[2, 3, 4], offlineReplicas=[ ]) for partition A-1 in response to UpdateMetadata request sent by controller 1 epoch 29 with correlation id 0 (state.change.logger) [2021-03-16 15:00:46.600] TRACE [Broker id=6] Cached leader info UpdateMetadataPartitionState(topicName='A', p artitionIndex=1, controllerEpoch=29, leader=2, leaderEpoch=13, isr=[2, 3, 4], zkVersion=25, replicas=[3, 2, 6], offlineReplicas=[ ]) for partition A-1 in response to UpdateMetadata request sent by controller 2 epoch 30 with correlation id 0 (state.change.logger) 2机器上的日志: [2021-03-16 15:07:26,164] WARN [ReplicaManager broker=2] Leader 2 failed to record follower 6's position 11679162 since the replica is not recognized to be one of the assigned replicas 3,4,2 for partition A-1. Empty records will be returned for this partition. (kafka.server.ReplicaManager)2021-03-18
- 鲁·本增加一个主题的分区调用的是 KafkaController 的方法: processTopicChange -> onNewPartitionCreation2020-09-01
- 张子涵调用的是onNewPartitionCreation方法,源码如下 private def onNewPartitionCreation(newPartitions: Set[TopicPartition]): Unit = { info(s"New partition creation callback for ${newPartitions.mkString(",")}") partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition) replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica) partitionStateMachine.handleStateChanges( newPartitions.toSeq, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy(false)) ) replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica) }2020-08-25
收起评论