消息队列高手课
李玥
美团高级技术专家
52199 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 42 讲
进阶篇 (21讲)
消息队列高手课
15
15
1.0x
00:00/00:00
登录|注册

20 | RocketMQ Producer源码分析:消息生产的实现过程

NettyRemotingClient
MQClientAPIImpl
MQClientInstance
sendKernelImpl()
sendSelectImpl()
MessageQueueSelector
DefaultMQProducer#start()
MQAdmin
DefaultMQProducerImpl
MQProducer
DefaultMQProducer
testSendMessageSync_SuccessWithHook
testSendMessageAsync_BodyCompressed
testSendMessageAsync
testSendMessageAsync_Success
testSendMessageSync_WithBodyCompressed
testSendMessageSync_Success
testSendMessage_NoRoute
testSendMessage_NoNameSrv
testSendMessage_ZeroMessage
terminate
init
消息发送过程
启动过程
RocketMQ的Producer入口类
RocketMQ客户端的单元测试
消息生产的实现过程
RocketMQ Producer源码分析

该思维导图由 AI 生成,仅供参考

你好,我是李玥。
对于消息队列来说,它最核心的功能就是收发消息。也就是消息生产和消费这两个流程。我们在之前的课程中提到了消息队列一些常见问题,比如,“如何保证消息不会丢失?”“为什么会收到重复消息?”“消费时为什么要先执行消费业务逻辑再确认消费?”,针对这些问题,我讲过它们的实现原理,这些最终落地到代码上,都包含在这一收一发两个流程中。
在接下来的两节课中,我会带你一起通过分析源码的方式,详细学习一下这两个流程到底是如何实现的。你在日常使用消息队列的时候,遇到的大部分问题,更多的是跟 Producer 和 Consumer,也就是消息队列的客户端,关联更紧密。搞清楚客户端的实现原理和代码中的细节,将对你日后使用消息队列时进行问题排查有非常大的帮助。所以,我们这两节课的重点,也将放在分析客户端的源代码上。
秉着先易后难的原则,我们选择代码风格比较简明易懂的 RocketMQ 作为分析对象。一起分析 RocketMQ 的 Producer 的源代码,学习消息生产的实现过程。
在分析源代码的过程中,我们的首要目的就是搞清楚功能的实现原理,另外,最好能有敏锐的嗅觉,善于发现代码中优秀的设计和巧妙构思,学习、总结并记住这些方法。在日常开发中,再遇到类似场景,你就可以直接拿来使用。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

RocketMQ Producer源码分析文章深入介绍了消息生产的实现过程,重点分析了消息生产和消费这两个流程。通过分析RocketMQ的Producer源代码,详细讲解了消息生产的实现过程。文章强调了编写测试用例的重要性,并建议在日常开发中多写一些测试用例,以提高单元测试的覆盖率。在文章中,作者还介绍了RocketMQ使用的设计模式——门面模式,并对状态模式和单例模式进行了详细解释。通过分析源码的方式,文章详细讲解了RocketMQ Producer的实现过程,对于想深入了解消息队列的开发者具有一定的参考价值。整体而言,本文内容丰富,涵盖了消息队列的核心功能和源码实现细节,对读者快速了解RocketMQ Producer的工作原理具有很高的参考价值。文章还提到了RocketMQ客户端消息生产的实现过程,包括Producer初始化和发送消息的主流程。文章还介绍了消息发送的三种方式:单向、同步和异步,以及对应的发送流程。同时,文章还提到了几个重要的业务逻辑实现类,包括DefaultMQProducerImpl、MQClientInstance、MQClientAPIImpl和NettyRemotingClient。最后,文章提出了一个思考题,引导读者思考如何改进异步发送消息的流程。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《消息队列高手课》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(31)

  • 最新
  • 精选
  • . 。o O
    请教老师一个问题,如果异步发送的话,就是把发送逻辑封装成任务放到线程池里去处理,那么是不是就没法保证消息的顺序性了呢?哪怕是通过key哈希到一个同一个队列,但是发送消息的任务执行先后顺序没法保证吧?

    作者回复: 异步发送仍然可以保证严格顺序,但需要注意几点: 1. 需要单线程异步发送; 2. 需要记录一个递增流水号,保证每个发出的消息都有一个流水号,如果某个流水号的消息发送出错,需要重发这个流水号之后的所有消息。比如,连续异步发送12345这5条消息,假如已经异步发送了12345,然后异步检查发送结果的时候发现3发送失败了,需要从3开始重发。也就是按顺序重发345。 3. 消费逻辑需要幂等,能接受2中的这种情况,也就是说,收到的消息有可能是:12(3丢了)45 345。

    2019-10-28
    3
    35
  • 微微一笑
    老师好,先祝您节日快乐!!!您辛苦了~ 有几个疑问需要老师解答一下: ①今天在看rocketMq源码过程中,发现DefaultMQProducer有个属性defaultTopicQueueNums,它是用来设置topic的ConsumeQueue的数量的吗?我之前的理解是,consumeQueue的数量是创建topic的时候指定的,跟producer没有关系,那这个参数又有什么作用呢? ②在RocketMq的控制台上可以创建topic,需要指定writeQueueNums,readQueueNums,perm,这三个参数是有什么用呢?这里为什么要区分写队列跟读队列呢?不应该只有一个consumeQueue吗? ③用户请求-->异步处理--->用户收到响应结果。异步处理的作用是:用更少的线程来接收更多的用户请求,然后异步处理业务逻辑。老师,异步处理完后,如何将结果通知给原先的用户呢?即使有回调接口,我理解也是给用户发个短信之类的处理,那结果怎么返回到定位到用户,并返回之前请求的页面上呢?需要让之前的请求线程阻塞吗?那也无法达到【用更少的线程来接收更多的用户请求】的目的丫。 望老师能指点迷津~~~

    作者回复: A1:这个参数是控制客户端在生产消费的时候会访问同一个主题的队列数量,假设一个主题有100个队列,对于每一个客户端来说,它没必要100个队列都访问,只需要使用其中的几个队列就行了。 A2:writeQueueNums和readQueueNums是在服务端来控制每个客户端在生产和消费的时候,分别访问多少个队列。这两个参数是服务端参数,优先级是高于客户端控制的参数defaultTopicQueueNums的。perm是设置Topic读写等权限的参数,具体如何设置你需要去看一下文档。 A3:如果局限于:“APP/浏览器 --[http协议]-->web 服务”这样的场景,受限于http协议,前端和web服务的交互一定是单向和同步的。一定要等待结果然后返回响应,但是,这种情况仍然可以使用异步的方法,这个我在“08答疑”中解释秒杀的时候其实已经给出了答案。很多同学不理解的原因是思维被web框架给限制住了。像spring web这种框架,它把处理web请求都给你封装好了,你只要写一个handler就行了,很方便。但是,这个handler只能是一个同步方法,它必须在返回值中给出响应结果,所以导致很多同学的思维转不过来这个弯儿。 你可以结合我们讲的异步网络IO内容想一下,http协议发一个请求到服务端,就是发了一些数据过来,服务端回响应也就是在这个连接上给它返回一些数据回去就可以了。至于什么时候往回发响应数据,哪个线程来发,有要求吗?并没有。只要在超时之前发响应就可以了。我们讲得如何来实现异步网络IO的方法处理的不就是这种情况吗? 这个过程不是说一定要做成和web框架一样的同步处理。

    2019-09-11
    3
    23
  • z.l
    老师,异步发送为什么是弃用,还是没有看懂,感觉超时时间的计算没有错啊…

    作者回复: 我理解主要的原因除了超时时间的计算不准确以外,更重要的原因是这个异步方法还有改进的空间,其实可以直接结合Netty做到不需要Executor。

    2019-11-18
    13
  • Peter
    课后作业,请老师指正: 从方法的注释看,说是因为异常处理和超时时间的语义不对。 异常处理这块我觉得应该是采用统一的异常处理,而不应该是有的异常抛出,而有的异常通过回调方法返回客户端。 再说超时时间的错误语义,严格来说应该是不准确的超时时间,因为在run方法里进行时间判断(if (timeout > costTime))实际上已经是开始执行当前线程的时间,而之前的排队时间没有算,因此我改进的方法应该是这样: CompletableFuture.runAsync(() -> { long costTime = System.currentTimeMillis() - beginStartTime; if (timeout > costTime) { try { sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime); } catch (Exception e) { sendCallback.onException(e); } } else { sendCallback.onException( new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout")); } }, executor);

    作者回复: 给认真思考完成作业的同学点赞👍!

    2019-11-04
    7
    7
  • leslie
    编程语言的话Python或Go可以么?极客时间里都有购买,就是忙着其它课程的学习,一直没顾的上编程语言的学习。 从开始一路跟到现在:算是少数一直在完全没有缺的课;前期一直遍边学习边针对开篇时的学习目标针对当下工作环境的Nosql DB和MQ使用率的低下的问题找解决思路和方案,课后笔记主要同样集中在思路以及针对思路的困惑查疑上,代码这块完全没顾上。虽然代码的思路看的懂,发现动手能力确实非常欠缺。一路学到现在梳理到现在整体方案大致定下来:以及早期的部分课程的结束;课程的主要方案自己估计在掌握思路的基础上去补强Coding能力。虽然DBA的Coding能力都比较烂,不过还是得边学边啃下来;逼自己一下总能勉强写出来,估计就是效率问题、、、MQ这块PY或GO哪种更合适,或者说都可以? 感谢老师一路的辛勤授业:授课之余尽力去帮助学生们解惑,让我们能一路走来一路成长;愿老师教师节快乐,谢谢老师的分享。

    作者回复: 个人建议学习Java或者Go,这两种语言都有不错的生态系统,都可以用来构建大规模集群。 相对来说,Java的生态系统更强大,Go比较年轻,有很多Java不具备的语言特性。 Python本来只是一门脚本语言,特别适合开发机器学习程序而火起来了,如果你不是从事机器学习相关的研发,不太建议作为第一语言来学习。

    2019-09-10
    6
  • 墙角儿的花
    老师 对于im服务器集群,客户端的socket均布在各个服务器,目标socket不在同一个服务器上时,服务器间需要转发消息,这个场景需要低延迟无需持久化,服务器间用redis的发布订阅,因其走内存较快,即使断电还可以走库。im服务器和入库服务间用其他mq解耦,因为这个环节需要持久化,所以选rocketmq或kafka,但kafka会延迟批量发布消息 所以选rocketmq,这两个环节的mq选型可行吗。

    作者回复: 有一个问题你需要考虑,你是不是需要为每一个会话(比如,张三和李四之间开始聊天,成为一个会话)在MQ中凑创建一个Topic呢?这样会导致MQ集群中的Topic数量非常多。假设你的系统注册用户数是n,理论上最多会需要 n x n 个Topic,这还没有计算用户拉的群。 对于海量的Topic数量,RocketMQ和Kafka都不是太好的选择。

    2019-09-11
    3
    5
  • Peter
    老师继续请教问题: 1.DefaultMQPullConsumer和DefaultMQPushConsumer有什么区别 2.为什么pullConsumer的启动和producer的启动在同一个start方法里(最终都在MQClientInstance#start里) 3.rebalanceService服务是干嘛的

    作者回复: PullConsumer:业务代码在需要的时候调用consumer.pullxxxx方法从consumer拉消息;PushConsumer:当有消息的时候,consumer会自动调用messageListener(业务处理消息的代码)。 这两种方式主要是为了方便使用者进行线程控制,没有什么本质区别。

    2019-11-04
    1
  • fomy
    1、为什么ServiceState变量不设置成volatile呢? 2、消费者MessageQueue(readQueueNums)怎么和生产者MessageQueue(writeQueueNums)关联起来的呢?比如readQueueNums=19个,writeQueueNums=23个,它们是怎么关联的呢?

    作者回复: A1: 因为它服务的类没有设计成线程安全的,所以也没必要用volatile关键字。 A2:writeQueueNums和readQueueNums是在服务端来控制每个客户端在生产和消费的时候,分别访问多少个队列。因为对主题来说,生产者的实例数和消费者的实例数是没有关系的,所以这两个参数是不关联的。

    2020-02-16
  • z.l
    DefaultMQProducerImpl的start和shutdown方法没有加同步,serviceState也只是一个普通成员变量没加volatile,不会有线程安全问题吗?

    作者回复: 这两个方法没有做到线程安全,但是这两个方法的实现内部,调用的方法都是线程安全的方法。

    2019-11-14
  • z.l
    看了Rocketmq producer源码,关于producer这块有个疑问不知道能否请教下?就是producer启动过程中为什么MQClientInstance mQClientFactory.start()这个方法需要被执行2次?2次的作用分别是什么?

    作者回复: 分别在哪两个地方执行的2次启动呢?

    2019-09-21
收起评论
显示
设置
留言
31
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部