Java并发编程实战
王宝令
资深架构师
立即订阅
15049 人已学习
课程目录
已完结 50 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 你为什么需要学习并发编程?
免费
学习攻略 (1讲)
学习攻略 | 如何才能学好并发编程?
第一部分:并发理论基础 (13讲)
01 | 可见性、原子性和有序性问题:并发编程Bug的源头
02 | Java内存模型:看Java如何解决可见性和有序性问题
03 | 互斥锁(上):解决原子性问题
04 | 互斥锁(下):如何用一把锁保护多个资源?
05 | 一不小心就死锁了,怎么办?
06 | 用“等待-通知”机制优化循环等待
07 | 安全性、活跃性以及性能问题
08 | 管程:并发编程的万能钥匙
09 | Java线程(上):Java线程的生命周期
10 | Java线程(中):创建多少线程才是合适的?
11 | Java线程(下):为什么局部变量是线程安全的?
12 | 如何用面向对象思想写好并发程序?
13 | 理论基础模块热点问题答疑
第二部分:并发工具类 (14讲)
14 | Lock和Condition(上):隐藏在并发包中的管程
15 | Lock和Condition(下):Dubbo如何用管程实现异步转同步?
16 | Semaphore:如何快速实现一个限流器?
17 | ReadWriteLock:如何快速实现一个完备的缓存?
18 | StampedLock:有没有比读写锁更快的锁?
19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致?
20 | 并发容器:都有哪些“坑”需要我们填?
21 | 原子类:无锁工具类的典范
22 | Executor与线程池:如何创建正确的线程池?
23 | Future:如何用多线程实现最优的“烧水泡茶”程序?
24 | CompletableFuture:异步编程没那么难
25 | CompletionService:如何批量执行异步任务?
26 | Fork/Join:单机版的MapReduce
27 | 并发工具类模块热点问题答疑
第三部分:并发设计模式 (10讲)
28 | Immutability模式:如何利用不变性解决并发问题?
29 | Copy-on-Write模式:不是延时策略的COW
30 | 线程本地存储模式:没有共享,就没有伤害
31 | Guarded Suspension模式:等待唤醒机制的规范实现
32 | Balking模式:再谈线程安全的单例模式
33 | Thread-Per-Message模式:最简单实用的分工方法
34 | Worker Thread模式:如何避免重复创建线程?
35 | 两阶段终止模式:如何优雅地终止线程?
36 | 生产者-消费者模式:用流水线思想提高效率
37 | 设计模式模块热点问题答疑
第四部分:案例分析 (4讲)
38 | 案例分析(一):高性能限流器Guava RateLimiter
39 | 案例分析(二):高性能网络应用框架Netty
40 | 案例分析(三):高性能队列Disruptor
41 | 案例分析(四):高性能数据库连接池HiKariCP
第五部分:其他并发模型 (4讲)
42 | Actor模型:面向对象原生的并发模型
43 | 软件事务内存:借鉴数据库的并发经验
44 | 协程:更轻量级的线程
45 | CSP模型:Golang的主力队员
结束语 (1讲)
结束语 | 十年之后,初心依旧
用户故事 (2讲)
用户来信 | 真好,面试考到这些并发编程,我都答对了!
3 个用户来信 | 打开一个新的并发世界
Java并发编程实战
登录|注册

25 | CompletionService:如何批量执行异步任务?

王宝令 2019-04-25
《23 | Future:如何用多线程实现最优的“烧水泡茶”程序?》的最后,我给你留了道思考题,如何优化一个询价应用的核心代码?如果采用“ThreadPoolExecutor+Future”的方案,你的优化结果很可能是下面示例代码这样:用三个线程异步执行询价,通过三次调用 Future 的 get() 方法获取询价结果,之后将询价结果保存在数据库中。
// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 异步向电商S1询价
Future<Integer> f1 =
executor.submit(
()->getPriceByS1());
// 异步向电商S2询价
Future<Integer> f2 =
executor.submit(
()->getPriceByS2());
// 异步向电商S3询价
Future<Integer> f3 =
executor.submit(
()->getPriceByS3());
// 获取电商S1报价并保存
r=f1.get();
executor.execute(()->save(r));
// 获取电商S2报价并保存
r=f2.get();
executor.execute(()->save(r));
// 获取电商S3报价并保存
r=f3.get();
executor.execute(()->save(r));
上面的这个方案本身没有太大问题,但是有个地方的处理需要你注意,那就是如果获取电商 S1 报价的耗时很长,那么即便获取电商 S2 报价的耗时很短,也无法让保存 S2 报价的操作先执行,因为这个主线程都阻塞在了 f1.get() 操作上。这点小瑕疵你该如何解决呢?
估计你已经想到了,增加一个阻塞队列,获取到 S1、S2、S3 的报价都进入阻塞队列,然后在主线程中消费阻塞队列,这样就能保证先获取到的报价先保存到数据库了。下面的示例代码展示了如何利用阻塞队列实现先获取到的报价先保存到数据库。
// 创建阻塞队列
BlockingQueue<Integer> bq =
new LinkedBlockingQueue<>();
//电商S1报价异步进入阻塞队列
executor.execute(()->
bq.put(f1.get()));
//电商S2报价异步进入阻塞队列
executor.execute(()->
bq.put(f2.get()));
//电商S3报价异步进入阻塞队列
executor.execute(()->
bq.put(f3.get()));
//异步保存所有报价
for (int i=0; i<3; i++) {
Integer r = bq.take();
executor.execute(()->save(r));
}

利用 CompletionService 实现询价系统

不过在实际项目中,并不建议你这样做,因为 Java SDK 并发包里已经提供了设计精良的 CompletionService。利用 CompletionService 不但能帮你解决先获取到的报价先保存到数据库的问题,而且还能让代码更简练。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Java并发编程实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(53)

  • 张天屹
    我觉得问题出在return m这里需要等待三个线程执行完成,但是并没有。
    ...
    AtomicReference<Integer> m = new AtomicReference<>(Integer.MAX_VALUE);
    CountDownLatch latch = new CountDownLatch(3);
    for(int i=0; i<3; i++) {
    executor.execute(()->{
    Integer r = null;
    try {
    r = cs.take().get();
    } catch(Exception e) {}
    save(r);
    m.set(Integer.min(m.get(), r));
    latch.countDown();
    });
    latch.await();
    return m;
    }

    作者回复: 👍

    2019-04-25
    2
    35
  • 小华
    看老师的意图是要等三个比较报假的线程都执行完才能执行主线程的的return m,但是代码无法保证三个线程都执行完,和主线程执行return的顺序,因此,m的值不是准确的,可以加个线程栈栏,线程执行完计数器,来达到这效果

    作者回复: 👍

    2019-04-25
    10
  • 代码搬运工
    m.get()和m.set()不是原子性操作,正确代码是:do{int expect = m.get();int min= Integer.min(expect,r);}while(!m.compareAndSet(expect,min))。老师,是这样吗?
    2019-04-25
    10
  • 西行寺咕哒子
    试过返回值是2147483647,也就是int的最大值。没有等待操作完成就猴急的返回了。 m.set(Integer.min(m.get(), r)... 这个操作也不是原子操作。
    试着自己弄了一下:
    public Integer run(){
            // 创建线程池
            ExecutorService executor = Executors.newFixedThreadPool(3);
            // 创建 CompletionService
            CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
            AtomicReference<Integer> m = new AtomicReference<>(Integer.MAX_VALUE);
            // 异步向电商 S1 询价
            cs.submit(()->getPriceByS1());
            // 异步向电商 S2 询价
            cs.submit(()->getPriceByS2());
            // 异步向电商 S3 询价
            cs.submit(()->getPriceByS3());
            // 将询价结果异步保存到数据库
            // 并计算最低报价
            for (int i=0; i<3; i++) {
                Integer r = logIfError(()->cs.take().get());
                executor.execute(()-> save(r));
                m.getAndUpdate(v->Integer.min(v, r));
            }
            return m.get();
        }
    不知道可不可行

    作者回复: 👍

    2019-04-25
    6
  • 天涯煮酒
    先调用m.get()并跟r比较,再调用m.set(),这里存在竞态条件,线程并不安全
    2019-04-25
    1
    4
  • linqw
    老师stampedLock的获取锁源码,老师能帮忙解惑下么?阻塞的读线程cowait是挂在写节点的下方么?老师能解惑下基于的理论模型
    private long acquireWrite(boolean interruptible, long deadline) {
            WNode node = null, p;
            for (int spins = -1;;) { // spin while enqueuing
                long m, s, ns;
                //如果当前的state是无锁状态即100000000
                if ((m = (s = state) & ABITS) == 0L) {
                 //设置成写锁
                    if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                        return ns;
                }
                else if (spins < 0)
                 //当前锁状态为写锁状态,并且队列为空,设置自旋值
                    spins = (m == WBIT && wtail == whead) ? SPINS : 0;
                else if (spins > 0) {
                 //自旋操作,就是让线程在此自旋
                    if (LockSupport.nextSecondarySeed() >= 0)
                        --spins;
                }
                //如果队列尾元素为空,初始化队列
                else if ((p = wtail) == null) { // initialize queue
                    WNode hd = new WNode(WMODE, null);
                    if (U.compareAndSwapObject(this, WHEAD, null, hd))
                        wtail = hd;
                }
                //当前要加入的元素为空,初始化当前元素,前置节点为尾节点
                else if (node == null)
                    node = new WNode(WMODE, p);
                //队列的稳定性判断,当前的前置节点是否改变,重新设置
                else if (node.prev != p)
                    node.prev = p;
                //将当前节点加入尾节点中
                else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                    p.next = node;
                    break;
                }
            }

            

    作者回复: 这可难倒我了,并发库的源码我只是零散得看的,看完基本也忘得差不多了,感觉自己也不是搞算法的料,放弃了😂

    2019-04-25
    3
  • Corner
    1.AtomicReference<Integer>的get方法应该改成使用cas方法
    2.最后筛选最小结果的任务是异步执行的,应该在return之前做同步,所以最好使用sumit提交该任务便于判断任务的完成
    最后请教老师一下,第一个例子中为什么主线程会阻塞在f1.get()方法呢?

    作者回复: 👍,示例代码有问题,已经改了

    2019-04-25
    3
  • undifined
    老师 用 CompletionService 和用 CompletionFuture 查询,然后用 whenComplete 或者 thenAcceptEither 这些方法的区别是什么,我觉得用 CompletionFuture 更直观些;
    老师可以在下一讲的时候说一下上一讲的思考题正确答案吗,谢谢老师
    2019-04-25
    2
  • 郑晨Cc
    executor.execute(Callable)提交任务是非阻塞的 return m;很大概率返回 Integer.Maxvalue,而且老师为了确保返回这个max还特意加入了save这个阻塞的方法
    2019-04-25
    2
  • 墨飞域
    老师,并发工具类,这整个一章,感觉听完似懂非懂的,因为实践中没用过,我要如何弥补这部分,还是说只要听说过,然后用的时候再去查看demo吗

    作者回复: 用的时候查文档就行,工具类主要是会用,知道什么场景用什么

    2019-10-23
    1
  • Sunqc
    // 获取电商 S1 报价并保存
    r=f1.get();
    executor.execute(()->save(r));

    如果把r=f1.get()放进execute里应该是也能保证先执行完的先保存

    作者回复: 是的

    2019-05-01
    1
  • 一眼万年
    课后思考如果需要等待最小结果,本来就有阻塞队列了,加了个线程池,评论还要加上栏栅,那除了炫技没啥作用
    2019-04-28
    1
  • 海鸿
    重新发过,刚刚的代码有误!
    1.for循环线程池执行属于异步导致未等比价结果就 return了,需要等待三次比价结果才能 return,可以用 CountDownLatch
    2. m. set( Integer. min( m. get(), r))存在竞态条件,可以更改为
    Integer o;
    do{
    o= m. get();
    if(o<=r){ break;}
    }
    while(! m. compareAndSet( o, r));
    3.还有一个小问题就是 try- catch捕获异常后的处理,提高程序鲁棒性
    2019-04-26
    1
  • hj
    确实,第一个例子老师为了抛出问题引出CompletionService,但是逻辑上的确不成立。不是f1.get()阻塞主线程
    2019-04-25
    1
  • 黄海峰
    我实际测试了第一段代码,确实是异步的,f1.get不会阻塞主线程。。。

    public static void main(String[] args) {
            ExecutorService executor = Executors.newFixedThreadPool(3);
            Future<Integer> f1 = executor.submit(()->getPriceByS1());
            Future<Integer> f2 = executor.submit(()->getPriceByS2());
            Future<Integer> f3 = executor.submit(()->getPriceByS3());

            executor.execute(()-> {
                try {
                    save(f1.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
            executor.execute(()-> {
                try {
                    save(f2.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
            executor.execute(()-> {
                try {
                    save(f3.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
        }

        private static Integer getPriceByS1() {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        }
        private static Integer getPriceByS2() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 2;
        }
        private static Integer getPriceByS3() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 3;
        }
        private static void save(Integer i) {
            System.out.println("save " + i);
        }

    作者回复: 👍

    2019-04-25
    1
  • 苏志辉
    m.set和get存在静态条件不是原子的,可能存在设置和不是最小值
    2019-04-25
    1
  • 空空空空
    算低价的时候是用三个不同的线程去计算,是异步的,因此可能算出来并不是预期的结果
    老师,这样理解对吗?

    作者回复: 对的!

    2019-04-25
    1
  • 王昊哲
    有个疑问:老师也提到那种线程池+阻塞队列实现方式,队列里保存的是任务的结果,而completionService保存的future,那completionService的future拿出来get的时候,也阻塞在get那里了啊,那不跟跟线程池+future的实现一样的弊端了啊?
    2019-11-28
  • Joker
    老师,那个futures保存future就是为了后面取消(`cancel()`),对吧

    作者回复: 你说的也不错,为了代码行数少一点

    2019-11-06
  • 墨雨
    关于课后问题,我理解的是循环内不开启新的线程来做就可以了,本身想获取最小值就必须知道所有的值,这需要串行阻塞的。

    // 创建线程池
    ExecutorService executor =
      Executors.newFixedThreadPool(3);
    // 创建 CompletionService
    CompletionService<Integer> cs = new
      ExecutorCompletionService<>(executor);
    // 异步向电商 S1 询价
    cs.submit(()->getPriceByS1());
    // 异步向电商 S2 询价
    cs.submit(()->getPriceByS2());
    // 异步向电商 S3 询价
    cs.submit(()->getPriceByS3());
    // 将询价结果异步保存到数据库
    // 并计算最低报价
    AtomicReference<Integer> m =
      new AtomicReference<>(Integer.MAX_VALUE);
    for (int i=0; i<3; i++) {
      
        Integer r = null;
        try {
          r = cs.take().get();
        } catch (Exception e) {}
        save(r);
        m.set(Integer.min(m.get(), r));
      
    }
    return m;
    2019-11-01
收起评论
53
返回
顶部