Kafka核心技术与实战
胡夕
人人贷计算平台部总监,Apache Kafka Contributor
立即订阅
8408 人已学习
课程目录
已完结 46 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 为什么要学习Kafka?
免费
Kafka入门 (5讲)
01 | 消息引擎系统ABC
02 | 一篇文章带你快速搞定Kafka术语
03 | Kafka只是消息引擎系统吗?
04 | 我应该选择哪种Kafka?
05 | 聊聊Kafka的版本号
Kafka的基本使用 (3讲)
06 | Kafka线上集群部署方案怎么做?
07 | 最最最重要的集群参数配置(上)
08 | 最最最重要的集群参数配置(下)
客户端实践及原理剖析 (14讲)
09 | 生产者消息分区机制原理剖析
10 | 生产者压缩算法面面观
11 | 无消息丢失配置怎么实现?
12 | 客户端都有哪些不常见但是很高级的功能?
13 | Java生产者是如何管理TCP连接的?
14 | 幂等生产者和事务生产者是一回事吗?
15 | 消费者组到底是什么?
16 | 揭开神秘的“位移主题”面纱
17 | 消费者组重平衡能避免吗?
18 | Kafka中位移提交那些事儿
19 | CommitFailedException异常怎么处理?
20 | 多线程开发消费者实例
21 | Java 消费者是如何管理TCP连接的?
22 | 消费者组消费进度监控都怎么实现?
深入Kafka内核 (5讲)
23 | Kafka副本机制详解
24 | 请求是怎么被处理的?
25 | 消费者组重平衡全流程解析
26 | 你一定不能错过的Kafka控制器
27 | 关于高水位和Leader Epoch的讨论
管理与监控 (12讲)
28 | 主题管理知多少?
29 | Kafka动态配置了解下?
30 | 怎么重设消费者组位移?
31 | 常见工具脚本大汇总
32 | KafkaAdminClient:Kafka的运维利器
33 | Kafka认证机制用哪家?
34 | 云环境下的授权该怎么做?
35 | 跨集群备份解决方案MirrorMaker
36 | 你应该怎么监控Kafka?
37 | 主流的Kafka监控框架
38 | 调优Kafka,你做到了吗?
39 | 从0搭建基于Kafka的企业级实时日志流处理平台
高级Kafka应用之流处理 (3讲)
40 | Kafka Streams与其他流处理平台的差异在哪里?
41 | Kafka Streams DSL开发实例
42 | Kafka Streams在金融领域的应用
结束语 (1讲)
结束语 | 以梦为马,莫负韶华!
特别放送 (2讲)
加餐 | 搭建开发环境、阅读源码方法、经典学习资料大揭秘
用户故事 | 黄云:行百里者半九十
Kafka核心技术与实战
登录|注册

30 | 怎么重设消费者组位移?

胡夕 2019-08-10
你好,我是胡夕。今天我要跟你分享的主题是:如何重设消费者组位移。

为什么要重设消费者组位移?

我们知道,Kafka 和传统的消息引擎在设计上是有很大区别的,其中一个比较显著的区别就是,Kafka 的消费者读取消息是可以重演的(replayable)。
像 RabbitMQ 或 ActiveMQ 这样的传统消息中间件,它们处理和响应消息的方式是破坏性的(destructive),即一旦消息被成功处理,就会被从 Broker 上删除。
反观 Kafka,由于它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据而已,是只读的操作,因此消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此它能够很容易地修改位移的值,实现重复消费历史数据的功能。
对了,之前有很多同学在专栏的留言区提问:在实际使用场景中,我该如何确定是使用传统的消息中间件,还是使用 Kafka 呢?我在这里统一回答一下。如果在你的场景中,消息处理逻辑非常复杂,处理代价很高,同时你又不关心消息之间的顺序,那么传统的消息中间件是比较合适的;反之,如果你的场景需要较高的吞吐量,但每条消息的处理时间很短,同时你又很在意消息的顺序,此时,Kafka 就是你的首选。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Kafka核心技术与实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(28)

  • QQ怪
    比较暴力的重新开个消费组从头消费😂
    2019-08-10
    3
    11
  • 小可爱
    current是回到最近提交位移处,但是消费者不是本来就从最近提交处继续消费吗

    作者回复: current主要是为了调试场景。比如这样的场景:你停掉了consumer,现在offset=50,然后修改了consumer代码重新上线,consumer开始从50消费,运行了一段时间发现你修改的代码有问题,还要继续改,那么下掉consumer,将offset调回current,改代码之后再上线,consumer从50消费。此时current策略就显得很方便了,对吧?

    2019-08-29
    1
    3
  • JasonZhi
    老师,不是还可以通过auto offset reset配置项重设位移吗?怎么这里没有说

    作者回复: 这是说的是手动设置位移的情况,自动设置位移是Kafka自动做的。当然也算是重设位移的一种

    2019-08-30
    2
  • cricket1981
    "最重要的是,一定要调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0))。"--- 能讲下为什么吗?如果不遵守会怎么样?

    作者回复: 两个的实现方式不一样。详细设计原理差别可以看看:https://www.cnblogs.com/huxi2b/p/10773559.html

    2019-08-12
    1
    2
  • 注定非凡
    1,为什么要:
    当需要实现重复消费历史数据的时候,就需要重设消费者组位移

    2,为什么能:
    A :Kafka和传统的消费引擎在设计上有很大区别,其中一个比较显著的区别是:Kafka的消息费者读取消息是可以重演的(replayable)
    B :如RabbitMQ或ActiveMQ这样的传统消息中间件,他们处理和响应消息的方式是破坏性的(destructive),一旦消息被成功处理,就会被从Broker上删除。
    C :Kafka,由于它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据,是只读操作,因此消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此他能够很容易的修改位移值,实现重复消费历史数据的功能。

    3,重设位移策略
    A :位移维度:根据位移值来重设。直接把消费者的位移值重设成我们给定的值。
    B :时间维度:可以给定一个时间,让消费者吧位移调整成大于该时间的最小位移;亦可以给出一段时间间隔,如30分钟,然后让消费者直接将位移调回30分钟之前的位移值。
    七种策略:
    (1)Earliest:将位移调整到主题当前最早位移处。可以实现重新消费主题的所有消息
    (2)Latest:把位移重设成最新末端位移。可以跳过所有历史消息,从最新消息开始消费
    (3)Current:将位移调整成消费者当前提交的最新位移,可以实现把位移重设到消费者重启时的位置。
    (4)Specified-Offset:把位移值调整到指定的位移处。这可以实现跳过某条错误信息,避免造成消费阻塞。
    (5)Shift-By-N:相对于当前位值的位移值,可以向前或向后,跳出一段距离。
    (6)DataTime:指定一个时间,然后将位移重置到该时间之后的最早位移处。
    (7)Duration:给定相对时间间隔,让位移调整到距离当前给定时间间隔的位移处。

    4 重设的方法
    A :通过消费者API来实现
    B :通过Kafka-consumer-groups命令行脚本来实现

    消费者API注意事项:
    (1):要创建消费者程序,要禁止自动提交位移
    (2):组ID要设置成要重设的消费者组的组ID
    (3):调用seekToBeginning方法时,要一次性构造主题的所有分区对象
    (4):最重要的是,一定要调用带长整形的poll方法,而不调用consumer.poll(Duration.ofSecond(0))。
    (5):要实现DateTime策略:需要借助KafkaConsumer.offsetsForTimes方法。

    总之:使用Java API的方式来实现重设策略的主要入口方法,就是seek方法。

    命令行方式注意事项:
    (1)bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
    2019-11-11
    1
  • shenbinzhuo
    bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
    老师,这是把当前group下的所有topic位移提交到当前最早的位移处,当前group的某个topic怎么设置?

    作者回复: 只能使用API来实现了,脚本方式不可以

    2019-09-19
  • 无菇朋友
    老师 问一下 如果 我想针对某个分区重置位移,怎么做

    作者回复: KafkaConsumer.seek方法支持指定单个分区进行重设位移

    2019-09-18
  • cuiyunfeng
    重置位移前,执行consumer.poll(),可以让kafka强制更新担当的partition信息,以防止发生kafka rebalance后partition信息陈旧,然后再取得partition信息进行offset位移。或者也可以配合使用kafka listener来处理发生rebalance情况下,进行重置位移。
    2019-09-06
  • What for
    请问一下老师在重设位移之前为什么要调用 consumer.poll 方法?是为了连接集群吗?

    作者回复: 嗯嗯,你可以这么认为,其实还有其他作用,比如获取集群信息后台主题数据等

    2019-08-31
  • 无菇朋友
    请问老师,current这个选项的应用场景是什么?

    作者回复: 如果你上线的consumer程序有bug,需要重演自上线起处理过的消息,那么可以考虑使用这个策略

    2019-08-26
  • 曹伟雄
    接着上一个问题,我的auto-offset-reset设置为earliest,我的问题是,一般什么情况下会出现这个情况? 可以从哪方面去分析? 谢谢。
    2019-08-26
  • 曹伟雄
    老师,留言想问一个问题,我在使用kafka的时候,发现个问题,offset偶尔会自动恢复到以前的值,然后又重新消费,例如:
    LogSize=538068,Consumer Offset=538068,Lag=0
    过了2天后突然变成了:
    LogSize=538068,Consumer Offset=138068,Lag=398023

    作者回复: 目前的信息不太能够确定原因。看看日志有无其他错误吧?

    2019-08-26
  • 锋芒
    请问,用命令行重设位移,应该在当前group 的leader 节点上?

    作者回复: 不需要的

    2019-08-23
  • 此方彼方Francis
    有遇到过,之前有一条Kafka消息的crc校验值出错了(不知道为什么会出错,非常奇怪),这种状况下就只能跳过这条消息了。
    不过有个问题请教老师,重设消费者位移之前,是不是有必要让消费者停止消费?

    作者回复: 如果使用程序API的方式不必停止消费

    2019-08-22
  • godtrue
    打卡,此节讲解了

    1:为啥需要重设消费者组位移?

    当需要实现重复消费历史数据的时候,就需要重设消费者组位移。


    2:重设消费者组位移的实现?

    有两种方式七种策略(两种维度:位移和时间)

    两种方式:操作API、操作kafka命令

    七种策略:

    2-1:Earliest 策略——表示将位移调整到主题当前最早位移处。

    2-2:Latest 策略——表示把位移重设成最新末端位移。

    2-3:Current 策略——表示将位移调整成消费者当前提交的最新位移。

    2-4:Specified-Offset 策略——则是比较通用的策略,表示消费者把位移值调整到你指定的位移处。这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理。

    2-5: Shift-By-N 策略——指定的是位移的相对数值,即你给出要跳过的一段消息的距离即可。这里的“跳”是双向的,你既可以向前“跳”,也可以向后“跳”。

    2-6:DateTime策略—— 允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。

    2-7:Duration 策略——则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS。
    2019-08-19
  • 常银玲
    老师,留言想问一个问题,现在项目有一个需求是做一套仿真系统,仿真的数据来源于之前历史数据,查询准备用es像这种情况我们可以用kafka吗?

    作者回复: 我觉得可以用啊

    2019-08-12
  • Li Shunduo
    试了下开着console consumer的时候去调整offset,遇到以下错误:
    Error: Assignments can only be reset if the group 'test_group' is inactive, but the current state is Stable.
    停掉console consumer之后,就可以调整offset了。
    好像不能动态调整?

    作者回复: 嗯,必须是非active的group才行

    2019-08-12
  • 边城
    老师您好,请教您两个问题。

    如果把位移调整成指定位移,是不是每次消费都需要持久化位移点?

    这样的话,发布应用的时候,我应该怎么停止消费程序进程呢?

    感谢!

    作者回复: 你不需要停止程序也能动态调整位移

    2019-08-12
    1
  • 陈华应
    生产环境中遇到过这种情况,
    现象是:本来正常消费的一个topic,突然因为业务调整,大数据侧向此topic推送了大量的另一个平台的历史数据,而这些数据对现在使用此topic的业务场景是无效数据,并且评估到按现有能力吧历史数据消化完需要几个小时时间,业务上是不能接受的
        最终就是通过调整位移来“越过”历史数据,消费最新的数据,解决了可能是故障的一次
    2019-08-11
  • 我自成魔
    老师,我当前使用的Kafka是0.10,重置offset使用seek方式,但是在实际使用过程中,发现使用subscribe加poll消费消息,无法消费到消息,程序也不报错,而使用seek方式重新指定各分区的offset进行消费就可以,麻烦老师解惑,希望老师可以讲一下Kafka如何去消费消息的,谢谢!

    作者回复: 是不是没有数据可以消费了呢?另外0.10版本的新consumer不建议使用,这个时候bug还比较多。建议至少到0.10.2或0.11之后再切换到新版本consumer

    2019-08-11
收起评论
28
返回
顶部