25 | 异步处理好用,但非常容易用错
该思维导图由 AI 生成,仅供参考
- 深入了解
- 翻译
- 解释
- 总结
本文通过三个代码案例结合RabbitMQ系统,使用Spring AMQP来操作RabbitMQ,详细讨论了异步处理流程中的消息发送、监听和补偿操作。文章分析了用户注册后异步发送欢迎消息的场景,以及如何处理可能出现的消息丢失情况。同时介绍了补偿Job的定时任务实现,以及如何处理补偿任务的并发和去重操作。强调了补偿闭环处理的最高标准,即能够达到补偿全量数据的吞吐量,确保流程都能正常执行。文章内容涵盖了异步处理的优势、常见错误及解决方法,以及对RabbitMQ的实际操作,为读者实现异步处理提供了全面的指导和参考。同时,文章还介绍了如何处理死信消息的问题,通过实例演示了如何使用Spring AMQP来处理死信消息,以及如何设置重试策略和死信队列的处理程序。这些内容对读者在实际业务代码中实现异步处理提供了有益的指导和参考。文章还提到了通过设置concurrentConsumers参数来增加消费线程,以避免性能问题,以及动态调整消费者线程数的方法。文章内容丰富,对读者实现异步处理提供了全面的指导和参考。文章还强调了在使用异步处理时需要考虑的四个方面问题,包括消息丢失、消息重复、消息路由配置和死信消息处理,为读者提供了思考和讨论的角度。
《Java 业务开发常见错误 100 例》,新⼈⾸单¥59
全部留言(22)
- 最新
- 精选
- vivi我之前做过一个demo 是基于canal做mysql数据同步,需要将解析好的数据发到kafka里面,再进行处理。在使用的时候发现这么一个问题,就是kafka多partition消费时不能保证消息的顺序消费,进而导致mysql数据同步异常。 由于kafka可以保证在同一个partition内消息有序,于是我自定义了一个分区器,将数据的id取hashcode然后根据partition的数量取余作为分区号,保证同一条数据的binlog能投递到同一个partition中,从而达到消息顺序消费的目的。
作者回复: 这个实现很赞
2020-05-12735 - 每天晒白牙老师,我理解的异步处理不仅仅是通过 MQ 来实现,还有其他方式 比如开新线程执行,返回 Future 还有各种异步框架,比如 Vertx,它是通过 callback 的方式实现 思考题 1.可能是数据写到了主库,然后查询了从库。但因为主从同步有延迟,导致没有查询到
作者回复: 是的,或许本文标题可以改为消息队列:XXX 😀,不过文中的一些点是可以泛化到你提到的两种异步处理的 思考题一是我真实遇到的问题,当时倒不是因为主从的问题,而是因为业务代码把保存数据和发MQ消息放在了一个事务中,有概率收到消息的时候事务还没有提交完成,当时开发同学的处理方式是收MQ消息的时候sleep 1秒,或许应该是先提交事务,完成后再发MQ消息,但是这又出来一个问题MQ消息发送失败怎么办?所以后来演化为建立本地消息表来确保MQ消息可补偿,把业务处理和保存MQ消息到本地消息表操作在相同事务内处理,然后异步发送和补偿发送消息表中的消息到MQ
2020-05-12929 - Darren第一个问题: 每天晒白牙大佬的回答和老师的回复已经很棒了,我就不班门弄斧了。 第二个问题: 自定义的私信队列,其实是发送失败,主要是生产者发送到mq的时候,发送失败,进了自定义的私信队列; DLX的方式的方式其实解决已到了mq,但是因为各种原因,无法到达正常的队列中,大概分类下面几种吧: 消息消费时被拒绝(basic.reject / basic.nack),并且requeue = false 消息TTL过期 队列达到最大长度 分享一下之前在公司内部分享的RabbitMQ的资料,欢迎大家交流 github上传不上去,只能用有道云笔记,请大家见谅 资料主要从: MQ选型及特点; AMQP与RabbitMQ模型; RabbitMQ核心概念; RabbitMQ相关机制; 这几个点去分析的,请大家多多指教。 http://note.youdao.com/noteshare?id=e9f2f88c6c7fcb7ac690463eb230650a
作者回复: 感谢分享 👍🏻
2020-05-12620 - 203。老师 我这里有个问题 关于Stream的,业务需求里需要按某几个字段去重(acctId,billingCycleId,prodInstId,offerId) 我这里想到了遍历集合areaDatas 后用contains方法判断 重写AcctItemYzfBean实体类的equals方法实现, 请问有没有更好的方法? 代码如下 List<AcctItemYzfBean> newList = new CopyOnWriteArrayList<>(); //循环过滤、增强翼支付数据 Optional.ofNullable(areaDatas)//集合判空 .orElse(new ArrayList<>()) .stream()//转化为流 便于下面过滤和增强数据 .filter(Objects::nonNull)//元素判空 .filter(yzfBean -> this.judgeIfOfferId(yzfBean))//判断销售品ID是否相同 .filter(yzfBean -> this.enhanceYzfBean(yzfBean))//增强过滤accNbr和acctId .filter(yzfBean -> this.judgeIfArrears(yzfBean))//判断是否不欠费 .filter(yzfBean -> this.judgeIfCancel(yzfBean))//判断是否销账金额大于0 .filter(yzfBean -> this.judgeIfReturn(yzfBean))//判断是否上月未返还 .forEach(yzfBean -> { //去重 重写AcctItemYzfBean.equals方法 if(!newList.contains(yzfBean)) { //增强latnName yzfBean.setLatnName(commonRegionMap.get(yzfBean.getRegionId())); //增强areaCode yzfBean.setAreaCode(areaCode); //数据封装 newList.add(yzfBean); } }); 重写的equals方法 @Override public boolean equals(Object yzfBeanObj) { if(yzfBeanObj instanceof AcctItemYzfBean) { AcctItemYzfBean yzfBean = (AcctItemYzfBean) yzfBeanObj; if(Tools.isEmpty(yzfBean.getAcctId(), yzfBean.getBillingCycleId(), yzfBean.getProdInstId(), yzfBean.getOfferId())) { return false; } if(yzfBean.getAcctId().equals(this.acctId) && yzfBean.getBillingCycleId().equals(this.billingCycleId) && yzfBean.getProdInstId().equals(this.prodInstId) && yzfBean.getOfferId().equals(this.offerId)) { return true; } } return super.equals(yzfBeanObj); }
作者回复: 比如下面的类,id1和id2重复认为是重复的,id3不需要考虑 @Data @Builder @NoArgsConstructor @AllArgsConstructor static class Test { private String id1; private String id2; @EqualsAndHashCode.Exclude private String id3; } 通过Set去重或者通过distinct去重即可: List<Test> list = new ArrayList<>(); list.add(new Test("a","b","c")); list.add(new Test("a","b","d")); System.out.println(list.stream().collect(Collectors.toSet())); System.out.println(list.stream().distinct().collect(Collectors.toList()));
2020-05-1227 - 似曾相识老师 1.如果实际生产中用使用 ConcurrentHashMap 来存放那些发过短信的用户 ID 实现幂等,如何一直往map中增加,会不会oom呢? 2.如果数据量巨大 使用ConcurrentSkipListMap 跳表会不会更好一些呢?
作者回复: 这只是demo生产应用肯定用数据库做幂等的
2020-05-176 - 王鹏mq发信息写到了事务中,导致了mq的消费时,事务还没有提交
作者回复: 是,我遇到的就是这个情况
2020-05-1234 - 郭石龙老师,你好,如果有多个补偿实例,会不会造成消息重复?
作者回复: 补偿需要配合幂等
2020-05-124 - 鲁鸣在event sourcing的时候,有一种方案是outbox,在发送方维持一个数据表,这样可以保证消息和业务数据在一个事务中,也是一种消息发送记录的方式
作者回复: 这就是消息事务表模式
2020-09-251 - G小调第一个问题,是否可以这样解决 1.先保存用户注册的数据,同时记录下要发送mq的消息,入库在一个事务里 2.通过异步任务定时拉取mq的消息表,发送到mq,进行处理 但这个有个问题,异步任务就能执行mq的的业务,那mq的价值是不是减少了
作者回复: 其实这就是本地事务消息的实现 第二步不一定需要定时任务拉取 第一步完成后直接发mq即可 定时任务拉取只用来补偿
2020-05-1521 - walle斌对了 许多同时用@Async标签实现 希望能够做到,但是又不用100%保证的解耦动作。实际也有隐患,不放把@Async标签底层实现更换为mq,是不是更合适?
作者回复: @Async和MQ原理完全不同
2021-07-122