19 | TimingWheel:探究Kafka定时器背后的高效时间轮算法
胡夕
你好,我是胡夕。今天,我们开始学习 Kafka 延时请求的代码实现。
延时请求(Delayed Operation),也称延迟请求,是指因未满足条件而暂时无法被处理的 Kafka 请求。举个例子,配置了 acks=all 的生产者发送的请求可能一时无法完成,因为 Kafka 必须确保 ISR 中的所有副本都要成功响应这次写入。因此,通常情况下,这些请求没法被立即处理。只有满足了条件或发生了超时,Kafka 才会把该请求标记为完成状态。这就是所谓的延时请求。
今天,我们的重点是弄明白请求被延时处理的机制——分层时间轮算法。
时间轮的应用范围非常广。很多操作系统的定时任务调度(如 Crontab)以及通信框架(如 Netty 等)都利用了时间轮的思想。几乎所有的时间任务调度系统都是基于时间轮算法的。Kafka 应用基于时间轮算法管理延迟请求的代码简洁精炼,而且和业务逻辑代码完全解耦,你可以从 0 到 1 地照搬到你自己的项目工程中。
时间轮简介
在开始介绍时间轮之前,我想先请你思考这样一个问题:“如果是你,你会怎么实现 Kafka 中的延时请求呢?”
针对这个问题,我的第一反应是使用 Java 的 DelayQueue。毕竟,这个类是 Java 天然提供的延时队列,非常适合建模延时对象处理。实际上,Kafka 的第一版延时请求就是使用 DelayQueue 做的。
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
- 深入了解
- 翻译
- 解释
- 总结
Kafka中的高效时间轮算法是管理延迟请求的重要工具。本文深入探讨了Kafka中采用的分层时间轮算法,通过对TimerTaskEntry和TimerTaskList类的详细解释,揭示了延时请求是如何被分层时间轮管理的。文章通过源码层级关系的解释,帮助读者深入理解了Kafka中高效时间轮算法的实现原理。TimingWheel类的代码定义了9个重要字段,包括滴答时长、时间轮上的Bucket数量、起始时间戳等。文章还介绍了TimingWheel类中的addOverflowWheel、add和advanceClock方法的实现原理。通过这些方法,Kafka实现了对延时请求的高效管理。总的来说,本文对于想要深入了解Kafka延时请求管理的技术人员来说,是一份极具价值的资料。
仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》,新⼈⾸单¥59
《Kafka 核心源码解读》,新⼈⾸单¥59
立即购买
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
登录 后留言
全部留言(12)
- 最新
- 精选
- 胡夕置顶你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,咱们结合源码重点了解了PartitionStateMachine的原理。课后我让你去自行分析下triggerOnlineStateChangeForPartitions方法,并尝试找出它被调用的时机。其实,这个方法,顾名思义就是将一组给定的主题分区的状态变更到Online状态。在执行变更前,必须要判断这些分区所属的主题当前没有被执行删除操作。另外除了要变更状态之外,该方法还会为这些分区执行Leader选举。triggerOnlineStateChangeForPartitions方法被调用的时机主要有3个:1. 选举Controller成功之后;2. Broker启动或下线时;3. Unclean Leader选举时。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。2020-06-151
- 伯安知心首先声明TimingWheel不是线程安全的,addOverflowWheel这个方法设计本身要单例模式,但是多个线程执行addOverflowWheel方法,可能出现不一致实例化多个类,如果是volatile限制了指令重排序,就解决了这个问题。首先声明TimingWheel不是线程安全的,addOverflowWheel这个方法设计本身要单例模式,但是多个线程执行addOverflowWheel方法,可能出现不一致实例化多个类,如果是volatile限制了指令重排序,就解决了这个问题。
作者回复: 不妨写个改进的patch:)
2020-06-0622 - 李老师,你好。源码中TimerTaskList#add()方法中有两行这样的注释 Remove the timer task entry if it is already in any other list,We do this outside of the sync block below to avoid deadlocking. 如果timerTaskEntry.remove()该行放进timerTaskEntry.synchronized 代码块里,与其他方法加锁顺序都一致,这儿哪里会有死锁
作者回复: 嗯,有一定道理。我觉得更多的就是一种防御性编程
2020-11-15 - 对与错我还是没有get到这个延迟队列的用途,为啥更新过期时间之后,需要把任务放到延迟队列里面?
作者回复: 其实只是把任务放入更高一级的时间轮中管理
2020-10-222 - 吃饭饭有个问题不明白:TimerTaskEntry 的 remove() 方法的 while (currentList != null) ,这个链表不是能放很多 TimerTaskEntry 吗?只是移除当前这个 Entry 为什么要把整个链表置空?如果内部还有其他的 Entry 呢?
作者回复: 这是从TimerTaskEntry的角度去做的。currentList只是TimerTaskEntry角度下的链表,将它置空相当于把该entry与list割裂开来。链表本身还是存在的
2020-06-19 - RonnieXie老师,这里有一个疑问,使用哈希表map似乎也可以实现延迟队列,key为时间戳,定期执行任务和删除过期任务,时间复杂度O(1),请问使用分层时间轮和哈希表map的优缺点是什么?
作者回复: 如何解决排序问题呢?
2020-06-073 - 空知只有当前时间越过了 Bucket 的起始时间,这个 Bucket 才算是过期。而这里的起始时间,就是代码中 expiration 字段的值。 这里的起始时间是否应该是 终止时间? taskCounter:这一层时间轮上的总定时任务数。 这里是否是每个Bucket的任务数?
作者回复: 1. 源码中并没有终止时间的提法。一旦时钟越过了Bucket起始时间,该Bucket就被视为过期了 2. 不是单个Bucket的,而是整个时间轮上的
2020-06-07 - 张三kafka的时间轮主要是用在哪里的?仅仅是用于处理内部逻辑嘛?2023-09-26归属地:日本
- 飞翔老师 如果放在时间轮的消息太多超过内存 kafka会把消息先存在disk中嘛2022-09-20归属地:美国
- 凯文小猪overflowWheel用在两个地方,一个是外部定时器也就是tickout线程的定时发起时钟轮转,此时需要更新层级时间轮(包括子级与父级),另一个是添加线程,也就是kafka接收到produceRequest的handler线程,他们会创建延时任务,如果牵涉到父级时间轮,也需要照顾他们的可见性。 这里要补充一点,就是老师没有讲到的queue问题。实际上分层时间轮最常见的问题有两个: 1.空转。也就是currentTime指针指向了某个bucket 但是那个bucket实际上是空的。kafka的处理方式使用delayqueue来处理,delayQueue.poll() 对于未到期的数据 是不会返回的,这样我们就不需要更新空转的指针 2.精度问题。实际上精度取决于最低一级时间轮的时间跨度,相当于PC里的HZ。而外部定时器每一轮发起的频次也会影响。 综上 进度误差在:外部定时器每一轮的频次+最低一级时间轮的时间跨度——》就是最小误差精度2022-06-29
收起评论