作者回复: 你好,这个问题我是这么看的。业务代码是要优化,但我们还是要尽量在框架层面作出更好的优化,提供一些可干预的机制,例如通过调整消费线程数能真正发挥出作用。 我在开头遇到的窘境就是单个队列积压了1000多万消息,这个时候,我们无法加快进度,那个无助感。 经过我现在的优化模型后,项目组使用相同的代码,已经可以毫无压力扛过双十一了。推广成本非常低,无需扩容,能充分发挥单台机器的资源,资源利用率也提高了,好处还是很多的。
作者回复: 嗯,好的,今年我逐步更新一下答案
作者回复: 理解非常正确,这也是RocketMQ出现消费积压的关键所在,因为一旦出现这个问题,就会在rocketmq客户端触发限流,具体就是不再往服务器拉取新的消息,所以在broker的偏移量会越来越大,而消费位点一直无法向前推进,从而产生消费积压问题
作者回复: 谢谢你的提问,为你的思考点赞。 1、我稍微再补充一下最小位点提交机制。例如偏移量从小到到大排序为 m1,m2,m3,m4,m5,如果m5消费成功了,此时会上报m1,如果m2,m3,m3,m5都消费了,还是提交m1,但如果m1消费完成,如果处理队列现在为空,那提交哪个位点呢,其实这个时候,就会提交位点m5,因为在拉取的时候,记录了客户端拉取的最大位点。但这种机制确实会带来部分消息的重复消费,我在这篇文章详细介绍了RocketMQ中哪些设计会带来重复消费:https://mp.weixin.qq.com/s/wWgAbFLuesdb3BhY3GjxPg 2、这个建议你重点看一下ConsumeMessageOrderlyService的submitConsumeRequest,并且多线程是只是竞争执行,获得锁后还是从队列中按顺序获取消息的,也就是顺序消费ConsumeRequest这个任务,并没有关联具体的消息,而只传入了processQueue、MessageQueue,所以可以保证顺序处理消息 3、这个可能需要你看看是哪个地方不懂。我这里也尝试补充一下,所谓的队列,通常是指消费队列,也就是MessageQueue,还有一个本地处理队列(ProcessQueue),在客户端消费时用于最小位点提交。
作者回复: 你好,我觉得不算,虽然在发送阶段,会按照key选择队列,但一旦进入了队列,所有的消息就必须一条一条顺序执行,例如 存在 k1,k2,k3,k1的消息,那顺序就固定了,而文章中提出的方案,尽管k1,k2,k3,k1的消息进入的是一个队列,但我还可以让k1,k2,k3并发消费,因为他们代表不同的业务实体,例如不同的银行卡账号,相互之间无关联,可以并发,但两条k1,就必须顺序执行。
作者回复: 嗯,这个当然是可以的,我们也可以通过扩容的方式来减少对新消息的影响(容忍一定的顺序性语义破坏),但原先积压的消息,没那么快消耗掉,而且我记得当时业务代码比较复杂,里面各种调用dubbo接口,速度提升其他比较困难。
作者回复: RocketMQ官方实现是的,因为一个队列中的消息,需要一条一条处理,只有前面一条处理成功,才会继续处理下一条。而且rocketmq顺序消费的重试次数为 Interger.MAX_VALUE
作者回复: 你好,所以作者的方案是:在顺序消费队列的消费者内引入线程池,再次拆分可并行的任务进行执行?这个是对的。 但后半部分不是这样的,位点提交,还是使用最小位点提交,代码中还是维护了一个处理队列<MessageQueue, TreeMap<Long,Msg>>这样的结构,位点提交还是提交处理队列中最小位点。 数据丢失这个是不可降级的底线,一切优化必须坚持这个底线。