03 | 日志(下):彻底搞懂Log对象的常见操作
- 深入了解
- 翻译
- 解释
- 总结
本文深入剖析了Kafka中Log对象的操作机制及实现细节,包括高水位管理、日志段管理、关键位移值管理和读写操作等方面。通过源码解析,详细介绍了Log对象的高水位管理操作,以及使用ConcurrentSkipListMap类保存所有日志段对象的优势。特别强调了Kafka的日志留存策略,以及如何根据一定的规则决定哪些日志段可以删除。文章还重点讲解了针对Log对象的读写操作,详细解释了写操作的执行流程,以及消息校验的实现原理。通过对LogAppendInfo类的分析,展示了Kafka如何校验消息,并介绍了消息格式的变迁。总的来说,本文通过深入的源码解析,为读者提供了对Kafka中Log对象的全面了解,使读者能够快速掌握Log对象的操作机制和实现细节。 本文详细介绍了Kafka中Log对象的操作机制及实现细节,包括高水位管理、日志段管理、关键位移值管理和读写操作等方面。通过源码解析,展示了Log对象的高水位管理操作和使用ConcurrentSkipListMap类保存所有日志段对象的优势。特别强调了Kafka的日志留存策略,以及如何根据规则决定哪些日志段可以删除。文章还重点讲解了Log对象的读写操作,详细解释了写操作的执行流程和消息校验的实现原理。通过对LogAppendInfo类的分析,展示了Kafka如何校验消息,并介绍了消息格式的变迁。通过深入的源码解析,为读者提供了对Kafka中Log对象的全面了解,使读者能够快速掌握Log对象的操作机制和实现细节。
《Kafka 核心源码解读》,新⼈⾸单¥59
全部留言(21)
- 最新
- 精选
- 胡夕置顶你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,咱们重点了解了日志对象,课后我留了道小作业,想请你思考下Log源码中的maybeIncrementHighWatermark方法实现。我的看法是这样的:这个方法是更新高水位值或高水位对象用的。方法首先要判断新的高水位值不能越过LEO,这在Kafka中是不允许的。接着,就是用咱们今天重点讲到的fetchHighWatermarkMetadata方法重新获取当前的高水位对象,然后判断当前高水位值要小于新的高水位值。这里的小于可能有两种情况: 1. 新旧高水位值在同一个日志段对象上且旧值小于新值; 2. 旧高水位对象在老的日志段对象上。 做这个判断的原因是Kafka必须要确保高水位值的单调增加性。一旦确保了这个条件之后,调用updateHighWatermarkMetadata方法更新即可,最后返回旧的高水位对象。 okay,你是怎么考虑的呢?我们可以一起讨论下。2020-04-2313
- 小崔为什么日志截断时可能更新Log Start Offset呢? 我理解删除日志段时,可能把开头的日字段删除了,所以Log Start Offset会增大。可是日志截断应该总是从尾部开始截断的,就算全部截没了,Log Start Offset也应该不变呀
作者回复: 极端情况下,要截断到的目标位移确实可能小于现有的log start offset。这里的targetOffset是面向整个log而言的,leader副本的targetOffset到了follower那里可能就是会小于log start offset
2020-05-013 - 小虞仔// 统计HW和LEO之间的消息数量 def getMessageCountBetweenHWAndLEO(): Int = { val diff = nextOffsetMetadata.messageOffset - highWatermarkMetadata.messageOffset if (diff < 0L) { warn(s"LEO is lower than HW") 0 } else { activeSegment.read(highWatermarkMetadata.messageOffset, Integer.MAX_VALUE, nextOffsetMetadata.messageOffset).records.records.asScala.size } } 这是我的实现,还请老师指点一下:)
作者回复: 求消息数量就可以,不用计算消息字节数量:)
2020-04-2922 - 真锅老师,我看源码中删除操作是对log进行的。删除策略限制的也只是partition的大小。是否代表如果broker上存储的分区越来越多,broker的磁盘最终会被占满,有没有限制broker能存储多少partition的参数,或者控制broker使用磁盘空间大小的方法呀。我们现在对kafka的要求是哪怕消息丢失,也不能让kafka集群停止工作,我原本想改一下源码,即使是超过hw,也删除日志段,但后来发现如果不限制partition的数量的话,还是会被占满的吧。
作者回复: 开启安全认证权限,然后由你们来创建主题,不允许任意一个客户端能够在集群上创建任意分区的主题。之后配合日志留存策略就可以整体上控制总磁盘占用空间。
2020-04-2921 - 欠债太多感觉更新有点快,要跟不上了
作者回复: 加油加油~ 刚开始的Log确实内容有点多,后面慢慢适应了就好了:)
2020-04-251 - 曾轼麟老师这个是我的实现 private def countDifferBetweenHighWaterAndNextOffset(): Long = { //获取高水位 val highWater = highWatermarkMetadata //nextOffsetMetadata 相当于LEO val leo = nextOffsetMetadata if (highWater.messageOffsetOnly) { //如果高水位不全,先补全 val fullHighWater = convertToOffsetMetadataOrThrow(highWater.messageOffset) //计算差值 leo.messageOffset - fullHighWater.messageOffset } else { //计算差值 leo.messageOffset - highWater.messageOffset } }
作者回复: 写的周全。另外高水位补全仅仅出现在它只有offset,无base offset和物理文件位置的情形,这两个字段在计算差值时其实用途不大。
2020-04-2421 - 曾轼麟1、如果在更新过程中发现新 LEO 值小于高水位值,那么 Kafka 还要更新高水位值(可以理解为HW可能会回退吗?是指Leader选举后的日志截取吗?) 2、activeSegment这个日志段,可以理解为是当前正在写入的日志段对象吗?
作者回复: 1. 是的。总之HW不能越过LEO,我是指终态。中间状态中可能出现偶发的“越过”情形 2. 可以这么理解
2020-04-241 - 卢松“segments.higherEntry:获取第一个起始位移值≥给定 Key 值的日志段对象;” 这里应该只有大于,没有等于把?
作者回复: 嗯嗯,确实是没有等于,感谢指正~
2021-04-07 - 是男人就开巴巴托斯请问老师, read方法返回的对象包含的是log文件的实际数据么? 还是查找到的需要读取的start offset & length?
作者回复: 返回实际的消息数据以及一些必要的元数据
2020-12-29 - flyCoderonSameSegment方法的segmentBaseOffset 值相同为什么就能保证是在同一个日志段呢?segmentBaseOffset不是所在日志段的起始位移吗?消息在每一个日志段的的位移不是一个相对值吗?
作者回复: segmentBaseOffset保存的是绝对位移值
2020-11-022