• james
    2022-07-27 来自上海
    其实顺序消费速率的瓶颈在业务,也就是消费者的处理逻辑,并不在MQ, 感觉这个方案走偏了,根据阿姆达尔定律,你应该做的是改变最能提升性能的地方,而不是在一个本来就很快的地方

    作者回复: 你好,这个问题我是这么看的。业务代码是要优化,但我们还是要尽量在框架层面作出更好的优化,提供一些可干预的机制,例如通过调整消费线程数能真正发挥出作用。 我在开头遇到的窘境就是单个队列积压了1000多万消息,这个时候,我们无法加快进度,那个无助感。 经过我现在的优化模型后,项目组使用相同的代码,已经可以毫无压力扛过双十一了。推广成本非常低,无需扩容,能充分发挥单台机器的资源,资源利用率也提高了,好处还是很多的。

    共 5 条评论
    2
  • Louise
    2022-10-11 来自浙江
    你好,丁威老师能不能课后题目都有个答案,像mysql45讲专栏一样呢?因为自己这方面接触少,有些问题还真想不出来,看评论也看的不是很懂

    作者回复: 嗯,好的,今年我逐步更新一下答案

    
    1
  • 越过山丘
    2022-09-11 来自上海
    丁威老师,请教一下,如果一批消息中有一条消息出了问题,导致阻塞了一个消费线程很长时间,按照最小位点提交的策略,这个是不是会导致位点一直不推进了

    作者回复: 理解非常正确,这也是RocketMQ出现消费积压的关键所在,因为一旦出现这个问题,就会在rocketmq客户端触发限流,具体就是不再往服务器拉取新的消息,所以在broker的偏移量会越来越大,而消费位点一直无法向前推进,从而产生消费积压问题

    
    1
  • Geek_9a02e8
    2023-02-07 来自福建
    1、上报最小位点?那不是下次拉取的时候还从这个偏移量开始拉取,不是一直重复拉同一批数据了吗? 2、开始的顺序消费模型怎么保证队列顺序的?虽然取数据到提交的过程都加锁了,可是实际消费是放到消费线程池了呀,这种异步没法保证顺序的吧? 3 好多地方提到队列,我都分不清是客户端的什么队列还是服务端的什么队列了…

    作者回复: 谢谢你的提问,为你的思考点赞。 1、我稍微再补充一下最小位点提交机制。例如偏移量从小到到大排序为 m1,m2,m3,m4,m5,如果m5消费成功了,此时会上报m1,如果m2,m3,m3,m5都消费了,还是提交m1,但如果m1消费完成,如果处理队列现在为空,那提交哪个位点呢,其实这个时候,就会提交位点m5,因为在拉取的时候,记录了客户端拉取的最大位点。但这种机制确实会带来部分消息的重复消费,我在这篇文章详细介绍了RocketMQ中哪些设计会带来重复消费:https://mp.weixin.qq.com/s/wWgAbFLuesdb3BhY3GjxPg 2、这个建议你重点看一下ConsumeMessageOrderlyService的submitConsumeRequest,并且多线程是只是竞争执行,获得锁后还是从队列中按顺序获取消息的,也就是顺序消费ConsumeRequest这个任务,并没有关联具体的消息,而只传入了processQueue、MessageQueue,所以可以保证顺序处理消息 3、这个可能需要你看看是哪个地方不懂。我这里也尝试补充一下,所谓的队列,通常是指消费队列,也就是MessageQueue,还有一个本地处理队列(ProcessQueue),在客户端消费时用于最小位点提交。

    共 2 条评论
    
  • 嘉嘉☕
    2023-01-11 来自浙江
    老师好,请问下,rocketmq实现的顺序消费 不算是关联顺序性吗?

    作者回复: 你好,我觉得不算,虽然在发送阶段,会按照key选择队列,但一旦进入了队列,所有的消息就必须一条一条顺序执行,例如 存在 k1,k2,k3,k1的消息,那顺序就固定了,而文章中提出的方案,尽管k1,k2,k3,k1的消息进入的是一个队列,但我还可以让k1,k2,k3并发消费,因为他们代表不同的业务实体,例如不同的银行卡账号,相互之间无关联,可以并发,但两条k1,就必须顺序执行。

    
    
  • 文敦复
    2022-12-15 来自四川
    提个思路,请教下:1优化消费者代码,如果不行,那么2提升消费者数量,如果已经达到队列长度,那么3重建一个长度队列更大的主题,改原消费者代码为按照业务逻辑重新分发消息到新的主题,老的消费者代码放到新的消费者这里。不知道如何评价?😅

    作者回复: 嗯,这个当然是可以的,我们也可以通过扩容的方式来减少对新消息的影响(容忍一定的顺序性语义破坏),但原先积压的消息,没那么快消耗掉,而且我记得当时业务代码比较复杂,里面各种调用dubbo接口,速度提升其他比较困难。

    
    
  • Geek_9d39c4
    2022-12-02 来自上海
    老师请教一下顺序消费 假设现在有两个队列 order1的状态变迁发送到q1 order2的发送到q2 order3的发送到q1 此时order3是不是要等order1全部都消费完才能消费order3

    作者回复: RocketMQ官方实现是的,因为一个队列中的消息,需要一条一条处理,只有前面一条处理成功,才会继续处理下一条。而且rocketmq顺序消费的重试次数为 Interger.MAX_VALUE

    共 2 条评论
    
  • 放不下荣华富贵
    2022-08-18 来自上海
    所以作者的方案是:在顺序消费队列的消费者内引入线程池,再次拆分可并行的任务进行执行? 然后激进的先更新位点后分发任务,宁可丢消息,也不会大批量重复消费造成位点回溯?

    作者回复: 你好,所以作者的方案是:在顺序消费队列的消费者内引入线程池,再次拆分可并行的任务进行执行?这个是对的。 但后半部分不是这样的,位点提交,还是使用最小位点提交,代码中还是维护了一个处理队列<MessageQueue, TreeMap<Long,Msg>>这样的结构,位点提交还是提交处理队列中最小位点。 数据丢失这个是不可降级的底线,一切优化必须坚持这个底线。

    
    
  • Geek_460f3a
    2023-03-10 来自广东
    老师您好,为啥每次拉去完消息都要先暂停,再恢复,是为了啥 while (isRunning) { List<MessageExt> records = consumer.poll(consumerPollTimeoutMs); submitRecords(records); consumerLimitController.pause(); consumerLimitController.resume(); }
    
    
  • Geek_460f3a
    2023-03-07 来自广东
    代码有bug,DefaultMQLitePushConsumer的isRunning永远是false,永远不会拉数据
    
    