Kafka 核心技术与实战
胡夕
Apache Kafka Committer,老虎证券技术总监
52815 人已学习
新⼈⾸单¥68
登录后,你可以任选4讲全文学习
课程目录
已完结/共 47 讲
开篇词 (1讲)
结束语 (1讲)
Kafka 核心技术与实战
15
15
1.0x
00:00/00:00
登录|注册

19 | CommitFailedException异常怎么处理?

方法4: 下游系统使用多线程来加速消费
方法3: 减少下游系统一次性消费的消息总数
方法2: 增加Consumer端允许下游系统消费一批消息的最大时长
方法1: 缩短单条消息处理时间
独立消费者程序和消费者组程序设置相同group.id值
消息处理时间超过max.poll.interval.ms
场景二
场景一
比较预防该异常的4种方法
出现时机和场景
含义
开放讨论
CommitFailedException异常
怎么处理Kafka Consumer端的CommitFailedException异常?

该思维导图由 AI 生成,仅供参考

你好,我是胡夕。今天我来跟你聊聊 CommitFailedException 异常的处理。
说起这个异常,我相信用过 Kafka Java Consumer 客户端 API 的你一定不会感到陌生。所谓 CommitFailedException,顾名思义就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常如果异常是可恢复的瞬时错误,提交位移的 API 自己就能规避它们了,因为很多提交位移的 API 方法是支持自动错误重试的,比如我们在上一期中提到的 commitSync 方法
每次和 CommitFailedException 一起出现的,还有一段非常著名的注释。为什么说它很“著名”呢?第一,我想不出在近 50 万行的 Kafka 源代码中,还有哪个异常类能有这种待遇,可以享有这么大段的注释,来阐述其异常的含义;第二,纵然有这么长的文字解释,却依然有很多人对该异常想表达的含义感到困惑。
现在,我们一起领略下这段文字的风采,看看社区对这个异常的最新解释:
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Kafka Consumer端的CommitFailedException异常处理方法 Kafka Consumer端的CommitFailedException异常是指在提交位移时出现错误或异常,通常由于消费者实例连续两次调用poll方法的时间间隔超过了预设值而导致。异常的处理方法包括优化消息处理逻辑、调整参数值,或者使用多线程加速消费。具体建议包括缩短单条消息处理时间、增加允许下游系统消费一批消息的最大时长、减少一次性消费的消息总数以及使用多线程来加速消费。需要注意消费者组和独立消费者在使用前都要指定group.id。如果出现设置相同group.id值的消费者组程序和独立消费者程序,可能会导致Kafka抛出CommitFailedException异常。总的来说,本文提供了处理Kafka Consumer端的CommitFailedException异常的实用方法,同时也指出了可能出现的特殊情况需要特别注意。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心技术与实战》
新⼈⾸单¥68
立即购买
登录 后留言

全部留言(52)

  • 最新
  • 精选
  • ban
    老师,1、请问Standalone Consumer 的独立消费者一般什么情况会用到 2、Standalone Consumer 的独立消费者 使用跟普通消费者组有什么区别的。

    作者回复: 1. 很多流处理框架的Kafka connector都没有使用consumer group,而是直接使用standalone consumer,因为group机制不好把控 2. standalone consumer没有rebalance,也没有group提供的负载均衡,你需要自己实现。其他方面(比如位移提交)和group没有太大的不同

    2019-07-16
    42
  • 胡小禾
    “当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常”。 其实逻辑是这样:消息处理的总时间超过预设的 max.poll.interval.ms 参数值 导致了 Rebalance‘; rebalance导致了 partition assgined 的consumer member变了; 导致原来的consumer 想要commit都没法commit 。(因为元信息,比如连的broker都变了). 请老师指正下

    作者回复: 嗯,差不多是这个道理:)

    2020-05-11
    2
    31
  • 胡小禾
    为啥自动commit 不会抛 CommitFailedException?

    作者回复: 自动commit失败由Kafka内部消化处理

    2020-05-11
    22
  • 德惠先生
    希望老师可以更加具体的说说,rebalance的细节,比如某个consumer发生full gc的场景,它的partition是怎么被分配走的,重连之后提交会发生什么

    作者回复: 假设full gc导致所有线程STW,从而心跳中断,导致被踢出group,Coordinator向其他存活consumer发送心跳response,通知它们开启新一轮rebalance。

    2019-07-16
    3
    20
  • ban
    老师,我想问下max.poll.interval.ms两者session.timeout.ms有什么联系,可以说0.10.1.0 之前的客户端 API,相当于session.timeout.ms代替了max.poll.interval.ms吗? 比如说session.timeout.ms是5秒,如果消息处理超过5秒,也算是超时吗?

    作者回复: 嗯,我更愿意说是max.poll.interval.ms承担了session.timeout.ms的部分功能。在没有max.poll.interval.ms和单独的心跳线程之前,如果session.timeout.ms = 5s,消息处理超过了5s,那么consumer就算是超时

    2019-07-16
    2
    11
  • windcaller
    To use this mode, instead of subscribing to the topic using subscribe, you just call assign(Collection) with the full list of partitions that you want to consume. String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1)); Once assigned, you can call poll in a loop, just as in the preceding examples to consume records. The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only change with another call to assign. Manual partition assignment does not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should usually ensure that the groupId is unique for each consumer instance. 老师 standalone mode 是上面这段内容吗?

    作者回复: 是的。使用assign的consumer就是standalone consumer

    2019-07-31
    8
  • Li Shunduo
    假如broker集群整个挂掉了,过段时间集群恢复后,consumer group会自动恢复消费吗?还是需要手动重启consumer机器?

    作者回复: consumer有重连机制

    2019-07-16
    8
  • 有时也,命也,运也,如之奈何?
    老师kafka死信该怎么去实现的? 2.0之后增加了如下配置: errors.tolerance = all errors.deadletterqueue.topic.name = ""?

    作者回复: 还不够,你需要使用Kafka Connect组件才能实现。见:https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues

    2019-07-30
    6
  • 柯察金
    老师,没有设置 group.id 话,会怎么样,系统会自动生成唯一的一个值吗

    作者回复: group.id是必须要设置的,否则会抛InvalidGroupIdException异常

    2019-11-08
    5
  • windcaller
    我没在kafka官网、stackoverflow 、google 找到任何关于 standalone kafka consumer的 例子,还望老师给个链接学习学习

    作者回复: Standalone consumer的提法并未出现在官方文档中,你可以在javadoc中看到一些:https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#manualassignment

    2019-07-30
    5
收起评论
显示
设置
留言
52
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部