Java 业务开发常见错误 100 例
朱晔
贝壳金服资深架构师
52944 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 48 讲
代码篇 (23讲)
Java 业务开发常见错误 100 例
15
15
1.0x
00:00/00:00
登录|注册

25 | 异步处理好用,但非常容易用错

解决死信无限重复进入队列的问题
实现用户服务需要广播消息给会员服务和营销服务的逻辑
实现会员服务监听用户服务发出的新用户注册消息的逻辑
补偿Job定期进行消息补偿
用户注册后异步发送欢迎消息的场景
消息广播
实现模块解耦
应对流量洪峰
用户不需要实时看到结果的流程
服务于主流程的分支流程
使用DLX实现死信消息的重投递
用户注册后发送消息到MQ,会员服务查询数据库问题
别让死信堵塞了消息队列
注意消息模式是广播还是工作队列
异步处理需要消息补偿闭环
死信消息堵塞队列的问题
消息发送模式的区分问题
可靠性问题
功能优势
适用场景
由同步处理、异步处理和定时任务处理三种模式相辅相成实现
互联网应用不可或缺的一种架构模式
思考与讨论
代码案例结合RabbitMQ
异步处理容易犯的错
异步处理
异步处理好用,但写Java业务代码时非常容易用错

该思维导图由 AI 生成,仅供参考

你好,我是朱晔。今天,我来和你聊聊好用但容易出错的异步处理。
异步处理是互联网应用不可或缺的一种架构模式,大多数业务项目都是由同步处理、异步处理和定时任务处理三种模式相辅相成实现的。
区别于同步处理,异步处理无需同步等待流程处理完毕,因此适用场景主要包括:
服务于主流程的分支流程。比如,在注册流程中,把数据写入数据库的操作是主流程,但注册后给用户发优惠券或欢迎短信的操作是分支流程,时效性不那么强,可以进行异步处理。
用户不需要实时看到结果的流程。比如,下单后的配货、送货流程完全可以进行异步处理,每个阶段处理完成后,再给用户发推送或短信让用户知晓即可。
同时,异步处理因为可以有 MQ 中间件的介入用于任务的缓冲的分发,所以相比于同步处理,在应对流量洪峰、实现模块解耦和消息广播方面有功能优势。
不过,异步处理虽然好用,但在实现的时候却有三个最容易犯的错,分别是异步处理流程的可靠性问题、消息发送模式的区分问题,以及大量死信消息堵塞队列的问题。今天,我就用三个代码案例结合目前常用的 MQ 系统 RabbitMQ,来和你具体聊聊。
今天这一讲的演示,我都会使用 Spring AMQP 来操作 RabbitMQ,所以你需要先引入 amqp 依赖:
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结

本文通过三个代码案例结合RabbitMQ系统,使用Spring AMQP来操作RabbitMQ,详细讨论了异步处理流程中的消息发送、监听和补偿操作。文章分析了用户注册后异步发送欢迎消息的场景,以及如何处理可能出现的消息丢失情况。同时介绍了补偿Job的定时任务实现,以及如何处理补偿任务的并发和去重操作。强调了补偿闭环处理的最高标准,即能够达到补偿全量数据的吞吐量,确保流程都能正常执行。文章内容涵盖了异步处理的优势、常见错误及解决方法,以及对RabbitMQ的实际操作,为读者实现异步处理提供了全面的指导和参考。同时,文章还介绍了如何处理死信消息的问题,通过实例演示了如何使用Spring AMQP来处理死信消息,以及如何设置重试策略和死信队列的处理程序。这些内容对读者在实际业务代码中实现异步处理提供了有益的指导和参考。文章还提到了通过设置concurrentConsumers参数来增加消费线程,以避免性能问题,以及动态调整消费者线程数的方法。文章内容丰富,对读者实现异步处理提供了全面的指导和参考。文章还强调了在使用异步处理时需要考虑的四个方面问题,包括消息丢失、消息重复、消息路由配置和死信消息处理,为读者提供了思考和讨论的角度。

仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Java 业务开发常见错误 100 例》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(22)

  • 最新
  • 精选
  • vivi
    我之前做过一个demo 是基于canal做mysql数据同步,需要将解析好的数据发到kafka里面,再进行处理。在使用的时候发现这么一个问题,就是kafka多partition消费时不能保证消息的顺序消费,进而导致mysql数据同步异常。 由于kafka可以保证在同一个partition内消息有序,于是我自定义了一个分区器,将数据的id取hashcode然后根据partition的数量取余作为分区号,保证同一条数据的binlog能投递到同一个partition中,从而达到消息顺序消费的目的。

    作者回复: 这个实现很赞

    2020-05-12
    7
    35
  • 每天晒白牙
    老师,我理解的异步处理不仅仅是通过 MQ 来实现,还有其他方式 比如开新线程执行,返回 Future 还有各种异步框架,比如 Vertx,它是通过 callback 的方式实现 思考题 1.可能是数据写到了主库,然后查询了从库。但因为主从同步有延迟,导致没有查询到

    作者回复: 是的,或许本文标题可以改为消息队列:XXX 😀,不过文中的一些点是可以泛化到你提到的两种异步处理的 思考题一是我真实遇到的问题,当时倒不是因为主从的问题,而是因为业务代码把保存数据和发MQ消息放在了一个事务中,有概率收到消息的时候事务还没有提交完成,当时开发同学的处理方式是收MQ消息的时候sleep 1秒,或许应该是先提交事务,完成后再发MQ消息,但是这又出来一个问题MQ消息发送失败怎么办?所以后来演化为建立本地消息表来确保MQ消息可补偿,把业务处理和保存MQ消息到本地消息表操作在相同事务内处理,然后异步发送和补偿发送消息表中的消息到MQ

    2020-05-12
    9
    29
  • Darren
    第一个问题: 每天晒白牙大佬的回答和老师的回复已经很棒了,我就不班门弄斧了。 第二个问题: 自定义的私信队列,其实是发送失败,主要是生产者发送到mq的时候,发送失败,进了自定义的私信队列; DLX的方式的方式其实解决已到了mq,但是因为各种原因,无法到达正常的队列中,大概分类下面几种吧: 消息消费时被拒绝(basic.reject / basic.nack),并且requeue = false 消息TTL过期 队列达到最大长度 分享一下之前在公司内部分享的RabbitMQ的资料,欢迎大家交流 github上传不上去,只能用有道云笔记,请大家见谅 资料主要从: MQ选型及特点; AMQP与RabbitMQ模型; RabbitMQ核心概念; RabbitMQ相关机制; 这几个点去分析的,请大家多多指教。 http://note.youdao.com/noteshare?id=e9f2f88c6c7fcb7ac690463eb230650a

    作者回复: 感谢分享 👍🏻

    2020-05-12
    6
    20
  • 203。
    老师 我这里有个问题 关于Stream的,业务需求里需要按某几个字段去重(acctId,billingCycleId,prodInstId,offerId) 我这里想到了遍历集合areaDatas 后用contains方法判断 重写AcctItemYzfBean实体类的equals方法实现, 请问有没有更好的方法? 代码如下 List<AcctItemYzfBean> newList = new CopyOnWriteArrayList<>(); //循环过滤、增强翼支付数据 Optional.ofNullable(areaDatas)//集合判空 .orElse(new ArrayList<>()) .stream()//转化为流 便于下面过滤和增强数据 .filter(Objects::nonNull)//元素判空 .filter(yzfBean -> this.judgeIfOfferId(yzfBean))//判断销售品ID是否相同 .filter(yzfBean -> this.enhanceYzfBean(yzfBean))//增强过滤accNbr和acctId .filter(yzfBean -> this.judgeIfArrears(yzfBean))//判断是否不欠费 .filter(yzfBean -> this.judgeIfCancel(yzfBean))//判断是否销账金额大于0 .filter(yzfBean -> this.judgeIfReturn(yzfBean))//判断是否上月未返还 .forEach(yzfBean -> { //去重 重写AcctItemYzfBean.equals方法 if(!newList.contains(yzfBean)) { //增强latnName yzfBean.setLatnName(commonRegionMap.get(yzfBean.getRegionId())); //增强areaCode yzfBean.setAreaCode(areaCode); //数据封装 newList.add(yzfBean); } }); 重写的equals方法 @Override public boolean equals(Object yzfBeanObj) { if(yzfBeanObj instanceof AcctItemYzfBean) { AcctItemYzfBean yzfBean = (AcctItemYzfBean) yzfBeanObj; if(Tools.isEmpty(yzfBean.getAcctId(), yzfBean.getBillingCycleId(), yzfBean.getProdInstId(), yzfBean.getOfferId())) { return false; } if(yzfBean.getAcctId().equals(this.acctId) && yzfBean.getBillingCycleId().equals(this.billingCycleId) && yzfBean.getProdInstId().equals(this.prodInstId) && yzfBean.getOfferId().equals(this.offerId)) { return true; } } return super.equals(yzfBeanObj); }

    作者回复: 比如下面的类,id1和id2重复认为是重复的,id3不需要考虑 @Data @Builder @NoArgsConstructor @AllArgsConstructor static class Test { private String id1; private String id2; @EqualsAndHashCode.Exclude private String id3; } 通过Set去重或者通过distinct去重即可: List<Test> list = new ArrayList<>(); list.add(new Test("a","b","c")); list.add(new Test("a","b","d")); System.out.println(list.stream().collect(Collectors.toSet())); System.out.println(list.stream().distinct().collect(Collectors.toList()));

    2020-05-12
    2
    7
  • 似曾相识
    老师 1.如果实际生产中用使用 ConcurrentHashMap 来存放那些发过短信的用户 ID 实现幂等,如何一直往map中增加,会不会oom呢? 2.如果数据量巨大 使用ConcurrentSkipListMap 跳表会不会更好一些呢?

    作者回复: 这只是demo生产应用肯定用数据库做幂等的

    2020-05-17
    6
  • 王鹏
    mq发信息写到了事务中,导致了mq的消费时,事务还没有提交

    作者回复: 是,我遇到的就是这个情况

    2020-05-12
    3
    4
  • 郭石龙
    老师,你好,如果有多个补偿实例,会不会造成消息重复?

    作者回复: 补偿需要配合幂等

    2020-05-12
    4
  • 鲁鸣
    在event sourcing的时候,有一种方案是outbox,在发送方维持一个数据表,这样可以保证消息和业务数据在一个事务中,也是一种消息发送记录的方式

    作者回复: 这就是消息事务表模式

    2020-09-25
    1
  • G小调
    第一个问题,是否可以这样解决 1.先保存用户注册的数据,同时记录下要发送mq的消息,入库在一个事务里 2.通过异步任务定时拉取mq的消息表,发送到mq,进行处理 但这个有个问题,异步任务就能执行mq的的业务,那mq的价值是不是减少了

    作者回复: 其实这就是本地事务消息的实现 第二步不一定需要定时任务拉取 第一步完成后直接发mq即可 定时任务拉取只用来补偿

    2020-05-15
    2
    1
  • walle斌
    对了 许多同时用@Async标签实现 希望能够做到,但是又不用100%保证的解耦动作。实际也有隐患,不放把@Async标签底层实现更换为mq,是不是更合适?

    作者回复: @Async和MQ原理完全不同

    2021-07-12
    2
收起评论
显示
设置
留言
22
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部