• nightmare
    2019-07-13
    老师手动提交的设计很优美,先用异步提交不影响程序的性能,再用consumer关闭时同步提交来确保位移一定提交成功。这里我有个疑问,比如我程序运行期间有多次异步提交没有成功,比如101的offset和201的offset没有提交成功,程序关闭的时候501的offset提交成功了,是不是就代表前面500条我还是消费成功了,只要最新的位移提交成功,就代表之前的消息都提交成功了?第二点 就是批量提交哪里,如果一个消费者晓得多个分区的消息,封装在一个Map对象里面消费者也能正确的对多个分区的位移都保证正确的提交吗?
     2
     20
  • 无菇朋友
    2019-07-21
    老师您好,有一个疑问,为什么poll之前的提交和按频率自动提交是一个时机,假如频率是5s提交一次,某两次poll之间的间隔是6s,这时候是怎么处理提交的?忘老师解答下,着实没想通这个地方

    作者回复: 嗯, 严格来说。提交频率指的是最小的提交间隔。比如设置5s,Kafka保证至少等待5s才会自动提交一次。

    
     6
  • aof
    2019-07-13
    先说一下,课后思考,解决的办法应该就是,将消息处理和位移提交放在一个事务里面,要么都成功,要么都失败。

    老师文章里面的举的一个例子没有很明白,能不能再解释一下。就是那个位移提交后Rebalance的例子。
     3
     4
  • Tony Du
    2019-07-13
    老师,您好~ 看了今天的教程我有两个问题想请教下,希望老师能赐教。
    1. 从文中的代码看上去,使用commitAsync提供offset,不需要等待异步执行结果再次poll就能拿到下一批消息,是那么这个offset的最新值是不是理解为其实是在consumer client的内存中管理的(因为实际commitAsync如果失败的话,offset不会写入broker中)?如果是这样的话,如果在执行到commitSync之前,consumer client进程重启了,就有可能会消费到因commitAsync失败产生的重复消息。
    2. 教程中手动提交100条消息的代码是一个同步处理的代码,我在实际工程中遇到的问题是,为了提高消息处理效率,coumser poll到一批消息后会提交到一个thread pool中处理,这种情况下,请教下怎样做到控制offset分批提交?
    谢谢
    展开
    
     4
  • 水天一色
    2019-12-07
    消费者提了异步 commit 实际还没更新完offset,消费者再不断地poll,其实会有重复消费的情况吧?

    作者回复: 只要consumer没有重启,不会发生重复消费。因为在运行过程中consumer会记录已获取的消息位移

    
     3
  • lmtoo
    2019-07-13
    对于手动同步和异步提交结合的场景,如果poll出来的消息是500条,而业务处理200条的时候,业务抛异常了,后续消息根本就没有被遍历过,finally里手动同步提交的是201还是000,还是501?

    作者回复: 如果调用没有参数的commit,那么提交的是500

     4
     2
  • 惜昔
    2019-11-08
    老师 手动提交位移的时候 如果一个某个消费者组的一个消费者实例从offset下标4开始消费的 但是消费者的消费逻辑比较重量级,处理的时间比较长,还没有提交。这时候另外一个消费者组的一个消费者实例来相同的分区来拿消息,会不会拿的是上一个消费者已经拿过的消费,从而重复消费?

    作者回复: 有可能的~

     2
     1
  • 注定非凡
    2019-11-05
    1 概念区分
        A :Consumer端的位移概念和消息分区的位移概念不是一回事。
        B :Consumer的消费位移,记录的是Consumer要消费的下一条消息的位移。

    2 提交位移
        A :Consumer 要向Kafka汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。
        B :Consumer需要为分配给它的每个分区提交各自的位移数据。

    3提交位移的作用
        A :提交位移主要是为了表征Consumer的消费进度,这样当Consumer发生故障重启后,能够从kafka中读取之前提交的位移值,从相应的位置继续消费,避免从头在消费一遍。

    4 位移提交的特点
        A :位移提交的语义保障是由你来负责的,Kafka只会“无脑”地接受你提交的位移。位移提交错误,就会消息消费错误。

    5 位移提交方式
        A :从用户的角度讲,位移提交分为自动提交和手动提交;从Consumer端的角度而言,位移提交分为同步提交和异步提交。

        B :自动提交:由Kafka consumer在后台默默的执行提交位移,用户不用管。开启简单,使用方便,但可能会出现重复消费。

        C :手动提交:好处在更加灵活,完全能够把控位移提交的时机和频率。
            (1)同步提交:在调用commitSync()时,Consumer程序会处于阻塞状态,直到远端Broker返回提交结果,这个状态才会结束。对TPS影响显著
            (2)异步提交:在调用commitAsync()时,会立即给响应,但是出问题了它不会自动重试。
            (3)手动提交最好是同步和异步结合使用,正常用异步提交,如果异步提交失败,用同步提交方式补偿提交。
        
        D :批次提交:对于一次要处理很多消费的Consumer而言,将一个大事务分割成若干个小事务分别提交。这可以有效减少错误恢复的时间,避免大批量的消息重新消费。
            (1)使用commitSync(Map<TopicPartition,Offset>)和commitAsync(Map<TopicPartition,OffsetAndMetadata>)。
    展开
    
     1
  • Algoric
    2019-09-25
    自动提交一定不会消息丢失吗,如果每次poll的数据过多,在提交时间内没有处理完,这时达到提交时间,那么Kafka还是重复提交上次poll的最大位移吗,还是讲本次poll的消息最大位移提交?

    作者回复: hmmm... 其实我一直觉得提交间隔这个参数的命名有些问题。它实际保证的是位移至少要隔一段时间才会提交,如果你是单线程处理消息,那么只有处理完消息后才会提交位移,可能远比你设置的间隔长。

     2
     1
  • 谢特
    2019-09-10
    不漏不重复很难做到,我现在都不知道怎么弄,读偏移量容易,提交太难
    
     1
  • 双叶
    2019-07-14
    我理解中的 enable.auto.commit 跟文章中说的不太一样,我理解设置为 true 的时候提供的是至多一次的语义,而不是至少一次的语义。

    我不用 java,看的是 librdkafka 的文档。enable.auto.commit 的文档说明是:Automatically and periodically commit offsets in the background. 也就是说他会定期提交 offset,但是这里没有明说提交的 offset 是什么时候记录的,我的理解是记录是由 enable.auto.offset.store 决定的。

    enable.auto.offset.store 文档说明是:Automatically store offset of last message provided to application. The offset store is an in-memory store of the next offset to (auto-)commit for each partition. 也就是说如果设置成 true(默认值),他会自动把上个提交给应用程序的offset 记录到内存中。

    也就是说,如果应用拿到一个 offset 了,librdkafka 就会把这个 offset 记录到内存中,然后默认情况下至多 5s 之后,就会提交给 broker。这时候如果应用还没有完成这个 offset 的处理时,发生了崩溃,这个 offset 就丢掉了,所以是一个至多一次的语义。

    我理解中提供至少一次语义需要关掉 enable.auto.commit 自己控制提交才行。
    展开
    
     1
  • WL
    2019-07-13
    我感觉有点不是很理解消费者端的位移概念和消息在分区中的位移为啥不是一回事,我理像是一回事,因为消费者端把自己消费的位移提交到Broker的位移主题里不就定义了下次消费的起点了吗,为啥说不是一回事呢,有啥区别呢,请老师具体指导一下。

    作者回复: 嗯嗯,我的意思是consumer提交的位移值虽然就是消息在分区中的位移值,但这个提交位移的概念和分区中的位移概念是不同的。

    
     1
  • Sparkler🎇
    2020-01-14
    文中的场景:poll方法返回5000条消息,指的是5000条消息集合,还是单条消息?如果是单条消息的话,处理完100条单条消息就提交位移,会导致Broker解开消息集合吗?
    我的猜测:不会解开消息集合,如果5000条消息在一个消息集合里,就算没有被完全提交位移,下次重新poll时,还是会返回5000条完整的消息集合,只不过此时消费位移不是从这5000条的开始位移,Consumer自然会跳过这部分已经提交过的位移数据,从中间某个位移开始消费消息。

    作者回复: 5000条消息。

    
    
  • banderSnatch🐱
    2020-01-03
    有个问题,spring-kafka框架有控制手动异步提交位移的方法吗?网上查了一些资料都没找到。只有一个Acknowledgement对象的acknowledge()方法手动提交。但是试了一下发现速度比较慢。

    作者回复: 抱歉,不太清楚spring-kafka的实现细节。。。

    
    
  • w h l
    2019-12-27
    老师,你好,请问一下consume关闭的时机是什么时候呢,这里可以解释下吗

    作者回复: 不确定百分之百地理解了您的问题。通常来说,当调用KafkaConsumer.close方法时consumer会关闭。

    
    
  • 猫哭
    2019-12-17
    业务端来保证;业务端重复消费是否是幂等的?如果是幂等,对业务无影响,重复消费没关系;如果不是幂等,在生产者给kafka发送消息的时候,给每条消息生成一个唯一ID,消费端,视业务场景而言,分两种情况:1.如果是敏感业务,如与钱相关,在数据库建一张消费消息的流水表,每次消费前到数据库去查询一下,看是否消费过,消费过就忽略,没有消费过就消费。2.非敏感业务,可以存到redis,消费前去redis查询,该条消息是否消费过
    
    
  • James
    2019-11-20
    你好,请问下分区自动提交失败,请求超时。会导致什么后果呢
    重复消费吗。

    作者回复: 偶发的提交失败也不一定就意味着重复消费,看consumer程序的运行情况。

     1
    
  • xiaoniu
    2019-11-18
    老师 你好 问个问题,Kafka对未提交的消息一定要等到消费工程重启才重新消费吗?比如:我的消费工程 支持幂等性,可以重复消费,但是由于某时刻数据库挂了,导致消息未提交,但隔了一段时间数据库又好了,之前未提交的消息可以在不重启的情况下再次消费下吗?

    作者回复: 如果不重启的话,程序里面是否显式调用了seek重新调整了offset呢?如果没有就无法重新消费那些已经消费的消息

    
    
  • 不忘初心丶方得始终
    2019-10-25
    老师你好,问个问题,目前公司要用kafka同步老数据库数据,同步过程是按照老数据库的bin.log日志顺序进行同步,但是在同步过程中,有些表是有关联的,加入将数据放到多个分区,不同分区数据消费顺序不一样,就会导致数据同步出现关联问题,如果设置一个分区……这样又太慢,有什么好的建议吗?

    作者回复: 如果是批任务,能否等到数据抽取完了再进行消费。如果是streaming任务,这是经典的table-table join,最好使用特定的流处理框架来处理

    
    
  • jc9090kkk
    2019-09-23
    感觉老师分享,对于文章中的那个自动提交的例子有点疑惑,希望老师能解答一下:
    auto.commit.interval.ms设置为5s,也就是说consumer每5秒才提交一次位移信息,那consumer如果每消费一条数据,但是没有达到自动提交的时间,这个位移信息该如何管理?consumer自己做维护吗?但是也需要跟broker端进行位移信息同步的吧? 不然可能会造成数据的重复消费?还是每5s的提交和consumer自动提交的时候都会伴随位移信息的同步?是我的理解有问题吗?

    作者回复: 如果没有达到提交时间就不会提交,自动提交完全由consumer自行维护,确实可能造成数据的重复消费。你的理解完全没有问题:)

    目前单纯依赖consumer是无法避免消息的重复消费的,Kafka默认提供的消息处理语义就是至少一次处理。

     1
    
我们在线,来聊聊吧