30 | 消息驱动:如何高效处理 Stream 中的异常?
姚秋辰
你好,我是姚秋辰。
在上节课中,我们通过 Spring Cloud Stream 和 RabbitMQ 落地了两个业务场景,实现了用户领券和删除券的操作。如果在 Consumer 消费消息的时候发生了异常,比如用户领取的优惠券超过了券模板约定的上限,或者用户想要删除一张压根不存在的券,那么 Consumer 会抛出一个运行期异常。你知道在 Stream 中有哪些优雅的异常处理方式呢?
你可以调用 deleteCoupon 接口删除一张不存在的优惠券,人为制造一个异常场景,你会观察到,在 Consumer 端的日志中,当前消费者逻辑被执行了三次。这三次执行包括首次消息消费和两次重试,这就是 Stream 默认的一种异常处理方式:消息重试。
接下来,我先带你从本地重试出发,看下如何在消费者端配置重试规则。然后再进一步带你了解消息降级和死信队列这两个异常处理手段。
消息重试
消息重试是一种简单高效的异常恢复手段,当 Consumer 端抛出异常的时候,Stream 会自动执行 2 次重试。重试次数是由 ConsumerProperties 类中的 maxAttempts 参数指定的,它设置了一个消息最多可以被 Consumer 执行几次。
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
- 深入了解
- 翻译
- 解释
- 总结
本文深入探讨了在消息驱动的应用中高效处理异常的多种方式。首先介绍了消息重试的机制,通过配置ConsumerProperties类中的maxAttempts参数和backOff参数来指定重试次数和重试规则,并讨论了如何关闭特定类型异常的重试功能。其次,文章讨论了异常降级方法,通过spring-integration组件的@ServiceActivator注解,可以为Consumer指定一段异常处理方法,实现自定义的降级逻辑。最后,文章提到了死信队列的概念,作为那些几经重试彻底没救的消息的最终归宿。通过配置死信交换机,可以将异常消息发送到死信队列,进一步处理这些异常消息。文章还介绍了如何配置死信队列以及使用RabbitMQ插件来移动死信队列的消息,以及死信队列的一些使用心得。总的来说,本文为读者提供了丰富的技术知识和实践经验,使其能够快速了解消息驱动中异常处理的多种方式。
仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Spring Cloud 微服务项目实战》,新⼈⾸单¥59
《Spring Cloud 微服务项目实战》,新⼈⾸单¥59
立即购买
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
登录 后留言
全部留言(3)
- 最新
- 精选
- javaadu事务消息是针对发送者的,消费者要确保做好异常处理和重试逻辑。 基于RocketMQ或Kafka的事务消息,实现分布式事务: (1)订单系统在消息队列上开启一个事务 (2)订单系统给消息服务器发送一个“半消息”,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。 (3)半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。 (4)根据本地事务的执行结果决定提交或者回滚事务消息。如果这一步失败,kafka是直接抛出异常,让用户在业务层代码自己处理;RocketMQ 中的事务实现中,增加了事务反查的机制来解决事务消息提交失败的问题。 (5)投递消息,消费者先执行消费的逻辑,只有消费逻辑执行成果后再进行消息的ack确认
作者回复: 同学这个解释非常的赞,用到事务消息的场景基于MQ构建的居多,这里面的半消息反查机制是理解原理的关键
2022-08-07归属地:上海4 - so long老师,我在测试requeue的时候,只设置requeue-rejected: true无法生效,需要同步设置auto-bind-dlq: false,这样才能生效,是不是我的版本问题?
作者回复: 默认情况下其实不用设置auto-bind-dlq,因为默认新创建的队列不会创建死信交换机。我猜想同学这个队列是之前声明成了DLD,因此已经具备死信交换功能了,所以要手动设置下false
2022-02-252 - peter请教老师两个关于Gateway的问题: Q1:Spring Gateway和Nacos、微服务应用通信是用RPC还是Rest API? Q2:Spring Gateway内部有“微服务客户端”吗?
作者回复: Q1:openfeign是rest风格,如果用dubbo+nacos是rpc Q2:Gateway本身也是一个微服务,作为client注册到nacos
2022-02-211
收起评论