Kafka核心源码解读
胡夕
友信金服商业智能部总监,Apache Kafka Contributor
立即订阅
3183 人已学习
课程目录
已更新 12 讲 / 共 40 讲
0/4登录后,你可以任选4讲全文学习。
课前必学 (3讲)
开篇词 | 阅读源码,逐渐成了职业进阶道路上的“必选项”
免费
导读 | 构建Kafka工程和源码阅读环境、Scala语言热身
重磅加餐 | 带你快速入门Scala语言
日志模块 (5讲)
01 | 日志段:保存消息文件的对象是怎么实现的?
02 | 日志(上):日志究竟是如何加载日志段的?
03 | 日志(下):彻底搞懂Log对象的常见操作
04 | 索引(上):改进的二分查找算法在Kafka索引的应用
05 | 索引(下):位移索引和时间戳索引的区别是什么?
请求处理模块 (3讲)
06 | 请求通道:如何实现Kafka请求队列?
07 | SocketServer(上):Kafka到底是怎么应用NIO实现网络通信的?
08 | SocketServer(中):请求还要区分优先级?
特别放送 (1讲)
特别放送(一)| 经典的Kafka学习资料有哪些?
Kafka核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

01 | 日志段:保存消息文件的对象是怎么实现的?

胡夕 2020-04-13
你好,我是胡夕。
今天,我们开始学习 Kafka 源代码分析的第一模块:日志(Log)、日志段(LogSegment)以及索引(Index)源码。
日志段及其相关代码是 Kafka 服务器源码中最为重要的组件代码之一。你可能会非常关心,在 Kafka 中,消息是如何被保存和组织在一起的。毕竟,不管是学习任何消息引擎,弄明白消息建模方式都是首要的问题。因此,你非常有必要学习日志段这个重要的子模块的源码实现。
除此之外,了解日志段也有很多实际意义,比如说,你一定对 Kafka 底层日志文件 00000000000000012345.log 的命名感到很好奇。学过日志段之后,我相信这个问题一定会迎刃而解的。
今天,我会带你详细看下日志段部分的源码。不过在此之前,你需要先了解一下 Kafka 的日志结构。

Kafka 日志结构概览

Kafka 日志在磁盘上的组织架构如下图所示:
日志是 Kafka 服务器端代码的重要组件之一,很多其他的核心组件都是以日志为基础的,比如后面要讲到的状态管理机和副本管理器等。
总的来说,Kafka 日志对象由多个日志段对象组成,而每个日志段对象会在磁盘上创建一组文件,包括消息日志文件(.log)、位移索引文件(.index)、时间戳索引文件(.timeindex)以及已中止(Aborted)事务的索引文件(.txnindex)。当然,如果你没有使用 Kafka 事务,已中止事务的索引文件是不会被创建出来的。图中的一串数字 0 是该日志段的起始位移值(Base Offset),也就是该日志段中所存的第一条消息的位移值。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Kafka核心源码解读》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(32)

  • 东风第一枝 置顶
    offsetIndex.truncateTo(offset) 和 timeIndex.truncateTo(offset)中有相同的一部分代码:

    /* There are 3 cases for choosing the new size
           * 1) if there is no entry in the index <= the offset, delete everything
           * 2) if there is an entry for this exact offset, delete it and everything larger than it
           * 3) if there is no entry for this offset, delete everything larger than the next smallest
           */
          val newEntries =
            if(slot < 0)
              0
            else if(relativeOffset(idx, slot) == offset - baseOffset)
              slot
            else
              slot + 1


    我觉得这应该解释了老师的提问:如果指定的位移值特别特别大,并且没有可以截断的消息时,会截断比下一个消息的位移大的所有的消息。代码里面用了solt+1来表示。不知道我理解的对不对。

    作者回复: 对的:)

    2020-04-16
    4
    2
  • 胡夕 置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~

    上节课,咱们重点学习了如何构建Kafka工程和搭建源码阅读环境。课后我让你去尝试寻找kafka-console-producer.sh脚本对应的Java类是哪个文件。现在我给出答案:如果我们打开这个SHELL脚本,可以清楚地发现它调用了kafka.tools.ConsoleProducer类。这个类位于core包的src/main/scala/kafka/tools下,文件名是ConsoleProducer.scala。

    怎么样,你找到了吗?我们可以一起讨论下。
    2020-04-26
  • 一口气读完了这篇文章,感觉老师讲得特别清楚,现在我很有信心学好源码。谢谢老师,期待后续的更新。

    作者回复: 谢谢,后面我们一起讨论~

    2020-04-13
    8
  • 🙊顾小顾
    maxTimestampSoFar等值被@volatile修饰,为何要这么设置
    每一个topic分区对应一个Segment,这个会被多线程的访问吗?比如spark streaming的kafkaUtil.directStream,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,写入的时候应该也是,理解起来应该是单线程的操作

    作者回复: 简单来说,日志段对象可能被多个线程操作。比如Kafka的多个I/O线程就可能同时操作同一个日志段对象。

    2020-04-26
    1
  • Roger宇
    “首先调用 records.sizeInBytes 方法判断该日志段是否为空,如果是空的话, Kafka 需要记录要写入消息集合的最大时间戳,并将其作为后面新增日志段倒计时的依据。”
    -----------------------
    想请问下老师,这段的如果为空的处理逻辑是不是不在append中啦?if record大小不为空就结束append方法啦。

    作者回复: 如果为空,则没有可以append的了,直接结束。

    2020-04-24
    1
  • 断情笔
    先回答老师提出的问题:log.truncateTo(validBytes) validBytes为要清到的大小,在truncate方法中首先会获取当前LogSegment的文件大小,当validByte> 该文件大小的时候会throws 一个异常 异常为"打算清空 xxx.log 到validBytes 这么大 但是失败了,日志段大小为当前LogSegement大小",如果validByte小于当前log byte 调用fileChannel.truncate(validBytes)进行直接截断, 同时返回截断的大小有多少字节。
    请教:老师 我看到源码中val truncated = log.sizeInBytes - validBytes validBytes为从logFile中的bactch 批量读取message的字节数累加实现,而log.sizeInByte同样不是log文件读取出来的么? 为什么会出现需要truncate呢? 能否举例说明什么时候LogFile中写入了异常数据呢 这个不是很理解。

    作者回复: 可能出现这种情况:日志文件写入了消息的部分字节然后broker宕机。磁盘是块设备,它可不能保证消息的全部字节要么全部写入,要么全都不写入。因此Kafka必须有机制应对这种情况,即校验+truncate

    2020-04-22
    1
  • 诶诶
    请问,append 方法的 largestOffset 和 shallowOffsetOfMaxTimestamp 参数,有可能相等吗?什么时候相等,什么时候不相等?

    作者回复: 你基本上可以认为它们两个是一样的。前提是消息中时间戳是单调增加的,这也符合我们常见的使用场景。

    2020-04-21
    1
  • 奔跑小电驴
    kafka用的也不算少,但冷不丁看源码还是有难度,具体难在有些变量不是特别清楚是做什么的,导致理解整个流程就有点困难,这块老师有什么好的方法嘛~~

    作者回复: Kafka中的变量命名总体来说还是友好的,确实也存在一些变量不知其意。还是要结合它在源码中的作用来看,特别是使用它的方法。也许在这些方法中有对应的info或warn log说明了它的用法。
    我举个例子,比如LogCleaner组件中有个变量叫uncleanablePartitions,注释里写它保存不可被clean的分区,但光看注释我们也不知道不可被clean要满足什么条件。这个时候你就看哪个方法往这个Map中添加分区,一查源码发现是markPartitionUncleanable方法,可这个方法没有一行注释!
    没问题,我们继续看,tryCleanFilthiestLog方法会调用markPartitionUncleanable。tryCleanFilthiestLog里面写了到底哪些分区不能被clean,因为有一行warn日志

    2020-04-16
    1
  • 曾轼麟
    今天阅读源码发现收益良多,先回答老师的问题,如果超过了日志段本身保存的最大位移值或者targetSize小于0(怀疑是溢出),会抛出一个KafkaException来中断程序执行。

    其次我发现read方法提到的:maxSize,maxPosition,minOneMessage几个参数,刚好可用对应上了kafka-clients消费者的配置项:max.poll.records,max.partition.fetch.bytes。其中kafka官方的api中也提到KafkaConsumer.poll()方法在主动拉取的时候,如果单条消息超过最大值能保证至少返回一条消息,也就是老师提到的消费者饥饿的情况。

    同时我发现kafka里面存在着Records接口其对应多种记录类型比如FileRecords,MemoryRecords等等顺着这个应该可以发现消息的各种形态。而且在FileRecords中使用了JDK提供的FileChannel,FileChannel是可以实现0拷贝技术的

    作者回复: FileRecords和MemoryRecords是非常重要的两个类,我建议好好研读下:)

    2020-04-16
    1
    1
  • jeffery
    没有语言基础怎么弯道超车撸源码,能分享下经验吗?再此谢过了

    作者回复: Java语言基础是必需的,Scala无需太多基础。一个比较好的经验是多读源码注释+测试用例。

    测试用例代码简单,而且对于源码解释是直接的

    2020-04-15
    1
    1
  • Roger宇
    if (largestTimestamp > maxTimestampSoFar)
    —————-
    请想问一下,既然是要追加的日志,而一个日志段段消息应该是顺序追加(推测),那为什么需要这个if 判断呢?请问老师,在什么情况下会出现需要追加的消息集合中最大时间戳小于等于当前日志段已经见到的时间戳的最大值,即 largestTimestamp <= maxTimestampSoFar的情况呢?
    2020-05-04
  • 小崔
    1.时间戳,是broker接收到消息时设置的,还是生产者发送时设置的?
    2.read方法里的maxSize我理解可以由消费者设定,但是maxPosition由谁设定?又有什么用呢?防止物理位置越界么?

    作者回复: 1. 都可以的。Kafka支持两种时间戳策略,CREATE和APPEND。前者是producer端生成的,后者是指broker端使用本机时间覆盖producer端生成的时间戳
    2. 由消费者角色而定。假设不考虑事务型consumer。

    如果是普通consumer,那么你只能读到HW以下的消息,那么maxPosition就是HW的物理位置——这是假设读取起始位移与hw在同一个段上,否则maxPosition就是整个段的字节数;
    同理,如果是follower,那么最多可以读到LEO,maxPosition就是LEO的物理位置——同样是假设读取起始位移与hw在同一个段上

    2020-04-29
  • 北冥有鱼
    老师,kafka是哪个版本的?我这边的数据是放在FileMessageSet中的

    作者回复: 我们专栏阅读Trunk分支代码进行阅读,至少是2.4,2.5版本的代码了。FileMessageSet是很老的概念了

    2020-04-28
  • Tomcat
    胡老师看完read方法之后我有几个疑问您可以给我解释下吗?
    1、val startOffsetAndSize = translateOffset(startOffset)
    startOffsetAndSize这里Offset代表的是物理文件的位置,Size指的是物理文件的大小,我说的对吗?
    2、即使出现消息体字节数超过了 maxSize 的情形,read 方法依然能返回至少一条消息。
       (1)、消息体字节数超过了 maxSize(能读取的最大字节数)的情形指的是哪种?这两个概念分别指什么?
       (2)、如果我的第一个大问题是对的,那么从源码可以看出如果minOneMessage设为true的时候
       会读到一个日志段的最大物理位置,我这样理解正确吗?(但是我觉得会存在一条消息不完整的情况,它是如何规避这个问题的?我的答案是kafka在写日志段的时候,即使超过了日志段的大小,
       它也不会立即分割一个新的日志段,而是把一条完整的消息写到当前的这个日志段之后再分割一个新的日志段。这样解释正确吗?)。当minOneMessage为false的时候,消息体字节数超过了maxSize它会从另一个日志段中去读取消息。这样理解对吗?
    3、但是看完这行val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize) 这行代码表示的是待读取的总字节数和日志段可读的字节数中取小的值。

    我觉得我对我第二个问题的解答是有很大问题的,胡老师能帮我解答一下吗?

    作者回复: 1. 所有offset字眼的变量都是位移值,不是物理位置。当然这里的startOffsetAndSize是一个pojo类,同时包含offset、物理文件位置和大小
    2.1 比如消息体超过了max.partition.fetch.bytes值。或者这么说,Broker中保存的消息是1MB,有个consumer,每次只能读取不超过500KB的数据
    2.2 不存在消息不完整的情况,至少对于consumer而言也是不可见的。因为consumer只能读到HW以下的数据,而HW以下的数据都是已提交的,不可能是不完整的
    3. 后面那一大堆话我没太明白什么意思。不过当minOneMessage=false,代码也不会从另一个日志段读取消息

    2020-04-27
    1
  • 🙊顾小顾
    当已写入字节数超过了 4KB 之后,append 方法会调用索引对象的 append 方法新增索引项,同时清空已写入字节数,以备下次重新累积计算. 字节数不会一次性超过4KB吗,我看没有一个循环的判断

    作者回复: 不需要循环判断啊。一旦超过了4KB,直接写索引,然后清空计数器。仅此而已~

    2020-04-26
  • 尘枫
    老师能讲讲KafkaConfigs吗, 在看AbstractConfig时,看不明白

    作者回复: 这个类里面保存了Broker端的参数定义和默认值等信息,同时定义了很多实用方法。您具体的问题是什么呢?

    2020-04-23
  • 断情笔
    老师您好。validBytes为从logFile中的bactch 批量读取message的字节数累加实现,当消息batch 写入一部分的时候如果出现broker宕机的话,会写入一部份消息,那么logFile大小会大于validBytes大小,但是validBytes不也是会扫描当前file的message吗? 那未完整写入的消息(脏消息) 不也会累加到一起吗? 还是说 logbatch会验证消息的完整性之后再读取呢? 感谢老师

    作者回复: "那未完整写入的消息(脏消息) 不也会累加到一起吗?" --- 不会的。能读到的都是已提交的完整的消息。

    2020-04-23
  • 断情笔
    老师,有两个问题想请教一下,
    1.kafka 的magic 我看在源码中有体现,这个具体指代kafka版本还是什么?
    2.LeaderEpoch这个epoch 是什么含义? 在recover方法中会cache.assign batch.parittionLeaderEpoch 这里不是很懂。希望老师赐教。

    作者回复: 1. magic是消息格式版本,总有3个版本。V2是最新版本
    2. leader epoch是controller分配给分区leader副本的版本号。每个消息批次都要有对应的leader epoch。Kafka会记录每个分区leader不同epoch对应的首条消息的位移。比如leader epoch=0时producer写入了100条消息,那么cache会记录<0, 0>,之后leader变更,epoch增加到1,之后producer又写入了200条消息,那么cache会记录<1, 100>。epoch主要用于做日志截断时保证一致性用的,单纯依赖HW值可能出现各种不一致的情况。这是社区对于HW值的一个修正机制

    2020-04-22
  • 陈炳杰
    def offsetOfMaxTimestampSoFar_=(offset: Long): Unit = _offsetOfMaxTimestampSoFar = Some(offset)
      def offsetOfMaxTimestampSoFar: Long = {
        if (_offsetOfMaxTimestampSoFar.isEmpty)
          _offsetOfMaxTimestampSoFar = Some(timeIndex.lastEntry.offset)
        _offsetOfMaxTimestampSoFar.get
      }
    老师,这个是Scala的什么特性呢 ?没有看懂.谢谢

    作者回复: 这是Scala中的setter和getter写法。特别是setter写法与Java很不一样。比如:
    private var _value = .....
    def value = _value // getter
    def value_= (newVal:Int) = _value = newVal // setter

    2020-04-21
  • thomas
    第一步:在源码中,首先调用 log.sizeInBytes 方法判断该日志段是否为空,如果是空的话, Kafka 需要记录要写入消息集合的最大时间戳,并将其作为后面新增日志段倒计时的依据。
    -------------------------------->
    老师,我在append源码中只看到 records.sizeInBytes>0, 没有log.sizeInBytes; 而且代码逻辑里只有为非空的逻辑块。请问是我哪里理解错了吗

    作者回复: 对对,这里是笔误。是records.sizeInBytes不是log.sizeInBytes。感谢指正,我让编辑小姐姐修正下~

    2020-04-21
    3
收起评论
32
返回
顶部