Kafka核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
新⼈⾸单¥19.9
3993 人已学习
课程目录
已完结 44 讲
0/4登录后,你可以任选4讲全文学习。
课前必学 (3讲)
开篇词 | 阅读源码,逐渐成了职业进阶道路上的“必选项”
免费
导读 | 构建Kafka工程和源码阅读环境、Scala语言热身
重磅加餐 | 带你快速入门Scala语言
日志模块 (5讲)
01 | 日志段:保存消息文件的对象是怎么实现的?
02 | 日志(上):日志究竟是如何加载日志段的?
03 | 日志(下):彻底搞懂Log对象的常见操作
04 | 索引(上):改进的二分查找算法在Kafka索引的应用
05 | 索引(下):位移索引和时间戳索引的区别是什么?
请求处理模块 (5讲)
06 | 请求通道:如何实现Kafka请求队列?
07 | SocketServer(上):Kafka到底是怎么应用NIO实现网络通信的?
08 | SocketServer(中):请求还要区分优先级?
09 | SocketServer(下):请求处理全流程源码分析
10 | KafkaApis:Kafka最重要的源码入口,没有之一
Controller模块 (5讲)
11 | Controller元数据:Controller都保存有哪些东西?有几种状态?
12 | ControllerChannelManager:Controller如何管理请求发送?
13 | ControllerEventManager:变身单线程后的Controller如何处理事件?
14 | Controller选举是怎么实现的?
15 | 如何理解Controller在Kafka集群中的作用?
状态机模块 (3讲)
16 | TopicDeletionManager: Topic是怎么被删除的?
17 | ReplicaStateMachine:揭秘副本状态机实现原理
18 | PartitionStateMachine:分区状态转换如何实现?
延迟操作模块 (2讲)
19 | TimingWheel:探究Kafka定时器背后的高效时间轮算法
20 | DelayedOperation:Broker是怎么延时处理请求的?
副本管理模块 (6讲)
21 | AbstractFetcherThread:拉取消息分几步?
22 | ReplicaFetcherThread:Follower拉取Leader消息是如何实现的?
23 | ReplicaManager(上):必须要掌握的副本管理类定义和核心字段
24 | ReplicaManager(中):副本管理器是如何读写副本的?
25 | ReplicaManager(下):副本管理器是如何管理副本的?
26 | MetadataCache:Broker是怎么异步更新元数据缓存的?
消费者组管理模块 (7讲)
27 | 消费者组元数据(上):消费者组都有哪些元数据?
28 | 消费者组元数据(下):Kafka如何管理这些元数据?
29 | GroupMetadataManager:组元数据管理器是个什么东西?
30 | GroupMetadataManager:位移主题保存的只是位移吗?
31 | GroupMetadataManager:查询位移时,不用读取位移主题?
32 | GroupCoordinator:在Rebalance中,Coordinator如何处理成员入组?
33 | GroupCoordinator:在Rebalance中,如何进行组同步?
特别放送 (5讲)
特别放送(一)| 经典的Kafka学习资料有哪些?
特别放送(二)| 一篇文章带你了解参与开源社区的全部流程
特别放送(三)| 我是怎么度过日常一天的?
特别放送(四)| 20道经典的Kafka面试题详解
特别放送(五) | Kafka 社区的重磅功能:移除 ZooKeeper 依赖
期中、期末测试 (2讲)
期中测试 | 这些源码知识,你都掌握了吗?
期末测试 | 一套习题,测试你的掌握程度
结束语 (1讲)
结束语 | 源码学习,我们才刚上路呢
Kafka核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

22 | ReplicaFetcherThread:Follower拉取Leader消息是如何实现的?

胡夕 2020-06-16
你好,我是胡夕。今天,我们继续学习 Follower 是如何拉取 Leader 消息的。
要弄明白这个问题,在学习源码的时候,我们需要从父类 AbstractFetcherThread 开始学起,因为这是理解子类 ReplicaFetcherThread 的基础。上节课,我们已经学习了 AbstractFetcherThread 的定义,以及 processPartitionData、truncate、buildFetch 这三个方法的作用。现在,你应该掌握了拉取线程源码的处理逻辑以及支撑这些逻辑实现的代码结构。
不过,在上节课的末尾,我卖了个关子——我把串联起这三个方法的 doWork 方法留到了今天这节课。等你今天学完 doWork 方法,以及这三个方法在子类 ReplicaFetcherThread 中的实现代码之后,你就能完整地理解 Follower 副本应用拉取线程(也就是 ReplicaFetcherThread 线程),从 Leader 副本获取消息并处理的流程了。
那么,现在我们就开启 doWork 以及子类 ReplicaFetcherThread 代码的阅读。

AbstractFetcherThread 类:doWork 方法

doWork 方法是 AbstractFetcherThread 类的核心方法,是线程的主逻辑运行方法,代码如下:
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Kafka核心源码解读》,如需阅读全部文章,
请订阅文章所属专栏新⼈⾸单¥19.9
立即订阅
登录 后留言

精选留言(4)

  • 胡夕 置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~

    上节课,我们重点学习了DelayedOperation类源码以及Broker是如何处理延时请求的。课后我请你简单描述下handlePartitionsWithErrors方法的实现原理。我的看法是这样的:这个方法接收一组分区,为它们调用delayPartitions方法执行延时处理。delayPartitions方法会遍历每个给定的分区,并把这些分区依次加入到待读取分区列表的末端,从而实现延时重试操作。简单来说,Kafka处理出错分区的思路就是将这些分区放入到轮询名单的末尾期待稍后重试。

    okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。
    2020-06-23
    1
  • 伯安知心
    updateFetchOffsetAndMaybeMarkTruncationComplete方法只有在truncateToEpochEndOffsets和truncateToHighWatermark中结尾执行,简单来说就是truncate之后,更新标记截断完成,更新partitionStates值,如果它们的offsetTruncationState指示截断已完成,则可能将它们标记为截断已完成

    作者回复: 👍

    2020-06-16
    1
  • 张子涵
    // 循环遍历所有分区,更新它们的取值偏移量,如果它们的offsetTruncationState表示截断完成,可以将它们标记为截断完成
      private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: Map[TopicPartition, OffsetTruncationState]): Unit = {
        val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStateMap.asScala
          .map { case (topicPartition, currentFetchState) =>
            val maybeTruncationComplete = fetchOffsets.get(topicPartition) match {
              case Some(offsetTruncationState) =>
                val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating
                PartitionFetchState(offsetTruncationState.offset, currentFetchState.lag,
                  currentFetchState.currentLeaderEpoch, currentFetchState.delay, state)
              case None => currentFetchState
            }
            (topicPartition, maybeTruncationComplete)
          }
        partitionStates.set(newStates.asJava)
      }
    ps(一边听着老师的语音一边看文档,更能提高注意力,后面的同学们可以尝试一下!)
    2020-08-28
  • 您好
    这块有个小问题想咨询一下
    就是有没有参数去控制
    consumer的fetch频率
    replica的fetch频率

    作者回复: Replica这端有两个参数可以使用:leader.replication.throttled.rate和follower.replication.throttled.rate

    Consumer端可以自己控制啊,比如人为地增加sleep等

    2020-08-14
收起评论
4
返回
顶部