Kafka 核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
19216 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 44 讲
结束语 (1讲)
Kafka 核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

10 | KafkaApis:Kafka最重要的源码入口,没有之一

你好,我是胡夕。今天,我们来收尾 Kafka 请求处理模块的源码学习。讲到这里,关于整个模块,我们还有最后一个知识点尚未掌握,那就是 KafkaApis 类。
在上节课中,我提到过,请求的实际处理逻辑是封装在 KafkaApis 类中的。你一定很想知道,这个类到底是做什么的吧。
实际上,我一直认为,KafkaApis 是 Kafka 最重要的源码入口。因为,每次要查找 Kafka 某个功能的实现代码时,我们几乎总要从这个 KafkaApis.scala 文件开始找起,然后一层一层向下钻取,直到定位到实现功能的代码处为止。比如,如果你想知道创建 Topic 的流程,你只需要查看 KafkaApis 的 handleCreateTopicsRequest 方法;如果你想弄懂 Consumer 提交位移是怎么实现的,查询 handleOffsetCommitRequest 方法就行了。
除此之外,在这一遍遍的钻取过程中,我们还会慢慢地掌握 Kafka 实现各种功能的代码路径和源码分布,从而建立起对整个 Kafka 源码工程的完整认识
如果这些还不足以吸引你阅读这部分源码,那么,我再给你分享一个真实的案例。
之前,在使用 Kafka 时,我发现,Producer 程序一旦向一个不存在的主题发送消息,在创建主题之后,Producer 端会抛出一个警告:
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

KafkaApis类是Kafka源码中的重要入口之一,负责处理各种Kafka请求类型。该类关联了众多重要组件,如ReplicaManager、GroupCoordinator和KafkaController,因此在处理不同类型的RPC请求时,会用到不同的组件。KafkaApis类的handle方法利用Scala语言中的模式匹配语法,完整地列出了对所有请求类型的处理逻辑,使读者能够串联出Kafka处理任何请求的源码路径。通过学习KafkaApis类,读者可以逐步掌握Kafka实现各种功能的代码路径和源码分布,从而建立对整个Kafka源码工程的完整认识。文章还分享了一个真实案例,通过查看KafkaApis代码,作者发现了LEADER_NOT_AVAILABLE异常是如何抛出的,强调了学习源码的重要性。Kafka社区维护了一个官方文档,专门记录这些RPC协议,包括不同版本所需的Request格式和Response格式。KafkaApis类的handle方法封装了所有RPC请求的具体处理逻辑,而sendResponse系列方法负责发送Response给请求发送方。authorize方法是请求处理前权限校验层的主要逻辑实现。通过学习KafkaApis类,读者可以掌握从KafkaApis类开始去寻找单个功能具体代码位置的方法,对深入学习源码或解决实际问题非常有帮助。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(8)

  • 最新
  • 精选
  • 胡夕
    置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,咱们重点了解了请求处理的全流程。课后我请你思考这样一个问题,即请求处理流程的哪些部分应用了经典的“生产者 - 消费者”模式。其实在Kafka的网路通信层,请求队列和响应队列都应用了生产者-消费者模式。请求队列的生产者是Processor线程,消费者是KafkaRequestHandler线程;响应队列的生产者是KafkaRequestHandler线程,而Processor线程是消费者。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。
    2020-05-19
  • 张子涵
    个人觉得答案应该在handleOffsetCommitRequest 方法中,比较贴近的应该是这一段 // reject the request if not authorized to the group if (!authorize(request, READ, GROUP, offsetCommitRequest.data.groupId)) { val error = Errors.GROUP_AUTHORIZATION_FAILED val responseTopicList = OffsetCommitRequest.getErrorResponseTopics( offsetCommitRequest.data.topics, error) sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse( new OffsetCommitResponseData() .setTopics(responseTopicList) .setThrottleTimeMs(requestThrottleMs) )) 如果没有授权给组,则拒绝请求

    作者回复: ������

    2020-08-21
    1
  • 懂码哥(GerryWen)
    胡总,我们目前生产版本是2.1.0,遇到一个比较严重也比较紧急问题,希望您能帮忙分析一下是不是版本问题。 很奇怪的现象就是重启某个有问题的broker就正常了,但是维持不到八个小时,又出问题了。 我把jira贴一下,你帮我看下,是否有人遇到同样问题问你。看jira回答,是在2.1.1版本修复了。 https://issues.apache.org/jira/browse/KAFKA-7697 https://issues.apache.org/jira/browse/KAFKA-7870 https://issues.apache.org/jira/browse/KAFKA-6582 谢谢谢谢谢谢!

    作者回复: 信息量太少了,光说是一个奇怪的问题,没有日志和相关描述很难判断啊:( 快速验证的话不妨升级到2.1.1试试呢

    2020-07-08
    2
    1
  • 曾轼麟
    提交位移在handleOffsetCommitRequest方法中,需要判断当前发起commitRequest的Consumer对这个group是否有读权限

    作者回复: 是的~

    2020-05-31
    1
  • 鲁·本
    if (!authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { 需要消费者组有读权限

    作者回复: ������

    2020-08-27
  • 空知
    有 READ权限就可以了~~

    作者回复: 能确定所在代码吗:)

    2020-05-14
    2
  • 伯安知心
    我看的kafka版本0.9中提交consumer位移在handleoffsetcommitrequest方法中,执行authorize验证在simpleaclauthorizer类map中是否包含这个用户,如果有则验证通过。

    作者回复: 是的,是否能再进一步,看看它需要什么权限?

    2020-05-10
    2
  • 欧阳
    老师,你好。学习kafka很容易想到raft,理论上很多近似和相同的地方。我的问题是:在实际生产环境中,需要搭建跨地域的集群,保证一致性是不是根本就不太可行?集群随时都有可能脑裂?isr集合一直变来变去。 如果不能跨地域的话,异地多中心的数据如何保证一致性呢?

    作者回复: 一般采用多套Kafka集群+异步同步的方式

    2020-05-09
收起评论
显示
设置
留言
8
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部