消息队列高手课
李玥
京东零售技术架构部资深架构师
立即订阅
8426 人已学习
课程目录
已完结 41 讲
0/4登录后,你可以任选4讲全文学习。
课前必读 (2讲)
开篇词 | 优秀的程序员,你的技术栈中不能只有“增删改查”
免费
预习 | 怎样更好地学习这门课?
基础篇 (8讲)
01 | 为什么需要消息队列?
02 | 该如何选择消息队列?
03 | 消息模型:主题和队列有什么区别?
04 | 如何利用事务消息实现分布式事务?
05 | 如何确保消息不会丢失?
06 | 如何处理消费过程中的重复消息?
07 | 消息积压了该如何处理?
08 | 答疑解惑(一) : 网关如何接收服务端的秒杀结果?
进阶篇 (21讲)
09 | 学习开源代码该如何入手?
10 | 如何使用异步设计提升系统性能?
11 | 如何实现高性能的异步网络传输?
12 | 序列化与反序列化:如何通过网络传输结构化的数据?
13 | 传输协议:应用程序之间对话的语言
14 | 内存管理:如何避免内存溢出和频繁的垃圾回收?
加餐 | JMQ的Broker是如何异步处理消息的?
15 | Kafka如何实现高性能IO?
16 | 缓存策略:如何使用缓存来减少磁盘IO?
17 | 如何正确使用锁保护共享数据,协调异步线程?
18 | 如何用硬件同步原语(CAS)替代锁?
19 | 数据压缩:时间换空间的游戏
20 | RocketMQ Producer源码分析:消息生产的实现过程
21 | Kafka Consumer源码分析:消息消费的实现过程
22 | Kafka和RocketMQ的消息复制实现的差异点在哪?
23 | RocketMQ客户端如何在集群中找到正确的节点?
24 | Kafka的协调服务ZooKeeper:实现分布式系统的“瑞士军刀”
25 | RocketMQ与Kafka中如何实现事务?
26 | MQTT协议:如何支持海量的在线IoT设备?
27 | Pulsar的存储计算分离设计:全新的消息队列设计思路
28 | 答疑解惑(二):我的100元哪儿去了?
案例篇 (7讲)
29 | 流计算与消息(一):通过Flink理解流计算的原理
30 | 流计算与消息(二):在流计算中使用Kafka链接计算任务
31 | 动手实现一个简单的RPC框架(一):原理和程序的结构
32 | 动手实现一个简单的RPC框架(二):通信与序列化
33 | 动手实现一个简单的RPC框架(三):客户端
34 | 动手实现一个简单的RPC框架(四):服务端
35 | 答疑解惑(三):主流消息队列都是如何存储消息的?
测试篇 (2讲)
期中测试丨10个消息队列热点问题自测
免费
期末测试 | 消息队列100分试卷等你来挑战!
结束语 (1讲)
结束语 | 程序员如何构建知识体系?
消息队列高手课
登录|注册

20 | RocketMQ Producer源码分析:消息生产的实现过程

李玥 2019-09-10
你好,我是李玥。
对于消息队列来说,它最核心的功能就是收发消息。也就是消息生产和消费这两个流程。我们在之前的课程中提到了消息队列一些常见问题,比如,“如何保证消息不会丢失?”“为什么会收到重复消息?”“消费时为什么要先执行消费业务逻辑再确认消费?”,针对这些问题,我讲过它们的实现原理,这些最终落地到代码上,都包含在这一收一发两个流程中。
在接下来的两节课中,我会带你一起通过分析源码的方式,详细学习一下这两个流程到底是如何实现的。你在日常使用消息队列的时候,遇到的大部分问题,更多的是跟 Producer 和 Consumer,也就是消息队列的客户端,关联更紧密。搞清楚客户端的实现原理和代码中的细节,将对你日后使用消息队列时进行问题排查有非常大的帮助。所以,我们这两节课的重点,也将放在分析客户端的源代码上。
秉着先易后难的原则,我们选择代码风格比较简明易懂的 RocketMQ 作为分析对象。一起分析 RocketMQ 的 Producer 的源代码,学习消息生产的实现过程。
在分析源代码的过程中,我们的首要目的就是搞清楚功能的实现原理,另外,最好能有敏锐的嗅觉,善于发现代码中优秀的设计和巧妙构思,学习、总结并记住这些方法。在日常开发中,再遇到类似场景,你就可以直接拿来使用。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《消息队列高手课》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(23)

  • lmtoo
    这种异步方式几乎没有意义,底层的netty已经实现了异步,这里只是在选择消息队列等判断的过程加了异步,最终callback还是由netty线程来调用的
    2019-09-10
    1
    6
  • 微微一笑
    老师好,先祝您节日快乐!!!您辛苦了~
    有几个疑问需要老师解答一下:
    ①今天在看rocketMq源码过程中,发现DefaultMQProducer有个属性defaultTopicQueueNums,它是用来设置topic的ConsumeQueue的数量的吗?我之前的理解是,consumeQueue的数量是创建topic的时候指定的,跟producer没有关系,那这个参数又有什么作用呢?
    ②在RocketMq的控制台上可以创建topic,需要指定writeQueueNums,readQueueNums,perm,这三个参数是有什么用呢?这里为什么要区分写队列跟读队列呢?不应该只有一个consumeQueue吗?
    ③用户请求-->异步处理--->用户收到响应结果。异步处理的作用是:用更少的线程来接收更多的用户请求,然后异步处理业务逻辑。老师,异步处理完后,如何将结果通知给原先的用户呢?即使有回调接口,我理解也是给用户发个短信之类的处理,那结果怎么返回到定位到用户,并返回之前请求的页面上呢?需要让之前的请求线程阻塞吗?那也无法达到【用更少的线程来接收更多的用户请求】的目的丫。
    望老师能指点迷津~~~

    作者回复: A1:这个参数是控制客户端在生产消费的时候会访问同一个主题的队列数量,假设一个主题有100个队列,对于每一个客户端来说,它没必要100个队列都访问,只需要使用其中的几个队列就行了。

    A2:writeQueueNums和readQueueNums是在服务端来控制每个客户端在生产和消费的时候,分别访问多少个队列。这两个参数是服务端参数,优先级是高于客户端控制的参数defaultTopicQueueNums的。perm是设置Topic读写等权限的参数,具体如何设置你需要去看一下文档。

    A3:如果局限于:“APP/浏览器 --[http协议]-->web 服务”这样的场景,受限于http协议,前端和web服务的交互一定是单向和同步的。一定要等待结果然后返回响应,但是,这种情况仍然可以使用异步的方法,这个我在“08答疑”中解释秒杀的时候其实已经给出了答案。很多同学不理解的原因是思维被web框架给限制住了。像spring web这种框架,它把处理web请求都给你封装好了,你只要写一个handler就行了,很方便。但是,这个handler只能是一个同步方法,它必须在返回值中给出响应结果,所以导致很多同学的思维转不过来这个弯儿。

    你可以结合我们讲的异步网络IO内容想一下,http协议发一个请求到服务端,就是发了一些数据过来,服务端回响应也就是在这个连接上给它返回一些数据回去就可以了。至于什么时候往回发响应数据,哪个线程来发,有要求吗?并没有。只要在超时之前发响应就可以了。我们讲得如何来实现异步网络IO的方法处理的不就是这种情况吗?

    这个过程不是说一定要做成和web框架一样的同步处理。

    2019-09-11
    1
    4
  • 明日
    李老师节日快乐!
    关于思考题看到了源码的注释说异常处理和超时时间有问题。
    自己看的话一是异常这里抛未知的原因,不够明确。
    二是这里用的线程池默认使用了虚拟机可用的线程,可能会对其他服务造成影响。
    三是超时时间这把线程阻塞可能等待的时间也包括进去了不太合适。
    感觉代码层次使用老师说过的completablefuture处理更优雅。另外底层使用了netty,应该直接用异步io就行了吧。
    2019-09-10
    2
  • leslie
    编程语言的话Python或Go可以么?极客时间里都有购买,就是忙着其它课程的学习,一直没顾的上编程语言的学习。
           从开始一路跟到现在:算是少数一直在完全没有缺的课;前期一直遍边学习边针对开篇时的学习目标针对当下工作环境的Nosql DB和MQ使用率的低下的问题找解决思路和方案,课后笔记主要同样集中在思路以及针对思路的困惑查疑上,代码这块完全没顾上。虽然代码的思路看的懂,发现动手能力确实非常欠缺。一路学到现在梳理到现在整体方案大致定下来:以及早期的部分课程的结束;课程的主要方案自己估计在掌握思路的基础上去补强Coding能力。虽然DBA的Coding能力都比较烂,不过还是得边学边啃下来;逼自己一下总能勉强写出来,估计就是效率问题、、、MQ这块PY或GO哪种更合适,或者说都可以?
            感谢老师一路的辛勤授业:授课之余尽力去帮助学生们解惑,让我们能一路走来一路成长;愿老师教师节快乐,谢谢老师的分享。

    作者回复: 个人建议学习Java或者Go,这两种语言都有不错的生态系统,都可以用来构建大规模集群。

    相对来说,Java的生态系统更强大,Go比较年轻,有很多Java不具备的语言特性。

    Python本来只是一门脚本语言,特别适合开发机器学习程序而火起来了,如果你不是从事机器学习相关的研发,不太建议作为第一语言来学习。

    2019-09-10
    2
  • 每天晒白牙
    我总结的kafka生产消息的源码分析
    https://mp.weixin.qq.com/s/-s34_y16HU6HR5HDsSD4bg
    2019-09-10
    1
  • z.l
    老师,异步发送为什么是弃用,还是没有看懂,感觉超时时间的计算没有错啊…

    作者回复: 我理解主要的原因除了超时时间的计算不准确以外,更重要的原因是这个异步方法还有改进的空间,其实可以直接结合Netty做到不需要Executor。

    2019-11-18
  • z.l
    DefaultMQProducerImpl的start和shutdown方法没有加同步,serviceState也只是一个普通成员变量没加volatile,不会有线程安全问题吗?

    作者回复: 这两个方法没有做到线程安全,但是这两个方法的实现内部,调用的方法都是线程安全的方法。

    2019-11-14
  • PeterLu
    课后作业,请老师指正:
    从方法的注释看,说是因为异常处理和超时时间的语义不对。
    异常处理这块我觉得应该是采用统一的异常处理,而不应该是有的异常抛出,而有的异常通过回调方法返回客户端。
    再说超时时间的错误语义,严格来说应该是不准确的超时时间,因为在run方法里进行时间判断(if (timeout > costTime))实际上已经是开始执行当前线程的时间,而之前的排队时间没有算,因此我改进的方法应该是这样:
    CompletableFuture.runAsync(() -> {
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeout > costTime) {
                    try {
                        sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
                    } catch (Exception e) {
                        sendCallback.onException(e);
                    }
                } else {
                    sendCallback.onException(
                            new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
                }
            }, executor);

    作者回复: 给认真思考完成作业的同学点赞👍!

    2019-11-04
    1
  • PeterLu
    老师继续请教问题:
    1.DefaultMQPullConsumer和DefaultMQPushConsumer有什么区别
    2.为什么pullConsumer的启动和producer的启动在同一个start方法里(最终都在MQClientInstance#start里)
    3.rebalanceService服务是干嘛的

    作者回复: PullConsumer:业务代码在需要的时候调用consumer.pullxxxx方法从consumer拉消息;PushConsumer:当有消息的时候,consumer会自动调用messageListener(业务处理消息的代码)。

    这两种方式主要是为了方便使用者进行线程控制,没有什么本质区别。

    2019-11-04
  • PeterLu
    老师请教问题:RocketMQ有个DefaultMQPullConsumer和DefaultMQPushConsumer,这两个类到底是干嘛的,它们到底什么关系?
    2019-11-04
  • . 。o O
    请教老师一个问题,如果异步发送的话,就是把发送逻辑封装成任务放到线程池里去处理,那么是不是就没法保证消息的顺序性了呢?哪怕是通过key哈希到一个同一个队列,但是发送消息的任务执行先后顺序没法保证吧?

    作者回复: 异步发送仍然可以保证严格顺序,但需要注意几点:

    1. 需要单线程异步发送;
    2. 需要记录一个递增流水号,保证每个发出的消息都有一个流水号,如果某个流水号的消息发送出错,需要重发这个流水号之后的所有消息。比如,连续异步发送12345这5条消息,假如已经异步发送了12345,然后异步检查发送结果的时候发现3发送失败了,需要从3开始重发。也就是按顺序重发345。
    3. 消费逻辑需要幂等,能接受2中的这种情况,也就是说,收到的消息有可能是:12(3丢了)45 345。

    2019-10-28
  • 囊子
    1. 异常处理问题:线程内部抛出的异常,比如MQBrokerException,客户端无法感知到,以为发送成功,会继续执行。
    2. 超时时间的概念问题:这里似乎去掉了线程调度的时间,将剩下的时间给了netty,个人感觉也应该包含进去。对客户端而言,调度是自己的事,不应包含在网络超时时间里。

    请老师指正。
    2019-09-28
  • Geek_71a2ee
    老师,辛苦了,能否讲讲发送端如何找到broker上的文件,队列和文件的关系,消息都放到文件上吧,能否展开梳理梳理,不然还是不懂怎么实现的
    2019-09-26
  • z.l
    看了Rocketmq producer源码,关于producer这块有个疑问不知道能否请教下?就是producer启动过程中为什么MQClientInstance mQClientFactory.start()这个方法需要被执行2次?2次的作用分别是什么?

    作者回复: 分别在哪两个地方执行的2次启动呢?

    2019-09-21
  • DFighting
    这里使用异步主要是提升消息发送的吞吐量,而在这过程中影响吞吐的有两个:磁盘IO和网络IO,而现在的这种异步方式好像并没有为这两部分设计详细的优化,似乎只是简单用了一个多线程去执行各自的操作,也就是并没有对真正的性能短板做优化。我觉得真要是异步的话,需要在这里维持一个队列、一个磁盘IO选择器和一个网络发送IO选择器,真正异步的点应该是两个选择器加并发协调处理待发送的数据。不过就像有些同学评价的,netty底层对各种IO已经做了很好的支持,这里的异步就显得很苍白无力了。不知netty是怎么设计异步来达到极致的性能的。
    2019-09-16
  • humor
    一是处理异常的代码很奇怪吧,有的异常使用sendCallback抛出,有的直接抛出;二是超时的语义有问题,现在的timeout意思是消息在线程池中排队的时间
    2019-09-16
  • 墙角儿的花
    老师 对于im服务器集群,客户端的socket均布在各个服务器,目标socket不在同一个服务器上时,服务器间需要转发消息,这个场景需要低延迟无需持久化,服务器间用redis的发布订阅,因其走内存较快,即使断电还可以走库。im服务器和入库服务间用其他mq解耦,因为这个环节需要持久化,所以选rocketmq或kafka,但kafka会延迟批量发布消息 所以选rocketmq,这两个环节的mq选型可行吗。

    作者回复: 有一个问题你需要考虑,你是不是需要为每一个会话(比如,张三和李四之间开始聊天,成为一个会话)在MQ中凑创建一个Topic呢?这样会导致MQ集群中的Topic数量非常多。假设你的系统注册用户数是n,理论上最多会需要 n x n 个Topic,这还没有计算用户拉的群。

    对于海量的Topic数量,RocketMQ和Kafka都不是太好的选择。

    2019-09-11
  • QQ怪
    老师,节日快乐🎉

    作者回复: 感谢!

    2019-09-10
  • 约书亚
    同楼上@lmtoo答案,源码一直追下去发现回调主要还是NettyRemoting做的,回调事件应该发生在netty的event executor绑定的线程内。最上层创建线程池没什么意义。改进的话是不是线程池去掉了就可以了。
    2019-09-10
  • leslie
    前期一直忙着强化和梳理一些基本功:操作系统、网络这块,学到现在发现老师的课程中的代码能看懂,大致思路也能明白;就是写不出。Python或者Go可以么? Java实在、、、Python和Go极客时间都有购买课程。
               可能目前线上的存储中间件现状比较差【许老师的课程对数据存储的定义,觉得有道理就直接现用了】,尤其是Nosql DB和MQ基本处于闲置,故而一直焦虑在这块;可是当现在初期迷惑已经解除且基本清晰时发现学习这门课和使用MQ的瓶颈就在代码能力上,毕竟DBA的Coding能力都比较差尤其是开发相关的能力;准备开始把之前报的开发语言的课程学习一遍。
             今天教师节:愿老师节日快乐,感激老师在授课之余一直如此辛勤的回帖解答我们的困惑;谢谢老师。

    作者回复: 感谢!
    代码能力这块儿,除了学习,还是多写代码,熟能生巧。

    2019-09-10
收起评论
23
返回
顶部