19 | CommitFailedException异常怎么处理?
该思维导图由 AI 生成,仅供参考
- 深入了解
- 翻译
- 解释
- 总结
Kafka Consumer端的CommitFailedException异常处理方法 Kafka Consumer端的CommitFailedException异常是指在提交位移时出现错误或异常,通常由于消费者实例连续两次调用poll方法的时间间隔超过了预设值而导致。异常的处理方法包括优化消息处理逻辑、调整参数值,或者使用多线程加速消费。具体建议包括缩短单条消息处理时间、增加允许下游系统消费一批消息的最大时长、减少一次性消费的消息总数以及使用多线程来加速消费。需要注意消费者组和独立消费者在使用前都要指定group.id。如果出现设置相同group.id值的消费者组程序和独立消费者程序,可能会导致Kafka抛出CommitFailedException异常。总的来说,本文提供了处理Kafka Consumer端的CommitFailedException异常的实用方法,同时也指出了可能出现的特殊情况需要特别注意。
《Kafka 核心技术与实战》,新⼈⾸单¥68
全部留言(52)
- 最新
- 精选
- ban老师,1、请问Standalone Consumer 的独立消费者一般什么情况会用到 2、Standalone Consumer 的独立消费者 使用跟普通消费者组有什么区别的。
作者回复: 1. 很多流处理框架的Kafka connector都没有使用consumer group,而是直接使用standalone consumer,因为group机制不好把控 2. standalone consumer没有rebalance,也没有group提供的负载均衡,你需要自己实现。其他方面(比如位移提交)和group没有太大的不同
2019-07-1642 - 胡小禾“当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常”。 其实逻辑是这样:消息处理的总时间超过预设的 max.poll.interval.ms 参数值 导致了 Rebalance‘; rebalance导致了 partition assgined 的consumer member变了; 导致原来的consumer 想要commit都没法commit 。(因为元信息,比如连的broker都变了). 请老师指正下
作者回复: 嗯,差不多是这个道理:)
2020-05-11231 - 胡小禾为啥自动commit 不会抛 CommitFailedException?
作者回复: 自动commit失败由Kafka内部消化处理
2020-05-1122 - 德惠先生希望老师可以更加具体的说说,rebalance的细节,比如某个consumer发生full gc的场景,它的partition是怎么被分配走的,重连之后提交会发生什么
作者回复: 假设full gc导致所有线程STW,从而心跳中断,导致被踢出group,Coordinator向其他存活consumer发送心跳response,通知它们开启新一轮rebalance。
2019-07-16320 - ban老师,我想问下max.poll.interval.ms两者session.timeout.ms有什么联系,可以说0.10.1.0 之前的客户端 API,相当于session.timeout.ms代替了max.poll.interval.ms吗? 比如说session.timeout.ms是5秒,如果消息处理超过5秒,也算是超时吗?
作者回复: 嗯,我更愿意说是max.poll.interval.ms承担了session.timeout.ms的部分功能。在没有max.poll.interval.ms和单独的心跳线程之前,如果session.timeout.ms = 5s,消息处理超过了5s,那么consumer就算是超时
2019-07-16211 - windcallerTo use this mode, instead of subscribing to the topic using subscribe, you just call assign(Collection) with the full list of partitions that you want to consume. String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1)); Once assigned, you can call poll in a loop, just as in the preceding examples to consume records. The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only change with another call to assign. Manual partition assignment does not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should usually ensure that the groupId is unique for each consumer instance. 老师 standalone mode 是上面这段内容吗?
作者回复: 是的。使用assign的consumer就是standalone consumer
2019-07-318 - Li Shunduo假如broker集群整个挂掉了,过段时间集群恢复后,consumer group会自动恢复消费吗?还是需要手动重启consumer机器?
作者回复: consumer有重连机制
2019-07-168 - 有时也,命也,运也,如之奈何?老师kafka死信该怎么去实现的? 2.0之后增加了如下配置: errors.tolerance = all errors.deadletterqueue.topic.name = ""?
作者回复: 还不够,你需要使用Kafka Connect组件才能实现。见:https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues
2019-07-306 - 柯察金老师,没有设置 group.id 话,会怎么样,系统会自动生成唯一的一个值吗
作者回复: group.id是必须要设置的,否则会抛InvalidGroupIdException异常
2019-11-085 - windcaller我没在kafka官网、stackoverflow 、google 找到任何关于 standalone kafka consumer的 例子,还望老师给个链接学习学习
作者回复: Standalone consumer的提法并未出现在官方文档中,你可以在javadoc中看到一些:https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#manualassignment
2019-07-305