Java并发编程实战
王宝令
资深架构师
立即订阅
15046 人已学习
课程目录
已完结 50 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 你为什么需要学习并发编程?
免费
学习攻略 (1讲)
学习攻略 | 如何才能学好并发编程?
第一部分:并发理论基础 (13讲)
01 | 可见性、原子性和有序性问题:并发编程Bug的源头
02 | Java内存模型:看Java如何解决可见性和有序性问题
03 | 互斥锁(上):解决原子性问题
04 | 互斥锁(下):如何用一把锁保护多个资源?
05 | 一不小心就死锁了,怎么办?
06 | 用“等待-通知”机制优化循环等待
07 | 安全性、活跃性以及性能问题
08 | 管程:并发编程的万能钥匙
09 | Java线程(上):Java线程的生命周期
10 | Java线程(中):创建多少线程才是合适的?
11 | Java线程(下):为什么局部变量是线程安全的?
12 | 如何用面向对象思想写好并发程序?
13 | 理论基础模块热点问题答疑
第二部分:并发工具类 (14讲)
14 | Lock和Condition(上):隐藏在并发包中的管程
15 | Lock和Condition(下):Dubbo如何用管程实现异步转同步?
16 | Semaphore:如何快速实现一个限流器?
17 | ReadWriteLock:如何快速实现一个完备的缓存?
18 | StampedLock:有没有比读写锁更快的锁?
19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致?
20 | 并发容器:都有哪些“坑”需要我们填?
21 | 原子类:无锁工具类的典范
22 | Executor与线程池:如何创建正确的线程池?
23 | Future:如何用多线程实现最优的“烧水泡茶”程序?
24 | CompletableFuture:异步编程没那么难
25 | CompletionService:如何批量执行异步任务?
26 | Fork/Join:单机版的MapReduce
27 | 并发工具类模块热点问题答疑
第三部分:并发设计模式 (10讲)
28 | Immutability模式:如何利用不变性解决并发问题?
29 | Copy-on-Write模式:不是延时策略的COW
30 | 线程本地存储模式:没有共享,就没有伤害
31 | Guarded Suspension模式:等待唤醒机制的规范实现
32 | Balking模式:再谈线程安全的单例模式
33 | Thread-Per-Message模式:最简单实用的分工方法
34 | Worker Thread模式:如何避免重复创建线程?
35 | 两阶段终止模式:如何优雅地终止线程?
36 | 生产者-消费者模式:用流水线思想提高效率
37 | 设计模式模块热点问题答疑
第四部分:案例分析 (4讲)
38 | 案例分析(一):高性能限流器Guava RateLimiter
39 | 案例分析(二):高性能网络应用框架Netty
40 | 案例分析(三):高性能队列Disruptor
41 | 案例分析(四):高性能数据库连接池HiKariCP
第五部分:其他并发模型 (4讲)
42 | Actor模型:面向对象原生的并发模型
43 | 软件事务内存:借鉴数据库的并发经验
44 | 协程:更轻量级的线程
45 | CSP模型:Golang的主力队员
结束语 (1讲)
结束语 | 十年之后,初心依旧
用户故事 (2讲)
用户来信 | 真好,面试考到这些并发编程,我都答对了!
3 个用户来信 | 打开一个新的并发世界
Java并发编程实战
登录|注册

23 | Future:如何用多线程实现最优的“烧水泡茶”程序?

王宝令 2019-04-20
在上一篇文章《22 | Executor 与线程池:如何创建正确的线程池?》中,我们详细介绍了如何创建正确的线程池,那创建完线程池,我们该如何使用呢?在上一篇文章中,我们仅仅介绍了 ThreadPoolExecutor 的 void execute(Runnable command) 方法,利用这个方法虽然可以提交任务,但是却没有办法获取任务的执行结果(execute() 方法没有返回值)。而很多场景下,我们又都是需要获取任务的执行结果的。那 ThreadPoolExecutor 是否提供了相关功能呢?必须的,这么重要的功能当然需要提供了。
下面我们就来介绍一下使用 ThreadPoolExecutor 的时候,如何获取任务执行结果。

如何获取任务执行结果

Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。下面我们先来介绍这 3 个 submit() 方法,这 3 个方法的方法签名如下。
// 提交Runnable任务
Future<?>
submit(Runnable task);
// 提交Callable任务
<T> Future<T>
submit(Callable<T> task);
// 提交Runnable任务及结果引用
<T> Future<T>
submit(Runnable task, T result);
你会发现它们的返回值都是 Future 接口,Future 接口有 5 个方法,我都列在下面了,它们分别是取消任务的方法 cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone()以及2 个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。通过 Future 接口的这 5 个方法你会发现,我们提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是:这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Java并发编程实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(33)

  • vector
    最近使用CompletableFuture工具方法以及lamda表达式比较多,语言语法的变化带来编码效率的提升真的很大。
    2019-04-21
    21
  • aroll
    建议并发编程课程中的Demo代码,尽量少使用System.out.println, 因为其实现有使用隐式锁,一些情况还会有锁粗化产生

    作者回复: 好建议

    2019-04-20
    1
    20
  • Asanz
    不是不建议使用 Executors 创建线程池了吗???
    2019-04-21
    1
    12
  • undifined
    课后题:
    可以用 Future
            ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(3);
            Future<R> future1 = threadPoolExecutor.submit(Test::getPriceByS1);
            Future<R> future2 = threadPoolExecutor.submit(Test::getPriceByS2);
            Future<R> future3 = threadPoolExecutor.submit(Test::getPriceByS3);
            R r1 = future1.get();
            R r2 = future2.get();
            R r3 = future3.get();

    也可以用 CompletableFuture
            CompletableFuture<R> completableFuture1 = CompletableFuture.supplyAsync(Test::getPriceByS1);
            CompletableFuture<R> completableFuture2 = CompletableFuture.supplyAsync(Test::getPriceByS2);
            CompletableFuture<R> completableFuture3 = CompletableFuture.supplyAsync(Test::getPriceByS3);
            CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3)
                             .thenAccept(System.out::println);
     老师这样理解对吗 谢谢老师
    2019-04-20
    9
  • linqw
    课后习题,老师帮忙看下哦
    public class ExecutorExample {
    private static final ExecutorService executor;
        static {executor = new ThreadPoolExecutor(4, 8, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000), runnable -> null, (r, executor) -> {//根据业务降级策略});
    }
    static class S1Task implements Callable<String> {
            @Override
            public String call() throws Exception {return getPriceByS1();}}
        static class S2Task implements Callable<String> {
            @Overridepublic String call() throws Exception {return getPriceByS2();}}
        static class S3Task implements Callable<String> {@Override public String call() throws Exception {return getPriceByS3();}}
        static class SaveTask implements Callable<Boolean> {private List<FutureTask<String>> futureTasks; public SaveTask(List<FutureTask<String>> futureTasks) {this.futureTasks = futureTasks;
    }
            @Override
            public Boolean call() throws Exception {
                for (FutureTask<String> futureTask : futureTasks) {
                    String data = futureTask.get(10, TimeUnit.SECONDS);
                    saveData(data);
                }
                return Boolean.TRUE;
            }
        }
        private static String getPriceByS1() {
            return "fromDb1";
        }
        private static String getPriceByS2() {
            return "fromDb2";
        }
        private static String getPriceByS3() {
            return "fromDb3";
        }
        private static void saveData(String data) {
            //save data to db
        }
        public static void main(String[] args) {
            S1Task s1Task = new S1Task();FutureTask<String> st1 = new FutureTask<>(s1Task);S2Task s2Task = new S2Task();FutureTask<String> st2 = new FutureTask<>(s2Task);S3Task s3Task = new S3Task();FutureTask<String> st3 = new FutureTask<>(s3Task);List<FutureTask<String>> futureTasks = Lists.<FutureTask<String>>newArrayList(st1, st2, st3);FutureTask<Boolean> saveTask = new FutureTask<>(new SaveTask(futureTasks));executor.submit(st1);executor.submit(st2);executor.submit(st3);executor.submit(saveTask);}}

    作者回复: 没问题,就是有点复杂,代码还可以精简一下

    2019-04-22
    1
    3
  • 三木禾
    这个可以用生产消费者模式啊
    2019-05-18
    2
  • 💪😊
    分别提交三个futuretask给线程池,然后最后分别get出结果,统一进行保存数据库
    2019-04-30
    2
  • 圆圜
    你这个不对啊,应该是executeservice.submit t2furturetask,不能直接提交t2
    2019-04-22
    2
  • QQ怪
    在实际项目中应用已经应用到了Feture,但没有使用线程池,没有那么优雅,所以算是get到了👍
    2019-04-20
    2
  • 张三
    打卡。感觉很神奇,之前完全不会用。学的知识太陈旧了,继续学习。
    2019-04-20
    2
  • 张天屹
    我不知道是不是理解错老师意思了,先分析依赖有向图,可以看到三条线,没有入度>1的节点
    那么启动三个线程即可。
    图:
    s1询价 -> s1保存
    s2询价 -> s2保存
    s3询价 -> s3保存
    代码:
            new Thread(() -> {
             r1 = getPriceByS1();
             save(r1);
            }).start();
            new Thread(() -> {
             r2 = getPriceByS2();
             save(r2);
            }).start();
            new Thread(() -> {
             r3 = getPriceByS3();
             save(r3);
            }).start();
    我觉得这里不需要future,除非询价和保存之间还有别的计算工作

    作者回复: 用线程池就用到了

    2019-04-20
    2
    2
  • 稚者
    烧水泡茶的Demo有点小问题,Task2的启动时间应该在Task1的洗茶壶之后开始,现在的代码是一起开始。
    2019-11-01
    1
  • 魏斌斌
    老师,我看了下futruerask的源码,当调用futrue.get()方法,其实最终会调用unsafe方法是当前线程阻塞。但是我不太理解线程阻塞到哪去了,也没看到锁。

    作者回复: 可以看看操作系统原理有关线程,进程的那部分

    2019-06-17
    1
  • liu
    future是阻塞的等待。发起任务后,做其他的工作。做完后,从future获取处理结果,继续进行后面的任务
    2019-04-25
    1
  • 捞鱼的搬砖奇
    Future的get()是拿到任务的执行结果不吧。为什么又说是拿到方法的入参了。
    2019-04-21
    1
  • QQ怪
    老师,在提交 Runnable 任务及结果引用的例子里面的x变量是什么?

    作者回复: 任意的东西,想成数字0也行

    2019-04-20
    1
  • henry
    现在是在主线程串行完成3个询价的任务,执行第一个任务,其它2个任务只能等待执行,如果要提高效率,这个地方需要改进,可以用老师今天讲的futuretask,三个询价任务改成futuretask并行执行,效率会提高

    作者回复: 👍

    2019-04-20
    1
  • 是我!
    老师您好:请问这样是否有问题?
    public static void main(String[] args) throws Exception {
            FutureTask t1 = new FutureTask(new Callable() {
                @Override
                public String call() {
                    return "getPriceByS1()";
                }
            });
            FutureTask t2 = new FutureTask(() -> "getPriceByS2()");
            FutureTask t3 = new FutureTask(() -> "getPriceByS3()");
            BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(3);
            ThreadPoolExecutor poolExecutor =
                    new ThreadPoolExecutor(10, 10, 10,
                            TimeUnit.SECONDS, blockingQueue);
            poolExecutor.submit(t1);
            save(t1.get().toString());
            poolExecutor.submit(t2);
            save(t2.get().toString());
            poolExecutor.submit(t3);
            save(t3.get().toString());
        }

        private static void save(String ss) {
            System.out.println("保存" + ss);
        }
    2019-11-30
  • Joker
    ```
    java
    ExecutorService futuresPool = Executors.newFixedThreadPool(3);
            Future<Price> future1 = futuresPool.submit(this::getPriceByS1);
            Future<Price> future2 = futuresPool.submit(this::getPriceByS2);
            Future<Price> future3 = futuresPool.submit(this::getPriceByS3);

            ExecutorService saveThreadPool = Executors.newFixedThreadPool(3);
            saveThreadPool.execute(() -> {
                try {
                    Price r1= future1.get();
                    save(r1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }

            });
            saveThreadPool.execute(() -> {
                try {
                    Price r2= future2.get();
                    save(r2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }

            });
            saveThreadPool.execute(() -> {
                try {
                    Price r3= future3.get();
                    save(r3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
    ```


    用三个线程把这个并行执行,麻烦老师看看,谢谢

    作者回复: 我觉得没问题。也可以用一个线程池,把查询和保存放一个线程里。

    2019-11-06
  • 245864982
    打卡
    2019-11-04
收起评论
33
返回
顶部