Kafka 核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
19216 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 44 讲
结束语 (1讲)
Kafka 核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

01 | 日志段:保存消息文件的对象是怎么实现的?

你好,我是胡夕。
今天,我们开始学习 Kafka 源代码分析的第一模块:日志(Log)、日志段(LogSegment)以及索引(Index)源码。
日志段及其相关代码是 Kafka 服务器源码中最为重要的组件代码之一。你可能会非常关心,在 Kafka 中,消息是如何被保存和组织在一起的。毕竟,不管是学习任何消息引擎,弄明白消息建模方式都是首要的问题。因此,你非常有必要学习日志段这个重要的子模块的源码实现。
除此之外,了解日志段也有很多实际意义,比如说,你一定对 Kafka 底层日志文件 00000000000000012345.log 的命名感到很好奇。学过日志段之后,我相信这个问题一定会迎刃而解的。
今天,我会带你详细看下日志段部分的源码。不过在此之前,你需要先了解一下 Kafka 的日志结构。

Kafka 日志结构概览

Kafka 日志在磁盘上的组织架构如下图所示:
日志是 Kafka 服务器端代码的重要组件之一,很多其他的核心组件都是以日志为基础的,比如后面要讲到的状态管理机和副本管理器等。
总的来说,Kafka 日志对象由多个日志段对象组成,而每个日志段对象会在磁盘上创建一组文件,包括消息日志文件(.log)、位移索引文件(.index)、时间戳索引文件(.timeindex)以及已中止(Aborted)事务的索引文件(.txnindex)。当然,如果你没有使用 Kafka 事务,已中止事务的索引文件是不会被创建出来的。图中的一串数字 0 是该日志段的起始位移值(Base Offset),也就是该日志段中所存的第一条消息的位移值。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入解析了Kafka日志段的重要性和实现细节,包括日志段的组织架构和重要组件,以及LogSegment类的定义和关键方法。重点分析了append、read和recover方法,强调了它们在日志段对象中的重要性。此外,还提到了truncateTo方法,并引发了对于指定位移值超出日志段保存最大位移值的思考。通过源码解析,读者可以深入了解Kafka消息的保存和组织方式,以及解决实际问题的方法。总之,本文内容详实,对Kafka日志段的重要性和实现细节进行了深入解析,对于想深入了解Kafka源码的读者具有很高的参考价值。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(74)

  • 最新
  • 精选
  • 胡夕
    置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,咱们重点学习了如何构建Kafka工程和搭建源码阅读环境。课后我让你去尝试寻找kafka-console-producer.sh脚本对应的Java类是哪个文件。现在我给出答案:如果我们打开这个SHELL脚本,可以清楚地发现它调用了kafka.tools.ConsoleProducer类。这个类位于core包的src/main/scala/kafka/tools下,文件名是ConsoleProducer.scala。 怎么样,你找到了吗?我们可以一起讨论下。
    2020-04-26
    7
  • 小虞仔
    置顶
    offsetIndex.truncateTo(offset) 和 timeIndex.truncateTo(offset)中有相同的一部分代码: /* There are 3 cases for choosing the new size * 1) if there is no entry in the index <= the offset, delete everything * 2) if there is an entry for this exact offset, delete it and everything larger than it * 3) if there is no entry for this offset, delete everything larger than the next smallest */ val newEntries = if(slot < 0) 0 else if(relativeOffset(idx, slot) == offset - baseOffset) slot else slot + 1 我觉得这应该解释了老师的提问:如果指定的位移值特别特别大,并且没有可以截断的消息时,会截断比下一个消息的位移大的所有的消息。代码里面用了solt+1来表示。不知道我理解的对不对。

    作者回复: 对的:)

    2020-04-16
    5
    11
  • Eternal
    一个分区partion对应一个Log对象 一个Log对象对应一个日志目录 一个日志目录有多个LogSegment,一个Logment概念下有4个文件,多一个LogSegment下的文件都是在一个目录下的,也就是LogSegment只是一个概念不是真的文件目录 一个LogSegment就有4个物理文件,那么集群有文件=4 * LogSegment*partion*副本因子*topic。当topic和分区数很多的时候,系统的文件句柄就不够用了,好像系统默认是65535 所有我们对topic的数量和分区的数量需要有一个合理的规划。 我有一个疑问:我们公司之前遇到过类似的问题,业务方创建了非常多的topic和分区,然后一次broker重启的时候,重启失败,原因是Map failed 内存映射失败,这个和老师文中讲的“ Broker 在启动时会从磁盘上加载所有日志段信息到内存中,并创建相应的 LogSegment 对象实例” 好像是一样的,不止我理解的对不对:

    作者回复: 是的。出现map failed + 超多分区的情况除了调整ulimit -n之外,最好调整下vm.max_map_count~

    2020-04-20
    3
    12
  • 一口气读完了这篇文章,感觉老师讲得特别清楚,现在我很有信心学好源码。谢谢老师,期待后续的更新。

    作者回复: 谢谢,后面我们一起讨论~

    2020-04-13
    12
  • 小崔
    1.时间戳,是broker接收到消息时设置的,还是生产者发送时设置的? 2.read方法里的maxSize我理解可以由消费者设定,但是maxPosition由谁设定?又有什么用呢?防止物理位置越界么?

    作者回复: 1. 都可以的。Kafka支持两种时间戳策略,CREATE和APPEND。前者是producer端生成的,后者是指broker端使用本机时间覆盖producer端生成的时间戳 2. 由消费者角色而定。假设不考虑事务型consumer。 如果是普通consumer,那么你只能读到HW以下的消息,那么maxPosition就是HW的物理位置——这是假设读取起始位移与hw在同一个段上,否则maxPosition就是整个段的字节数; 同理,如果是follower,那么最多可以读到LEO,maxPosition就是LEO的物理位置——同样是假设读取起始位移与hw在同一个段上

    2020-04-29
    7
  • 灰机
    老师您好,我想问一下 “日志段至少新写入 4KB 的消息数据才会新增一条索引项” 这个是写在OffsetIndex 中么? 对应物理位置是***.index中么? 这个新增一条索引项不是很理解,希望老师看到后给予解答。 感谢

    作者回复: 大致意思是说producer写入4KB的消息数据后,会为当前日志段对象的索引文件中新写入一条OffsetIndex索引项。索引文件就是***.index。4KB是由Broker端参数log.index.interval.bytes决定

    2020-04-20
    6
  • 李奕慧
    不明白为啥 recover 需要清空索引文件(这里的索引文件指的是 xxx.index 文件吗?)

    作者回复: 可能发生截断、日志段被删除等情况,因此最好重建索引文件

    2020-09-02
    5
  • 灰机
    先回答老师提出的问题:log.truncateTo(validBytes) validBytes为要清到的大小,在truncate方法中首先会获取当前LogSegment的文件大小,当validByte> 该文件大小的时候会throws 一个异常 异常为"打算清空 xxx.log 到validBytes 这么大 但是失败了,日志段大小为当前LogSegement大小",如果validByte小于当前log byte 调用fileChannel.truncate(validBytes)进行直接截断, 同时返回截断的大小有多少字节。 请教:老师 我看到源码中val truncated = log.sizeInBytes - validBytes validBytes为从logFile中的bactch 批量读取message的字节数累加实现,而log.sizeInByte同样不是log文件读取出来的么? 为什么会出现需要truncate呢? 能否举例说明什么时候LogFile中写入了异常数据呢 这个不是很理解。

    作者回复: 可能出现这种情况:日志文件写入了消息的部分字节然后broker宕机。磁盘是块设备,它可不能保证消息的全部字节要么全部写入,要么全都不写入。因此Kafka必须有机制应对这种情况,即校验+truncate

    2020-04-22
    5
  • Roger宇
    if (largestTimestamp > maxTimestampSoFar) —————- 请想问一下,既然是要追加的日志,而一个日志段段消息应该是顺序追加(推测),那为什么需要这个if 判断呢?请问老师,在什么情况下会出现需要追加的消息集合中最大时间戳小于等于当前日志段已经见到的时间戳的最大值,即 largestTimestamp <= maxTimestampSoFar的情况呢?

    作者回复: 目前producer端是可以自行指定任意时间戳的

    2020-05-04
    4
  • drapeau🏖
    希望老师以后能讲讲页缓存这个东西

    作者回复: 很多操作系统的书对页缓存都有介绍。简单来说,page cache是最重要的一类操作系统(特别是Linux系统)disk cache。相关介绍可以看看《Understanding the Linux Kernel》

    2020-04-16
    3
收起评论
显示
设置
留言
74
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部