中间件核心技术与实战
丁威
中通快递资深架构师,RocketMQ 社区首席布道师
19674 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 33 讲
中间件核心技术与实战
15
15
1.0x
00:00/00:00
登录|注册

16|案例:如何提升RocketMQ顺序消费性能?

你好,我是丁威。
在课程正式开始之前,我想先分享一段我的经历。我记得 2020 年双十一的时候,公司订单中心有一个业务出现了很大程度的延迟。我们的系统为了根据订单状态的变更进行对应的业务处理,使用了 RocketMQ 的顺序消费。但是经过排查,我们发现每一个队列都积压了上千万条消息。
当时为了解决这个问题,我们首先决定快速扩容消费者。因为当时主题的总队列为 64 个,所以我们一口气将消费者扩容到了 64 台。但上千万条消息毕竟还是太多了。还有其他办法能够加快消息的消费速度吗?比较尴尬的是,没有,我们当时能做的只有等待。
作为公司消息中间件的负责人,在故障发生时没有其他其他补救手段确实比较无奈。事后,我对顺序消费模型进行了反思与改善。接下来,我想和你介绍我是如何优化 RocketMQ 的顺序消费性能的。

RocketMQ 顺序消费实现原理

我们先来了解一下 RocketMQ 顺序消费的实现原理。RocketMQ 支持局部顺序消息消费,可以保证同一个消费队列上的消息顺序消费。例如,消息发送者向主题为 ORDER_TOPIC 的 4 个队列共发送 12 条消息, RocketMQ 可以保证 1、4、8 这三条按顺序消费,但无法保证消息 4 和消息 2 的先后顺序。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

RocketMQ顺序消费性能优化是本文的核心内容。作者分享了在双十一期间遇到的订单中心业务延迟问题,并介绍了RocketMQ顺序消费的实现原理和设计缺陷。文章指出RocketMQ为了实现顺序消费引入了三把锁,导致并发性能受限。为了解决这一问题,作者提出了基于关联顺序性的顺序消费改进模型,通过优化消息拉取和消费线程的设计,提高了并发度,从而优化了RocketMQ的顺序消费性能。 文章通过详细的代码实现和时序图展示了顺序消费模型的改进方案。作者提出了创建消费线程池、消费线程内部执行流程、Pull线程和消息路由机制等关键实现要点。通过降低锁的粒度,将并发度从队列级别降低到消息级别,从而显著提升了性能。这一优化方案对于理解RocketMQ顺序消费的内部机制以及解决实际应用中的性能问题具有重要意义。 总的来说,本文通过实际案例和详细的技术实现,深入探讨了RocketMQ顺序消费性能优化的方法和原理,为读者提供了宝贵的性能优化思路和方法。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《中间件核心技术与实战》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(11)

  • 最新
  • 精选
  • james
    其实顺序消费速率的瓶颈在业务,也就是消费者的处理逻辑,并不在MQ, 感觉这个方案走偏了,根据阿姆达尔定律,你应该做的是改变最能提升性能的地方,而不是在一个本来就很快的地方

    作者回复: 你好,这个问题我是这么看的。业务代码是要优化,但我们还是要尽量在框架层面作出更好的优化,提供一些可干预的机制,例如通过调整消费线程数能真正发挥出作用。 我在开头遇到的窘境就是单个队列积压了1000多万消息,这个时候,我们无法加快进度,那个无助感。 经过我现在的优化模型后,项目组使用相同的代码,已经可以毫无压力扛过双十一了。推广成本非常低,无需扩容,能充分发挥单台机器的资源,资源利用率也提高了,好处还是很多的。

    2022-07-27归属地:上海
    5
    2
  • Louise
    你好,丁威老师能不能课后题目都有个答案,像mysql45讲专栏一样呢?因为自己这方面接触少,有些问题还真想不出来,看评论也看的不是很懂

    作者回复: 嗯,好的,今年我逐步更新一下答案

    2022-10-11归属地:浙江
    1
  • 越过山丘
    丁威老师,请教一下,如果一批消息中有一条消息出了问题,导致阻塞了一个消费线程很长时间,按照最小位点提交的策略,这个是不是会导致位点一直不推进了

    作者回复: 理解非常正确,这也是RocketMQ出现消费积压的关键所在,因为一旦出现这个问题,就会在rocketmq客户端触发限流,具体就是不再往服务器拉取新的消息,所以在broker的偏移量会越来越大,而消费位点一直无法向前推进,从而产生消费积压问题

    2022-09-11归属地:上海
    1
  • Geek_9a02e8
    1、上报最小位点?那不是下次拉取的时候还从这个偏移量开始拉取,不是一直重复拉同一批数据了吗? 2、开始的顺序消费模型怎么保证队列顺序的?虽然取数据到提交的过程都加锁了,可是实际消费是放到消费线程池了呀,这种异步没法保证顺序的吧? 3 好多地方提到队列,我都分不清是客户端的什么队列还是服务端的什么队列了…

    作者回复: 谢谢你的提问,为你的思考点赞。 1、我稍微再补充一下最小位点提交机制。例如偏移量从小到到大排序为 m1,m2,m3,m4,m5,如果m5消费成功了,此时会上报m1,如果m2,m3,m3,m5都消费了,还是提交m1,但如果m1消费完成,如果处理队列现在为空,那提交哪个位点呢,其实这个时候,就会提交位点m5,因为在拉取的时候,记录了客户端拉取的最大位点。但这种机制确实会带来部分消息的重复消费,我在这篇文章详细介绍了RocketMQ中哪些设计会带来重复消费:https://mp.weixin.qq.com/s/wWgAbFLuesdb3BhY3GjxPg 2、这个建议你重点看一下ConsumeMessageOrderlyService的submitConsumeRequest,并且多线程是只是竞争执行,获得锁后还是从队列中按顺序获取消息的,也就是顺序消费ConsumeRequest这个任务,并没有关联具体的消息,而只传入了processQueue、MessageQueue,所以可以保证顺序处理消息 3、这个可能需要你看看是哪个地方不懂。我这里也尝试补充一下,所谓的队列,通常是指消费队列,也就是MessageQueue,还有一个本地处理队列(ProcessQueue),在客户端消费时用于最小位点提交。

    2023-02-07归属地:福建
    2
  • 嘉嘉☕
    老师好,请问下,rocketmq实现的顺序消费 不算是关联顺序性吗?

    作者回复: 你好,我觉得不算,虽然在发送阶段,会按照key选择队列,但一旦进入了队列,所有的消息就必须一条一条顺序执行,例如 存在 k1,k2,k3,k1的消息,那顺序就固定了,而文章中提出的方案,尽管k1,k2,k3,k1的消息进入的是一个队列,但我还可以让k1,k2,k3并发消费,因为他们代表不同的业务实体,例如不同的银行卡账号,相互之间无关联,可以并发,但两条k1,就必须顺序执行。

    2023-01-11归属地:浙江
  • 文敦复
    提个思路,请教下:1优化消费者代码,如果不行,那么2提升消费者数量,如果已经达到队列长度,那么3重建一个长度队列更大的主题,改原消费者代码为按照业务逻辑重新分发消息到新的主题,老的消费者代码放到新的消费者这里。不知道如何评价?😅

    作者回复: 嗯,这个当然是可以的,我们也可以通过扩容的方式来减少对新消息的影响(容忍一定的顺序性语义破坏),但原先积压的消息,没那么快消耗掉,而且我记得当时业务代码比较复杂,里面各种调用dubbo接口,速度提升其他比较困难。

    2022-12-15归属地:四川
  • Geek_9d39c4
    老师请教一下顺序消费 假设现在有两个队列 order1的状态变迁发送到q1 order2的发送到q2 order3的发送到q1 此时order3是不是要等order1全部都消费完才能消费order3

    作者回复: RocketMQ官方实现是的,因为一个队列中的消息,需要一条一条处理,只有前面一条处理成功,才会继续处理下一条。而且rocketmq顺序消费的重试次数为 Interger.MAX_VALUE

    2022-12-02归属地:上海
    2
  • 放不下荣华富贵
    所以作者的方案是:在顺序消费队列的消费者内引入线程池,再次拆分可并行的任务进行执行? 然后激进的先更新位点后分发任务,宁可丢消息,也不会大批量重复消费造成位点回溯?

    作者回复: 你好,所以作者的方案是:在顺序消费队列的消费者内引入线程池,再次拆分可并行的任务进行执行?这个是对的。 但后半部分不是这样的,位点提交,还是使用最小位点提交,代码中还是维护了一个处理队列<MessageQueue, TreeMap<Long,Msg>>这样的结构,位点提交还是提交处理队列中最小位点。 数据丢失这个是不可降级的底线,一切优化必须坚持这个底线。

    2022-08-18归属地:上海
  • shen
    有两个问题咨询下老师 1,开头增加机器到64台,是单机处理不过来了吗?如果处理不过来后面改造增加线程处理也应该处理不过来吧 2,为什么不增加队列数量?集群队列是64,那么增加到128,256,是不是也可以达到相同的目的
    2023-10-07归属地:广东
  • Geek_460f3a
    老师您好,为啥每次拉去完消息都要先暂停,再恢复,是为了啥 while (isRunning) { List<MessageExt> records = consumer.poll(consumerPollTimeoutMs); submitRecords(records); consumerLimitController.pause(); consumerLimitController.resume(); }
    2023-03-10归属地:广东
收起评论
显示
设置
留言
11
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部