• QQ怪
    2019-08-10
    比较暴力的重新开个消费组从头消费😂
     3
     12
  • 小可爱
    2019-08-29
    current是回到最近提交位移处,但是消费者不是本来就从最近提交处继续消费吗

    作者回复: current主要是为了调试场景。比如这样的场景:你停掉了consumer,现在offset=50,然后修改了consumer代码重新上线,consumer开始从50消费,运行了一段时间发现你修改的代码有问题,还要继续改,那么下掉consumer,将offset调回current,改代码之后再上线,consumer从50消费。此时current策略就显得很方便了,对吧?

     2
     4
  • JasonZhi
    2019-08-30
    老师,不是还可以通过auto offset reset配置项重设位移吗?怎么这里没有说

    作者回复: 这是说的是手动设置位移的情况,自动设置位移是Kafka自动做的。当然也算是重设位移的一种

    
     2
  • cricket1981
    2019-08-12
    "最重要的是,一定要调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0))。"--- 能讲下为什么吗?如果不遵守会怎么样?

    作者回复: 两个的实现方式不一样。详细设计原理差别可以看看:https://www.cnblogs.com/huxi2b/p/10773559.html

     1
     2
  • 皇甫
    2020-01-08
    老师好,有一个实际问题,为了保证实时性,消费者端该如何配置,可以在积压大于一定数量的时候,消费位移重置为最新位移,丢弃中间积压的消息,消费最新的消息,kafka版本为2.1.0,消费者端是storm,用的storm-kafka-client2.1.0,谢谢

    作者回复: 我觉得目前这些计算引擎与Kafka的connector都没有这样的功能。只能自己编写consumer实现之

     1
    
  • hxy
    2020-01-01
    请问老师,低版本的kafka将offset存储在zk上,能否通过修改zk上记录的offset来实现重设消费者组位移?

    作者回复: 理论上讲是可以的:)

    
    
  • 杨陆伟
    2019-12-19
    选择消息中间件还是kafka的部分,没太理解,能再详细的深入说说吗?谢谢

    作者回复: 您主要关心的问题是什么?

    
    
  • 水天一色
    2019-12-17
    请问,重置offset到 datetime,这个 datetime 是生产时间还是当前group的消费时间?

    作者回复: 消息的创建时间

    
    
  • 注定非凡
    2019-11-11
    1,为什么要:
    当需要实现重复消费历史数据的时候,就需要重设消费者组位移

    2,为什么能:
        A :Kafka和传统的消费引擎在设计上有很大区别,其中一个比较显著的区别是:Kafka的消息费者读取消息是可以重演的(replayable)
        B :如RabbitMQ或ActiveMQ这样的传统消息中间件,他们处理和响应消息的方式是破坏性的(destructive),一旦消息被成功处理,就会被从Broker上删除。
        C :Kafka,由于它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据,是只读操作,因此消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此他能够很容易的修改位移值,实现重复消费历史数据的功能。

    3,重设位移策略
        A :位移维度:根据位移值来重设。直接把消费者的位移值重设成我们给定的值。
        B :时间维度:可以给定一个时间,让消费者吧位移调整成大于该时间的最小位移;亦可以给出一段时间间隔,如30分钟,然后让消费者直接将位移调回30分钟之前的位移值。
    七种策略:
    (1)Earliest:将位移调整到主题当前最早位移处。可以实现重新消费主题的所有消息
    (2)Latest:把位移重设成最新末端位移。可以跳过所有历史消息,从最新消息开始消费
    (3)Current:将位移调整成消费者当前提交的最新位移,可以实现把位移重设到消费者重启时的位置。
    (4)Specified-Offset:把位移值调整到指定的位移处。这可以实现跳过某条错误信息,避免造成消费阻塞。
    (5)Shift-By-N:相对于当前位值的位移值,可以向前或向后,跳出一段距离。
    (6)DataTime:指定一个时间,然后将位移重置到该时间之后的最早位移处。
    (7)Duration:给定相对时间间隔,让位移调整到距离当前给定时间间隔的位移处。

    4 重设的方法
        A :通过消费者API来实现
        B :通过Kafka-consumer-groups命令行脚本来实现

        消费者API注意事项:
        (1):要创建消费者程序,要禁止自动提交位移
        (2):组ID要设置成要重设的消费者组的组ID
        (3):调用seekToBeginning方法时,要一次性构造主题的所有分区对象
        (4):最重要的是,一定要调用带长整形的poll方法,而不调用consumer.poll(Duration.ofSecond(0))。
        (5):要实现DateTime策略:需要借助KafkaConsumer.offsetsForTimes方法。
        
        总之:使用Java API的方式来实现重设策略的主要入口方法,就是seek方法。

        命令行方式注意事项:
        (1)bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
    展开
     1
    
  • shenbinzhuo
    2019-09-19
    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
    老师,这是把当前group下的所有topic位移提交到当前最早的位移处,当前group的某个topic怎么设置?

    作者回复: 只能使用API来实现了,脚本方式不可以

    
    
  • 无菇朋友
    2019-09-18
    老师 问一下 如果 我想针对某个分区重置位移,怎么做

    作者回复: KafkaConsumer.seek方法支持指定单个分区进行重设位移

    
    
  • cuiyunfeng
    2019-09-06
    重置位移前,执行consumer.poll(),可以让kafka强制更新担当的partition信息,以防止发生kafka rebalance后partition信息陈旧,然后再取得partition信息进行offset位移。或者也可以配合使用kafka listener来处理发生rebalance情况下,进行重置位移。
    
    
  • What for
    2019-08-31
    请问一下老师在重设位移之前为什么要调用 consumer.poll 方法?是为了连接集群吗?

    作者回复: 嗯嗯,你可以这么认为,其实还有其他作用,比如获取集群信息后台主题数据等

    
    
  • 无菇朋友
    2019-08-26
    请问老师,current这个选项的应用场景是什么?

    作者回复: 如果你上线的consumer程序有bug,需要重演自上线起处理过的消息,那么可以考虑使用这个策略

    
    
  • 曹伟雄
    2019-08-26
    接着上一个问题,我的auto-offset-reset设置为earliest,我的问题是,一般什么情况下会出现这个情况? 可以从哪方面去分析? 谢谢。
    
    
  • 曹伟雄
    2019-08-26
    老师,留言想问一个问题,我在使用kafka的时候,发现个问题,offset偶尔会自动恢复到以前的值,然后又重新消费,例如:
    LogSize=538068,Consumer Offset=538068,Lag=0
    过了2天后突然变成了:
    LogSize=538068,Consumer Offset=138068,Lag=398023

    作者回复: 目前的信息不太能够确定原因。看看日志有无其他错误吧?

    
    
  • 锋芒
    2019-08-23
    请问,用命令行重设位移,应该在当前group 的leader 节点上?

    作者回复: 不需要的

    
    
  • 此方彼方Francis
    2019-08-22
    有遇到过,之前有一条Kafka消息的crc校验值出错了(不知道为什么会出错,非常奇怪),这种状况下就只能跳过这条消息了。
    不过有个问题请教老师,重设消费者位移之前,是不是有必要让消费者停止消费?

    作者回复: 如果使用程序API的方式不必停止消费

    
    
  • godtrue
    2019-08-19
    打卡,此节讲解了

    1:为啥需要重设消费者组位移?

    当需要实现重复消费历史数据的时候,就需要重设消费者组位移。


    2:重设消费者组位移的实现?

    有两种方式七种策略(两种维度:位移和时间)

    两种方式:操作API、操作kafka命令

    七种策略:

    2-1:Earliest 策略——表示将位移调整到主题当前最早位移处。

    2-2:Latest 策略——表示把位移重设成最新末端位移。

    2-3:Current 策略——表示将位移调整成消费者当前提交的最新位移。

    2-4:Specified-Offset 策略——则是比较通用的策略,表示消费者把位移值调整到你指定的位移处。这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理。

    2-5: Shift-By-N 策略——指定的是位移的相对数值,即你给出要跳过的一段消息的距离即可。这里的“跳”是双向的,你既可以向前“跳”,也可以向后“跳”。

    2-6:DateTime策略—— 允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。

    2-7:Duration 策略——则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS。
    展开
    
    
  • 常银玲
    2019-08-12
    老师,留言想问一个问题,现在项目有一个需求是做一套仿真系统,仿真的数据来源于之前历史数据,查询准备用es像这种情况我们可以用kafka吗?

    作者回复: 我觉得可以用啊

    
    
我们在线,来聊聊吧