Java并发编程实战
王宝令
资深架构师
立即订阅
15026 人已学习
课程目录
已完结 50 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 你为什么需要学习并发编程?
免费
学习攻略 (1讲)
学习攻略 | 如何才能学好并发编程?
第一部分:并发理论基础 (13讲)
01 | 可见性、原子性和有序性问题:并发编程Bug的源头
02 | Java内存模型:看Java如何解决可见性和有序性问题
03 | 互斥锁(上):解决原子性问题
04 | 互斥锁(下):如何用一把锁保护多个资源?
05 | 一不小心就死锁了,怎么办?
06 | 用“等待-通知”机制优化循环等待
07 | 安全性、活跃性以及性能问题
08 | 管程:并发编程的万能钥匙
09 | Java线程(上):Java线程的生命周期
10 | Java线程(中):创建多少线程才是合适的?
11 | Java线程(下):为什么局部变量是线程安全的?
12 | 如何用面向对象思想写好并发程序?
13 | 理论基础模块热点问题答疑
第二部分:并发工具类 (14讲)
14 | Lock和Condition(上):隐藏在并发包中的管程
15 | Lock和Condition(下):Dubbo如何用管程实现异步转同步?
16 | Semaphore:如何快速实现一个限流器?
17 | ReadWriteLock:如何快速实现一个完备的缓存?
18 | StampedLock:有没有比读写锁更快的锁?
19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致?
20 | 并发容器:都有哪些“坑”需要我们填?
21 | 原子类:无锁工具类的典范
22 | Executor与线程池:如何创建正确的线程池?
23 | Future:如何用多线程实现最优的“烧水泡茶”程序?
24 | CompletableFuture:异步编程没那么难
25 | CompletionService:如何批量执行异步任务?
26 | Fork/Join:单机版的MapReduce
27 | 并发工具类模块热点问题答疑
第三部分:并发设计模式 (10讲)
28 | Immutability模式:如何利用不变性解决并发问题?
29 | Copy-on-Write模式:不是延时策略的COW
30 | 线程本地存储模式:没有共享,就没有伤害
31 | Guarded Suspension模式:等待唤醒机制的规范实现
32 | Balking模式:再谈线程安全的单例模式
33 | Thread-Per-Message模式:最简单实用的分工方法
34 | Worker Thread模式:如何避免重复创建线程?
35 | 两阶段终止模式:如何优雅地终止线程?
36 | 生产者-消费者模式:用流水线思想提高效率
37 | 设计模式模块热点问题答疑
第四部分:案例分析 (4讲)
38 | 案例分析(一):高性能限流器Guava RateLimiter
39 | 案例分析(二):高性能网络应用框架Netty
40 | 案例分析(三):高性能队列Disruptor
41 | 案例分析(四):高性能数据库连接池HiKariCP
第五部分:其他并发模型 (4讲)
42 | Actor模型:面向对象原生的并发模型
43 | 软件事务内存:借鉴数据库的并发经验
44 | 协程:更轻量级的线程
45 | CSP模型:Golang的主力队员
结束语 (1讲)
结束语 | 十年之后,初心依旧
用户故事 (2讲)
用户来信 | 真好,面试考到这些并发编程,我都答对了!
3 个用户来信 | 打开一个新的并发世界
Java并发编程实战
登录|注册

24 | CompletableFuture:异步编程没那么难

王宝令 2019-04-23
前面我们不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作。如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得把它们并行化,那具体实施起来该怎么做呢?
//以下两个方法都是耗时操作
doBizA();
doBizB();
还是挺简单的,就像下面代码中这样,创建两个子线程去执行就可以了。你会发现下面的并行方案,主线程无需等待 doBizA() 和 doBizB() 的执行结果,也就是说 doBizA() 和 doBizB() 两个操作已经被异步化了。
new Thread(()->doBizA())
.start();
new Thread(()->doBizB())
.start();
异步化,是并行方案得以实施的基础,更深入地讲其实就是:利用多线程优化性能这个核心方案得以实施的基础。看到这里,相信你应该就能理解异步编程最近几年为什么会大火了,因为优化性能是互联网大厂的一个核心需求啊。Java 在 1.8 版本提供了 CompletableFuture 来支持异步编程,CompletableFuture 有可能是你见过的最复杂的工具类了,不过功能也着实让人感到震撼。

CompletableFuture 的核心优势

为了领略 CompletableFuture 异步编程的优势,这里我们用 CompletableFuture 重新实现前面曾提及的烧水泡茶程序。首先还是需要先完成分工方案,在下面的程序中,我们分了 3 个任务:任务 1 负责洗水壶、烧开水,任务 2 负责洗茶壶、洗茶杯和拿茶叶,任务 3 负责泡茶。其中任务 3 要等待任务 1 和任务 2 都完成后才能开始。这个分工如下图所示。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Java并发编程实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(43)

  • 袁阳
    思考题:
    1,读数据库属于io操作,应该放在单独线程池,避免线程饥饿
    2,异常未处理

    作者回复: 👍👍

    2019-04-23
    25
  • 刘晓林
    思考题:
    1.没有进行异常处理,
    2.要指定专门的线程池做数据库查询
    3.如果检查和查询都比较耗时,那么应该像之前的对账系统一样,采用生产者和消费者模式,让上一次的检查和下一次的查询并行起来。

    另外,老师把javadoc里那一堆那一堆方法进行了分类,分成串行、并行、AND聚合、OR聚合,简直太棒了,一下子就把这些方法纳入到一个完整的结构体系里了。简直棒

    作者回复: 思考题考虑的很全面👍

    2019-04-23
    18
  • 密码123456
    我在想一个问题,明明是串行过程,直接写就可以了。为什么还要用异步去实现串行?

    作者回复: 这个简单场景没必要用

    2019-04-23
    14
  • 青莲
    1.查数据库属于io操作,用定制线程池
    2.查出来的结果做为下一步处理的条件,若结果为空呢,没有对应处理
    3.缺少异常处理机制

    作者回复: 👍👍

    2019-04-23
    8
  • 发条橙子 。
    老师 ,我有个疑问。 completableFuture 中各种关系(并行、串行、聚合),实际上就覆盖了各种需求场景。 例如 : 线程A 等待 线程B 或者 线程C 等待 线程A和B 。

    我们之前讲的并发包里面 countdownLatch , 或者 threadPoolExecutor 和future 就是来解决这些关系场景的 , 那有了 completableFuture 这个类 ,是不是以后有需求都优先考虑用 completableFuture ?感觉这个类就可以解决前面所讲的类的问题了

    作者回复: 我觉得可以优先使用CompletableFuture,当然前提是你的jdk是1.8

    2019-04-24
    5
  • tyul
    回答「密码123456」:CompletableFuture 在执行的过程中可以不阻塞主线程,支持 runAsync、anyOf、allOf 等操作,等某个时间点需要异步执行的结果时再阻塞获取。

    作者回复: 是的,复杂场景就能体现出优势了

    2019-04-23
    1
    3
  • 刘晓林
    我觉得既然都讲到CompletableFuture了,老师是不是有必要不一章ForkJoinPool呀?毕竟,ForkJoinPool和ThreadPoolExecutor还是有很多不一样的。谢谢老师

    作者回复: 后面有介绍

    2019-04-23
    3
  • Michael
    老师 你好,对文章点赞这种功能异步如何实现?

    作者回复: 喊一嗓子,让朋友点

    2019-05-23
    2
  • 笃行之
    ”如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。”老师,阻塞在io上和是不是在一个线程池没关系吧?

    作者回复: 有关系,如果系统就一个线程池,里面的线程都阻塞在io上,那么系统其他的任务都需要等待。如果其他任务有自己的线程池,就没有问题。

    2019-04-29
    1
  • 星辰
    1.我只是感觉到异常没处理

    2.没考虑到io动作需要定制线程去处理

    java异步编程看起来很有意思……
    2019-04-25
    1
  • henry
    老师我现在有个任务,和您的例子有相似的地方,是从一个库里查询多张表的数据同步到另外一个库,就有双重for循环,最外层用与多张表的遍历,内层的for循环用于批量读取某一张表的数据,因为数据量可能在几万条,我想分批次读出来再同步到另一个数据库,昨天写的时候用的是futuretask,今天正好看到老师的文章就改成了CompletableFuture,还没有用异常处理的,后面我还要看看怎么加上异常处理的。其它的不知道我用的对不对,请老师看看:
       // 初始化异步工具类,分别异步执行2个任务
            CompletableFuture<List<PBSEnergyData>> asyncAquirePBSEnergyData = new CompletableFuture();
            CompletableFuture<List<AXEEnergyData>> asyncSaveAxeEnergyData = new CompletableFuture();
            // 初始化两个线程池, 分别用于2个任务 ,1个任务一个线程池,互不干扰
            Executor aquirePBSEnergyDataExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
            Executor saveAxeEnergyDataExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
            queryUtils.getTableNames().forEach(tableName -> {
                int pageSize = queryUtils.getPageSize();
                //查询该表有多少条数据,每${pageSize}条一次
                int count = pbsEnergyService.getCount(tableName);
                //总页数
                int pages = count / pageSize;
                int pageNum = 0;
                final int pageNo = pageNum;
                for(pageNum = 0; pageNum <= pages; pageNum++){
                    // 异步获取PBS数据库的数据并返回结果
                    asyncAquirePBSEnergyData
                            .supplyAsync(() -> {
                        查询数据库
                        return pbsEnergyDatas;
                    },aquirePBSEnergyDataExecutor)
                            // 任务2任务1,任务1返回的结果
                            .thenApply(pbsEnergyDatas -> asyncSaveAxeEnergyData.runAsync(()->{
                        List<AXEEnergyData> axeEnergyDatas = pbsEnergyDatas.stream().map(pbsEnergyData -> {
                        //进行类型转换
                        }).collect(Collectors.toList());
                        //批量保存
                    },saveAxeEnergyDataExecutor));
                }
            });
    全部贴上去,超过字符数了,只能请老师凑合看了 :(

    作者回复: 有个地方需要注意:runAsync和supplyAsync都是静态方法。
    线程池设置的太小了,这是个IO密集型的任务
    thenApply里面的runAsync我觉得好像是没有必要,增加了复杂的了。

    如果thenApply里面需要异步,可以用thenApplyAsync

    2019-04-24
    1
  • 易儿易
    老师我有一个问题:在描述串行关系时,为什么参数没有other?这让我觉得并不是在描述两个子任务的串行关系,而是给第一个子任务追加了一个类似“回调方法”fn等……而并行关系和汇聚关系则很明确的出现了other……

    作者回复: 你也可以理解成给第一个子任务追加了一个类似“回调方法”。回调不也是在第一个任务执行完才回调吗?所以也是串行的。都是一回事,你怎么理解起来顺手就怎么理解就可以了。

    2019-04-23
    1
  • linqw
    课后习题,规则校验依赖于数据库中的规则,如果规则不是共用的,直接放在一个内部,如果规则是共用,可以在主线程进行规则获取,异步校验规则。thenApply会重新创建一个CompletableFuture
    PurchersOrder po;
    CompletableFuture<Boolean> cf =
      CompletableFuture.supplyAsync(()->{
        // 在数据库中查询规则
        r = findRuleByJdbc();
        // 规则校验
        return check(po, r);
      });
    Boolean isOk = cf.join();
    CompletableFuture的写法和rxjava的使用很类似,一个结果作为下一个的参数,链式操作等
    2019-04-23
    1
  • 木木匠
    我觉得课后思考题中,既然是先查规则再校验,这本来就是一个串行化的动作,为什么要异步呢?用异步的意义在哪?
    2019-04-23
    1
  • 放个屁臭到了自己
    如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上


    这个是不是有问题?因为线程池有多个线程,如果只有一个阻塞,那么其他的线程也是可以的吧

    作者回复: 可以,实际情况不会只提交一个慢的任务

    2019-11-21
    1
  • Sharry
    是的, RxJava 已经是 Android 开发者必不可少的一个库了
    2019-09-24
  • 静水流深
    both->and,either->or
    2019-09-16
  • 随心而至
    想要用好还是要看看源码实现
    2019-09-06
  • Mr.zhang
    老师您好,我想请问一下:(__, tf)->{ },这是一种什么用法呢?括号中的__是什么意思呢?

    作者回复: Java里的lambda表达式

    2019-08-29
    1
  • null
    问题:为什么在 CompletableFuture.supplyAsync() 方法中返回 Supplier 对象时,打印当前线程的名称是 main 线程?(同步执行?)

    发现一个“有趣”的现象:
    如果在 supplyAsync() 方法中调用的 lazySupplier() 方法返回 Supplier,惰性求值,在 thenApply() 方法中调用 Supplier.get() 方法触发真正的耗时操作。
    每次执行,总有 CompletableFuture.supplyAsync() 是在 main 线程中执行的。
    自己的观察,总是前面 2 个在 main 线程中执行。

    如果 lazySupplier() 方法返回 String,在 supplyAsync() 就触发真正耗时操作,这时结果跟期望一样,都是异步执行。
    想不明白为什么 supplyAsync + Supplier 惰性求值,就退化成同步执行了。

    谢谢老师!!


    代码和执行结果如下所示:
    public static void main(String[] args) {
      ExecutorService executor = Executors.newFixedThreadPool(3);
      List<CompletableFuture<String>> list = new ArrayList<>(3);
      for (int i = 0; i < 3; i++) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> lazySupplier(), executor)
          .thenApply(stringSupplier -> {
            System.out.println("+++ thenApply:" + Thread.currentThread().getName() + " +++");
            String s = stringSupplier.get();
            return s + ", hi";
          });
        System.out.println("+++ list.add(future:" + i + "); +++");
        list.add(future);
      }

      for (CompletableFuture<String> future : list) {
        String exportList = future.join();
        System.out.println(exportList);
      }

      executor.shutdown();
    }

    private static Supplier<String> lazySupplier() {
      return () -> {
        long l = System.currentTimeMillis()/1000;
        String name = Thread.currentThread().getName();
        System.out.println(name + ",执行惰性求值,开始:" + l);
        try {
          Thread.sleep(10000);
        } catch (InterruptedException ignored) {
        }
        System.out.println(name + ",执行惰性求值,结束:" + System.currentTimeMillis()/1000);
        return l + "";
      };
    }

    执行结果:
    +++ thenApply:main +++
    main, 执行惰性求值,开始:1565796415
    main, 执行惰性求值,结束:1565796425
    +++ list.add(future:0); +++
    +++ thenApply:main +++
    main, 执行惰性求值,开始:1565796425
    main, 执行惰性求值,结束:1565796435
    +++ list.add(future:1); +++
    +++ list.add(future:2); +++
    1565796415, hi
    1565796425, hi
    +++ thenApply:pool-1-thread-3 +++
    pool-1-thread-3, 执行惰性求值,开始:1565796435
    pool-1-thread-3, 执行惰性求值,结束:1565796445
    1565796435, hi
    2019-08-14
收起评论
43
返回
顶部