• 小生向北
    2019-07-17
    max.poll.interval.ms是指两次poll()的最大间隔时间,kafka消费者以轮询的方式来拉取消息,并且一次拉取批量的消息(默认500条),而批量的大小是通过max.poll.records来控制的。两次poll()的实际时间取决于 单条消息的处理时间*一次拉取的消息量(500),当超过max.poll.interval.ms配置的时间Kafka server认为kafka consumer掉线了,于是就执行分区再均衡将这个consumer踢出消费者组。但是consumer又不知道服务端把自己给踢出了,下次在执行poll()拉取消息的时候(在poll()拉取消息之前有个自动提交offset的操作),就会触发该问题。 可见第2,3种方案是通过调整Kafka consumer的配置参数来缩短业务总的处理时间或者增加服务端判断时长,比较容易实现;第1种就跟业务有关了,比较难搞,有些业务可能就是要这么长的时间,很难再缩短;第4种方案就更复杂了,要把同步消息转换成异步,交给其它线程来处理,这时需要把auto.commit.enable=false,手动提交offset,并且consumer是线程不安全的,异步线程何时处理完,何时该提交,在哪提交,也是应用需要考虑的问题!希望胡老师针对第4种方案重点探讨一下!
    展开
     2
     15
  • ban
    2019-07-16
    老师,1、请问Standalone Consumer 的独立消费者一般什么情况会用到
    2、Standalone Consumer 的独立消费者 使用跟普通消费者组有什么区别的。

    作者回复: 1. 很多流处理框架的Kafka connector都没有使用consumer group,而是直接使用standalone consumer,因为group机制不好把控
    2. standalone consumer没有rebalance,也没有group提供的负载均衡,你需要自己实现。其他方面(比如位移提交)和group没有太大的不同

    
     10
  • ban
    2019-07-16
    老师,我想问下max.poll.interval.ms两者session.timeout.ms有什么联系,可以说0.10.1.0 之前的客户端 API,相当于session.timeout.ms代替了max.poll.interval.ms吗?
    比如说session.timeout.ms是5秒,如果消息处理超过5秒,也算是超时吗?

    作者回复: 嗯,我更愿意说是max.poll.interval.ms承担了session.timeout.ms的部分功能。在没有max.poll.interval.ms和单独的心跳线程之前,如果session.timeout.ms = 5s,消息处理超过了5s,那么consumer就算是超时

     1
     2
  • 注定非凡
    2019-11-05
    A :定义:所谓CommitFailedException,是指Consumer客户端在提交位移时出现了错误或异常,并且并不可恢复的严重异常。

    B :导致原因:
        (1)消费者端处理的总时间超过预设的max.poll.interval.ms参数值
        (2)出现一个Standalone Consumerd的独立消费者,配置的group.id重名冲突。

    C :解决方案:
        (1)减少单条消息处理的时间
        (2)增加Consumer端允许下游系统消费一批消息的最大时长
        (3)减少下游系统一次性消费的消息总数。
        (4)下游使用多线程加速消费
    展开
    
     1
  • 蛋炒番茄
    2019-09-06
    Synchronous auto-commit of offsets {=OffsetAndMetadata{offset=6236, leaderEpoch=null, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    我们这边有几个项目,当用原生的kafka客户端经常出现这个报错,max.poll.interval.ms是默认值300000,max.poll.records.是2000,但是实际上数据很少,每条数据处理的时间也很短。heartbeat.interval.ms是2000,session.timeout.ms是12000。为什么经常出现这个错误。
    重点是,另外几个项目用的是spring-kafka却重来没有出现过这样的报错,相关配置差不多,业务场景差不多。求指点?怎么样避免这样的问题
    展开

    作者回复: 还是要研究下你的环境中Rebalance多吗?

     2
     1
  • windcaller
    2019-07-31
    To use this mode, instead of subscribing to the topic using subscribe, you just call assign(Collection) with the full list of partitions that you want to consume.

         String topic = "foo";
         TopicPartition partition0 = new TopicPartition(topic, 0);
         TopicPartition partition1 = new TopicPartition(topic, 1);
         consumer.assign(Arrays.asList(partition0, partition1));
     
    Once assigned, you can call poll in a loop, just as in the preceding examples to consume records. The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only change with another call to assign. Manual partition assignment does not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should usually ensure that the groupId is unique for each consumer instance.

    老师 standalone mode 是上面这段内容吗?
    展开

    作者回复: 是的。使用assign的consumer就是standalone consumer

    
     1
  • 德惠先生
    2019-07-16
    希望老师可以更加具体的说说,rebalance的细节,比如某个consumer发生full gc的场景,它的partition是怎么被分配走的,重连之后提交会发生什么

    作者回复: 假设full gc导致所有线程STW,从而心跳中断,导致被踢出group,Coordinator向其他存活consumer发送心跳response,通知它们开启新一轮rebalance。

    
     1
  • 巧克力黑
    2020-02-07
    老师你好,我在使用spark streaming消费kafka消息会遇到某个batch消费耗时长。请老师帮忙分析一下这个是什么原因,应该怎么去优化。谢谢老师。日志大体如下:
    20/02/06 21:13:18 INFO KafkaRDD: Computing topic test_topic, partition 0 offsets 470985316 -> 470988502
    20/02/06 21:13:18 DEBUG NetworkClient: Disconnecting from node 1 due to request timeout.
    20/02/06 21:13:18 DEBUG ConsumerNetworkClient: Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@bf99562, request=RequestSend(header={api_key=1,api_version=2,correlation_id=32,client_id=consumer-2}, body={replica_id=-1,max_wait_time=3000,min_bytes=1,topics=[{topic=test_topic,partitions=[{partition=0,fetch_offset=470986525,max_bytes=10485760}]}]}), createdTimeMs=1580994741183, sendTimeMs=1580994741183) with correlation id 32 due to node 1 being disconnected
    20/02/06 21:13:18 DEBUG Fetcher: Fetch failed
    org.apache.kafka.common.errors.DisconnectException
    20/02/06 21:13:18 DEBUG NetworkClient: Initialize connection to node 2 for sending metadata request
    20/02/06 21:13:18 DEBUG NetworkClient: Initiating connection to node 2 at host1:9092.
    展开
    
    
  • Hale
    2019-12-25
    2019-12-23 16:43:56,367 consumer.py[line:792] WARNING Auto offset commit failed for group aff74e1e254e11ea9f47b827eb16d0ae: NodeNotReadyError: coordinator-1
    2019-12-23 16:43:56,471 client_async.py[line:695] WARNING <BrokerConnection node_id=coordinator-1 host=39.104.137.50:9093 <connected> [IPv4 ('39.104.137.50', 9093)]> timed out after 305000 ms. Closing connection.
    2019-12-23 16:43:56,473 client_async.py[line:327] WARNING Node coordinator-1 connection failed -- refreshing metadata
    2019-12-23 16:43:56,478 base.py[line:493] ERROR Error sending HeartbeatRequest_v1 to node coordinator-1 [[Error 7] RequestTimedOutError: Request timed out after 305000 ms]
    2019-12-23 16:43:56,479 base.py[line:714] WARNING Marking the coordinator dead (node coordinator-1) for group aff74e1e254e11ea9f47b827eb16d0ae: [Error 7] RequestTimedOutError: Request timed out after 305000 ms.
    出现上面的报错consumer就卡主了,不能接收数据了,是提交失败吧
    展开

    作者回复: 看着像网络连接失败导致的问题。网络没问题吗?是持续性地报警吗?

    
    
  • Geek_zy
    2019-11-21
    老师有两个疑问:

    1.相同的GroupId的Consumer 不应该就是同一个Consumer Group 组下的吗,或者有其他的区分条件,比如订阅的Topic不同?

    2.如果这个standalone Consumer 再给他添加一个同组的standalone Conusmer,会发生什么?

    作者回复: 1. 设置相同group.id的consumer就是属于同一个group,你说的是对的:)
    2. 会出现位移提交失败的问题。严格来说这其实是一个问题,但是如果反馈到社区,社区会认为这不是标准用法

    
    
  • pain
    2019-11-08
    老师,没有设置 group.id 话,会怎么样,系统会自动生成唯一的一个值吗

    作者回复: group.id是必须要设置的,否则会抛InvalidGroupIdException异常

    
    
  • godtrue
    2019-08-17
    CommitFailedException 表示,Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。
    如果框架在设计的时候,针对异常情况不但抛出异常信息,还给出相应的解决方案,那就更人性化啦!
    多谢分享。
    
    
  • 有时也,命也,运也,...
    2019-07-30
    老师kafka死信该怎么去实现的?
    2.0之后增加了如下配置:
    errors.tolerance = all
    errors.deadletterqueue.topic.name = ""?

    作者回复: 还不够,你需要使用Kafka Connect组件才能实现。见:https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues

    
    
  • windcaller
    2019-07-30
    我没在kafka官网、stackoverflow 、google 找到任何关于 standalone kafka consumer的 例子,还望老师给个链接学习学习

    作者回复: Standalone consumer的提法并未出现在官方文档中,你可以在javadoc中看到一些:https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#manualassignment

    
    
  • 老鱼
    2019-07-26
    胡老师,手动提交时,当前后两次poll时间超过期望的max.poll.interval.ms时,会触发Rebalance。 那么假如是自动提交时,会触发Rebalance吗?
    假如,手动提交场景,consumer消费端处理业务时间过长(特殊case导致的),发生了Rebalance,那么该consumer实例被踢出了,那么它永远‘死掉’了吗,还是会再次通过heartbeat检测让它复活?

    作者回复: 不会

    
    
  • 曹伟雄
    2019-07-19
    老师,请教2个问题,谢谢!
    我想问下0.10.1.0之后的session.timeout.ms还有什么作用呢?
    standalone consumer和group consumer在配置上如何区分?

    作者回复: 用于侦测会话超时。standalone consumer和group的区分体现在API上

    
    
  • 郭刚
    2019-07-17
    [2019-07-17 18:53:36,230] ERROR [ReplicaManager broker=2] Error processing append operation on partition __consumer_offsets-49 (kafka.server.ReplicaManager)
    org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(2) is insufficient to satisfy the min.isr requirement of 2 for partition __consumer_offsets-49
    把min.insync.replicas = 2改成1,消费者就可以运行了,这种flower失效的情况怎么处理呢?
    展开
     1
    
  • 郭刚
    2019-07-17
    老师,kafka报错,哪个论坛比较火?
    
    
  • juan
    2019-07-17
    请问如何计算单条消息处理的时间, 比如收到一条消息之后要调用A,B,C,D,E 五个方法处理,其中处理完B,E之后将结果发给别的kafka,处理时间是处理完A,B时间还是处理完A,B,C,D,E 的总时间?

    作者回复: 这取决于你对如何才算处理完一条消息的定义。另外从Kafka中拿到消息后剩下的事情就完全由你负责了,因此如何计算处理时间应该是你说了算的:)

    
    
  • 电光火石
    2019-07-16
    我在线上也遇到这个问题,想问一下老师:在新的consumer加入,发生repartition的时候,是否也会抱这个错,谢谢了!
    
    
我们在线,来聊聊吧