消息队列高手课
李玥
京东零售技术架构部资深架构师
立即订阅
8426 人已学习
课程目录
已完结 41 讲
0/4登录后,你可以任选4讲全文学习。
课前必读 (2讲)
开篇词 | 优秀的程序员,你的技术栈中不能只有“增删改查”
免费
预习 | 怎样更好地学习这门课?
基础篇 (8讲)
01 | 为什么需要消息队列?
02 | 该如何选择消息队列?
03 | 消息模型:主题和队列有什么区别?
04 | 如何利用事务消息实现分布式事务?
05 | 如何确保消息不会丢失?
06 | 如何处理消费过程中的重复消息?
07 | 消息积压了该如何处理?
08 | 答疑解惑(一) : 网关如何接收服务端的秒杀结果?
进阶篇 (21讲)
09 | 学习开源代码该如何入手?
10 | 如何使用异步设计提升系统性能?
11 | 如何实现高性能的异步网络传输?
12 | 序列化与反序列化:如何通过网络传输结构化的数据?
13 | 传输协议:应用程序之间对话的语言
14 | 内存管理:如何避免内存溢出和频繁的垃圾回收?
加餐 | JMQ的Broker是如何异步处理消息的?
15 | Kafka如何实现高性能IO?
16 | 缓存策略:如何使用缓存来减少磁盘IO?
17 | 如何正确使用锁保护共享数据,协调异步线程?
18 | 如何用硬件同步原语(CAS)替代锁?
19 | 数据压缩:时间换空间的游戏
20 | RocketMQ Producer源码分析:消息生产的实现过程
21 | Kafka Consumer源码分析:消息消费的实现过程
22 | Kafka和RocketMQ的消息复制实现的差异点在哪?
23 | RocketMQ客户端如何在集群中找到正确的节点?
24 | Kafka的协调服务ZooKeeper:实现分布式系统的“瑞士军刀”
25 | RocketMQ与Kafka中如何实现事务?
26 | MQTT协议:如何支持海量的在线IoT设备?
27 | Pulsar的存储计算分离设计:全新的消息队列设计思路
28 | 答疑解惑(二):我的100元哪儿去了?
案例篇 (7讲)
29 | 流计算与消息(一):通过Flink理解流计算的原理
30 | 流计算与消息(二):在流计算中使用Kafka链接计算任务
31 | 动手实现一个简单的RPC框架(一):原理和程序的结构
32 | 动手实现一个简单的RPC框架(二):通信与序列化
33 | 动手实现一个简单的RPC框架(三):客户端
34 | 动手实现一个简单的RPC框架(四):服务端
35 | 答疑解惑(三):主流消息队列都是如何存储消息的?
测试篇 (2讲)
期中测试丨10个消息队列热点问题自测
免费
期末测试 | 消息队列100分试卷等你来挑战!
结束语 (1讲)
结束语 | 程序员如何构建知识体系?
消息队列高手课
登录|注册

21 | Kafka Consumer源码分析:消息消费的实现过程

李玥 2019-09-12
你好,我是李玥。
我们在上节课中提到过,用于解决消息队列一些常见问题的知识和原理,最终落地到代码上,都包含在收、发消息这两个流程中。对于消息队列的生产和消费这两个核心流程,在大部分消息队列中,它实现的主要流程都是一样的,所以,通过这两节课的学习之后,掌握了这两个流程的实现过程。无论你使用的是哪种消息队列,遇到收发消息的问题,你都可以用同样的思路去分析和解决问题。
上一节课我和你一起通过分析源代码学习了 RocketMQ 消息生产的实现过程,本节课我们来看一下 Kafka 消费者的源代码,理清 Kafka 消费的实现过程,并且能从中学习到一些 Kafka 的优秀设计思路和编码技巧。
在开始分析源码之前,我们一起来回顾一下 Kafka 消费模型的几个要点:
Kafka 的每个 Consumer(消费者)实例属于一个 ConsumerGroup(消费组);
在消费时,ConsumerGroup 中的每个 Consumer 独占一个或多个 Partition(分区);
对于每个 ConsumerGroup,在任意时刻,每个 Partition 至多有 1 个 Consumer 在消费;
每个 ConsumerGroup 都有一个 Coordinator(协调者)负责分配 Consumer 和 Partition 的对应关系,当 Partition 或是 Consumer 发生变更时,会触发 rebalance(重新分配)过程,重新分配 Consumer 与 Partition 的对应关系;
Consumer 维护与 Coordinator 之间的心跳,这样 Coordinator 就能感知到 Consumer 的状态,在 Consumer 故障的时候及时触发 rebalance。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《消息队列高手课》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(15)

  • 业余草
    我们需要一些速成的实战经验,比如消息队列突然延迟2个小时该如何解决?
    希望带着这些类似的问题,结合设计原理,帮助我们层层解析。。。
    2019-09-12
    12
  • DFighting
    offset的提交不知道是不是在kafkaConsumer.commitAsync中调用coordinator.commitOffsetsAsync(offsets,callback)
    1、这里设计成异步方式一开始我是比较奇怪的,他是如何保证offset不丢失呢?看了代码才知道在异步返回前会等待ConcurrentLinkQueue<offsetCommitCompletetion>中没有其他的待处理的其他的offset的commit后,才会返回,这里的非阻塞队列是线程安全的,可以避免当前提交冲掉其他的offset的提交
    2、真正进行提交的时候也不是调用什么具体操作net的接口,而是向另一个ConcurrentLinkedQueue中注册了一个RequestFutureListener的监听者,当然注册之前使用了AtomicInteger来保证并发安全。
    3、每个监听者应该都会由相应的Coordinator轮询处理队列中的待提交请求,将offset提交从具体的Consumer中解耦到每个组的Coordinator中。
    当然以上只是个人理解,如有不当欢迎指正。
    读完这个代码,我发现kafka在这里保证并发数据一致性时,使用了安全的数据结构+CAS的数据访问,灵活且大大降低了锁机制的粗粒度带来的性能损耗,只是这个代码真不容易写好,真是大牛作品!!
    2019-09-17
    2
  • leslie
    先打卡:代码慢慢研究;老师今天讲述了研究代码的目的:
       1.消息队列实现的主要流程都一样,掌握流程的实现过程;遇到收发消息的问题,都可以用同样的思路去分析和解决问题。
       2.看一下源代码,理清消费生产的实现过程,从中学习一些优秀的设计思路和编码技巧。
       这个算是一个小的总结吧:明天中秋休息刚好可以啃代码,好好研究代码去体会。
       明天是中秋佳节:愿老师节日快乐^_^
    2019-09-12
    1
  • 碧落惊鸿LY
    当 Partition 或是 Consumer 发生变更是,会触发 reblance(重新分配)过程。
    这句话有两个错别字。。。

    作者回复: 我联系编辑同学修改,感谢!

    2019-11-17
  • PeterLu
    课后作业,希望老师指正。
    在基础篇03的时候讲过消费位置是消息队列服务器针对每个消费组和每个队列维护的一个位置变量。那么也就是说最终真正更新这个位置变量应该是交由服务器去执行的,而Consumer只是发送一个请求。那么顺着这个思路,我猜应该是在更新元数据的时候就应该发送这个请求,原因很简单:消费者需要知道“从哪发起”并且“发多少”,因此这时就已经知道了应该将消费位置更新为多少了,所以这时候就可以发送这个请求了。至于服务器最终会将消费位置更新为多少,还取决于客户端返回的结果。
    在方法updateAssignmentMetadataIfNeeded中,最后一行return updateFetchPositions(timer);
    从updateFetchPositions这个方法点进去,看到coordinator.refreshCommittedOffsetsIfNeeded(timer)
    这个方法点进去之后会看到fetchCommittedOffsets方法,进这个方法,找到sendOffsetFetchRequest,点进去,最终会发现 client.send(coordinator, requestBuilder)

    作者回复: 细致👍

    2019-11-01
  • 旭东
    上一节课我和你一起通过分析源代码学习了 RocketMQ 消息生产的实现过程,本节课我们来看一下 Kafka 消费者的源代码,理清 Kafka 消费的实现过程,并且能从中学习到一些 Kafka 的优秀设计思路和编码技巧。

    老师,为什么上一讲是Rabbit这一讲是kafka?

    作者回复: 因为我们这个课需要照顾到使用不同消息队列和使用不同语言的小伙伴啊。

    2019-10-20
  • 亚洲舞王.尼古拉斯赵四
    这篇文章真的好深,看了好几遍才明白,异步的设计好复杂,现在明白前面为什么老师讲那些基础了
    2019-10-10
  • Geek_71a2ee
    老师,你好,broker和消费端都重启了,消费端还知道从哪个offset开始消费吗

    作者回复: 从服务端的协调者获取。服务端的协调者会记录主题的每个消费组的每个分区当前的消费位置

    2019-09-30
    1
  • Geek_71a2ee
    消费端重启的时候,他会从哪里拿offset呢?他是从头还是拿消息还是从上次消费的地方?这个offset能讲讲吗?
    2019-09-30
  • Geek_71a2ee
    老师,主题和分区,以及分区和ConsumerGroup到底是几对几的关系
    2019-09-25
  • Geek_71a2ee
    ConsumerGroup,和分区什么关系,有多少个ConsumerGroup呢
    2019-09-25
  • z.l
    老师好,看了Rocketmq consumer源码,为啥启动consumer实例的时候也会初始化producer,是为了提交消费位点吗?

    作者回复: 你能说一下在代码中哪个地方初始化了Producer吗?

    2019-09-21
    2
  • 米兰铁匠
    用代码跟类图来讲解一个系统的实现还是太过于抽象,难以把握住,最好的方式是用老师理解的东西然后转换成通俗易懂的原理图来讲解

    作者回复: 我还是希望同学们能通过学习,具备独立分析代码,独立设计能力,这样才能提高。

    2019-09-15
  • Geek_71a2ee
    消费者如何从服务端拉取消息的,用for循环效率太低吧,能否说说实际的代码

    作者回复: 同学,我们这节课通篇就是讲得这个问题啊,给你们讲解使用的就是实际的代码。

    2019-09-13
    2
  • 微微一笑
    老师好,先祝您节日快乐!!!您辛苦了~
    有几个疑问需要老师解答一下:
    ①今天在看rocketMq源码过程中,发现DefaultMQProducer有个属性defaultTopicQueueNums,它是用来设置topic的ConsumeQueue的数量的吗?我之前的理解是,consumeQueue的数量是创建topic的时候指定的,跟producer没有关系,那这个参数又有什么作用呢?
    ②在RocketMq的控制台上可以创建topic,需要指定writeQueueNums,readQueueNums,perm,这三个参数是有什么用呢?这里为什么要区分写队列跟读队列呢?不应该只有一个consumeQueue吗?
    ③用户请求-->异步处理--->用户收到响应结果。异步处理的作用是:用更少的线程来接收更多的用户请求,然后异步处理业务逻辑。老师,异步处理完后,如何将结果通知给原先的用户呢?即使有回调接口,我理解也是给用户发个短信之类的处理,那结果怎么返回到定位到用户,并返回之前请求的页面上呢?需要让之前的请求线程阻塞吗?那也无法达到【用更少的线程来接收更多的用户请求】的目的丫。
    望老师能指点迷津~~~
    2019-09-12
收起评论
15
返回
顶部