16|案例:如何提升RocketMQ顺序消费性能?
RocketMQ 顺序消费实现原理
- 深入了解
- 翻译
- 解释
- 总结
RocketMQ顺序消费性能优化是本文的核心内容。作者分享了在双十一期间遇到的订单中心业务延迟问题,并介绍了RocketMQ顺序消费的实现原理和设计缺陷。文章指出RocketMQ为了实现顺序消费引入了三把锁,导致并发性能受限。为了解决这一问题,作者提出了基于关联顺序性的顺序消费改进模型,通过优化消息拉取和消费线程的设计,提高了并发度,从而优化了RocketMQ的顺序消费性能。 文章通过详细的代码实现和时序图展示了顺序消费模型的改进方案。作者提出了创建消费线程池、消费线程内部执行流程、Pull线程和消息路由机制等关键实现要点。通过降低锁的粒度,将并发度从队列级别降低到消息级别,从而显著提升了性能。这一优化方案对于理解RocketMQ顺序消费的内部机制以及解决实际应用中的性能问题具有重要意义。 总的来说,本文通过实际案例和详细的技术实现,深入探讨了RocketMQ顺序消费性能优化的方法和原理,为读者提供了宝贵的性能优化思路和方法。
《中间件核心技术与实战》,新⼈⾸单¥59
全部留言(11)
- 最新
- 精选
- james其实顺序消费速率的瓶颈在业务,也就是消费者的处理逻辑,并不在MQ, 感觉这个方案走偏了,根据阿姆达尔定律,你应该做的是改变最能提升性能的地方,而不是在一个本来就很快的地方
作者回复: 你好,这个问题我是这么看的。业务代码是要优化,但我们还是要尽量在框架层面作出更好的优化,提供一些可干预的机制,例如通过调整消费线程数能真正发挥出作用。 我在开头遇到的窘境就是单个队列积压了1000多万消息,这个时候,我们无法加快进度,那个无助感。 经过我现在的优化模型后,项目组使用相同的代码,已经可以毫无压力扛过双十一了。推广成本非常低,无需扩容,能充分发挥单台机器的资源,资源利用率也提高了,好处还是很多的。
2022-07-27归属地:上海52 - Louise你好,丁威老师能不能课后题目都有个答案,像mysql45讲专栏一样呢?因为自己这方面接触少,有些问题还真想不出来,看评论也看的不是很懂
作者回复: 嗯,好的,今年我逐步更新一下答案
2022-10-11归属地:浙江1 - 越过山丘丁威老师,请教一下,如果一批消息中有一条消息出了问题,导致阻塞了一个消费线程很长时间,按照最小位点提交的策略,这个是不是会导致位点一直不推进了
作者回复: 理解非常正确,这也是RocketMQ出现消费积压的关键所在,因为一旦出现这个问题,就会在rocketmq客户端触发限流,具体就是不再往服务器拉取新的消息,所以在broker的偏移量会越来越大,而消费位点一直无法向前推进,从而产生消费积压问题
2022-09-11归属地:上海1 - Geek_9a02e81、上报最小位点?那不是下次拉取的时候还从这个偏移量开始拉取,不是一直重复拉同一批数据了吗? 2、开始的顺序消费模型怎么保证队列顺序的?虽然取数据到提交的过程都加锁了,可是实际消费是放到消费线程池了呀,这种异步没法保证顺序的吧? 3 好多地方提到队列,我都分不清是客户端的什么队列还是服务端的什么队列了…
作者回复: 谢谢你的提问,为你的思考点赞。 1、我稍微再补充一下最小位点提交机制。例如偏移量从小到到大排序为 m1,m2,m3,m4,m5,如果m5消费成功了,此时会上报m1,如果m2,m3,m3,m5都消费了,还是提交m1,但如果m1消费完成,如果处理队列现在为空,那提交哪个位点呢,其实这个时候,就会提交位点m5,因为在拉取的时候,记录了客户端拉取的最大位点。但这种机制确实会带来部分消息的重复消费,我在这篇文章详细介绍了RocketMQ中哪些设计会带来重复消费:https://mp.weixin.qq.com/s/wWgAbFLuesdb3BhY3GjxPg 2、这个建议你重点看一下ConsumeMessageOrderlyService的submitConsumeRequest,并且多线程是只是竞争执行,获得锁后还是从队列中按顺序获取消息的,也就是顺序消费ConsumeRequest这个任务,并没有关联具体的消息,而只传入了processQueue、MessageQueue,所以可以保证顺序处理消息 3、这个可能需要你看看是哪个地方不懂。我这里也尝试补充一下,所谓的队列,通常是指消费队列,也就是MessageQueue,还有一个本地处理队列(ProcessQueue),在客户端消费时用于最小位点提交。
2023-02-07归属地:福建2 - 嘉嘉☕老师好,请问下,rocketmq实现的顺序消费 不算是关联顺序性吗?
作者回复: 你好,我觉得不算,虽然在发送阶段,会按照key选择队列,但一旦进入了队列,所有的消息就必须一条一条顺序执行,例如 存在 k1,k2,k3,k1的消息,那顺序就固定了,而文章中提出的方案,尽管k1,k2,k3,k1的消息进入的是一个队列,但我还可以让k1,k2,k3并发消费,因为他们代表不同的业务实体,例如不同的银行卡账号,相互之间无关联,可以并发,但两条k1,就必须顺序执行。
2023-01-11归属地:浙江 - 文敦复提个思路,请教下:1优化消费者代码,如果不行,那么2提升消费者数量,如果已经达到队列长度,那么3重建一个长度队列更大的主题,改原消费者代码为按照业务逻辑重新分发消息到新的主题,老的消费者代码放到新的消费者这里。不知道如何评价?😅
作者回复: 嗯,这个当然是可以的,我们也可以通过扩容的方式来减少对新消息的影响(容忍一定的顺序性语义破坏),但原先积压的消息,没那么快消耗掉,而且我记得当时业务代码比较复杂,里面各种调用dubbo接口,速度提升其他比较困难。
2022-12-15归属地:四川 - Geek_9d39c4老师请教一下顺序消费 假设现在有两个队列 order1的状态变迁发送到q1 order2的发送到q2 order3的发送到q1 此时order3是不是要等order1全部都消费完才能消费order3
作者回复: RocketMQ官方实现是的,因为一个队列中的消息,需要一条一条处理,只有前面一条处理成功,才会继续处理下一条。而且rocketmq顺序消费的重试次数为 Interger.MAX_VALUE
2022-12-02归属地:上海2 - 放不下荣华富贵所以作者的方案是:在顺序消费队列的消费者内引入线程池,再次拆分可并行的任务进行执行? 然后激进的先更新位点后分发任务,宁可丢消息,也不会大批量重复消费造成位点回溯?
作者回复: 你好,所以作者的方案是:在顺序消费队列的消费者内引入线程池,再次拆分可并行的任务进行执行?这个是对的。 但后半部分不是这样的,位点提交,还是使用最小位点提交,代码中还是维护了一个处理队列<MessageQueue, TreeMap<Long,Msg>>这样的结构,位点提交还是提交处理队列中最小位点。 数据丢失这个是不可降级的底线,一切优化必须坚持这个底线。
2022-08-18归属地:上海 - shen有两个问题咨询下老师 1,开头增加机器到64台,是单机处理不过来了吗?如果处理不过来后面改造增加线程处理也应该处理不过来吧 2,为什么不增加队列数量?集群队列是64,那么增加到128,256,是不是也可以达到相同的目的2023-10-07归属地:广东
- Geek_460f3a老师您好,为啥每次拉去完消息都要先暂停,再恢复,是为了啥 while (isRunning) { List<MessageExt> records = consumer.poll(consumerPollTimeoutMs); submitRecords(records); consumerLimitController.pause(); consumerLimitController.resume(); }2023-03-10归属地:广东