Kafka 核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
19216 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 44 讲
结束语 (1讲)
Kafka 核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

06 | 请求通道:如何实现Kafka请求队列?

你好,我是胡夕。日志模块我们已经讲完了,你掌握得怎样了呢?如果你在探索源码的过程中碰到了问题,记得在留言区里写下你的困惑,我保证做到知无不言。
现在,让我们开启全新的“请求处理模块”的源码学习之旅。坦率地讲,这是我自己给 Kafka 源码划分的模块,在源码中可没有所谓的“请求处理模块”的提法。但我始终觉得,这样划分能够帮助你清晰地界定不同部分的源码的作用,可以让你的阅读更有针对性,学习效果也会事半功倍。
在这个模块,我会带你了解请求处理相关的重点内容,包括请求处理通道、请求处理全流程分析和请求入口类分析等。今天,我们先来学习下 Kafka 是如何实现请求队列的。源码中位于 core/src/main/scala/kafka/network 下的 RequestChannel.scala 文件,是主要的实现类。
当我们说到 Kafka 服务器端,也就是 Broker 的时候,往往会说它承担着消息持久化的功能,但本质上,它其实就是一个不断接收外部请求、处理请求,然后发送处理结果的 Java 进程
你可能会觉得奇怪,Broker 不是用 Scala 语言编写的吗,怎么这里又是 Java 进程了呢?这是因为,Scala 代码被编译之后生成.class 文件,它和 Java 代码被编译后的效果是一样的,因此,Broker 启动后也仍然是一个普通的 Java 进程。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入解析了Kafka请求通道的实现原理,重点关注了请求处理通道、请求处理全流程分析和请求入口类分析。文章详细解释了Kafka请求的定义和实现原理,包括请求的属性和监控指标。通过对RequestChannel.scala文件的分析,阐述了请求的处理过程和关键属性,如processor、context、startTimeNanos、memoryPool、buffer和metrics。此外,还介绍了Request对象的解析过程和相关的监控指标管理。文章以技术性强、深入源码的方式,帮助读者深入了解Kafka请求队列的实现原理。同时,还介绍了Response类的继承关系和各个子类的作用,以及RequestChannel类的定义和重要字段属性。文章还详细解释了RequestChannel类的主要功能,包括处理Request和Response的方法,以及Processor线程池的管理。整体而言,本文通过深入源码的方式,帮助读者全面了解Kafka请求通道的实现原理,适合对Kafka感兴趣的技术人员阅读。文章还介绍了RequestChannel类的监控指标实现,包括RequestsPerSec、RequestQueueTimeMs、LocalTimeMs、RemoteTimeMs和TotalTimeMs等重要指标,帮助读者了解如何评估Broker的繁忙状态以及定位系统问题。文章总结了Request、Response、RequestChannel和监控指标等关键内容,鼓励读者思考Request和Response在请求通道中的流转过程,以加深对Kafka请求处理机制的理解。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(17)

  • 最新
  • 精选
  • 胡夕
    置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,咱们重点了解了Kafka中位移索引和时间戳索引的不同。课后我让你尝试自行编写一个函数,该函数实现类似于CEILING 函数的位移查找逻辑,即返回不小于给定位移值 targetOffset 的最小位移值和对应的物理文件位置。以下是我给出的代码: def ceilingLookup(targetOffset: Long): OffsetPosition = { maybeLock(lock) { val idx = mmap.duplicate val slot = smallestUpperBoundSlotFor(idx, targetOffset, IndexSearchType.KEY) if (slot == -1) OffsetPosition(baseOffset, 0) else parseEntry(idx, slot) } } okay,你是怎么考虑的呢?我们可以一起讨论下。
    2020-05-06
    6
  • Ball
    思考题:监控 Request 队列当前的使用情况,应该是 newGauge(requestQueueSizeMetricName, () => requestQueue.size) 这个方法里面,requestQueueSizeMetricName 这个指标。

    作者回复: 这个指标名称实际上是RequestQueueSize:)

    2020-10-04
    1
  • delete is create
    感觉kafka中大量使用协调器(findCoordinator) 老师协调器到底是做什么的 我今天看一个kafkaAdminClient中获取消费者组信息里面也有这个东西

    作者回复: 协调器是做协调用的:) hmmm... sorry,哈哈哈 目前Kafka中有两类协调器,GroupCoordinator和TransactionCoordinator。前者用于管理消费者组,后者是用于事务管理的。就以消费者组来说,Coordinator需要生成消费者组的元数据信息,负责维护组的位移提交和获取以及全套的rebalance流程支持。

    2020-04-28
    2
    1
  • lucas
    老师好,请教一个问题,我flink1.12.1写入kafka2.3.0的时候报错,而且是运行几个小时之后才会报 Transiting to fatal error state due to org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. Transiting to fatal error state due to org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 1

    作者回复: 还是版本不匹配的问题

    2021-03-28
  • Geek_ae1f59
    老师你好,我们kafka集群responseSendTimeMs监控指标非常大,而且部分节点还抛出异常:java.io.IOException: Connection to xx was disconnected before the response was read,关于这个有什么排查思路吗?

    作者回复: 排查网络的问题吧

    2020-06-11
  • sljoai
    “StartThrottlingResponse:用于通知 Broker 的 Socket Server 组件某个 TCP 连接通信通道开始被限流(throttling). ”这里的限流如何理解呢?是物理网络上的限制,还是可以在Kafka中设置的?!

    作者回复: Kafka实现的。基本上可以认为是人为引入延时

    2020-06-10
  • 空知
    用kafka-configs 命令把Processor 线程池中的线程数量减少 会导致响应丢失的吧,这个不用处理或者警告限制嘛?

    作者回复: 的确可能丢失,到时候客户端自行处理。不过我同意应该加个告警。如果你有兴趣可以提交一个patch给社区,修改RequestChannel.scala的sendResponse方法代码即可:)

    2020-05-05
  • 小崔
    对于一个特定的请求,TotalTimeMs = RequestQueueTimeMs + LocalTimeMs + RemoteTimeMs,对吗?还是说LocalTimeMs包含了(RemoteTimeMs + 本地处理时间)?

    作者回复: 前一个是对的。local time是指本地broker写入磁盘的时间

    2020-05-05
    2
  • 镜子中间
    这节课的信息量很大,干货满满,就是听完下来云里雾里,打算对照源码和老师的讲解梳理一遍

    作者回复: 加油加油,如果有问题就提出来, 我第一时间回复

    2020-05-01
  • 🐾
    胡总下午好,专栏后面会介绍如何在IDEA进行调试吗?想看下客户端发送消息到服务端的整个流程,再结合文章介绍的功能点,功力提升效果会比较好些

    作者回复: hmmm,实话说本专栏聚焦于Broker端代码。客户端染指的很少。不过你可以将整个流程拆成broker端调试+clients端调试。特别是broker端调试,直接在KafkaApis.scala中打断点就可以

    2020-04-29
    2
收起评论
显示
设置
留言
17
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部