25 | CompletionService:如何批量执行异步任务?
王宝令
该思维导图由 AI 生成,仅供参考
在《23 | Future:如何用多线程实现最优的“烧水泡茶”程序?》的最后,我给你留了道思考题,如何优化一个询价应用的核心代码?如果采用“ThreadPoolExecutor+Future”的方案,你的优化结果很可能是下面示例代码这样:用三个线程异步执行询价,通过三次调用 Future 的 get() 方法获取询价结果,之后将询价结果保存在数据库中。
上面的这个方案本身没有太大问题,但是有个地方的处理需要你注意,那就是如果获取电商 S1 报价的耗时很长,那么即便获取电商 S2 报价的耗时很短,也无法让保存 S2 报价的操作先执行,因为这个主线程都阻塞在了 f1.get() 操作上。这点小瑕疵你该如何解决呢?
估计你已经想到了,增加一个阻塞队列,获取到 S1、S2、S3 的报价都进入阻塞队列,然后在主线程中消费阻塞队列,这样就能保证先获取到的报价先保存到数据库了。下面的示例代码展示了如何利用阻塞队列实现先获取到的报价先保存到数据库。
利用 CompletionService 实现询价系统
不过在实际项目中,并不建议你这样做,因为 Java SDK 并发包里已经提供了设计精良的 CompletionService。利用 CompletionService 不但能帮你解决先获取到的报价先保存到数据库的问题,而且还能让代码更简练。
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
- 深入了解
- 翻译
- 解释
- 总结
CompletionService是Java SDK并发包中的一个重要组件,它能够帮助开发者批量执行异步任务,并且保证任务执行结果的有序性。通过CompletionService,开发者可以更简洁地管理批量异步任务,避免无谓的等待,并且能够快速实现诸如Forking Cluster这样的需求。 文章首先介绍了通过阻塞队列实现先获取到的报价先保存到数据库的方法,然后详细介绍了CompletionService的实现原理和使用方法。接着,文章展示了如何利用CompletionService实现Dubbo中的Forking Cluster,以及如何利用CompletionService实现询价应用的核心功能。最后,文章提出了一个课后思考问题,让读者思考并改进代码。 通过本文,读者可以了解到CompletionService的基本原理和使用方法,以及它在实际项目中的应用场景。同时,文章还提供了一个课后思考问题,引发读者的思考和讨论。 总之,CompletionService是一个强大的工具,能够帮助开发者更高效地处理批量异步任务,保证任务执行结果的有序性,以及实现复杂的集群模式。
仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Java 并发编程实战》,新⼈⾸单¥59
《Java 并发编程实战》,新⼈⾸单¥59
立即购买
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
登录 后留言
全部留言(87)
- 最新
- 精选
- 张天屹我觉得问题出在return m这里需要等待三个线程执行完成,但是并没有。 ... AtomicReference<Integer> m = new AtomicReference<>(Integer.MAX_VALUE); CountDownLatch latch = new CountDownLatch(3); for(int i=0; i<3; i++) { executor.execute(()->{ Integer r = null; try { r = cs.take().get(); } catch(Exception e) {} save(r); m.set(Integer.min(m.get(), r)); latch.countDown(); }); latch.await(); return m; }
作者回复: 👍
2019-04-2511133 - 小华看老师的意图是要等三个比较报假的线程都执行完才能执行主线程的的return m,但是代码无法保证三个线程都执行完,和主线程执行return的顺序,因此,m的值不是准确的,可以加个线程栈栏,线程执行完计数器,来达到这效果
作者回复: 👍
2019-04-2533 - 西行寺咕哒子试过返回值是2147483647,也就是int的最大值。没有等待操作完成就猴急的返回了。 m.set(Integer.min(m.get(), r)... 这个操作也不是原子操作。 试着自己弄了一下: public Integer run(){ // 创建线程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 创建 CompletionService CompletionService<Integer> cs = new ExecutorCompletionService<>(executor); AtomicReference<Integer> m = new AtomicReference<>(Integer.MAX_VALUE); // 异步向电商 S1 询价 cs.submit(()->getPriceByS1()); // 异步向电商 S2 询价 cs.submit(()->getPriceByS2()); // 异步向电商 S3 询价 cs.submit(()->getPriceByS3()); // 将询价结果异步保存到数据库 // 并计算最低报价 for (int i=0; i<3; i++) { Integer r = logIfError(()->cs.take().get()); executor.execute(()-> save(r)); m.getAndUpdate(v->Integer.min(v, r)); } return m.get(); } 不知道可不可行
作者回复: 👍
2019-04-25731 - ipofss老师,并发工具类,这整个一章,感觉听完似懂非懂的,因为实践中没用过,我要如何弥补这部分,还是说只要听说过,然后用的时候再去查看demo吗
作者回复: 用的时候查文档就行,工具类主要是会用,知道什么场景用什么
2019-10-23217 - linqw老师stampedLock的获取锁源码,老师能帮忙解惑下么?阻塞的读线程cowait是挂在写节点的下方么?老师能解惑下基于的理论模型 private long acquireWrite(boolean interruptible, long deadline) { WNode node = null, p; for (int spins = -1;;) { // spin while enqueuing long m, s, ns; //如果当前的state是无锁状态即100000000 if ((m = (s = state) & ABITS) == 0L) { //设置成写锁 if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) return ns; } else if (spins < 0) //当前锁状态为写锁状态,并且队列为空,设置自旋值 spins = (m == WBIT && wtail == whead) ? SPINS : 0; else if (spins > 0) { //自旋操作,就是让线程在此自旋 if (LockSupport.nextSecondarySeed() >= 0) --spins; } //如果队列尾元素为空,初始化队列 else if ((p = wtail) == null) { // initialize queue WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } //当前要加入的元素为空,初始化当前元素,前置节点为尾节点 else if (node == null) node = new WNode(WMODE, p); //队列的稳定性判断,当前的前置节点是否改变,重新设置 else if (node.prev != p) node.prev = p; //将当前节点加入尾节点中 else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; break; } }
作者回复: 这可难倒我了,并发库的源码我只是零散得看的,看完基本也忘得差不多了,感觉自己也不是搞算法的料,放弃了😂
2019-04-257 - Sunqc// 获取电商 S1 报价并保存 r=f1.get(); executor.execute(()->save(r)); 如果把r=f1.get()放进execute里应该是也能保证先执行完的先保存
作者回复: 是的
2019-05-013 - 黄海峰我实际测试了第一段代码,确实是异步的,f1.get不会阻塞主线程。。。 public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(3); Future<Integer> f1 = executor.submit(()->getPriceByS1()); Future<Integer> f2 = executor.submit(()->getPriceByS2()); Future<Integer> f3 = executor.submit(()->getPriceByS3()); executor.execute(()-> { try { save(f1.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); executor.execute(()-> { try { save(f2.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); executor.execute(()-> { try { save(f3.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); } private static Integer getPriceByS1() { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } return 1; } private static Integer getPriceByS2() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 2; } private static Integer getPriceByS3() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 3; } private static void save(Integer i) { System.out.println("save " + i); }
作者回复: 👍
2019-04-253 - Corner1.AtomicReference<Integer>的get方法应该改成使用cas方法 2.最后筛选最小结果的任务是异步执行的,应该在return之前做同步,所以最好使用sumit提交该任务便于判断任务的完成 最后请教老师一下,第一个例子中为什么主线程会阻塞在f1.get()方法呢?
作者回复: 👍,示例代码有问题,已经改了
2019-04-253 - 空空空空算低价的时候是用三个不同的线程去计算,是异步的,因此可能算出来并不是预期的结果 老师,这样理解对吗?
作者回复: 对的!
2019-04-252 - 梅小西老师讲的挺不错的,看了这个例子,有几点疑问,还希望老师说明下: // 这个是老师例子: // 创建线程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 创建CompletionService CompletionService<Integer> cs = new ExecutorCompletionService<>(executor); // 异步向电商S1询价 cs.submit(()->getPriceByS1()); // 异步向电商S2询价 cs.submit(()->getPriceByS2()); // 异步向电商S3询价 cs.submit(()->getPriceByS3()); // 将询价结果异步保存到数据库 for (int i=0; i<3; i++) { Integer r = cs.take().get(); executor.execute(()->save(r)); } 首先,CompletionService应该是要绑定泛型,代表异步任务的返回结果,实际应用中,几乎不太可能所有的异步任务的返回类型是一样的,除非设置成Object这种通用型,那又会导致拿到结果后需要强转,代码看起来更难受; 其次,对于返回的结果的处理方式,实际应用中几乎也是不同的,那就要针对每一个take出来的结果做判断,这实际上也是会导致代码很难维护; 综上,CompletionService 看来能够做批量处理异步任务的事情,实际应用中,我感觉不太实用! 以上两点是个人见解,有不对之处请老师指教!
作者回复: 每种异步任务都会创建一个新的,不可能所有的异步任务都用一个。如果某些需要共用,可以包装一个类就可以了
2019-10-271
收起评论