消息队列高手课
李玥
美团高级技术专家
51348 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 42 讲
进阶篇 (21讲)
消息队列高手课
15
15
1.0x
00:00/00:00
登录|注册

21 | Kafka Consumer源码分析:消息消费的实现过程

你好,我是李玥。
我们在上节课中提到过,用于解决消息队列一些常见问题的知识和原理,最终落地到代码上,都包含在收、发消息这两个流程中。对于消息队列的生产和消费这两个核心流程,在大部分消息队列中,它实现的主要流程都是一样的,所以,通过这两节课的学习之后,掌握了这两个流程的实现过程。无论你使用的是哪种消息队列,遇到收发消息的问题,你都可以用同样的思路去分析和解决问题。
上一节课我和你一起通过分析源代码学习了 RocketMQ 消息生产的实现过程,本节课我们来看一下 Kafka 消费者的源代码,理清 Kafka 消费的实现过程,并且能从中学习到一些 Kafka 的优秀设计思路和编码技巧。
在开始分析源码之前,我们一起来回顾一下 Kafka 消费模型的几个要点:
Kafka 的每个 Consumer(消费者)实例属于一个 ConsumerGroup(消费组);
在消费时,ConsumerGroup 中的每个 Consumer 独占一个或多个 Partition(分区);
对于每个 ConsumerGroup,在任意时刻,每个 Partition 至多有 1 个 Consumer 在消费;
每个 ConsumerGroup 都有一个 Coordinator(协调者)负责分配 Consumer 和 Partition 的对应关系,当 Partition 或是 Consumer 发生变更时,会触发 rebalance(重新分配)过程,重新分配 Consumer 与 Partition 的对应关系;
Consumer 维护与 Coordinator 之间的心跳,这样 Coordinator 就能感知到 Consumer 的状态,在 Consumer 故障的时候及时触发 rebalance。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结
仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《消息队列高手课》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(29)

  • 最新
  • 精选
  • Peter
    课后作业,希望老师指正。 在基础篇03的时候讲过消费位置是消息队列服务器针对每个消费组和每个队列维护的一个位置变量。那么也就是说最终真正更新这个位置变量应该是交由服务器去执行的,而Consumer只是发送一个请求。那么顺着这个思路,我猜应该是在更新元数据的时候就应该发送这个请求,原因很简单:消费者需要知道“从哪发起”并且“发多少”,因此这时就已经知道了应该将消费位置更新为多少了,所以这时候就可以发送这个请求了。至于服务器最终会将消费位置更新为多少,还取决于客户端返回的结果。 在方法updateAssignmentMetadataIfNeeded中,最后一行return updateFetchPositions(timer); 从updateFetchPositions这个方法点进去,看到coordinator.refreshCommittedOffsetsIfNeeded(timer) 这个方法点进去之后会看到fetchCommittedOffsets方法,进这个方法,找到sendOffsetFetchRequest,点进去,最终会发现 client.send(coordinator, requestBuilder)

    作者回复: 细致👍

    2
    21
  • 鲁班大师
    老师,kafak consumer 在reblance期间,如何实现不重复消费

    作者回复: 实现“不重复消费”是非常困难的,你需要做的就是让你的consumer具备幂等性,这样即使发生重复消费,也不会对系统数据产生任何影响。

    3
    6
  • 山头
    老师,你好,broker和消费端都重启了,消费端还知道从哪个offset开始消费吗

    作者回复: 从服务端的协调者获取。服务端的协调者会记录主题的每个消费组的每个分区当前的消费位置

    4
    6
  • 凌空飞起的剪刀腿
    老师您好: kafka consumer中没有分析到心跳线程是怎么处理的,我看源代码上写的是单独开了一个后台线程负责心跳,这样处理的优势是什么啊?

    作者回复: Kafka Consumer与服务端的协调者维护心跳,而协调者所在的Broker不一定和接收消息的Broker是同一个实例。 所以,必须得分开。

    3
  • 鲁班大师
    每个 ConsumerGroup 都有一个 Coordinator(协调者)负责分配 Consumer 和 Partition 的对应关系,当 Partition 或是 Consumer 发生变更时,会触发 rebalance(重新分配)过程,重新分配 Consumer 与 Partition 的对应关系;……在rebalance期间应该是不能消费的吧

    作者回复: 是的,消费会暂停,直到Rebalance完成。

    2
  • SKang
    老师 我看完之后 可以理解为 消费组A 消费一个cc主题的消息,然后过程中我将消费组A 的名字改成消费组B后,不会出现重复消费,只会接着A的 继续消费剩下的吧 我认为毕竟A已经成功消费了 偏移量已经成功被更新了吧

    作者回复: 不是的,不同消费组的偏移量是分开记录的。

    1
  • 山头
    消费者如何从服务端拉取消息的,用for循环效率太低吧,能否说说实际的代码

    作者回复: 同学,我们这节课通篇就是讲得这个问题啊,给你们讲解使用的就是实际的代码。

    5
    1
  • 蛤蟆先生
    有个问题请教一下老师,目前我们公司某个应用在生产环境一共有两台机器,这时候有一台机器挂了,但是某个消息还是会经常消费到这台挂的机器上,导致消息没有消费成功,这是为什么呢?

    作者回复: 可以贴一下Topic的具体配置,大家一起帮你分析一下原因。

  • 鲁班大师
    多个consumer消费同一个partition会有什么问题么

    作者回复: 如果是不同的group.id,互相之间没有影响。

    3
  • lizhibo
    老师好,kafka消息要是在消费端消费出现异常了怎么办,他没有再次消费的机制,比如1分之后再去消费,这个怎么实现

    作者回复: 所以消费的时候,如果不能接受丢消息,一定不要设置成自动提交消费位置。这样下次拉取的时候,还会拉到这个位置的消息。 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L)); // 执行消费业务逻辑,然后再提交消费位置。 consumer.commitSync();

    4
收起评论
显示
设置
留言
29
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部