Java 业务开发常见错误 100 例
朱晔
贝壳金服资深架构师
51940 人已学习
新⼈⾸单¥59
登录后,你可以任选4讲全文学习
课程目录
已完结/共 48 讲
代码篇 (23讲)
Java 业务开发常见错误 100 例
15
15
1.0x
00:00/00:00
登录|注册

25 | 异步处理好用,但非常容易用错

你好,我是朱晔。今天,我来和你聊聊好用但容易出错的异步处理。
异步处理是互联网应用不可或缺的一种架构模式,大多数业务项目都是由同步处理、异步处理和定时任务处理三种模式相辅相成实现的。
区别于同步处理,异步处理无需同步等待流程处理完毕,因此适用场景主要包括:
服务于主流程的分支流程。比如,在注册流程中,把数据写入数据库的操作是主流程,但注册后给用户发优惠券或欢迎短信的操作是分支流程,时效性不那么强,可以进行异步处理。
用户不需要实时看到结果的流程。比如,下单后的配货、送货流程完全可以进行异步处理,每个阶段处理完成后,再给用户发推送或短信让用户知晓即可。
同时,异步处理因为可以有 MQ 中间件的介入用于任务的缓冲的分发,所以相比于同步处理,在应对流量洪峰、实现模块解耦和消息广播方面有功能优势。
不过,异步处理虽然好用,但在实现的时候却有三个最容易犯的错,分别是异步处理流程的可靠性问题、消息发送模式的区分问题,以及大量死信消息堵塞队列的问题。今天,我就用三个代码案例结合目前常用的 MQ 系统 RabbitMQ,来和你具体聊聊。
今天这一讲的演示,我都会使用 Spring AMQP 来操作 RabbitMQ,所以你需要先引入 amqp 依赖:
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 中文繁体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
    • 阿拉伯语
  • 解释
  • 总结
仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Java 业务开发常见错误 100 例》
新⼈⾸单¥59
立即购买
登录 后留言

全部留言(22)

  • 最新
  • 精选
  • vivi
    我之前做过一个demo 是基于canal做mysql数据同步,需要将解析好的数据发到kafka里面,再进行处理。在使用的时候发现这么一个问题,就是kafka多partition消费时不能保证消息的顺序消费,进而导致mysql数据同步异常。 由于kafka可以保证在同一个partition内消息有序,于是我自定义了一个分区器,将数据的id取hashcode然后根据partition的数量取余作为分区号,保证同一条数据的binlog能投递到同一个partition中,从而达到消息顺序消费的目的。

    作者回复: 这个实现很赞

    7
    35
  • 每天晒白牙
    老师,我理解的异步处理不仅仅是通过 MQ 来实现,还有其他方式 比如开新线程执行,返回 Future 还有各种异步框架,比如 Vertx,它是通过 callback 的方式实现 思考题 1.可能是数据写到了主库,然后查询了从库。但因为主从同步有延迟,导致没有查询到

    作者回复: 是的,或许本文标题可以改为消息队列:XXX 😀,不过文中的一些点是可以泛化到你提到的两种异步处理的 思考题一是我真实遇到的问题,当时倒不是因为主从的问题,而是因为业务代码把保存数据和发MQ消息放在了一个事务中,有概率收到消息的时候事务还没有提交完成,当时开发同学的处理方式是收MQ消息的时候sleep 1秒,或许应该是先提交事务,完成后再发MQ消息,但是这又出来一个问题MQ消息发送失败怎么办?所以后来演化为建立本地消息表来确保MQ消息可补偿,把业务处理和保存MQ消息到本地消息表操作在相同事务内处理,然后异步发送和补偿发送消息表中的消息到MQ

    9
    27
  • Darren
    第一个问题: 每天晒白牙大佬的回答和老师的回复已经很棒了,我就不班门弄斧了。 第二个问题: 自定义的私信队列,其实是发送失败,主要是生产者发送到mq的时候,发送失败,进了自定义的私信队列; DLX的方式的方式其实解决已到了mq,但是因为各种原因,无法到达正常的队列中,大概分类下面几种吧: 消息消费时被拒绝(basic.reject / basic.nack),并且requeue = false 消息TTL过期 队列达到最大长度 分享一下之前在公司内部分享的RabbitMQ的资料,欢迎大家交流 github上传不上去,只能用有道云笔记,请大家见谅 资料主要从: MQ选型及特点; AMQP与RabbitMQ模型; RabbitMQ核心概念; RabbitMQ相关机制; 这几个点去分析的,请大家多多指教。 http://note.youdao.com/noteshare?id=e9f2f88c6c7fcb7ac690463eb230650a

    作者回复: 感谢分享 👍🏻

    6
    20
  • 203。
    老师 我这里有个问题 关于Stream的,业务需求里需要按某几个字段去重(acctId,billingCycleId,prodInstId,offerId) 我这里想到了遍历集合areaDatas 后用contains方法判断 重写AcctItemYzfBean实体类的equals方法实现, 请问有没有更好的方法? 代码如下 List<AcctItemYzfBean> newList = new CopyOnWriteArrayList<>(); //循环过滤、增强翼支付数据 Optional.ofNullable(areaDatas)//集合判空 .orElse(new ArrayList<>()) .stream()//转化为流 便于下面过滤和增强数据 .filter(Objects::nonNull)//元素判空 .filter(yzfBean -> this.judgeIfOfferId(yzfBean))//判断销售品ID是否相同 .filter(yzfBean -> this.enhanceYzfBean(yzfBean))//增强过滤accNbr和acctId .filter(yzfBean -> this.judgeIfArrears(yzfBean))//判断是否不欠费 .filter(yzfBean -> this.judgeIfCancel(yzfBean))//判断是否销账金额大于0 .filter(yzfBean -> this.judgeIfReturn(yzfBean))//判断是否上月未返还 .forEach(yzfBean -> { //去重 重写AcctItemYzfBean.equals方法 if(!newList.contains(yzfBean)) { //增强latnName yzfBean.setLatnName(commonRegionMap.get(yzfBean.getRegionId())); //增强areaCode yzfBean.setAreaCode(areaCode); //数据封装 newList.add(yzfBean); } }); 重写的equals方法 @Override public boolean equals(Object yzfBeanObj) { if(yzfBeanObj instanceof AcctItemYzfBean) { AcctItemYzfBean yzfBean = (AcctItemYzfBean) yzfBeanObj; if(Tools.isEmpty(yzfBean.getAcctId(), yzfBean.getBillingCycleId(), yzfBean.getProdInstId(), yzfBean.getOfferId())) { return false; } if(yzfBean.getAcctId().equals(this.acctId) && yzfBean.getBillingCycleId().equals(this.billingCycleId) && yzfBean.getProdInstId().equals(this.prodInstId) && yzfBean.getOfferId().equals(this.offerId)) { return true; } } return super.equals(yzfBeanObj); }

    作者回复: 比如下面的类,id1和id2重复认为是重复的,id3不需要考虑 @Data @Builder @NoArgsConstructor @AllArgsConstructor static class Test { private String id1; private String id2; @EqualsAndHashCode.Exclude private String id3; } 通过Set去重或者通过distinct去重即可: List<Test> list = new ArrayList<>(); list.add(new Test("a","b","c")); list.add(new Test("a","b","d")); System.out.println(list.stream().collect(Collectors.toSet())); System.out.println(list.stream().distinct().collect(Collectors.toList()));

    2
    7
  • 似曾相识
    老师 1.如果实际生产中用使用 ConcurrentHashMap 来存放那些发过短信的用户 ID 实现幂等,如何一直往map中增加,会不会oom呢? 2.如果数据量巨大 使用ConcurrentSkipListMap 跳表会不会更好一些呢?

    作者回复: 这只是demo生产应用肯定用数据库做幂等的

    6
  • 王鹏
    mq发信息写到了事务中,导致了mq的消费时,事务还没有提交

    作者回复: 是,我遇到的就是这个情况

    3
    4
  • 郭石龙
    老师,你好,如果有多个补偿实例,会不会造成消息重复?

    作者回复: 补偿需要配合幂等

    4
  • 鲁鸣
    在event sourcing的时候,有一种方案是outbox,在发送方维持一个数据表,这样可以保证消息和业务数据在一个事务中,也是一种消息发送记录的方式

    作者回复: 这就是消息事务表模式

    1
  • G小调
    第一个问题,是否可以这样解决 1.先保存用户注册的数据,同时记录下要发送mq的消息,入库在一个事务里 2.通过异步任务定时拉取mq的消息表,发送到mq,进行处理 但这个有个问题,异步任务就能执行mq的的业务,那mq的价值是不是减少了

    作者回复: 其实这就是本地事务消息的实现 第二步不一定需要定时任务拉取 第一步完成后直接发mq即可 定时任务拉取只用来补偿

    2
    1
  • walle斌
    对了 许多同时用@Async标签实现 希望能够做到,但是又不用100%保证的解耦动作。实际也有隐患,不放把@Async标签底层实现更换为mq,是不是更合适?

    作者回复: @Async和MQ原理完全不同

    2
收起评论
显示
设置
留言
22
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部