19|案例:如何排查RocketMQ消息消费积压问题?
RocketMQ 的消息消费模型
- 深入了解
- 翻译
- 解释
- 总结
本文介绍了如何排查RocketMQ消息消费积压问题。首先,文章介绍了RocketMQ的消息消费模型,包括Delay和LastConsumeTime两个重要指标。然后详细解释了RocketMQ的消息消费处理模型,指出了可能导致消费积压的两个原因。接着,文章提出了限流机制来避免内存溢出,并介绍了如何排查RocketMQ消息消费积压问题,包括使用jstack命令跟踪线程栈,定位消费端慢的具体代码行。最后,总结了消费积压问题的根本原因,并分享了一些小经验。整体来说,本文通过深入分析RocketMQ消息消费积压问题的原因和排查方法,为读者提供了解决该问题的思路和方法。
《中间件核心技术与实战》,新⼈⾸单¥59
全部留言(2)
- 最新
- 精选
- 小杰因为消费位点是consumer定时发给broker的,而每次发的时候只发最小确认offset,所以有可能消费者挂了又起来,大量消息都得重新消费,老师是这个原因吗?
作者回复: 非常正确,👏👏,虽然rocketmq无法避免消息重复投送,但还是有义务尽量少发生重复推送。
2022-08-04归属地:上海23 - Sam Fu老师可以讲下消费成功或者失败 本地以及broker offset的更新机制吗? 是每条消息消费完都会更新本地offset还是拉取的一批全部处理完才会更新. 如果这一批有一条消费失败的就全部扔回吗 还是只扔消费失败的
作者回复: 好,你这个我分两个问题回答。 1、关于消息的位点提交机制,为了提高降低服务端持久化位点的压力,RocketMQ的位点提交机制是批量+异步方式。具体是消费端在处理完一条消息后,会提交位点,但这个位点提交只是将位点存储在本地内存缓存中,然后定时(默认每隔5s)将本地缓存一次性的提交到Broker,Broker收到位点提交机制后,更新内存中的位点缓存,然后每隔5s持久化到位点存储文件。 2、如果一批失败了是怎么提交,一般是指并发消费,我们可以留意一下MessageListenerConcurrently中consumeMessage这个方法声明:ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs, final ConsumeConcurrentlyContext context); 一批消息对应一个处理状态,所以如果其中一条消息失败了,这个时候这一批消息都会重试。
2023-02-04归属地:北京1