Kafka核心技术与实战
胡夕
人人贷计算平台部总监,Apache Kafka Contributor
立即订阅
8408 人已学习
课程目录
已完结 46 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 为什么要学习Kafka?
免费
Kafka入门 (5讲)
01 | 消息引擎系统ABC
02 | 一篇文章带你快速搞定Kafka术语
03 | Kafka只是消息引擎系统吗?
04 | 我应该选择哪种Kafka?
05 | 聊聊Kafka的版本号
Kafka的基本使用 (3讲)
06 | Kafka线上集群部署方案怎么做?
07 | 最最最重要的集群参数配置(上)
08 | 最最最重要的集群参数配置(下)
客户端实践及原理剖析 (14讲)
09 | 生产者消息分区机制原理剖析
10 | 生产者压缩算法面面观
11 | 无消息丢失配置怎么实现?
12 | 客户端都有哪些不常见但是很高级的功能?
13 | Java生产者是如何管理TCP连接的?
14 | 幂等生产者和事务生产者是一回事吗?
15 | 消费者组到底是什么?
16 | 揭开神秘的“位移主题”面纱
17 | 消费者组重平衡能避免吗?
18 | Kafka中位移提交那些事儿
19 | CommitFailedException异常怎么处理?
20 | 多线程开发消费者实例
21 | Java 消费者是如何管理TCP连接的?
22 | 消费者组消费进度监控都怎么实现?
深入Kafka内核 (5讲)
23 | Kafka副本机制详解
24 | 请求是怎么被处理的?
25 | 消费者组重平衡全流程解析
26 | 你一定不能错过的Kafka控制器
27 | 关于高水位和Leader Epoch的讨论
管理与监控 (12讲)
28 | 主题管理知多少?
29 | Kafka动态配置了解下?
30 | 怎么重设消费者组位移?
31 | 常见工具脚本大汇总
32 | KafkaAdminClient:Kafka的运维利器
33 | Kafka认证机制用哪家?
34 | 云环境下的授权该怎么做?
35 | 跨集群备份解决方案MirrorMaker
36 | 你应该怎么监控Kafka?
37 | 主流的Kafka监控框架
38 | 调优Kafka,你做到了吗?
39 | 从0搭建基于Kafka的企业级实时日志流处理平台
高级Kafka应用之流处理 (3讲)
40 | Kafka Streams与其他流处理平台的差异在哪里?
41 | Kafka Streams DSL开发实例
42 | Kafka Streams在金融领域的应用
结束语 (1讲)
结束语 | 以梦为马,莫负韶华!
特别放送 (2讲)
加餐 | 搭建开发环境、阅读源码方法、经典学习资料大揭秘
用户故事 | 黄云:行百里者半九十
Kafka核心技术与实战
登录|注册

27 | 关于高水位和Leader Epoch的讨论

胡夕 2019-08-03
你好,我是胡夕。今天我要和你分享的主题是:Kafka 中的高水位和 Leader Epoch 机制。
你可能听说过高水位(High Watermark),但不一定耳闻过 Leader Epoch。前者是 Kafka 中非常重要的概念,而后者是社区在 0.11 版本中新推出的,主要是为了弥补高水位机制的一些缺陷。鉴于高水位机制在 Kafka 中举足轻重,而且深受各路面试官的喜爱,今天我们就来重点说说高水位。当然,我们也会花一部分时间来讨论 Leader Epoch 以及它的角色定位。

什么是高水位?

首先,我们要明确一下基本的定义:什么是高水位?或者说什么是水位?水位一词多用于流式处理领域,比如,Spark Streaming 或 Flink 框架中都有水位的概念。教科书中关于水位的经典定义通常是这样的:
在时刻 T,任意创建时间(Event Time)为 T’,且 T’≤T 的所有事件都已经到达或被观测到,那么 T 就被定义为水位。
“Streaming System”一书则是这样表述水位的:
水位是一个单调增加且表征最早未完成工作(oldest work not yet completed)的时间戳。
为了帮助你更好地理解水位,我借助这本书里的一张图来说明一下。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Kafka核心技术与实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(39)

  • 趙衍
    老师列举了数据丢失的场景,我补充一个数据丢失的场景吧:

    假设集群中有两台Broker,Leader为A,Follower为B。A中有两条消息m1和m2,他的HW为1,LEO为2;B中有一条消息m1,LEO和HW都为1.假设A和B同时挂掉,然后B先醒来,成为了Leader(假设此时的min.insync.replicas参数配置为1)。然后B中写入一条消息m3,并且将LEO和HW都更新为2.然后A醒过来了,向B发送FetchrRequest,B发现A的LEO和自己的一样,都是2,就让A也更新自己的HW为2。但是其实,虽然大家的消息都是2条,可是消息的内容是不一致的。一个是(m1,m2),一个是(m1,m3)。

    这个问题也是通过引入leader epoch机制来解决的。

    现在是引入了leader epoch之后的情况:B恢复过来,成为了Leader,之后B中写入消息m3,并且将自己的LEO和HW更新为2,注意这个时候LeaderEpoch已经从0增加到1了。
    紧接着A也恢复过来成为Follower并向B发送一个OffsetForLeaderEpochRequest请求,这个时候A的LeaderEpoch为0。B根据0这个LeaderEpoch查询到对应的offset为1并返回给A,那么A就要对日志进行截断,删除m2这条消息。然后用FetchRequest从B中同步m3这条消息。这样就解决了数据不一致的问题。
    2019-08-10
    1
    14
  • QQ怪
    这篇文章有点深度了,看了几遍才看懂
    2019-08-03
    8
  • 常超
    前面有几个同学提过了,请老师再看一下。

    >与 Leader 副本保持同步的两个判断条件。
    >1. 该远程 Follower 副本在 ISR 中。
    >2. ...

    >如果 Kafka 只判断第 1 个条件的话,就可能出现某些副本具备了“进入 ISR”的资格,但却尚未进入到 ISR 中的情况。此时,分区高水位值就可能超过 ISR 中副本 LEO,而高水位 > LEO 的情形是不被允许的。

    应该改成“如果 Kafka 只判断第 2 个条件的话,...” 吧?
    按照现在的说法,上面那句话可以扩展成,如果只判断远程Follower副本是否在ISR中的话,就可能出现某些副本具备了“进入 ISR”的资格,但却尚未进入到 ISR 中的情况。此时,分区高水位值就可能超过 ISR 中副本 LEO,而高水位 > LEO 的情形是不被允许的。
    这样是说不通的吧。
    换个问法,比如,条件1有副本a,b, 条件2有副本b,c(其中c满足进入1的条件,但还没进入1),老师是想说,“只判断1,a会被误判为同步状态”,还是“只判断2,c会被误判为同步状态”呢?
    2019-08-08
    1
    4
  • hgf
    关于leader副本在处理follower同步时的流程感觉有问题。原文如下:

    处理 Follower 副本拉取消息的逻辑如下: 读取磁盘(或页缓存)中的消息数据。 使用 Follower 副本发送请求中的位移值更新远程副本 LEO 值。 更新分区高水位值(具体步骤与处理生产者请求的步骤相同)。

    在处理生产者请求时,更新leader HW的步骤原文:

    i. 获取 Leader 副本所在 Broker 端保存的所有远程副本 LEO 值{LEO-1,LEO-2,……,LEO-n}。
    ii. 获取 Leader 副本高水位值: currentHW 。
    iii. 更新 currentHW = min(currentHW, LEO-1,LEO-2,……,LEO-n)。


    在“副本同步机制解析”中,有一段话:

    在新一轮的拉取请求中,由于位移值是 0 的消息已经拉取成功,因此 Follower 副本这次请求拉取的是位移值 =1 的消息。Leader 副本接收到此请求后,更新远程副本 LEO 为 1,然后更新 Leader 高水位为 1。做完这些之后,它会将当前已更新过的高水位值 1 发送给 Follower 副本。Follower 副本接收到以后,也将自己的高水位值更新成 1。至此,一次完整的消息同步周期就结束了。事实上,Kafka 就是利用这样的机制,实现了 Leader 和 Follower 副本之间的同步。


    如果使用原文更新leader HW的逻辑,leader的HW是不应该更新的,并且永远都是0。具体分析如下:

    在新一轮的请求中,Follower 副本这次请求拉取的是位移值 =1 的消息,Leader 副本接收到此请求后,更新远程副本 LEO 为 1,但此时leader的HW还是0(即currentHW=0),更新leader的currentHW = min(currentHW, LEO-1,LEO-2,……,LEO-n)=min(0,1),那么结论应该是0。

    更新leader HW步骤中,第二步是不是应该获取leader 副本的LEO,第三步应该是更新 currentHW = min(leader_leo, LEO-1,LEO-2,……,LEO-n)。
    2019-08-05
    4
    3
  • godtrue
    今天的课程很棒,知识密度比较大,小结一下
    1:啥是高水位?
    水位,我的理解就是水平面当前的位置,可以表示水的深度。在kafka中水位用于表示消息在分区中的位移或位置,高水位用于表示已提交的消息的分界线的位置,在高水位这个位置之前的消息都是已提交的,在高水位这个位置之后的消息都是未提交的。所以,高水位可以看作是已提交消息和未提交消息之间的分割线,如果把分区比喻为一个竖起来的水容器的话,这个表示就更明显了,在高水位之下的消息都是已提交的,在高水位之上的消息都是未提交的。
    高水位的英文是High Watermark ,所以其英文缩写为HW。
    值得注意的是,Kafka 中也有低水位(Low Watermark,英文缩写为LW),它是与 Kafka 删除消息相关联的概念。
    再加一个概念,LEO——Log End Offset 的缩写——意思是日志末端位移,它表示副本写入下一条消息的位移值——既分区中待写入消息的位置。这个位置和高水位之间的位置包括高水位的那个位置,就是所有未提交消息的全部位置所在啦——未提交的消息是不能被消费者消费的。所以,同一个副本对象,其高水位值不会大于 LEO 值。
    高水位和 LEO 是副本对象的两个重要属性。Kafka 所有副本都有对应的高水位和 LEO 值,而不仅仅是 Leader 副本。只不过 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位。

    2:高水位有啥用?
    2-1:定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的——已提交的消息是可以被消费者消费的。
    2-2:帮助 Kafka 完成副本同步——明确那些消息已提交那些未提交,才好进行消息的同步。

    3:高水位怎么管理?
    这个不好简单的描述,牢记高水位的含义,有助于理解更新高水的时机以及具体步骤。
    高水位——用于界定分区中已提交和未提交的消息。

    4:高水有舍缺陷?
    Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。

    5:啥是 leader epoch?
    可以大致认为就是leader的版本。
    它由两部分数据组成。
    5-1:Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
    5-2:起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。

    6:leader epoch 有啥用?
    通过 Leader Epoch 机制,Kafka 规避了因为Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配,而引起的很多“数据丢失”或“数据不一致”的问题。

    7:leader epoch 怎么管理?
    需要再看看,还不能简单描述出来。

    2019-08-18
    2
  • 信信
    原文中“如果 Kafka 只判断第 1 个条件的话”--这里应该是:第2个条件?评论区其他人也有提到
    对这块的个人理解:
    两个条件之间的关系是与不是或
    这里想表达的应该是--这个即将进入isr的副本的LEO值比分区高水位小,但满足条件2;
    文中对条件2的描述好像有点歧义,以下是网上找的一段:
    假设replica.lag.max.messages设置为4,表明只要follower落后leader不超过3,就不会从同步副本列表中移除。replica.lag.time.max设置为500 ms,表明只要follower向leader发送请求时间间隔不超过500 ms,就不会被标记为死亡,也不会从同步副本列中移除。

    作者回复: replica.lag.max.messages已经被移除了,不要看这篇了。你可以看看我之前写的这篇:Kafka副本管理—— 为何去掉replica.lag.max.messages参数(https://www.cnblogs.com/huxi2b/p/5903354.html)

    2019-08-04
    2
  • 我来也
    1.该远程 Follower 副本在 ISR 中。

    如果 Kafka 只判断第 1 个条件的话,就可能出现某些副本具备了“进入 ISR”的资格,但却尚未进入到 ISR 中的情况。

    ————————
    这里是不是把条件的编号写反了?

    作者回复: 没写反啊?就是想说只靠第一个条件不充分

    2019-08-03
    2
  • 奇奇
    首先leader接受生产者逻辑 肯定不会是通过当前leader 高水位来取min来得到高水位的值,你这样取min会始终取到当前leader高水位的值,因为它就是min,应该要通过leader的leo来取min,这样才能使得

    作者回复: 嗯嗯,这块写的是有问题。应该改成:
    更新 currentHW = max(currentHW, min(leo-1, leo-2, .. leo-n))

    感谢您的反馈:)

    2019-09-10
    2
    1
  • 知易
    文中老师举例说明数据丢失场景,其中有一处疑惑。
    原文。。“当执行完截断操作后,副本 B 开始从 A 拉取消息,执行正常的消息同步。如果就在这个节骨眼上,副本 A 所在的 Broker 宕机了,那么 Kafka 就别无选择,只能让副本 B 成为新的 Leader,此时,当 A 回来后,需要执行相同的日志截断操作,即将高水位调整为与 B 相同的值,也就是 1。这样操作之后,位移值为 1 的那条消息就从这两个副本中被永远地抹掉了。这就是这张图要展示的数据丢失场景。”
          其中,A宕机前其高水位为2,此时回来进行日志截断不应该还是2么,为啥要调整为与leaderB一样的水位值?前面B宕机回来的时候,进行日志截断也还是保持其宕机前的值1,并没有调整为与leaderA一样的水位值呢?
    这里不是没有理解到,请老师解惑。感谢。
    2019-08-26
    1
  • 常超
    请问老师,与 Leader 副本保持同步的两个判断条件,是OR还是AND的关系?

    作者回复: AND

    2019-08-07
    1
  • 注定非凡
    1,高水位概念
    A :水位:水位一次多用于流式处理领域,如Spark Streaming 或Flink框架中都有水位的概念。
    在教科书中关于水位定义:在即刻T,任意创建时间(Event Time)为T ’ ,且T’ <= T的所有事件都已经到达或被观测到,那么T就被定义为水位。

    在“Streaming System”:一书则是这样表述水位:水位是一个单调增加且表征最早未完成工作(oldest work not yet completed)的时间戳。

    B :kafka的水位概念:kafka的水位不是时间戳,与时间无关。他是和位置信息绑定的,它是用消息位移来表征的。
    Kafka源码使用的表述是高水位。在Kafka中也有低水位(Low Watermark),它是与Kafka删除消息相关的概念。

    2 高水位作用
    A :定义消息可见性,用来标识分区下的哪些消息是可以被消费者消费的。
    B :帮助Kafka完成副本同步。
     
    “已提交消息” 和 “未提交消息”
    (1)在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。
    (2)消费者只能消费已提交消息
    (3)这不是Kafka的事务,因为事务机制会影响消息者所能看到的消息的范围,他不只是简单依赖高水位来判断。他依靠一个名为LSO(Log Stable Offset)的位移值来判断事务型消费者的可见性。
    (4)位移值等于高水位的消息也属于为提交消息。即,高水位消息的消息是不能被消费者消费的。
    (5)日志末端位移的概念:Log End Offset,简写是LEO。他表示副本写入下一条消息的位移值。同一个副本对象,其高水位值不会大于LEO值。
    (6)高水位和LEO是副本对象的两个重要属性。Kafka所有副本都有对应的高水位和LEO值,而不仅仅是Leader副本。只是Leader副本比较特殊,Kafka使用Leader副本的高水位来定义所在分区的高水位。即,分区的高水位就是其Leader副本的高水位。
    3 高水位更新机制
     
    A :在Leader副本所在Broker上,还保存了其他Follower副本的LEO值。而其他Broker上仅仅保存该分区的某个Follower副本。Kafka将Leader副本所在Broker上保存的这些Follower副本称为远程副本。
    Kafka副本机制在运行过程中,会更新Broker1上Follower副本的高水位和LEO值,同时也会更新Broker0上Leader副本的高水位和LEO,以及所有远程副本的LEO。但它不会更新远程副本的高水位值。
    Broker0上保存这些远程副本的作用是帮助Leader副本确定其高水位,即分区高水位。

    B :与Leader副本保持同步

    总结:高水位和LEO的更新机制
    一,Leader副本
    处理生产者请求的逻辑:
    a. 写入消息到本地磁盘。
    b. 更新分区高水位值
    1,获取Leader副本所在Broker端保存的所有远程副本LEO值{LEO-1,LEO-2,……,LEO-n}。
    2,获取Leader副本高水位值:currentHW。
    3,更新currentHW = max(currentHW ,min(leo-1,leo-2,……leo-n)).

    处理follwer副本拉取消息的逻辑:
    a. 读取磁盘(或页缓存)中的消息数据
    b. 使用Follower副本发送请求中的位移值更新远程副本LEO值。
    c. 更新分区高水位值(具体步骤与处理生产者请求的步骤相同)

    二 Follower副本
    从Leader拉取消息的处理逻辑:
    a. 写入消息到本地磁盘
    b. 更新LEO值
    c. 更新高水位值
    1. 获取Leader发送的高水位值:currentHW。
    2. 获取步骤2中更新过的LEO值:currentLEO。
    3. 更新高水位为min(currentHW,currentLEO)。

    4 Leader Epoch
    Leader Epoch概念,用来规避因高水位更新错配导致的各种不一种问题。所谓Leader Epoch大致可以认为是Leader版本。
    A :组成:由两部分数据组成。
    1. Epoch。一个单调增加的版本号。每当领导权发生变更时,都会增加该版本号。小版本号的Leader被认为是过期的Leader,不能在行使Leader权利。
    2. 起始位移(Start Offset)。Leader副本在改Epoch值上写入的首条消息的位移。
    B :Kafka Broker会在内存中为每个分区都缓存Leader Epoch数据,同时他还会定期地将这些信息持久化到一个checkpoint文件中。
    2019-11-10
  • 亚洲舞王.尼古拉斯赵四
    我还奇怪为什么老师讲的和Apache kafka实战这部分内容差不多,还以为是抄袭,后来一看,原来老师就是我看的这本书的作者,😂

    作者回复: 在所有需要回复的人当中,你的名字是我最喜欢的,没有之一:)

    2019-11-06
  • 朱东旭
    胡老师您好,在您讲的leader epoch机制案例中,在我看来最关键的操作是broker重启后先向leader确认leo,而不是直接基于自己的高水位截断数据,来防止数据不一致。。可是有无leader epoch都可以做这个操作呀,我看不出leader epoch必要性在哪。。

    作者回复: epoch还有其他的作用,比如执行基本的fencing逻辑等

    2019-11-02
  • 😈😈😈😈😈
    这个是我理解的HW和LEO更新的机制步骤,有错误的话请大神指明下,非常感谢
    更新对象 更新时机
    Broker1上Follower副本 Follwer会从Leader副本不停的拉取数据,但是Leader副本现在的没有数据。所以Leader副本和Follower副本的高水位值和LEO值都是0
    Broker0上的Leader副本 生产者向Leader副本中写入一条数据,此时LEO值是1,HW值是0。也就是说位移为0的位置上已经有数据了
    Broker1上Follower副本 由于Leader副本有了数据,所以Follower可以获取到数据写入到自己的日志中,且标记LEO值为1,此时在Followe位移值为0的位置上也有了数据,所以此时Follower的HW=0,LEO=1
    Broker1上Follower副本 获取到数据之后,再次向Leader副本拉数据,这次请求拉取的数据是位移值1上的数据
    Broker0上的远程副本 Leader收到Follower的拉取请求后,发现Follower要拉取的数据是在位移值为1的位置上的数据,此时会更新远程副本的LEO值为1。所以所有的远程副本的LEO等于各自对应的Follower副本的LEO值
    Brober0上的Leader副本 Broker0上的远程副本的LEO已经更新为1了。所以开始更新Leader副本的HW值。HW=max{HW,min(LEO1,LEO2,LEO3......LEON)},更新HW值为1,之后会发送Follower副本请求的数据(如果有数据的话,没有数据的话只发送HW值)并一起发送HW值
    Broker1上Follower副本 Follwer副本收到Leader返回的数据和HW值(如果Leader返回了数据那么LEO就是2,没有数据的话LEO还是1),用HW值和自己的LEO值比较选择较小作为自己的HW值并更新HW值为1(如果俩个值相等的话HW=LEO)
    一次副本间的同步过程完成

    作者回复: 挺好的,没有什么意见:)

    2019-10-22
  • 谢特
    老师,有个问题请教一下,就是最近在写kafka sink connect ,提供了rest api可以暂停消费,请问这样时间长的话,会导致消息堆积,broker会宕机吗

    作者回复: 不会的,不论是你是否消费,消息都会保存在broker端一段时间的

    2019-10-12
  • 谢特
    远程副本指的是和leader副本在一个broker上的副本吗

    作者回复: 嗯嗯,是的。不过是在内存中的对象,并不实际存在于leader副本所在broker的磁盘上

    2019-10-11
  • benying
    没看懂leader epoch的作用
    2019-09-28
  • 被过去推开
    没有 epoch时,副本A所在Broker宕机,高水位退回到位移值为1,不是应该会发生重复消费嘛 ?

    作者回复: 不会重复消费的,hw是在所有副本都越过了某个值之后才会前进到该值,但的确有可能造成各副本间不一致

    2019-09-01
  • 绿箭侠
    老师,文中讲的leader副本的两个更新时机都是使用currentHW = min(currentHW, LEO-1,LEO-n)方法去更新吗?这里有些困惑,第一种理解因为LEO-1,LEO-2会增加,但是在更新currentHW前currentHW是不变,那这个方法的结果是currentHW永远得不到更新?或者说是第二种理解,leader副本会在得到follow副本拉取消息请求后就将currentHW更新为远程副本LEO的最小值,然后在第一种更新时机中,即更新完leader副本LEO后试用上面那个方法更新currentHW?
    针对这两种理解,请老师指正。

    作者回复: 不会啊,如果所有follower的LEO都超过了某个值,hw就会前进到那个值,不会永远不更新的

    2019-08-30
    1
  • 无菇朋友
    老师,文章里出现的请求怎么用抓包工具抓一下

    作者回复: tcpdump可以抓一下,只是都是二进制的字节序列

    2019-08-19
收起评论
39
返回
顶部