11 | 无消息丢失配置怎么实现?
该思维导图由 AI 生成,仅供参考
- 深入了解
- 翻译
- 解释
- 总结
Kafka消息丢失一直备受关注,本文从Kafka对已提交消息的持久化保证出发,解释了Kafka在何种情况下能够保证消息不丢失。文章列举了两种常见的“消息丢失”案例:生产者程序丢失数据和消费者程序丢失数据,并提出了解决方法。此外,还提到了一种隐蔽的消息丢失场景,即增加主题分区后可能出现的问题。为了避免消息丢失,文章给出了一系列Kafka无消息丢失的配置建议,包括使用带有回调通知的发送API、设置acks为all、配置retries、设置unclean.leader.election.enable为false等。总结起来,本文帮助读者更好地理解Kafka消息丢失问题,并提供了解决方案。
《Kafka 核心技术与实战》,新⼈⾸单¥68
全部留言(141)
- 最新
- 精选
- 阳明总结里的的第二条ack=all和第六条的说明是不是有冲突
作者回复: 其实不冲突。如果ISR中只有1个副本了,acks=all也就相当于acks=1了,引入min.insync.replicas的目的就是为了做一个下限的限制:不能只满足于ISR全部写入,还要保证ISR中的写入个数不少于min.insync.replicas。
2019-06-2729101 - 曹伟雄单个 Consumer 程序使用多线程来消费消息说起来容易,写成代码却异常困难,因为你很难正确地处理位移的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。 关于这个问题,老师能否提供个java代码的最佳实践? 谢谢!
作者回复: 写过一两篇,https://www.cnblogs.com/huxi2b/p/7089854.html, 但总觉得不太完美。如果你想深入了解的话,推荐读一下Flink Kafka Connector的源码
2019-06-30230 - 陈国林老师好,请教一个问题,ack=1的时候,min.insync.replicas还会生效吗?或者说还有必要吗,感谢 🤝
作者回复: 不生效,min.insync.replicas只有在acks=-1时才生效
2020-01-09427 - lmtoo最后一个问题,难道新增分区之后,producer先感知并发送数据,消费者后感知,消费者的offset会定位到新分区的最后一条消息?消费者没有提交offset怎么会从最后一条开始的呢?
作者回复: 如果你配置了auto.offset.reset=latest就会这样的
2019-06-27323 - ban老师, 如果我有10个副本,isr=10,然后我配置ack=all,min.insync.replicas=5, 这时候这两个参数以谁为准,生产一个消息,必须是全部副本都同步才算提交,还是只要5个副本才算提交?
作者回复: min.insync.replicas是保证下限的。acks=all的含义是producer会等ISR中所有副本都写入成功才返回,但如果不设置min.insync.replicas = 5,默认是1,那么假设ISR中只有1个副本,只要写入这个副本成功producer也算其正常写入,因此min.insync.replicas保证的写入副本的下限。
2019-08-03721 - redis你好胡老师,想问一下 kafka是在落地刷盘之后,同步副本成功后,才能会被消费吗?
作者回复: 其实,有可能在落盘之前就被消费了。能否被消费不是看是否flush到磁盘,而是看leader副本的高水位是否越过了该条消息
2019-11-21616 - 永光看了评论区回答还是不太理解,第二条ack=all与第六条min.insync.replicas 怎样协调工作的,总感觉是有冲突的。 问题是: 第二条的“已提交”和第六条的“已提交”是同一个意思吗?如果是同一个意思,那定义为什么不一样呀?
作者回复: acks=all表示消息要写入所有ISR副本,但没要求ISR副本有多少个。min.insync.replicas做了这样的保证
2019-06-27614 - 美美胡老师 还有一种消息重复的情况希望帮忙分析下。producer发送消息后,broker成功写入消息了,但是ack因为网络问题没有到达producer,生产者可能会重试发送这条消息。 这种问题如何避免重复消费呢
作者回复: 使用幂等producer
2019-11-24612 - yang老师,我针对您的提问思考并查阅了一下相关资料,说一下我的思考哈: 我们假设有且仅有一个producer只在这个consumer感知到之前,新的partition分区只写了那么几条记录,不会再有其他producer写数据到这个新的partition中。 新增partition的情况,rebalance时由于我们默认offset.auto.reset=lastest,因此在使用了这个默认配置之下,producer较consumer先感知到新的partition将数据发送到新的partition,而consumer之后才感知到这个consumer,此时由于这个新的partition的offset是第一次消费,没有已提交的offset,所以使用latest从最新的位移开始读取,也就是producer写入消息offset + 1的那个位置开始读取,因此也就读取不到数据。 latest:有提交位移就从提交位移开始处理,没有提交位移就从最新的位移开始处理。 earlist: 有提交位移就从提交位移开始处理,没有提交位移就从最早的位移开始处理。 current: 从当前的提交位移开始处理。 因此,碰到上述情况,我们可以使用seekToBegin从这个新分区的开始位置读即可。 我能想到的办法是,实现一个ConsumerRebalanceListener,重写onPartitionsAssigned方法,在这个方法里我们每次都从自己维护的数据库系统里取offset,能取到说明这个partition是之前就存在的,按已有的offset继续消费就可以了,没有取到的记录表示是新增的partition,那么就从0开始消费并且保存当前offset到数据库。不论是不是新的,都可以seek到指定的位置,只是我们有没有维护到这个partition的offset记录的区别。也就不用针对这个新分区指定offset.auto.rset=earlist了吧? 希望得到胡老师的交流哈~
作者回复: 我觉得是很好的办法。值得一试:)
2020-07-28310 - 浪迹人生请问消息的createTimestamp 是在生产者服务器上生成的,还是在进入不同partition 后生成的?我能不能根据这个时间戳来判断不同分区的消息原始全局顺序?谢谢🙏
作者回复: 在生产者服务器上生成的。个人感觉不可以,毕竟每个producer服务器上的时钟不是实时同步的。事实上,用时钟来保证同步性是一件非常不靠谱的事情
2019-11-01410