• yhh
    2019-07-18
    希望老师能讲讲方案2下线程池怎么管理和提交位移!!
     8
     40
  • QQ怪
    2019-07-18
    老师能否加餐spring-kafka相关知识
    
     18
  • calljson
    2019-07-18
    希望老师能对比spring-kafka源码,关于多线程管理consumer谢谢
    
     12
  • 小生向北
    2019-07-18
    能够用多线程解决的就不要用多进程,毕竟资源有限。方案2的讲解还是太浅了,同希望老师能针对方案2详细讲解一下!方案2里面在异步线程里提交offset,每个线程自己提交自己的,如果中间有offset提交失败,后面的offset又提交成功了咋办呢?而且每个线程都自己提交consumer.commit()就意味着要在多个线程里面使用consumer,如文中所说,这种情况是要报CME错误的,那究竟该如何正确的提交呢,有没有最佳实践呀?
    
     10
  • james
    2019-07-18
    方案2最核心的如何commit老师没有说,难道只能启用自动提交吗?我觉得可以用Cyclicbarrier来实现线程池执行完毕后,由consumer来commit,不用countdownlatch因为它只能记录一次,而cb可以反复用,或者用forkjoin方式,总之要等待多线程都处理完才能commit,风险就是某个消息处理太慢回导致整体都不能commit,而触发rebalance以及重复消费,而重复消费我用布隆过滤器来解决
    
     6
  • 千屿
    2019-07-20
    最近用spring cloud做了一个kafka可靠消息微服务组件,有兴趣的朋友可以看看 ,消费端是多线程模型,消费线程和业务执行分离,使用了mongodb(分片+副本集) 存储消息发送的链路,对发送失败的消息做了补偿机制。https://gitee.com/huacke/mq-kafka,有问题可以联系我。
    
     4
  • 来碗绿豆汤
    2019-07-22
    对于第二种方案,可以添加一个共享的队列,消费线程消费完一个记录,就写入队列,然后主线程可以读取这个队列,然后依次提交小号的offset
    
     3
  • KEEPUP
    2019-07-19
    希望老师讲一下sparkstreaming 消费kafka 消息的情况
    
     3
  • 注定非凡
    2019-11-05
    A :Kafka Java Consumer是单线程设计原理。
        (1)在Kafka从0.10.1.0版本开始,KafkaConsumer就变成双线程设计即:用户主线程和心跳线程。
        (2)主线程是指:启动Consumer应用程序main方法的那个线程,而新引入的心跳线程只负责定期给对应的Broker机器发送心跳请求,以标识消费者应用的存活性。

        (2)老版本中有Scala Consumer的API,是多线程架构的,每个Consumer实例在内部为所有订阅的主题分区创建对应消息获取线程,也称为Fetcher线程。老版本Consumer同时也是阻塞式的(blocking),Consumer实例启动后,内部会创建很多阻塞式的消息迭代器。
    (3)在很多场景下,Consumer端是有非阻塞需求的,如流处理应用中执行过滤(filter),连接(join),分组(group by)等操作时就不能是阻塞式的。
        所以,新版本Consumer设计了单线程+轮询的机制。这种设计能够较好的实现非阻塞式的消息获取。

    B :单线程设计优点
        (1)单线程可以较好的实现如在流处理应用中执行过滤(filter),连接(join),分组(group by)等操作。
        (2)单线程能够简化Consumer端设计。Consumer端获取到消息后,处理消息的逻辑是否采用多线程,由自己决定。
        (3)单线程设计在很多种编程中都比较易于实现,编译社区移植。

    C :多线程方案
        (1)KafkaConsumer类不是线程安全的(thread-safe)。所有的网络I/O处理都是发生在用户主线程中,所以不能在多线程中共享同一个KafkaConsumer实例,否则程序会抛ConcurrentModificationException异常。
        
        (2)方案一:
            消费者程序启动多个线程,每个线程维护专属的KafkaConsumer实例,负责完整的消息获取,消息处理流程。
            优点:
                方便实现,速度快,无线程间交互开销,易于维护分区的消息顺序
            缺点:
                占用更多的系统资源,线程数受限于主题分区数,扩展性差。线程自己处理消息容易超时,进而引发Rebalance。
        
        (3)方案二:
            消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是多个,每个线程维护专属的KafkaConsumer实例,处理消息则交由特定的线程池来做。
            优点:
                可独立扩展消费获取线程数和worker线程数,伸缩性好
            缺点:
                难以维护分区内的消息消费顺序,处理链路拉长,不易于位移提交管理,实现难度高。
    展开
    
     2
  • 开水
    2019-07-19
    方案一用在需要精确控制消费数量的方案里,比如访问量这种日志什么的。
    方案二可以把后面处理通过数据库key做成幂等操作,用在实时处理需要随时增减消费能力的业务上面。
    
     2
  • Xiao
    2019-07-18
    胡老师,第二种方案我觉得还有个问题就是如果是自动提交,那就会容易出现消息丢失,因为异步消费消息,如果worker线程有异常,主线程捕获不到异常,就会造成消息丢失,这个是不是还需要做补偿机制;如果是手动提交,那offer set也会有可能丢失,消息重复消费,消息重复还好处理,做幂等就行。
    
     2
  • 玉剑冰锋
    2019-07-18
    Kafka重启时间比较长,每次重启一台差不多四五十分钟,日志保存12个小时,每台数据量差不多几个T,想请教一下老师有什么可以优化的参数吗?

    作者回复: 有可能是要加载的日志段数据太多导致的,可以增加num.recovery.threads.per.data.dir的值

     3
     2
  • 高志强
    2019-12-25
    老师我现在用Php多进程消费,一个topic 130个分区,我是不是该启动130个进程去消费,目前启动64个进程,但消费能力上不去,消息积压量有几十万了,怎么才能提高消费能力呢

    作者回复: 可以考虑单个进程下再开多线程的方式来增强消费能力,不必一味考虑多进程的方案

     2
     1
  • 高志强
    2019-12-13
    在发布订阅模式下,我想使用php多进程消费方式,groupid相同,topic也不变,kafkaConsumer会是多个,那么我需要指定consumer对应消费的分区么,会不会出现重新消费的情况,该如何避免,kafka在这种情况下会自动分配分区么,希望老师给予解答

    作者回复: PHP客户端的设计方式不太了解。如果是Java Consumer API的话,你不需要指定分区,只需要要消费的topic就行。如果位移管理恰当的话,通常不会发生大面积的消息重新消费。

     1
     1
  • 寂静欢喜
    2019-11-27
    老师 想问下 心跳线程是和主线程分开的,那么 第一种方案中,主线程阻塞,又怎么会导致超时Rebalance呢?

    作者回复: 应该这么说,心跳线程会定期地检查前端线程是否卡住了,一旦发现卡住了,便会发起主动离组。

    
     1
  • 丘壑
    2019-07-19
    对于老师说的第二种多线程处理的方案,我本人觉得在消息量很大的系统中比较常用,只是在使用的时候很担心出现异常后的数据问题,数据应该怎么找回,这块对消费异常设计难度较大,请老师可以考虑分享下这块的手动提交位移及异常处理的经验
    
     1
  • Hale
    2019-12-24
    如果只有一个broker,一个consumer 一个分区,上面的consumer 组成一个组,一个topic 当consumer 卡住时,协调器会将消费者踢出消费组,进行重新分区分配,但只有一个消费者,那消费者就不能接受到数据了,怎样实现消费者重连

    作者回复: 消费者会自动重连的,如果重连失败,说明网络有问题

    
    
  • YiPwInGhOnG
    2019-12-16
    老师,想请教消费者的一个问题...
    我们的业务场景是这样的:建立一个服务接收 http 请求、根据传入的参数(topic)从 Kafka 指定 topic 拉取一定数量的消息后返回。但 Kafka 的消费者是要保持轮询的,不然就只能每次建立消费者、获取分区/加入群组、请求数据后关闭消费者(但这样效率很低)。
    请问有什么比较好又可靠的实现方法吗?谢谢~

    作者回复: 持续消费一部分消息缓存到本地,http接口从本地读取消息。如果长时间不拉取,consumer先pause消费

    
    
  • 飞翔
    2019-12-04
    老师 想问一个方案1 谁负责分配线程给每个partition呀 我看您的code 只是没产生一个线程去消费一个主题 如果我有4个parition 那么我产生4个线程来消费这个主题,他会自动均匀分配嘛

    作者回复: “谁负责分配线程给每个partition呀” --- leader consumer负责分配。

    会均匀分配的,这是kafka consumer代码保证的

    
    
  • 胡家鹏
    2019-10-23
    老师及各位朋友好,问下两个问题1.上面的代码怎么没有消费位移提交,难道是设置的自动提交位移吗?2.consumer.wakeup什么时候使用,来解决什么问题呢?

    作者回复: 1. 您指哪段代码?另外如果设置了enable.auto.commit=true或没有显式设置enable.auto.commit=false,就是自动提交
    2. wakeup主要用于唤醒polling中的consumer实例。如果你使用了多线程(即把KafkaConsumer实例用于单独的线程),你需要有能力在另一个线程中“中断”KafkaConsumer所在实例的执行。wakeup就是用这个的

    
    
我们在线,来聊聊吧