Kafka 核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
19216 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 44 讲
结束语 (1讲)
Kafka 核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

21 | AbstractFetcherThread:拉取消息分几步?

你好,我是胡夕。从今天开始,我们正式进入到第 5 大模块“副本管理模块”源码的学习。
在 Kafka 中,副本是最重要的概念之一。为什么这么说呢?在前面的课程中,我曾反复提到过副本机制是 Kafka 实现数据高可靠性的基础。具体的实现方式就是,同一个分区下的多个副本分散在不同的 Broker 机器上,它们保存相同的消息数据以实现高可靠性。对于分布式系统而言,一个必须要解决的问题,就是如何确保所有副本上的数据是一致的。
针对这个问题,最常见的方案当属 Leader/Follower 备份机制(Leader/Follower Replication)。在 Kafka 中, 分区的某个副本会被指定为 Leader,负责响应客户端的读写请求。其他副本自动成为 Follower,被动地同步 Leader 副本中的数据。
这里所说的被动同步,是指 Follower 副本不断地向 Leader 副本发送读取请求,以获取 Leader 处写入的最新消息数据。
那么在接下来的两讲,我们就一起学习下 Follower 副本是如何通过拉取线程做到这一点的。另外,Follower 副本在副本同步的过程中,还可能发生名为截断(Truncation)的操作。我们一并来看下它的实现原理。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Kafka中的副本管理模块是确保数据高可靠性的关键组件,而副本同步则是其中的重要问题。本文深入探讨了Follower副本如何通过拉取线程实现数据同步,并详细介绍了AbstractFetcherThread抽象基类的定义和重要字段。通过源码分析,读者可以了解到Kafka中副本获取线程的实现原理,以及如何定位和解决生产环境中的问题。文章还强调了对源码的理解能够帮助解决实际问题,并指出了源码中的改进对问题解决的重要性。通过对AbstractFetcherThread类的介绍,读者可以了解到其在副本管理模块中的重要作用,以及为子类实现提供的基础。此外,还介绍了分区读取状态类的实现原理和重要方法,以及AbstractFetcherThread类提供的关键方法。整体而言,本文为读者提供了深入了解Kafka副本管理模块的基础知识,并强调了对源码理解的重要性。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(6)

  • 最新
  • 精选
  • 胡夕
    置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,咱们结合源码重点了解了Broker是如何处理延时请求的。课后我挑了advanceClock方法中的一个语句让你去分析它的目的。这行语句计算你的事是watcherList中总的已完成的请求数。另外在遍历watcherList的同时,代码还会将这些已完成的任务从链表中移除。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。
    2020-06-15
    1
  • 伯安知心
    handlePartitionsWithErrors方法简单来说就是,如果可能出现leader切换,延迟处理带错误的partitions,并且延迟处理过程加了中断优先处理的锁,

    作者回复: 👍

    2020-06-14
    1
  • Geek_08f329
    打扰下大佬,最近集群出现isr抖动的问题,定位是leaderisrupdatelock的锁竞争导致的。除了正常的业务请求,我发现每次出现问题的罪魁祸首是follower节点的fetchfollower请求。 在测试环境发现fetchfollower的量受到集群内处理请求的节点数以及produce与consume流量的强影响。 我有以下两个疑惑: - 为什么consume流量上涨会导致fetchfollower的上涨? - 我尝试扩容。集群内的topic一半AR设置为1 2 3,一半设置为4,5,6。我理解相当于均摊了partition数量但是没有为当前节点引入更多相关broker,按理说相当于拆分成两个集群,但是fetchfollower的数量仍然产生了近两倍的fetchfollower量。请问下具体原因是什么? 与fetchfollower量相关的broker参数我找到了config限流,replica.fetch.wait.max.ms, fetch.min.bytes。但是这些参数都将加大丢消息以及被踢出isr的风险。如果您有别的方法来减少fetchfollower的量,将感激不尽。谢谢
    2023-02-03归属地:广东
  • 杨逸林
    老师,sourceBroker 这个配置是从 Zookeeper /broker/ids/[0-n] 这个节点下拿的配置吧,如果配置了 advertised.listener 那就会变成这个配置的地址,是这样吗?我现在遇到的问题就是内外网非双网卡的情况,多副本的 Topic 发送消息就会导致 Broker 报错。
    2022-07-25
  • 梁聪明
    handlePartitionsWithErrors 方法的实现原理 1、当partitions不为空时,调用delayPartitions方法 2、获取每个分区的partitionStates,如果当前分区状态不是isDelayed则将该分区移动至末尾,进行延迟处理
    2020-08-31
  • 张子涵
    handlePartitionsWithErrors方法源码如下: // 处理可能由于领导层变更而导致的错误分区 private def handlePartitionsWithErrors(partitions: Iterable[TopicPartition], methodName: String): Unit = { if (partitions.nonEmpty) { debug(s"Handling errors in $methodName for partitions $partitions") delayPartitions(partitions, fetchBackOffMs) } } 底层调用delayPartitions方法 def delayPartitions(partitions: Iterable[TopicPartition], delay: Long): Unit = { partitionMapLock.lockInterruptibly() try { for (partition <- partitions) { Option(partitionStates.stateValue(partition)).foreach { currentFetchState => if (!currentFetchState.isDelayed) { partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentFetchState.fetchOffset, currentFetchState.lag, currentFetchState.currentLeaderEpoch, Some(new DelayedItem(delay)), currentFetchState.state)) } } } partitionMapCond.signalAll() } finally partitionMapLock.unlock() }
    2020-08-28
收起评论
显示
设置
留言
6
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部