Kafka核心技术与实战
胡夕
人人贷计算平台部总监,Apache Kafka Contributor
立即订阅
8408 人已学习
课程目录
已完结 46 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 为什么要学习Kafka?
免费
Kafka入门 (5讲)
01 | 消息引擎系统ABC
02 | 一篇文章带你快速搞定Kafka术语
03 | Kafka只是消息引擎系统吗?
04 | 我应该选择哪种Kafka?
05 | 聊聊Kafka的版本号
Kafka的基本使用 (3讲)
06 | Kafka线上集群部署方案怎么做?
07 | 最最最重要的集群参数配置(上)
08 | 最最最重要的集群参数配置(下)
客户端实践及原理剖析 (14讲)
09 | 生产者消息分区机制原理剖析
10 | 生产者压缩算法面面观
11 | 无消息丢失配置怎么实现?
12 | 客户端都有哪些不常见但是很高级的功能?
13 | Java生产者是如何管理TCP连接的?
14 | 幂等生产者和事务生产者是一回事吗?
15 | 消费者组到底是什么?
16 | 揭开神秘的“位移主题”面纱
17 | 消费者组重平衡能避免吗?
18 | Kafka中位移提交那些事儿
19 | CommitFailedException异常怎么处理?
20 | 多线程开发消费者实例
21 | Java 消费者是如何管理TCP连接的?
22 | 消费者组消费进度监控都怎么实现?
深入Kafka内核 (5讲)
23 | Kafka副本机制详解
24 | 请求是怎么被处理的?
25 | 消费者组重平衡全流程解析
26 | 你一定不能错过的Kafka控制器
27 | 关于高水位和Leader Epoch的讨论
管理与监控 (12讲)
28 | 主题管理知多少?
29 | Kafka动态配置了解下?
30 | 怎么重设消费者组位移?
31 | 常见工具脚本大汇总
32 | KafkaAdminClient:Kafka的运维利器
33 | Kafka认证机制用哪家?
34 | 云环境下的授权该怎么做?
35 | 跨集群备份解决方案MirrorMaker
36 | 你应该怎么监控Kafka?
37 | 主流的Kafka监控框架
38 | 调优Kafka,你做到了吗?
39 | 从0搭建基于Kafka的企业级实时日志流处理平台
高级Kafka应用之流处理 (3讲)
40 | Kafka Streams与其他流处理平台的差异在哪里?
41 | Kafka Streams DSL开发实例
42 | Kafka Streams在金融领域的应用
结束语 (1讲)
结束语 | 以梦为马,莫负韶华!
特别放送 (2讲)
加餐 | 搭建开发环境、阅读源码方法、经典学习资料大揭秘
用户故事 | 黄云:行百里者半九十
Kafka核心技术与实战
登录|注册

19 | CommitFailedException异常怎么处理?

胡夕 2019-07-16
你好,我是胡夕。今天我来跟你聊聊 CommitFailedException 异常的处理。
说起这个异常,我相信用过 Kafka Java Consumer 客户端 API 的你一定不会感到陌生。所谓 CommitFailedException,顾名思义就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。如果异常是可恢复的瞬时错误,提交位移的 API 自己就能规避它们了,因为很多提交位移的 API 方法是支持自动错误重试的,比如我们在上一期中提到的 commitSync 方法
每次和 CommitFailedException 一起出现的,还有一段非常著名的注释。为什么说它很“著名”呢?第一,我想不出在近 50 万行的 Kafka 源代码中,还有哪个异常类能有这种待遇,可以享有这么大段的注释,来阐述其异常的含义;第二,纵然有这么长的文字解释,却依然有很多人对该异常想表达的含义感到困惑。
现在,我们一起领略下这段文字的风采,看看社区对这个异常的最新解释:
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Kafka核心技术与实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(26)

  • 小生向北
    max.poll.interval.ms是指两次poll()的最大间隔时间,kafka消费者以轮询的方式来拉取消息,并且一次拉取批量的消息(默认500条),而批量的大小是通过max.poll.records来控制的。两次poll()的实际时间取决于 单条消息的处理时间*一次拉取的消息量(500),当超过max.poll.interval.ms配置的时间Kafka server认为kafka consumer掉线了,于是就执行分区再均衡将这个consumer踢出消费者组。但是consumer又不知道服务端把自己给踢出了,下次在执行poll()拉取消息的时候(在poll()拉取消息之前有个自动提交offset的操作),就会触发该问题。 可见第2,3种方案是通过调整Kafka consumer的配置参数来缩短业务总的处理时间或者增加服务端判断时长,比较容易实现;第1种就跟业务有关了,比较难搞,有些业务可能就是要这么长的时间,很难再缩短;第4种方案就更复杂了,要把同步消息转换成异步,交给其它线程来处理,这时需要把auto.commit.enable=false,手动提交offset,并且consumer是线程不安全的,异步线程何时处理完,何时该提交,在哪提交,也是应用需要考虑的问题!希望胡老师针对第4种方案重点探讨一下!
    2019-07-17
    2
    14
  • ban
    老师,1、请问Standalone Consumer 的独立消费者一般什么情况会用到
    2、Standalone Consumer 的独立消费者 使用跟普通消费者组有什么区别的。

    作者回复: 1. 很多流处理框架的Kafka connector都没有使用consumer group,而是直接使用standalone consumer,因为group机制不好把控
    2. standalone consumer没有rebalance,也没有group提供的负载均衡,你需要自己实现。其他方面(比如位移提交)和group没有太大的不同

    2019-07-16
    6
  • 注定非凡
    A :定义:所谓CommitFailedException,是指Consumer客户端在提交位移时出现了错误或异常,并且并不可恢复的严重异常。

    B :导致原因:
    (1)消费者端处理的总时间超过预设的max.poll.interval.ms参数值
    (2)出现一个Standalone Consumerd的独立消费者,配置的group.id重名冲突。

    C :解决方案:
    (1)减少单条消息处理的时间
    (2)增加Consumer端允许下游系统消费一批消息的最大时长
    (3)减少下游系统一次性消费的消息总数。
    (4)下游使用多线程加速消费
    2019-11-05
    1
  • 蛋炒番茄
    Synchronous auto-commit of offsets {=OffsetAndMetadata{offset=6236, leaderEpoch=null, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    我们这边有几个项目,当用原生的kafka客户端经常出现这个报错,max.poll.interval.ms是默认值300000,max.poll.records.是2000,但是实际上数据很少,每条数据处理的时间也很短。heartbeat.interval.ms是2000,session.timeout.ms是12000。为什么经常出现这个错误。
    重点是,另外几个项目用的是spring-kafka却重来没有出现过这样的报错,相关配置差不多,业务场景差不多。求指点?怎么样避免这样的问题

    作者回复: 还是要研究下你的环境中Rebalance多吗?

    2019-09-06
    2
    1
  • ban
    老师,我想问下max.poll.interval.ms两者session.timeout.ms有什么联系,可以说0.10.1.0 之前的客户端 API,相当于session.timeout.ms代替了max.poll.interval.ms吗?
    比如说session.timeout.ms是5秒,如果消息处理超过5秒,也算是超时吗?

    作者回复: 嗯,我更愿意说是max.poll.interval.ms承担了session.timeout.ms的部分功能。在没有max.poll.interval.ms和单独的心跳线程之前,如果session.timeout.ms = 5s,消息处理超过了5s,那么consumer就算是超时

    2019-07-16
    1
    1
  • Geek_zy
    老师有两个疑问:

    1.相同的GroupId的Consumer 不应该就是同一个Consumer Group 组下的吗,或者有其他的区分条件,比如订阅的Topic不同?

    2.如果这个standalone Consumer 再给他添加一个同组的standalone Conusmer,会发生什么?

    作者回复: 1. 设置相同group.id的consumer就是属于同一个group,你说的是对的:)
    2. 会出现位移提交失败的问题。严格来说这其实是一个问题,但是如果反馈到社区,社区会认为这不是标准用法

    2019-11-21
  • pain
    老师,没有设置 group.id 话,会怎么样,系统会自动生成唯一的一个值吗

    作者回复: group.id是必须要设置的,否则会抛InvalidGroupIdException异常

    2019-11-08
  • godtrue
    CommitFailedException 表示,Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。
    如果框架在设计的时候,针对异常情况不但抛出异常信息,还给出相应的解决方案,那就更人性化啦!
    多谢分享。
    2019-08-17
  • windcaller
    To use this mode, instead of subscribing to the topic using subscribe, you just call assign(Collection) with the full list of partitions that you want to consume.

         String topic = "foo";
         TopicPartition partition0 = new TopicPartition(topic, 0);
         TopicPartition partition1 = new TopicPartition(topic, 1);
         consumer.assign(Arrays.asList(partition0, partition1));
     
    Once assigned, you can call poll in a loop, just as in the preceding examples to consume records. The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only change with another call to assign. Manual partition assignment does not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should usually ensure that the groupId is unique for each consumer instance.

    老师 standalone mode 是上面这段内容吗?

    作者回复: 是的。使用assign的consumer就是standalone consumer

    2019-07-31
  • 有时也,命也,运也,如之奈何?
    老师kafka死信该怎么去实现的?
    2.0之后增加了如下配置:
    errors.tolerance = all
    errors.deadletterqueue.topic.name = ""?

    作者回复: 还不够,你需要使用Kafka Connect组件才能实现。见:https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues

    2019-07-30
  • windcaller
    我没在kafka官网、stackoverflow 、google 找到任何关于 standalone kafka consumer的 例子,还望老师给个链接学习学习

    作者回复: Standalone consumer的提法并未出现在官方文档中,你可以在javadoc中看到一些:https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#manualassignment

    2019-07-30
  • 老鱼
    胡老师,手动提交时,当前后两次poll时间超过期望的max.poll.interval.ms时,会触发Rebalance。 那么假如是自动提交时,会触发Rebalance吗?
    假如,手动提交场景,consumer消费端处理业务时间过长(特殊case导致的),发生了Rebalance,那么该consumer实例被踢出了,那么它永远‘死掉’了吗,还是会再次通过heartbeat检测让它复活?

    作者回复: 不会

    2019-07-26
  • 曹伟雄
    老师,请教2个问题,谢谢!
    我想问下0.10.1.0之后的session.timeout.ms还有什么作用呢?
    standalone consumer和group consumer在配置上如何区分?

    作者回复: 用于侦测会话超时。standalone consumer和group的区分体现在API上

    2019-07-19
  • 郭刚
    [2019-07-17 18:53:36,230] ERROR [ReplicaManager broker=2] Error processing append operation on partition __consumer_offsets-49 (kafka.server.ReplicaManager)
    org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(2) is insufficient to satisfy the min.isr requirement of 2 for partition __consumer_offsets-49
    把min.insync.replicas = 2改成1,消费者就可以运行了,这种flower失效的情况怎么处理呢?
    2019-07-17
    1
  • 郭刚
    老师,kafka报错,哪个论坛比较火?
    2019-07-17
  • juan
    请问如何计算单条消息处理的时间, 比如收到一条消息之后要调用A,B,C,D,E 五个方法处理,其中处理完B,E之后将结果发给别的kafka,处理时间是处理完A,B时间还是处理完A,B,C,D,E 的总时间?

    作者回复: 这取决于你对如何才算处理完一条消息的定义。另外从Kafka中拿到消息后剩下的事情就完全由你负责了,因此如何计算处理时间应该是你说了算的:)

    2019-07-17
  • 电光火石
    我在线上也遇到这个问题,想问一下老师:在新的consumer加入,发生repartition的时候,是否也会抱这个错,谢谢了!
    2019-07-16
  • 曾轼麟
    老师我这边遇到了一个奇怪的情况,kafka生成者发送消息能创建topic,但是消息怎么都发不上去broker。并且在kafka-logs底下有一个和刚刚那条消息key值一样的文件夹,

    并且打印出如下的日志:
    [2019-07-16 22:34:16,380] INFO [Log partition=23bb7ffd-4aa5-42e2-9d84-c90f4566c15b-2, dir=/tmp/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
    [2019-07-16 22:34:16,381] INFO [Log partition=23bb7ffd-4aa5-42e2-9d84-c90f4566c15b-2, dir=/tmp/kafka-logs] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 3 ms (kafka.log.Log)
    [2019-07-16 22:34:16,383] INFO Created log for partition 23bb7ffd-4aa5-42e2-9d84-c90f4566c15b-2 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.0-IV1, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 40

    作者回复: 这都是info级别的log,没有看出有什么问题。最好确认去leader副本所在的broker去看日志

    2019-07-16
  • Dovelol
    老师好,我上一个问题的具体描述是这样的,今天讲CommitFailedException的例子是调用consumer.commitSync();手动提交offset,确实当消息处理的总时间超过预设的max.poll.interval.ms时会报这个异常,但是如果是自动提交offset的情况下,也就是把enable.auto.commit=true,然后删除consumer.commitSync();代码,其它代码不变,也是max.poll.interval.ms=5s,然后循环中sleep(6s),发现不会报异常并且会一直重复消费,想问下这是什么原因呢?

    作者回复: 嗯,是的。这个异常只是在手动提交时抛出的。

    2019-07-16
    1
  • Li Shunduo
    假如broker集群整个挂掉了,过段时间集群恢复后,consumer group会自动恢复消费吗?还是需要手动重启consumer机器?

    作者回复: consumer有重连机制

    2019-07-16
收起评论
26
返回
顶部