Kafka 核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
19216 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 44 讲
结束语 (1讲)
Kafka 核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

03 | 日志(下):彻底搞懂Log对象的常见操作

你好,我是胡夕。上节课,我们一起了解了日志加载日志段的过程。今天,我会继续带你学习 Log 源码,给你介绍 Log 对象的常见操作。
我一般习惯把 Log 的常见操作分为 4 大部分。
高水位管理操作:高水位的概念在 Kafka 中举足轻重,对它的管理,是 Log 最重要的功能之一。
日志段管理:Log 是日志段的容器。高效组织与管理其下辖的所有日志段对象,是源码要解决的核心问题。
关键位移值管理:日志定义了很多重要的位移值,比如 Log Start Offset 和 LEO 等。确保这些位移值的正确性,是构建消息引擎一致性的基础。
读写操作:所谓的操作日志,大体上就是指读写日志。读写操作的作用之大,不言而喻。
接下来,我会按照这个顺序和你介绍 Log 对象的常见操作,并希望你特别关注下高水位管理部分。
事实上,社区关于日志代码的很多改进都是基于高水位机制的,有的甚至是为了替代高水位机制而做的更新。比如,Kafka 的 KIP-101 提案正式引入的 Leader Epoch 机制,就是用来替代日志截断操作中的高水位的。显然,要深入学习 Leader Epoch,你至少要先了解高水位并清楚它的弊病在哪儿才行。
既然高水位管理这么重要,那我们就从它开始说起吧。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文深入剖析了Kafka中Log对象的操作机制及实现细节,包括高水位管理、日志段管理、关键位移值管理和读写操作等方面。通过源码解析,详细介绍了Log对象的高水位管理操作,以及使用ConcurrentSkipListMap类保存所有日志段对象的优势。特别强调了Kafka的日志留存策略,以及如何根据一定的规则决定哪些日志段可以删除。文章还重点讲解了针对Log对象的读写操作,详细解释了写操作的执行流程,以及消息校验的实现原理。通过对LogAppendInfo类的分析,展示了Kafka如何校验消息,并介绍了消息格式的变迁。总的来说,本文通过深入的源码解析,为读者提供了对Kafka中Log对象的全面了解,使读者能够快速掌握Log对象的操作机制和实现细节。 本文详细介绍了Kafka中Log对象的操作机制及实现细节,包括高水位管理、日志段管理、关键位移值管理和读写操作等方面。通过源码解析,展示了Log对象的高水位管理操作和使用ConcurrentSkipListMap类保存所有日志段对象的优势。特别强调了Kafka的日志留存策略,以及如何根据规则决定哪些日志段可以删除。文章还重点讲解了Log对象的读写操作,详细解释了写操作的执行流程和消息校验的实现原理。通过对LogAppendInfo类的分析,展示了Kafka如何校验消息,并介绍了消息格式的变迁。通过深入的源码解析,为读者提供了对Kafka中Log对象的全面了解,使读者能够快速掌握Log对象的操作机制和实现细节。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(21)

  • 最新
  • 精选
  • 胡夕
    置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,咱们重点了解了日志对象,课后我留了道小作业,想请你思考下Log源码中的maybeIncrementHighWatermark方法实现。我的看法是这样的:这个方法是更新高水位值或高水位对象用的。方法首先要判断新的高水位值不能越过LEO,这在Kafka中是不允许的。接着,就是用咱们今天重点讲到的fetchHighWatermarkMetadata方法重新获取当前的高水位对象,然后判断当前高水位值要小于新的高水位值。这里的小于可能有两种情况: 1. 新旧高水位值在同一个日志段对象上且旧值小于新值; 2. 旧高水位对象在老的日志段对象上。 做这个判断的原因是Kafka必须要确保高水位值的单调增加性。一旦确保了这个条件之后,调用updateHighWatermarkMetadata方法更新即可,最后返回旧的高水位对象。 okay,你是怎么考虑的呢?我们可以一起讨论下。
    2020-04-23
    1
    3
  • 小崔
    为什么日志截断时可能更新Log Start Offset呢? 我理解删除日志段时,可能把开头的日字段删除了,所以Log Start Offset会增大。可是日志截断应该总是从尾部开始截断的,就算全部截没了,Log Start Offset也应该不变呀

    作者回复: 极端情况下,要截断到的目标位移确实可能小于现有的log start offset。这里的targetOffset是面向整个log而言的,leader副本的targetOffset到了follower那里可能就是会小于log start offset

    2020-05-01
    3
  • 小虞仔
    // 统计HW和LEO之间的消息数量 def getMessageCountBetweenHWAndLEO(): Int = { val diff = nextOffsetMetadata.messageOffset - highWatermarkMetadata.messageOffset if (diff < 0L) { warn(s"LEO is lower than HW") 0 } else { activeSegment.read(highWatermarkMetadata.messageOffset, Integer.MAX_VALUE, nextOffsetMetadata.messageOffset).records.records.asScala.size } } 这是我的实现,还请老师指点一下:)

    作者回复: 求消息数量就可以,不用计算消息字节数量:)

    2020-04-29
    2
    2
  • 真锅
    老师,我看源码中删除操作是对log进行的。删除策略限制的也只是partition的大小。是否代表如果broker上存储的分区越来越多,broker的磁盘最终会被占满,有没有限制broker能存储多少partition的参数,或者控制broker使用磁盘空间大小的方法呀。我们现在对kafka的要求是哪怕消息丢失,也不能让kafka集群停止工作,我原本想改一下源码,即使是超过hw,也删除日志段,但后来发现如果不限制partition的数量的话,还是会被占满的吧。

    作者回复: 开启安全认证权限,然后由你们来创建主题,不允许任意一个客户端能够在集群上创建任意分区的主题。之后配合日志留存策略就可以整体上控制总磁盘占用空间。

    2020-04-29
    2
    1
  • 欠债太多
    感觉更新有点快,要跟不上了

    作者回复: 加油加油~ 刚开始的Log确实内容有点多,后面慢慢适应了就好了:)

    2020-04-25
    1
  • 曾轼麟
    老师这个是我的实现 private def countDifferBetweenHighWaterAndNextOffset(): Long = { //获取高水位 val highWater = highWatermarkMetadata //nextOffsetMetadata 相当于LEO val leo = nextOffsetMetadata if (highWater.messageOffsetOnly) { //如果高水位不全,先补全 val fullHighWater = convertToOffsetMetadataOrThrow(highWater.messageOffset) //计算差值 leo.messageOffset - fullHighWater.messageOffset } else { //计算差值 leo.messageOffset - highWater.messageOffset } }

    作者回复: 写的周全。另外高水位补全仅仅出现在它只有offset,无base offset和物理文件位置的情形,这两个字段在计算差值时其实用途不大。

    2020-04-24
    2
    1
  • 曾轼麟
    1、如果在更新过程中发现新 LEO 值小于高水位值,那么 Kafka 还要更新高水位值(可以理解为HW可能会回退吗?是指Leader选举后的日志截取吗?) 2、activeSegment这个日志段,可以理解为是当前正在写入的日志段对象吗?

    作者回复: 1. 是的。总之HW不能越过LEO,我是指终态。中间状态中可能出现偶发的“越过”情形 2. 可以这么理解

    2020-04-24
    1
  • 卢松
    “segments.higherEntry:获取第一个起始位移值≥给定 Key 值的日志段对象;” 这里应该只有大于,没有等于把?

    作者回复: 嗯嗯,确实是没有等于,感谢指正~

    2021-04-07
  • 是男人就开巴巴托斯
    请问老师, read方法返回的对象包含的是log文件的实际数据么? 还是查找到的需要读取的start offset & length?

    作者回复: 返回实际的消息数据以及一些必要的元数据

    2020-12-29
  • flyCoder
    onSameSegment方法的segmentBaseOffset 值相同为什么就能保证是在同一个日志段呢?segmentBaseOffset不是所在日志段的起始位移吗?消息在每一个日志段的的位移不是一个相对值吗?

    作者回复: segmentBaseOffset保存的是绝对位移值

    2020-11-02
    2
收起评论
显示
设置
留言
21
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部