Kafka核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
新⼈⾸单¥19.9
3969 人已学习
课程目录
已完结 44 讲
0/4登录后,你可以任选4讲全文学习。
课前必学 (3讲)
开篇词 | 阅读源码,逐渐成了职业进阶道路上的“必选项”
免费
导读 | 构建Kafka工程和源码阅读环境、Scala语言热身
重磅加餐 | 带你快速入门Scala语言
日志模块 (5讲)
01 | 日志段:保存消息文件的对象是怎么实现的?
02 | 日志(上):日志究竟是如何加载日志段的?
03 | 日志(下):彻底搞懂Log对象的常见操作
04 | 索引(上):改进的二分查找算法在Kafka索引的应用
05 | 索引(下):位移索引和时间戳索引的区别是什么?
请求处理模块 (5讲)
06 | 请求通道:如何实现Kafka请求队列?
07 | SocketServer(上):Kafka到底是怎么应用NIO实现网络通信的?
08 | SocketServer(中):请求还要区分优先级?
09 | SocketServer(下):请求处理全流程源码分析
10 | KafkaApis:Kafka最重要的源码入口,没有之一
Controller模块 (5讲)
11 | Controller元数据:Controller都保存有哪些东西?有几种状态?
12 | ControllerChannelManager:Controller如何管理请求发送?
13 | ControllerEventManager:变身单线程后的Controller如何处理事件?
14 | Controller选举是怎么实现的?
15 | 如何理解Controller在Kafka集群中的作用?
状态机模块 (3讲)
16 | TopicDeletionManager: Topic是怎么被删除的?
17 | ReplicaStateMachine:揭秘副本状态机实现原理
18 | PartitionStateMachine:分区状态转换如何实现?
延迟操作模块 (2讲)
19 | TimingWheel:探究Kafka定时器背后的高效时间轮算法
20 | DelayedOperation:Broker是怎么延时处理请求的?
副本管理模块 (6讲)
21 | AbstractFetcherThread:拉取消息分几步?
22 | ReplicaFetcherThread:Follower拉取Leader消息是如何实现的?
23 | ReplicaManager(上):必须要掌握的副本管理类定义和核心字段
24 | ReplicaManager(中):副本管理器是如何读写副本的?
25 | ReplicaManager(下):副本管理器是如何管理副本的?
26 | MetadataCache:Broker是怎么异步更新元数据缓存的?
消费者组管理模块 (7讲)
27 | 消费者组元数据(上):消费者组都有哪些元数据?
28 | 消费者组元数据(下):Kafka如何管理这些元数据?
29 | GroupMetadataManager:组元数据管理器是个什么东西?
30 | GroupMetadataManager:位移主题保存的只是位移吗?
31 | GroupMetadataManager:查询位移时,不用读取位移主题?
32 | GroupCoordinator:在Rebalance中,Coordinator如何处理成员入组?
33 | GroupCoordinator:在Rebalance中,如何进行组同步?
特别放送 (5讲)
特别放送(一)| 经典的Kafka学习资料有哪些?
特别放送(二)| 一篇文章带你了解参与开源社区的全部流程
特别放送(三)| 我是怎么度过日常一天的?
特别放送(四)| 20道经典的Kafka面试题详解
特别放送(五) | Kafka 社区的重磅功能:移除 ZooKeeper 依赖
期中、期末测试 (2讲)
期中测试 | 这些源码知识,你都掌握了吗?
期末测试 | 一套习题,测试你的掌握程度
结束语 (1讲)
结束语 | 源码学习,我们才刚上路呢
Kafka核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

07 | SocketServer(上):Kafka到底是怎么应用NIO实现网络通信的?

胡夕 2020-04-30
你好,我是胡夕。这节课我们来说说 Kafka 底层的 NIO 通信机制源码。
在谈到 Kafka 高性能、高吞吐量实现原理的时候,很多人都对它使用了 Java NIO 这件事津津乐道。实际上,搞懂“Kafka 究竟是怎么应用 NIO 来实现网络通信的”,不仅是我们掌握 Kafka 请求全流程处理的前提条件,对我们了解 Reactor 模式的实现大有裨益,而且还能帮助我们解决很多实际问题。
比如说,当 Broker 处理速度很慢、需要优化的时候,你只有明确知道 SocketServer 组件的工作原理,才能制定出恰当的解决方案,并有针对性地给出对应的调优参数。
那么,今天,我们就一起拿下这个至关重要的 NIO 通信机制吧。

网络通信层

在深入学习 Kafka 各个网络组件之前,我们先从整体上看一下完整的网络通信层架构,如下图所示:
可以看出,Kafka 网络通信组件主要由两大部分构成:SocketServerKafkaRequestHandlerPool
SocketServer 组件是核心,主要实现了 Reactor 模式,用于处理外部多个 Clients(这里的 Clients 指的是广义的 Clients,可能包含 Producer、Consumer 或其他 Broker)的并发请求,并负责将处理结果封装进 Response 中,返还给 Clients。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Kafka核心源码解读》,如需阅读全部文章,
请订阅文章所属专栏新⼈⾸单¥19.9
立即订阅
登录 后留言

精选留言(16)

  • 胡夕 置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~

    上节课,咱们重点了解了Kafka中的请求通道:RequestChannel类。课后我让你去源码中寻找监控Request队列当前使用情况的监控指标。下面我给出我的答案。这个监控指标就是RequestQueueSize。完整的MBean写法是kafka.network:type=RequestChannel,name=RequestQueueSize。你可以在RequestChannel.scala中找到这行源码:newGauge(requestQueueSizeMetricName, () => requestQueue.size)
    这里的requestQueueSizeMetricName就对应于RequestQueueSize


    okay,你找到了吗?我们可以一起讨论下。
    2020-05-06
    1
  • 每天晒白牙
    去年自己在工作中接触了 Kafka,然后写了篇网络层的源码分析(应该叫源码注释 + 画图)
    https://mp.weixin.qq.com/s/-VzDU0V8J2guNXwhiBEEyg

    作者回复: 赞👍

    2020-04-30
    6
  • Geek_d1cc4d
    Request队列线程共享,这样不同线程的workload才不会发生倾斜,不然可能会发生一边的线程空闲,一边的线程队列满

    作者回复: 很有道理:)

    2020-04-30
    2
  • __Unsafe
    老师,想请教一个问题,kafka是如何处理nio 的 epoll空轮训bug的?

    作者回复: Kafka没做处理。毕竟不属于它应做的事情

    2020-07-02
    1
  • hc
    老师你好,我这边生产环境kafka0.10版本生产者偶然会出现kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries的问题,问一下您这边有什么好的定位思路吗?

    作者回复: 看着像是非常古老的异常了。通常都是listeners配置或是clients的/etc/hosts配置的问题。总之是连不上Broker导致的

    2020-05-20
    1
    1
  • 长脖子树
    总结了下 acceptor 和 processor 的处理逻辑
    Acceptor 部分:
    1. 建立一个 ServerSocketChannel , 并注册监听 OP_ACCEPT 事件
    2. 循环调用 select 方法, 获取事件通知, 这时会拿到一个 socketChannel , 然后使用轮询选择一个空闲的 processor ,
     将这个 socketChannel 放入 processor 的 newConnections 队列中

    Processor 部分:
    1. 给 processor 的 newConnections 队列中的 channel 注册 OP_READ
    2. 从 responseQueue 中拉取 response 对象, 发送 response , 并将 发送完的 response 放入 inflightResponses 队列中
    3. 获取对应socketChannel 上准备就绪的 IO 操作, 将获取到的字节, 封装成 NetworkReceive , 并放入 completedReceives map 中
    4. 从 completedReceives 中获取 NetWorkReceive , 将其封装成 Request , 放入所有 processor 共享的请求队列 requestQueue 中
    5. 取出 inflightResponses 队列中的response , 对其调用回调逻辑
    6. 处理连接断开后的事情
    7. 关闭超过配额限制部分的连接
    2020-07-23
  • 长脖子树
    文中说:
     每当 Processor 线程接收新的连接请求时,都会将对应的 SocketChannel 放入这个队列。后面在创建连接时(也就是调用 configureNewConnections 时),就从该队列中取出 SocketChannel,然后注册新的连接。
    按照我的理解, tcp 三次握手是在 Acceptor 调用 accept() 完成之前就已经完成了的. 而并不是 processor 的工作. processor 的 configureNewConnections 方法只是将 已经建立的连接取出来再次注册 OP_READ 事件而已

    老师怎么看? 大家有什么想法么?

    作者回复: 嗯嗯,你说的是对的。只是我们对于“创建连接”的定义可能不太一样。通常情况下,除了创建TCP连接,Kafka还需要创建所需的通道等对象,这在Kafka看来都是正常工作前的准备工作:)

    2020-07-23
  • 长脖子树
    Processor 中 run() 中会调用 processCompletedReceives() 这个方法, 将 request 放入 requestQueue 中, 还以为它没加锁, 多个 Processor 会导致问题. 最后发现 requestQueue 是 ArrayBlockingQueue , 里面用了可重入锁. 哎哎哎 并发容器又忘得差不多了

    作者回复: 😃

    2020-07-22
  • yes的练级攻略
    对于老师一直讲述的网络线程和IO线程我一直觉得有点别捏,单单是因为参数的名称所以这样叫嘛?还是官方就是这样命名的?
    在我的印象中,对于kafka的processor线程所做的事,我认为processor应该称为IO线程,读写socket,而KafkaRequestHandlerPool做的事,对于Broker来说不正是Broker所需要的做的"业务"么?所以称之为业务线程而不是IO线程。

    作者回复: 这个提法呢是Kafka这边的定义,不是我自己命名的,源码中并无业务线程的提法。不过我们理解意思就行,不用太纠结于称谓

    2020-06-29
  • huldar
    老师,您好。如何正确运行ApiKeys中的main方法呢?直接在idea中点击run按钮会报ClassNotFoundException。查资料后分析应该使用gradle的task运行,但是在clients中没有找到应该是哪个task。谢谢。

    作者回复: 是碰到这个错误了吗? java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode

    如果是,我倾向于认为这是一个bug

    2020-05-16
    1
  • 小崔
    接上条老师回复,“ 如果RTT很大,我们最好是让Socket buffer累积更多的数据之后一次性发送,节省网络I/O带宽。”
    但这里调大的缓冲区是broker端的,broker是接收端,根本措施不应该是调大clients一次发送的的数据量么?
    还是说这里的broker既是接收端,又是其他broker的clients?文中rtt指broker之间的通讯?

    作者回复: producer、consumer和broker端都需要调整啊,都有各自的参数

    2020-05-09
  • 小崔
    为什么Clients 与 Broker 的通信网络延迟很大,建议调大缓冲区呢?背后原理是什么?

    作者回复: 如果RTT很大,我们最好是让Socket buffer累积更多的数据之后一次性发送,节省网络I/O带宽。就比如送快递,如果目的地很远,快递员最好先积攒多一点待发送的物件之后再去发送

    2020-05-07
    1
  • 小崔
    response跟对应和socket链接是绑死的,如果放到共享队列,也只能用一个dispatcher来取,发给对应processor,很多余。

    作者回复: 很有道理👍

    2020-05-06
  • 伯安知心
    我认为2点,第一,保持处理独立性,一个processor处理一个链接;第二,按理说限制response队列代码限制越少,请求越快,我个人觉得和状态有关系,kafka请求处理是无状态模式,这么做是在有状态处理做的努力。说kafka无状态也有问题,有个session(虽然开启kerberos才使用),但是这个也能证明kafka并非绝对无状态。
    2020-05-05
  • 冯磊
    利用假期终于追上了,胡老师讲得非常详细,感谢。胡老师,日后有没有想过开一节操作实操课?日常开发中可能会遇到上面提到的某种问题,但不知从何下手,如果老师带着拿出实际中一两个小案例来剖析下,我们就更清晰了,遇到问题也知道如何思考了。

    作者回复: 视频课这种方式可能后续会认真考虑看看的,目前确实还没有思路。不过多谢鼓励!

    2020-05-03
  • 曾轼麟
    我是这样理解的,resquest共享这样才能实线程间的负载均衡,response专属的是因为对应的request已经分配给对应的线程处理已,为了避免线程上下文上下文切换理因也由这个线程处理响应,作为一个线程内部的变量更加合理

    作者回复: 嗯,很有道理:)

    2020-05-02
    2
收起评论
16
返回
顶部