Kafka 核心技术与实战
胡夕
Apache Kafka Committer,老虎证券技术总监
52815 人已学习
新⼈⾸单¥68
登录后,你可以任选4讲全文学习
课程目录
已完结/共 47 讲
开篇词 (1讲)
结束语 (1讲)
Kafka 核心技术与实战
15
15
1.0x
00:00/00:00
登录|注册

30 | 怎么重设消费者组位移?

Duration策略
DateTime策略
Shift-By-N策略
Specified-Offset策略
Current策略
Latest策略
Earliest策略
推荐的重设方法
支持的重设策略和方法
重设位移的目的
实现Duration策略
实现DateTime策略
实现Shift-By-N策略
实现Specified-Offset策略
实现Current策略
实现Latest策略
实现Earliest策略
kafka-consumer-groups脚本
实现Duration策略
实现DateTime策略
实现Shift-By-N策略
实现Specified-Offset策略
实现Current策略
实现Latest策略
实现Earliest策略
seek方法
时间维度
位移维度
适用场景选择:传统消息中间件 vs. Kafka
Kafka和传统消息引擎的设计区别
开放讨论
小结
命令行方式设置
消费者API方式设置
重设位移策略
为什么要重设消费者组位移?
怎么重设Kafka消费者组位移?

该思维导图由 AI 生成,仅供参考

你好,我是胡夕。今天我要跟你分享的主题是:如何重设消费者组位移。

为什么要重设消费者组位移?

我们知道,Kafka 和传统的消息引擎在设计上是有很大区别的,其中一个比较显著的区别就是,Kafka 的消费者读取消息是可以重演的(replayable)。
像 RabbitMQ 或 ActiveMQ 这样的传统消息中间件,它们处理和响应消息的方式是破坏性的(destructive),即一旦消息被成功处理,就会被从 Broker 上删除。
反观 Kafka,由于它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据而已,是只读的操作,因此消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此它能够很容易地修改位移的值,实现重复消费历史数据的功能。
对了,之前有很多同学在专栏的留言区提问:在实际使用场景中,我该如何确定是使用传统的消息中间件,还是使用 Kafka 呢?我在这里统一回答一下。如果在你的场景中,消息处理逻辑非常复杂,处理代价很高,同时你又不关心消息之间的顺序,那么传统的消息中间件是比较合适的;反之,如果你的场景需要较高的吞吐量,但每条消息的处理时间很短,同时你又很在意消息的顺序,此时,Kafka 就是你的首选。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

Kafka消费者组位移的重设是Kafka中重要的技术操作之一。本文介绍了为什么需要重设消费者组位移以及重设位移的策略。Kafka的消费者读取消息是可以重演的,这是Kafka与传统消息引擎的显著区别。文章列举了7种重设策略,包括Earliest、Latest、Current、Specified-Offset、Shift-By-N、DateTime和Duration策略,以及通过消费者API和kafka-consumer-groups命令行脚本两种方式来实现重设消费者组位移。通过Java API的方式来重设位移,需要调用KafkaConsumer的seek方法,或者是它的变种方法seekToBeginning和seekToEnd。对于命令行方式设置,针对不同策略有对应的参数。总的来说,本文提供了清晰的技术指导,帮助读者了解Kafka消费者组位移的重设操作及其实现方式。文章内容涵盖了技术细节,适合读者快速了解Kafka消费者组位移的重设操作及其实现方式。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Kafka 核心技术与实战》
新⼈⾸单¥68
立即购买
登录 后留言

全部留言(67)

  • 最新
  • 精选
  • 小可爱
    current是回到最近提交位移处,但是消费者不是本来就从最近提交处继续消费吗

    作者回复: current主要是为了调试场景。比如这样的场景:你停掉了consumer,现在offset=50,然后修改了consumer代码重新上线,consumer开始从50消费,运行了一段时间发现你修改的代码有问题,还要继续改,那么下掉consumer,将offset调回current,改代码之后再上线,consumer从50消费。此时current策略就显得很方便了,对吧?

    2019-08-29
    9
    38
  • Curry
    老师,为什么要poll(0)一下?是为了获取元数据吗?

    作者回复: 是的,去拿元数据

    2020-05-05
    2
    17
  • 喜欢地球的阿培同学
    老师,问一个问题: 像RocketMQ这样的消息引擎,如果消费者消费某条消息一直失败,会将这条消息放到 “死信”队列里,然后消费者继续消费下一条消息。在kafka中,如果消费者消费某条消息一直失败,会怎么处理呢?难道程序会一直消费这条消息,然后失败.. 继续消费这条消息 .. 然后继续失败 ......

    作者回复: consumer端可以选择跳过该消息。的确这方面Kafka没有提供开箱即用的dead letter queue~

    2020-05-27
    2
    10
  • 水天一色
    请问,重置offset到 datetime,这个 datetime 是生产时间还是当前group的消费时间?

    作者回复: 消息的创建时间

    2019-12-17
    2
    8
  • cricket1981
    "最重要的是,一定要调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0))。"--- 能讲下为什么吗?如果不遵守会怎么样?

    作者回复: 两个的实现方式不一样。详细设计原理差别可以看看:https://www.cnblogs.com/huxi2b/p/10773559.html

    2019-08-12
    5
    8
  • Li Shunduo
    试了下开着console consumer的时候去调整offset,遇到以下错误: Error: Assignments can only be reset if the group 'test_group' is inactive, but the current state is Stable. 停掉console consumer之后,就可以调整offset了。 好像不能动态调整?

    作者回复: 嗯,必须是非active的group才行

    2019-08-12
    6
  • What for
    请问一下老师在重设位移之前为什么要调用 consumer.poll 方法?是为了连接集群吗?

    作者回复: 嗯嗯,你可以这么认为,其实还有其他作用,比如获取集群信息后台主题数据等

    2019-08-31
    4
  • JasonZhi
    老师,不是还可以通过auto offset reset配置项重设位移吗?怎么这里没有说

    作者回复: 这是说的是手动设置位移的情况,自动设置位移是Kafka自动做的。当然也算是重设位移的一种

    2019-08-30
    4
  • James
    老师,你好.存在以下问题,麻烦解答下: 1.对文中解释的Latest与Current感觉区分不清楚,老能能详细举例介绍下; 文中是禁止提交位移,那么这两个应该都是一样,最新的位移位置. 要是可以提交位移呢;不是特别懂 2.第一条评论中,current主要是为了调试场景,要是有提交位移(50->100),那么重新上线使用current策略,最新提交位移是不是100;

    作者回复: Latest 策略表示把位移重设成最新末端位移,也就是LEO。Current 策略表示将位移调整成消费者当前提交的最新位移,是消费者已经提交的位移,后者必然小于等于LEO

    2020-07-02
    2
    3
  • 无菇朋友
    老师 问一下 如果 我想针对某个分区重置位移,怎么做

    作者回复: KafkaConsumer.seek方法支持指定单个分区进行重设位移

    2019-09-18
    3
收起评论
显示
设置
留言
67
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部