22 | ReplicaFetcherThread:Follower如何拉取Leader消息?
AbstractFetcherThread 类:doWork 方法
- 深入了解
- 翻译
- 解释
- 总结
本文深入剖析了ReplicaFetcherThread的实现原理,重点介绍了Follower副本应用拉取线程的流程。通过对doWork方法和相关方法的逐步介绍,读者能够完整地理解ReplicaFetcherThread的执行过程。文章首先解释了AbstractFetcherThread类的doWork方法,详细讲解了副本截断操作和消息获取操作的重要性。接着,对maybeTruncate方法和maybeFetch方法的实现逻辑进行了详细解读,包括截断操作的实现原理和消息获取的核心逻辑。此外,文章还提供了代码片段和流程图,帮助读者更好地理解ReplicaFetcherThread的执行过程。 总之,本文以技术解读的方式,为想要深入了解Follower如何拉取Leader消息的读者提供了具有指导意义的文章。ReplicaFetcherThread类的定义和字段,以及重要方法processPartitionData、buildFetch和truncate也得到了详细解读。文章通过流程图和代码片段的方式,使读者更容易理解这些方法的实现逻辑。文章内容还涉及了truncate方法的实现,以及对AbstractFetcherThread线程的doWork方法的总结,强调了拉取线程的工作入口方法和其子类ReplicaFetcherThread类的重要性。整体而言,本文为读者提供了深入了解ReplicaFetcherThread的机会,使其能够全面理解Follower副本如何实时从Leader副本拉取消息并写入到本地日志,实现与Leader副本之间的同步。
《Kafka 核心源码解读》,新⼈⾸单¥59
全部留言(10)
- 最新
- 精选
- 胡夕置顶你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,我们重点学习了DelayedOperation类源码以及Broker是如何处理延时请求的。课后我请你简单描述下handlePartitionsWithErrors方法的实现原理。我的看法是这样的:这个方法接收一组分区,为它们调用delayPartitions方法执行延时处理。delayPartitions方法会遍历每个给定的分区,并把这些分区依次加入到待读取分区列表的末端,从而实现延时重试操作。简单来说,Kafka处理出错分区的思路就是将这些分区放入到轮询名单的末尾期待稍后重试。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。2020-06-233
- 在路上图片里的房子我看了十分钟,这得学多少 kafka 才能拥有啊?
作者回复: 阁下脑回路真是新奇,“害的”我也看了半天图片:)
2020-11-063 - 伯安知心updateFetchOffsetAndMaybeMarkTruncationComplete方法只有在truncateToEpochEndOffsets和truncateToHighWatermark中结尾执行,简单来说就是truncate之后,更新标记截断完成,更新partitionStates值,如果它们的offsetTruncationState指示截断已完成,则可能将它们标记为截断已完成
作者回复: 👍
2020-06-161 - 哲您好 这块有个小问题想咨询一下 就是有没有参数去控制 consumer的fetch频率 replica的fetch频率
作者回复: Replica这端有两个参数可以使用:leader.replication.throttled.rate和follower.replication.throttled.rate Consumer端可以自己控制啊,比如人为地增加sleep等
2020-08-14 - Geek_5258f8老师,复制leader的消息,pagecahe中的会复制吗?如果是,是不是存在一个消息某一时刻所有副本都在pagecache中?2023-09-21归属地:江苏1
- 张子涵// 循环遍历所有分区,更新它们的取值偏移量,如果它们的offsetTruncationState表示截断完成,可以将它们标记为截断完成 private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: Map[TopicPartition, OffsetTruncationState]): Unit = { val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStateMap.asScala .map { case (topicPartition, currentFetchState) => val maybeTruncationComplete = fetchOffsets.get(topicPartition) match { case Some(offsetTruncationState) => val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating PartitionFetchState(offsetTruncationState.offset, currentFetchState.lag, currentFetchState.currentLeaderEpoch, currentFetchState.delay, state) case None => currentFetchState } (topicPartition, maybeTruncationComplete) } partitionStates.set(newStates.asJava) } ps(一边听着老师的语音一边看文档,更能提高注意力,后面的同学们可以尝试一下!)2020-08-281
- linying很多地方讲的都是不清楚,稀里糊涂的2023-07-21归属地:广东
- Geek_3f38ee当分区存在 Leader Epoch 值时,源码会将副本的本地日志截断到 Leader Epoch 对应的最新位移值处,即方法 truncateToEpochEndOffsets 的逻辑实现; 老师请教下,不是每个分区都会有leader epoch值, 当leader变化epoch值会发生变化吗, 那为什么会有分区不存在 Leader Epoch呢2023-03-09归属地:北京
- surprise想问下如果是ack=all的情况下,还需要isr中的副本去leader拉取数据么,这里是不是也不需要写入本地日志了,因为发送的时候制定ack=all已经确认写入了isr中的副本2023-01-07归属地:北京
- 梁聪明updateFetchOffsetAndMaybeMarkTruncationComplete方法:更新已经完成截断的分区状态为Fetching2020-08-31