Kafka核心技术与实战
胡夕
人人贷计算平台部总监,Apache Kafka Contributor
立即订阅
8408 人已学习
课程目录
已完结 46 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 为什么要学习Kafka?
免费
Kafka入门 (5讲)
01 | 消息引擎系统ABC
02 | 一篇文章带你快速搞定Kafka术语
03 | Kafka只是消息引擎系统吗?
04 | 我应该选择哪种Kafka?
05 | 聊聊Kafka的版本号
Kafka的基本使用 (3讲)
06 | Kafka线上集群部署方案怎么做?
07 | 最最最重要的集群参数配置(上)
08 | 最最最重要的集群参数配置(下)
客户端实践及原理剖析 (14讲)
09 | 生产者消息分区机制原理剖析
10 | 生产者压缩算法面面观
11 | 无消息丢失配置怎么实现?
12 | 客户端都有哪些不常见但是很高级的功能?
13 | Java生产者是如何管理TCP连接的?
14 | 幂等生产者和事务生产者是一回事吗?
15 | 消费者组到底是什么?
16 | 揭开神秘的“位移主题”面纱
17 | 消费者组重平衡能避免吗?
18 | Kafka中位移提交那些事儿
19 | CommitFailedException异常怎么处理?
20 | 多线程开发消费者实例
21 | Java 消费者是如何管理TCP连接的?
22 | 消费者组消费进度监控都怎么实现?
深入Kafka内核 (5讲)
23 | Kafka副本机制详解
24 | 请求是怎么被处理的?
25 | 消费者组重平衡全流程解析
26 | 你一定不能错过的Kafka控制器
27 | 关于高水位和Leader Epoch的讨论
管理与监控 (12讲)
28 | 主题管理知多少?
29 | Kafka动态配置了解下?
30 | 怎么重设消费者组位移?
31 | 常见工具脚本大汇总
32 | KafkaAdminClient:Kafka的运维利器
33 | Kafka认证机制用哪家?
34 | 云环境下的授权该怎么做?
35 | 跨集群备份解决方案MirrorMaker
36 | 你应该怎么监控Kafka?
37 | 主流的Kafka监控框架
38 | 调优Kafka,你做到了吗?
39 | 从0搭建基于Kafka的企业级实时日志流处理平台
高级Kafka应用之流处理 (3讲)
40 | Kafka Streams与其他流处理平台的差异在哪里?
41 | Kafka Streams DSL开发实例
42 | Kafka Streams在金融领域的应用
结束语 (1讲)
结束语 | 以梦为马,莫负韶华!
特别放送 (2讲)
加餐 | 搭建开发环境、阅读源码方法、经典学习资料大揭秘
用户故事 | 黄云:行百里者半九十
Kafka核心技术与实战
登录|注册

18 | Kafka中位移提交那些事儿

胡夕 2019-07-13
你好,我是胡夕。今天我们来聊聊 Kafka 中位移提交的那些事儿。
之前我们说过,Consumer 端有个位移的概念,它和消息在分区中的位移不是一回事儿,虽然它们的英文都是 Offset。今天我们要聊的位移是 Consumer 的消费位移,它记录了 Consumer 要消费的下一条消息的位移。这可能和你以前了解的有些出入,不过切记是下一条消息的位移,而不是目前最新消费消息的位移。
我来举个例子说明一下。假设一个分区中有 10 条消息,位移分别是 0 到 9。某个 Consumer 应用已消费了 5 条消息,这就说明该 Consumer 消费了位移为 0 到 4 的 5 条消息,此时 Consumer 的位移是 5,指向了下一条消息的位移。
Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据
提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X,那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Kafka核心技术与实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(62)

  • nightmare
    老师手动提交的设计很优美,先用异步提交不影响程序的性能,再用consumer关闭时同步提交来确保位移一定提交成功。这里我有个疑问,比如我程序运行期间有多次异步提交没有成功,比如101的offset和201的offset没有提交成功,程序关闭的时候501的offset提交成功了,是不是就代表前面500条我还是消费成功了,只要最新的位移提交成功,就代表之前的消息都提交成功了?第二点 就是批量提交哪里,如果一个消费者晓得多个分区的消息,封装在一个Map对象里面消费者也能正确的对多个分区的位移都保证正确的提交吗?
    2019-07-13
    2
    18
  • 无菇朋友
    老师您好,有一个疑问,为什么poll之前的提交和按频率自动提交是一个时机,假如频率是5s提交一次,某两次poll之间的间隔是6s,这时候是怎么处理提交的?忘老师解答下,着实没想通这个地方

    作者回复: 嗯, 严格来说。提交频率指的是最小的提交间隔。比如设置5s,Kafka保证至少等待5s才会自动提交一次。

    2019-07-21
    5
  • Tony Du
    老师,您好~ 看了今天的教程我有两个问题想请教下,希望老师能赐教。
    1. 从文中的代码看上去,使用commitAsync提供offset,不需要等待异步执行结果再次poll就能拿到下一批消息,是那么这个offset的最新值是不是理解为其实是在consumer client的内存中管理的(因为实际commitAsync如果失败的话,offset不会写入broker中)?如果是这样的话,如果在执行到commitSync之前,consumer client进程重启了,就有可能会消费到因commitAsync失败产生的重复消息。
    2. 教程中手动提交100条消息的代码是一个同步处理的代码,我在实际工程中遇到的问题是,为了提高消息处理效率,coumser poll到一批消息后会提交到一个thread pool中处理,这种情况下,请教下怎样做到控制offset分批提交?
    谢谢
    2019-07-13
    4
  • AF
    先说一下,课后思考,解决的办法应该就是,将消息处理和位移提交放在一个事务里面,要么都成功,要么都失败。

    老师文章里面的举的一个例子没有很明白,能不能再解释一下。就是那个位移提交后Rebalance的例子。
    2019-07-13
    1
    3
  • lmtoo
    对于手动同步和异步提交结合的场景,如果poll出来的消息是500条,而业务处理200条的时候,业务抛异常了,后续消息根本就没有被遍历过,finally里手动同步提交的是201还是000,还是501?

    作者回复: 如果调用没有参数的commit,那么提交的是500

    2019-07-13
    3
    2
  • 水天一色
    消费者提了异步 commit 实际还没更新完offset,消费者再不断地poll,其实会有重复消费的情况吧?

    作者回复: 只要consumer没有重启,不会发生重复消费。因为在运行过程中consumer会记录已获取的消息位移

    2019-12-07
    1
  • 惜昔
    老师 手动提交位移的时候 如果一个某个消费者组的一个消费者实例从offset下标4开始消费的 但是消费者的消费逻辑比较重量级,处理的时间比较长,还没有提交。这时候另外一个消费者组的一个消费者实例来相同的分区来拿消息,会不会拿的是上一个消费者已经拿过的消费,从而重复消费?

    作者回复: 有可能的~

    2019-11-08
    1
    1
  • Algoric
    自动提交一定不会消息丢失吗,如果每次poll的数据过多,在提交时间内没有处理完,这时达到提交时间,那么Kafka还是重复提交上次poll的最大位移吗,还是讲本次poll的消息最大位移提交?

    作者回复: hmmm... 其实我一直觉得提交间隔这个参数的命名有些问题。它实际保证的是位移至少要隔一段时间才会提交,如果你是单线程处理消息,那么只有处理完消息后才会提交位移,可能远比你设置的间隔长。

    2019-09-25
    2
    1
  • 谢特
    不漏不重复很难做到,我现在都不知道怎么弄,读偏移量容易,提交太难
    2019-09-10
    1
  • WL
    我感觉有点不是很理解消费者端的位移概念和消息在分区中的位移为啥不是一回事,我理像是一回事,因为消费者端把自己消费的位移提交到Broker的位移主题里不就定义了下次消费的起点了吗,为啥说不是一回事呢,有啥区别呢,请老师具体指导一下。

    作者回复: 嗯嗯,我的意思是consumer提交的位移值虽然就是消息在分区中的位移值,但这个提交位移的概念和分区中的位移概念是不同的。

    2019-07-13
    1
  • James
    你好,请问下分区自动提交失败,请求超时。会导致什么后果呢
    重复消费吗。

    作者回复: 偶发的提交失败也不一定就意味着重复消费,看consumer程序的运行情况。

    2019-11-20
    1
  • xiaoniu
    老师 你好 问个问题,Kafka对未提交的消息一定要等到消费工程重启才重新消费吗?比如:我的消费工程 支持幂等性,可以重复消费,但是由于某时刻数据库挂了,导致消息未提交,但隔了一段时间数据库又好了,之前未提交的消息可以在不重启的情况下再次消费下吗?

    作者回复: 如果不重启的话,程序里面是否显式调用了seek重新调整了offset呢?如果没有就无法重新消费那些已经消费的消息

    2019-11-18
  • Ran
    消费端去重处理方式有很多。比如保证操作的幂等性,或者缓存业务id在redis中,或者数据库唯一键
    2019-11-14
  • 注定非凡
    1 概念区分
    A :Consumer端的位移概念和消息分区的位移概念不是一回事。
    B :Consumer的消费位移,记录的是Consumer要消费的下一条消息的位移。

    2 提交位移
    A :Consumer 要向Kafka汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。
    B :Consumer需要为分配给它的每个分区提交各自的位移数据。

    3提交位移的作用
    A :提交位移主要是为了表征Consumer的消费进度,这样当Consumer发生故障重启后,能够从kafka中读取之前提交的位移值,从相应的位置继续消费,避免从头在消费一遍。

    4 位移提交的特点
    A :位移提交的语义保障是由你来负责的,Kafka只会“无脑”地接受你提交的位移。位移提交错误,就会消息消费错误。

    5 位移提交方式
    A :从用户的角度讲,位移提交分为自动提交和手动提交;从Consumer端的角度而言,位移提交分为同步提交和异步提交。

    B :自动提交:由Kafka consumer在后台默默的执行提交位移,用户不用管。开启简单,使用方便,但可能会出现重复消费。

    C :手动提交:好处在更加灵活,完全能够把控位移提交的时机和频率。
    (1)同步提交:在调用commitSync()时,Consumer程序会处于阻塞状态,直到远端Broker返回提交结果,这个状态才会结束。对TPS影响显著
    (2)异步提交:在调用commitAsync()时,会立即给响应,但是出问题了它不会自动重试。
    (3)手动提交最好是同步和异步结合使用,正常用异步提交,如果异步提交失败,用同步提交方式补偿提交。

    D :批次提交:对于一次要处理很多消费的Consumer而言,将一个大事务分割成若干个小事务分别提交。这可以有效减少错误恢复的时间,避免大批量的消息重新消费。
    (1)使用commitSync(Map<TopicPartition,Offset>)和commitAsync(Map<TopicPartition,OffsetAndMetadata>)。
    2019-11-05
  • 不忘初心丶方得始终
    老师你好,问个问题,目前公司要用kafka同步老数据库数据,同步过程是按照老数据库的bin.log日志顺序进行同步,但是在同步过程中,有些表是有关联的,加入将数据放到多个分区,不同分区数据消费顺序不一样,就会导致数据同步出现关联问题,如果设置一个分区……这样又太慢,有什么好的建议吗?

    作者回复: 如果是批任务,能否等到数据抽取完了再进行消费。如果是streaming任务,这是经典的table-table join,最好使用特定的流处理框架来处理

    2019-10-25
  • jc9090kkk
    感觉老师分享,对于文章中的那个自动提交的例子有点疑惑,希望老师能解答一下:
    auto.commit.interval.ms设置为5s,也就是说consumer每5秒才提交一次位移信息,那consumer如果每消费一条数据,但是没有达到自动提交的时间,这个位移信息该如何管理?consumer自己做维护吗?但是也需要跟broker端进行位移信息同步的吧? 不然可能会造成数据的重复消费?还是每5s的提交和consumer自动提交的时候都会伴随位移信息的同步?是我的理解有问题吗?

    作者回复: 如果没有达到提交时间就不会提交,自动提交完全由consumer自行维护,确实可能造成数据的重复消费。你的理解完全没有问题:)

    目前单纯依赖consumer是无法避免消息的重复消费的,Kafka默认提供的消息处理语义就是至少一次处理。

    2019-09-23
    1
  • DFighting
    每个Consumer消费完数据后需要暂存下offset,考虑到一个分区的数据只会被一个当前组下的一个Consumer消费,那么有仨种情况要处理:
    1、继续消费时,那么可以判断后续poll到的offset和自己保存的值的大小,只消费不小于的消息
    2、处理最后一个消息时,这时候可以仿照TCP的最后一次挥手中的CLOSE_WAIT状态,设定一个超时时间——这需要结合日常的业务场景,至少要取最大传输时延的2倍,因为大多数情况下消息是不断到达的,所以这个时间设定稍微久远一点也是可以的。
    3、前两种都是成功消费的情况,如果消费失败导致位移更新失败,那么这个机制就没有任何生效的意义了,这时候重复消费就不可避免了。
    自己的一些见解,有什么不合适的情况望老师指点一二

    作者回复: 我觉得说的挺好的:)

    2019-09-05
  • 盘尼西林
    发生重平衡之前可以添加一个ConsumerRebalanceListener,防止offset丢失。 在一个消费者对一个分区失去所有权之前 会调用这个ConsumerRebalanceListener,ConsumerRebalanceListener在调用subscribe()方法传进去,这个时候我们可以在这个listener中添加commit offset
    2019-09-01
  • 嘉嘉☕
    老师, 请问一下,
    如果consumer拉过来一批消息, 还没处理完呢, 就调用了它的close方法, consumer会继续消费完成吗 ? 还是会做出一些其他的行为 ?
    谢谢

    作者回复: “还没处理完呢, 就调用了它的close方法” --- 不会啊。

    2019-08-17
  • godtrue
    0:位移是指什么?
    今天我们要聊的位移是 Consumer 的消费位移,它记录了 Consumer 要消费的下一条消息的位移。这可能和你以前了解的有些出入,不过切记是下一条消息的位移,而不是目前最新消费消息的位移。
    consumer侧的位移,表示consumer消费消息的进度和即将要消费的消息在分区中的具体位置。

    1:提交位移是指什么意思?
    Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即Consumer 需要为分配给它的每个分区提交各自的位移数据

    2:提交位移的作用是啥?
    提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X,那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了。

    2:提交位移的方式有哪些?
    从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。
    自动提交位移,嫩个保证消息不丢失,但是可能存在消息的重复消费。
    手动提交位移,kafka提供了同步和异步的API,老师也提供了手动提交的代码范例。
    老师推荐手动提交,灵活,可控,同步异步结合使用。
    需要注意的是位移提交如果错误会出现消息丢失或重复消费的情况。手动提交时,位移提交的语义保障是由我们自己来负责的,Kafka 只会“无脑”地接受我们提交的位移。我们对位移提交的管理直接影响了我们的 Consumer 所能提供的消息语义保障。

    请问老师,手动提交位移时,kafka对边界值有校验嘛?比如:一个分区有0~9十个位置,我传过去一个-1或者11,kafka会将所有消息重复消费或全部丢掉嘛?
    业务侧来进行消息去重,可以利用数据库的唯一索引,这需要一个唯一的业务字段,比如:订单号
    如果不能利用数据库来做,我觉得可以缓存消息,然后用于消息去重,消息重复发送的时机应该时差比较短。
    再请教一个问题,假如现在只有一个producer、一个broker、一个分区、一个consumer,位移从producer的角度有一个,从consumer的角度有一个,不过全是针对这一个分区而言的对吗?如果是,我还想再问一次分区的数据结构是什么?看之前的图像是数组,是不是数组呢?如果是,一端负责写入数据,一端负责读取数据,都是通过位移来控制的,也能理解。只是这数组的长度和删除消息的操作怎么控制的呢?
    2019-08-16
收起评论
62
返回
顶部