一、前言
大家好,我是老周,有快二十多天没有更新文章了,很多小伙伴一直在催更。先说明下最近的情况,最近项目上线很忙,没有时间写,并且组里有个同事使用 Kafka 不当,导致线上消息丢失,在修复一些线上的数据,人都麻了。事情是这样,有个 Kafka 消费者实例,部署到线上去,消费到了线上的数据,而新版本做了新的逻辑,新版本的业务逻辑与老版本的业务逻辑不兼容,直接导致消费失败,没有进行重试操作,关键还提交了 offset。直接这部分数据没有被业务处理,导致消息丢失,然后紧急修复线上数据。
刚好这些天忙完了有空,所以记录一下,同时看是否对大家能起到避免踩坑的作用,能有一些作用,那我写的也就值了。
我们下面会从以下三个方面来说一下 Kafka 消息丢失的场景以及最佳实践。
生产者丢失消息
Kafka Broker 服务端丢失消息
消费者丢失消息
二、Kafka 的三种消息语义
先说 Kafka 消息丢失的场景之前,我们先来说下 Kafka 的三种消息语义,不会还有人不知道吧?这个不应该了,消息系统基本上抽象成这以下三种消息语义了:
类型消息是否会重复消息是否会丢失优势劣势适用场景最多一次否是生产端发送消息后不用等待和处理服务端响应,消息发送速度会很快。网络或服务端有问题会造成消息的丢失消息系统吞吐量大且对消息的丢失不敏感。例如:日志收集、用户行为等场景。最少一次是否生产端发送消息后需要等待和处理服务端响应,如果失败会重试。吞吐量较低,有重复发送的消息。消息系统吞吐量一般,但是绝不能丢消息,对于重复消息不敏感。有且仅有一次否否消息不重复,消息不丢失,消息可靠性很好。吞吐量较低对消息的可靠性要求很高,同时可以容忍较小的吞吐量。"}}">
三、Kafka 消息丢失的场景
3.1 生产者丢失消息
目前 Kafka Producer 是异步发送消息的,如果你的 Producer 客户端使用了 producer.send(msg) 方法来发送消息,方法会立即返回,但此时并不能代表消息已经发送成功了。
如果消息再发送的过程中发生了网络抖动,那么消息可能没有传递到 Broker,那么消息可能会丢失。
如果发送的消息本身不符合,如大小超过了 Broker 的承受能力等。
3.2 Kafka Broker 服务端丢失消息
Leader Broker 宕机了,触发选举过程,集群选举了一个落后 Leader 太多的 Broker 作为 Leader,那么落后的那些消息就会丢失了。
Kafka 为了提升性能,使用页缓存机制,将消息写入页缓存而非直接持久化至磁盘,采用了异步批量刷盘机制,也就是说,按照一定的消息量和时间间隔去刷盘,刷盘的动作由操作系统来调度的,如果刷盘之前,Broker 宕机了,重启后在页缓存的这部分消息则会丢失。
3.3 消费者丢失消息
消费者拉取了消息,并处理了消息,但处理消息异常了导致失败,并且提交了偏移量,消费者重启后,会从之前已提交的位移的下一个位置重新开始消费,消费失败的那些消息不会再次处理,即相当于消费者丢失了消息。
消费者拉取了消息,并提交了消费位移,但是在消息处理结束之前突然发生了宕机等故障,消费者重启后,会从之前已提交的位移的下一个位置重新开始消费,之前未处理完成的消息不会再次处理,即相当于消费者丢失了消息。
四、最佳实践
4.1 生产端
不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。带有回调通知的 send 方法可以针对发送失败的消息进行重试处理。
设置 acks = all。代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
设置 retries = 3,当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过max.request.size参数配置的值时,这种方式就不可行了。
设置 retry.backoff.ms = 300,合理估算重试的时间间隔,可以避免无效的频繁重试。
它用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置 retries 和 retry.backoff.ms之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。
4.2 Broker 端
设置 unclean.leader.election.enable = false。它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
设置 replication.factor >= 3。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
设置 min.insync.replicas > 1。这控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
4.3 消费端
确保消息消费完成再提交。最好把它设置成 enable.auto.commit = false,并采用手动提交位移的方式。这对于单 Consumer 多线程处理的场景而言是至关重要的。
虽然采用手动提交位移的方式可以解决消费端消息丢失的场景,但同时会存在重复消费问题,关于重复消费问题我们下一篇再讲。
像我们上面说的那个线上问题,即使你设置了手动提交,消费异常了同时也提交了位移,还是会存在消息丢失。
Kafka 没有重试机制不支持消息重试,也没有死信队列,因此使用 Kafka 做消息队列时,需要自己实现消息重试的功能。这里我先说下大致的思路,后续有时间再分享代码出来:
创建一个 Topic 作为重试 Topic,用于接收等待重试的消息。
普通 Topic 消费者设置待重试消息的下一个重试 Topic。
从重试 Topic 获取待重试消息存储到 Redis 的 ZSet 中,并以下一次消费时间排序。
定时任务从 Redis 获取到达消费时间的消息,并把消息发送到对应的 Topic。
同一个消息重试次数过多则不再重试