Kafka 核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
19216 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 44 讲
结束语 (1讲)
Kafka 核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

02 | 日志(上):日志究竟是如何加载日志段的?

你好,我是胡夕。今天我来讲讲 Kafka 源码的日志(Log)对象。
上节课,我们学习了日志段部分的源码,你可以认为,日志是日志段的容器,里面定义了很多管理日志段的操作。坦率地说,如果看 Kafka 源码却不看 Log,就跟你买了这门课却不知道作者是谁一样。在我看来,Log 对象是 Kafka 源码(特别是 Broker 端)最核心的部分,没有之一。
它到底有多重要呢?我和你分享一个例子,你先感受下。我最近正在修复一个 Kafka 的 Bug(KAFKA-9157):在某些情况下,Kafka 的 Compaction 操作会产生很多空的日志段文件。如果要避免这些空日志段文件被创建出来,就必须搞懂创建日志段文件的原理,而这些代码恰恰就在 Log 源码中。
既然 Log 源码要管理日志段对象,那么它就必须先把所有日志段对象加载到内存里面。这个过程是怎么实现的呢?今天,我就带你学习下日志加载日志段的过程。
首先,我们来看下 Log 对象的源码结构。

Log 源码结构

Log 源码位于 Kafka core 工程的 log 源码包下,文件名是 Log.scala。总体上,该文件定义了 10 个类和对象,如下图所示:
那么,这 10 个类和对象都是做什么的呢?我先给你简单介绍一下,你可以对它们有个大致的了解。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Kafka的Log源码解析 Kafka的Log对象是Broker端最核心的组成部分之一,负责管理日志段的加载和操作。Log对象的重要性体现在修复Kafka Bug时的必要性,例如避免产生空的日志段文件。Log对象的加载过程涉及LogAppendInfo、Log和RollParams等伴生对象,以及LogMetricNames、LogOffsetSnapshot、LogReadInfo和CompletedTxn等类。Log类定义了许多常量和辅助方法,其中Log Object定义了各种文件类型的常量,以及工具类方法如filenamePrefixFromOffset。Log类的核心属性包括dir和logStartOffset,分别表示日志所在的文件夹路径和日志的当前最早位移。其他重要属性包括nextOffsetMetadata、highWatermarkMetadata、segments和leaderEpochCache,分别用于管理下一条待插入消息的位移值、分区日志高水位值、日志段信息和Leader Epoch缓存。Log类的初始化逻辑涉及加载日志段、恢复日志段和完成未完成的swap操作。这些操作对于理解Kafka的核心功能至关重要。 在文章中,作者详细解释了Log对象的核心方法和逻辑,包括removeTempFilesAndCollectSwapFiles、loadSegmentFiles、completeSwapOperations和recoverLog。这些方法涉及了日志文件的清理、加载、恢复和修复操作,保证了Kafka Broker的稳定性和可靠性。通过对这些方法的分析,读者可以深入了解Kafka日志管理的内部实现,为理解Kafka的核心功能和Bug修复提供了重要参考。 总之,本文通过对Kafka源码中Log对象的核心方法和逻辑进行详细解析,为读者提供了深入了解Kafka日志管理内部实现的机会,对于Kafka开发和维护人员具有重要的参考价值。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(28)

  • 最新
  • 精选
  • 胡夕
    置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,咱们重点了解了日志段对象,课后我让你思考下如果给定位移值过大truncateTo方法的实现。关于这个问题,我的看法很简单。如果truncateTo的输入offset过大以至于超过了该日志段当前最大的消息位移值,那么这个方法不会执行任何截断操作,因为不会有机会调用log.truncateTo(mapping.position) okay,你是怎么考虑的呢?我们可以一起讨论下。
    2020-04-23
    1
  • 曾轼麟
    置顶
    先回答老师的问题maybeIncrementHighWatermark的实现: 【首先需要注意以下几个内容】: 1、这个方法是通过leaderLog这个实例去调用的,当HW更新的时候follower就会更新自身的HW。 2、leaderLog 是在Partition.scala中的,是分区维度的概念。 3、maybeIncrementHighWatermark的入参是newHighWatermark,是新的HW标记,但是可能是更新也可能不是。 【下面来说实现】 在maybeIncrementHighWatermark中会先去判断新的newHighWatermark.messageOffset是否大于当前的LEO,如果大于肯定不合理,因为新的HW不可能跑在LEO前面 然后获取当前高水位的偏移量和元数据。如果偏移元数据不是,已知,将在索引中执行查找并缓存结果,并赋值给oldHighWatermark。 最后进行判断,如果(oldHighWatermark.messageOffset小于newHighWatermark.messageOffset) 则跟新HW的元数据 或者如果(oldHighWatermark.messageOffset 等于 newHighWatermark.messageOffset)并且当前segmentBaseOffset小于newHighWatermark.segmentBaseOffset 也会更新HW的元数据 最后这个过程是在synchronized的包围下进行的。

    作者回复: 👍👍👍

    2020-04-18
    4
    10
  • 花开成海
    有两个疑问请老师帮忙看下: 1、为什么broker重启要重建所有的索引文件? 2、LogSegment的log: FileRecords 表示的是消息内容,加载日志段时候,什么机制使有限内存加载所有segment?

    作者回复: 1. 这些写的确实有点问题。应该是删除孤立的索引文件 2. 应该说是JVM GC机制:) FileRocords表示日志段对象的底层物理文件,已加载完成的日志段对象对应的FileRecords是可以被GC的。

    2020-04-30
    2
    3
  • Geek_47baf9
    在trunk分支提交记录(commitId=db1f581da7f3440cfd5be93800b4a9a2d7327a35)上 Log.scala已经在2021-08-13 7:10被重命名为UnifiedLog.scala,希望大家看专栏的时候注意点

    作者回复: 是啊,感慨Kafka代码依然在保持活力地更新着

    2021-09-20
    2
    2
  • 你瞅啥?没见过这么帅的哈士奇吗
    清除上一次failure留下的文件,这个failure情况是指什么情况下发生的failure情况,是节点挂了重启时的failure还是其他情况,能举个🌰吗

    作者回复: broker碰到的任何失败

    2021-05-08
    2
  • 北纬8℃
    太难了😂

    作者回复: 哈哈,坚持坚持!

    2020-04-16
    2
    2
  • 云超
    老师,问一个语法上的问题,在上面的代码片段中的locally{xxx} ,这个locally的作用是什么啊?

    作者回复: 类似于java中的静态代码块,多用于类或Object初始化执行一段代码之用

    2020-10-19
    2
    1
  • 小虞仔
    先回答老师的问题: 如果HW大于LEO,那么直接抛出IllegalArgumentException异常; 否则做如下操作: 1. 拿到HW的LogOffsetMetadata 2. 当存在以下情况的时候,更新HW,并且返回更新之前的HW a. old HW的位移小于new HW b. old HW的位移等于new HW, 并且old HW的基准位移小于new HW的基准位移,这种情况说明,new HW对应的segment是一个新的segment 3. 如果old HW的位移大于等于new HW,直接返回None 再请教老师一个问题,LeaderEpochFileCache中有这样一行代码: private var epochs: ArrayBuffer[EpochEntry] = inWriteLock(lock) ...... 其实就是初始化epochs,我理解这边是读leader-epoch-checkpoint文件,为什么要用inWriteLock写锁,而不是inReadLock。

    作者回复: 首先我要声明,源码写得也不一定就是对的!我们完全可以针对源码中可能的问题开放讨论哈。 在LeaderEpochFileCache中,针对epochs的更新和赋值全部都在write lock下,读取epoch用read lock。

    2020-04-26
    1
  • 曾轼麟
    老师下面我想问一下我的一些问题: 1、为什么要遍历两次文件路径呢?我看了一下,如果在删除的时候顺便去加载segment会有什么问题吗?这样是否可以提高加载效率呢? 2、我看了一下在removeTempFilesAndCollectSwapFiles方法中minCleanedFileOffset是从文件名filename上面读取的,如果我修改了文件名的offset大小会出现什么意想不到的情况呢? 3、我发现segments是使用ConcurrentNavigableMap,而这里的ConcurrentNavigableMap是使用JDK的ConcurrentSkipListMap,使用跳表的目的是为了方便使用offset范围查询segments中的对象吗?

    作者回复: 1. 就我个人而言,我觉得也没有什么问题。我觉得作者更多是为了把不同逻辑进行了分组导致遍历多次 2. 可能造成Broker的崩溃,无法启动。因为我们公司有小伙伴这么干过:( 3. 对的,这样可以快速根据给定offset找到对应的一上一下日志段对象

    2020-04-18
    2
    1
  • 我是小队长
    // 这些都做完之后,如果日志段集合不为空 // 验证分区日志的LEO值不能小于Log Start Offset值,否则删除这些日志段对象 想问下什么时候才会出现这种情况呢?

    作者回复: 当底层日志文件被删除或损坏的话就可能出现这种情况,因为无法读取文件去获取LEO了。你可以用2.0版本做个试验: 1. 发消息到分区日志 2. 使用Admin的DeleteRecords命令驱动Log start offset前进 3. 关闭Broker 4. 删除日志路径 5. 重启Broker

    2020-04-17
    1
收起评论
显示
设置
留言
28
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部