Kafka 核心源码解读
胡夕
Apache Kafka Committer,老虎证券技术总监
19216 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 44 讲
结束语 (1讲)
Kafka 核心源码解读
15
15
1.0x
00:00/00:00
登录|注册

20 | DelayedOperation:Broker是怎么延时处理请求的?

你好,我是胡夕。
上节课,我们学习了分层时间轮在 Kafka 中的实现。既然是分层时间轮,那就说明,源码中构造的时间轮是有多个层次的。每一层所表示的总时长,等于该层 Bucket 数乘以每个 Bucket 涵盖的时间范围。另外,该总时长自动成为下一层单个 Bucket 所覆盖的时间范围。
举个例子,目前,Kafka 第 1 层的时间轮固定时长是 20 毫秒(interval),即有 20 个 Bucket(wheelSize),每个 Bucket 涵盖 1 毫秒(tickMs)的时间范围。第 2 层的总时长是 400 毫秒,同样有 20 个 Bucket,每个 Bucket 20 毫秒。依次类推,那么第 3 层的时间轮时长就是 8 秒,因为这一层单个 Bucket 的时长是 400 毫秒,共有 20 个 Bucket。
基于这种设计,每个延迟请求需要根据自己的超时时间,来决定它要被保存于哪一层时间轮上。我们假设在 t=0 时创建了第 1 层的时间轮,那么,该层第 1 个 Bucket 保存的延迟请求就是介于[0,1)之间,第 2 个 Bucket 保存的是介于[1,2) 之间的请求。现在,如果有两个延迟请求,超时时刻分别在 18.5 毫秒和 123 毫秒,那么,第 1 个请求就应该被保存在第 1 层的第 19 个 Bucket(序号从 1 开始)中,而第 2 个请求,则应该被保存在第 2 层时间轮的第 6 个 Bucket 中。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Kafka中的DelayedOperation机制通过分层时间轮实现请求的延时处理。分层时间轮由多个层次组成,每层表示的总时长是该层Bucket数乘以每个Bucket的时间范围。SystemTimer类封装了分层时间轮对象,为Purgatory提供延迟请求管理功能。DelayedOperation类是所有Kafka延迟请求类的抽象父类,它的构造函数中只需要传入一个超时时间即可。DelayedOperationPurgatory类是实现Purgatory的地方,它是一个泛型类,通常情况下,每一类延迟请求都对应于一个DelayedOperationPurgatory实例。DelayedOperationPurgatory类的字段包括purgatoryName、timeoutTimer、brokerId、purgeInterval、reaperEnabled和timerEnabled,其中purgeInterval用于控制删除线程移除Bucket中的过期延迟请求的频率。文章通过深入分析分层时间轮的实现原理和SystemTimer类的关键代码,帮助读者理解Kafka中DelayedOperation机制的技术特点和实现细节。 DelayedOperationPurgatory还定义了两个内置类,分别是Watchers和WatcherList。Watchers是基于Key的一个延迟请求的监控链表,而WatcherList是一组按照Key分组的Watchers对象。DelayedOperationPurgatory类的两个重要方法tryCompleteElseWatch和checkAndComplete分别用于尝试完成延迟请求或检查并完成给定Key所在的WatcherList中的延迟请求。这些方法共同实现了Broker端对于延迟请求的处理,将无法立即完成的请求放入Purgatory缓冲区,后续由DelayedOperationPurgatory类的方法自动处理这些延迟请求。 总的来说,了解Kafka中DelayedOperation机制的实现原理和代码实现对于理解分布式系统的异步循环操作和管理定时任务具有重要意义。文章通过深入解析分层时间轮的实现原理和SystemTimer类的关键代码,帮助读者掌握了Kafka中DelayedOperation机制的技术特点和实现细节。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心源码解读》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(9)

  • 最新
  • 精选
  • 胡夕
    置顶
    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~ 上节课,咱们结合源码重点了解了Kafka定时器背后的时间轮算法实现。课后我请你思考这样一个问题:TimingWheel类中的overflowWheel变量为什么是volatile型的。overflowWheel表示上一层的时间轮对象。我们在课程里面说过,上一层时间轮是按需创建的,它可能被两个并发的线程同时访问。一个线程更新该字段,另一个线程读取它,因此代码声明该字段是volatile型以保证它的内存可见性。 okay,你同意这个说法吗?或者说你有其他的看法吗?我们可以一起讨论下。
    2020-06-15
    1
  • J.Smile
    读第一遍,一脸懵逼,哈哈。看来来得多多读几遍!

    作者回复: 嗯,DelayedOperation是我个人觉得最难的一部分源码,当初我也是看了好几遍。

    2020-07-07
    1
  • 伯安知心
    大概意思就是说 到了purgeInterval这个清除间隔的数量,就会清理watcherLists中已经完成但是一直被监听的触发器个数,而且这个purged 就是已经完成但是还未清理的任务数量。purgeCompleted 进行remove。大概意思就是说 到了purgeInterval这个清除间隔的数量,就会清理watcherLists中已经完成但是一直被监听的触发器个数,而且这个purged 就是已经完成但是还未清理的任务数量。purgeCompleted 进行remove。

    作者回复: 嗯,是这回事

    2020-06-09
    1
  • Geek_bd613f
    延迟任务在文件中是怎么组织的呢

    作者回复: 您指的组织是什么含义?

    2020-12-20
  • Kvicii.Y
    addTimerTaskEntry这个方法的是不是未过期并且未被取消什么也不做吧;

    作者回复: 嗯,是的

    2020-07-12
  • 可达猫
    老师好,我们最近遇到了一个就是扩容 kafka broker 的问题,之前有 3 台 broker,这两天新增了一台 broker,然后执行 Reassign Partitions(数据量比较大),但是经过了 36 个小时了,Reassign Partitions 状态一直是 pending,没有结束。不知道老师遇到过这种问题没有

    作者回复: 先查看下日志吧,别有什么异常导致卡住了。另外如果数据量大的话,其实建议一个分区一个分区的来

    2020-06-10
    2
  • z.l
    核心就是一边往时间轮里面加任务,一边在外部死循环调用delayQueue.poll(timeout)方法来触发时间轮的tick,推进时间轮往前走。感觉还是Netty实现的时间轮简单点,bucket可以复用,不过netty是自己实现的时间轮推进。
    2022-09-08归属地:上海
  • 成电帅才
    老师好,DelayedOperationPurgatory会管理延迟请求,这里的延迟请求和timewheeling中的延迟请求是不是同一个对象?还有就是为什么要分两个地方保存这份数据?
    2021-11-22
  • 柠檬C
    老师,purgatory中既由watcherList和watcher组成的监听链表,又有Timer这个时间轮相关组件 我感觉它们都起到监听延迟请求,并重试的作用,请问它们的区别是啥呢
    2021-04-14
收起评论
显示
设置
留言
9
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部