Kafka 核心技术与实战
胡夕
Apache Kafka Committer,老虎证券技术总监
52815 人已学习
新⼈⾸单¥68
登录后,你可以任选4讲全文学习
课程目录
已完结/共 47 讲
开篇词 (1讲)
结束语 (1讲)
Kafka 核心技术与实战
15
15
1.0x
00:00/00:00
登录|注册

11 | 无消息丢失配置怎么实现?

多线程异步处理消费消息,手动提交位移
维持先消费消息,再更新位移的顺序
使用带有回调通知的发送API
异步发送消息
有前提条件:N个Broker中至少有1个存活
Kafka只对已提交的消息做持久化保证
消息在Kafka看来变为"已提交"
Broker成功接收消息并写入日志文件
增加主题分区的消息丢失场景
确保消息消费完成再提交
确保replication.factor > min.insync.replicas
设置min.insync.replicas > 1
设置replication.factor >= 3
设置unclean.leader.election.enable = false
设置retries为一个较大的值
设置acks = all
不要使用producer.send(msg),而要使用producer.send(msg, callback)
案例2:消费者程序丢失数据
案例1:生产者程序丢失数据
有限度的持久化保证
已提交的消息
开放讨论
最佳实践
"消息丢失"案例
Kafka持久化保证的责任边界
如何配置Kafka无消息丢失

该思维导图由 AI 生成,仅供参考

你好,我是胡夕。今天我要和你分享的主题是:如何配置 Kafka 无消息丢失。
一直以来,很多人对于 Kafka 丢失消息这件事情都有着自己的理解,因而也就有着自己的解决之道。在讨论具体的应对方法之前,我觉得我们首先要明确,在 Kafka 的世界里什么才算是消息丢失,或者说 Kafka 在什么情况下能保证消息不丢失。这点非常关键,因为很多时候我们容易混淆责任的边界,如果搞不清楚事情由谁负责,自然也就不知道由谁来出解决方案了。
那 Kafka 到底在什么情况下才能保证消息不丢失呢?
一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
这句话里面有两个核心要素,我们一一来看。
第一个核心要素是“已提交的消息”。什么是已提交的消息?当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。
那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个 Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。不论哪种情况,Kafka 只对已提交的消息做持久化保证这件事情是不变的。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Kafka消息丢失一直备受关注,本文从Kafka对已提交消息的持久化保证出发,解释了Kafka在何种情况下能够保证消息不丢失。文章列举了两种常见的“消息丢失”案例:生产者程序丢失数据和消费者程序丢失数据,并提出了解决方法。此外,还提到了一种隐蔽的消息丢失场景,即增加主题分区后可能出现的问题。为了避免消息丢失,文章给出了一系列Kafka无消息丢失的配置建议,包括使用带有回调通知的发送API、设置acks为all、配置retries、设置unclean.leader.election.enable为false等。总结起来,本文帮助读者更好地理解Kafka消息丢失问题,并提供了解决方案。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心技术与实战》
新⼈⾸单¥68
立即购买
登录 后留言

全部留言(141)

  • 最新
  • 精选
  • 阳明
    总结里的的第二条ack=all和第六条的说明是不是有冲突

    作者回复: 其实不冲突。如果ISR中只有1个副本了,acks=all也就相当于acks=1了,引入min.insync.replicas的目的就是为了做一个下限的限制:不能只满足于ISR全部写入,还要保证ISR中的写入个数不少于min.insync.replicas。

    2019-06-27
    29
    101
  • 曹伟雄
    单个 Consumer 程序使用多线程来消费消息说起来容易,写成代码却异常困难,因为你很难正确地处理位移的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。 关于这个问题,老师能否提供个java代码的最佳实践? 谢谢!

    作者回复: 写过一两篇,https://www.cnblogs.com/huxi2b/p/7089854.html, 但总觉得不太完美。如果你想深入了解的话,推荐读一下Flink Kafka Connector的源码

    2019-06-30
    2
    30
  • 陈国林
    老师好,请教一个问题,ack=1的时候,min.insync.replicas还会生效吗?或者说还有必要吗,感谢 🤝

    作者回复: 不生效,min.insync.replicas只有在acks=-1时才生效

    2020-01-09
    4
    27
  • lmtoo
    最后一个问题,难道新增分区之后,producer先感知并发送数据,消费者后感知,消费者的offset会定位到新分区的最后一条消息?消费者没有提交offset怎么会从最后一条开始的呢?

    作者回复: 如果你配置了auto.offset.reset=latest就会这样的

    2019-06-27
    3
    23
  • ban
    老师, 如果我有10个副本,isr=10,然后我配置ack=all,min.insync.replicas=5, 这时候这两个参数以谁为准,生产一个消息,必须是全部副本都同步才算提交,还是只要5个副本才算提交?

    作者回复: min.insync.replicas是保证下限的。acks=all的含义是producer会等ISR中所有副本都写入成功才返回,但如果不设置min.insync.replicas = 5,默认是1,那么假设ISR中只有1个副本,只要写入这个副本成功producer也算其正常写入,因此min.insync.replicas保证的写入副本的下限。

    2019-08-03
    7
    21
  • redis
    你好胡老师,想问一下 kafka是在落地刷盘之后,同步副本成功后,才能会被消费吗?

    作者回复: 其实,有可能在落盘之前就被消费了。能否被消费不是看是否flush到磁盘,而是看leader副本的高水位是否越过了该条消息

    2019-11-21
    6
    16
  • 永光
    看了评论区回答还是不太理解,第二条ack=all与第六条min.insync.replicas 怎样协调工作的,总感觉是有冲突的。 问题是: 第二条的“已提交”和第六条的“已提交”是同一个意思吗?如果是同一个意思,那定义为什么不一样呀?

    作者回复: acks=all表示消息要写入所有ISR副本,但没要求ISR副本有多少个。min.insync.replicas做了这样的保证

    2019-06-27
    6
    14
  • 美美
    胡老师 还有一种消息重复的情况希望帮忙分析下。producer发送消息后,broker成功写入消息了,但是ack因为网络问题没有到达producer,生产者可能会重试发送这条消息。 这种问题如何避免重复消费呢

    作者回复: 使用幂等producer

    2019-11-24
    6
    12
  • yang
    老师,我针对您的提问思考并查阅了一下相关资料,说一下我的思考哈: 我们假设有且仅有一个producer只在这个consumer感知到之前,新的partition分区只写了那么几条记录,不会再有其他producer写数据到这个新的partition中。 新增partition的情况,rebalance时由于我们默认offset.auto.reset=lastest,因此在使用了这个默认配置之下,producer较consumer先感知到新的partition将数据发送到新的partition,而consumer之后才感知到这个consumer,此时由于这个新的partition的offset是第一次消费,没有已提交的offset,所以使用latest从最新的位移开始读取,也就是producer写入消息offset + 1的那个位置开始读取,因此也就读取不到数据。 latest:有提交位移就从提交位移开始处理,没有提交位移就从最新的位移开始处理。 earlist: 有提交位移就从提交位移开始处理,没有提交位移就从最早的位移开始处理。 current: 从当前的提交位移开始处理。 因此,碰到上述情况,我们可以使用seekToBegin从这个新分区的开始位置读即可。 我能想到的办法是,实现一个ConsumerRebalanceListener,重写onPartitionsAssigned方法,在这个方法里我们每次都从自己维护的数据库系统里取offset,能取到说明这个partition是之前就存在的,按已有的offset继续消费就可以了,没有取到的记录表示是新增的partition,那么就从0开始消费并且保存当前offset到数据库。不论是不是新的,都可以seek到指定的位置,只是我们有没有维护到这个partition的offset记录的区别。也就不用针对这个新分区指定offset.auto.rset=earlist了吧? 希望得到胡老师的交流哈~

    作者回复: 我觉得是很好的办法。值得一试:)

    2020-07-28
    3
    10
  • 浪迹人生
    请问消息的createTimestamp 是在生产者服务器上生成的,还是在进入不同partition 后生成的?我能不能根据这个时间戳来判断不同分区的消息原始全局顺序?谢谢🙏

    作者回复: 在生产者服务器上生成的。个人感觉不可以,毕竟每个producer服务器上的时钟不是实时同步的。事实上,用时钟来保证同步性是一件非常不靠谱的事情

    2019-11-01
    4
    10
收起评论
显示
设置
留言
99+
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部