Java并发编程实战
王宝令
资深架构师
立即订阅
15061 人已学习
课程目录
已完结 50 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 你为什么需要学习并发编程?
免费
学习攻略 (1讲)
学习攻略 | 如何才能学好并发编程?
第一部分:并发理论基础 (13讲)
01 | 可见性、原子性和有序性问题:并发编程Bug的源头
02 | Java内存模型:看Java如何解决可见性和有序性问题
03 | 互斥锁(上):解决原子性问题
04 | 互斥锁(下):如何用一把锁保护多个资源?
05 | 一不小心就死锁了,怎么办?
06 | 用“等待-通知”机制优化循环等待
07 | 安全性、活跃性以及性能问题
08 | 管程:并发编程的万能钥匙
09 | Java线程(上):Java线程的生命周期
10 | Java线程(中):创建多少线程才是合适的?
11 | Java线程(下):为什么局部变量是线程安全的?
12 | 如何用面向对象思想写好并发程序?
13 | 理论基础模块热点问题答疑
第二部分:并发工具类 (14讲)
14 | Lock和Condition(上):隐藏在并发包中的管程
15 | Lock和Condition(下):Dubbo如何用管程实现异步转同步?
16 | Semaphore:如何快速实现一个限流器?
17 | ReadWriteLock:如何快速实现一个完备的缓存?
18 | StampedLock:有没有比读写锁更快的锁?
19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致?
20 | 并发容器:都有哪些“坑”需要我们填?
21 | 原子类:无锁工具类的典范
22 | Executor与线程池:如何创建正确的线程池?
23 | Future:如何用多线程实现最优的“烧水泡茶”程序?
24 | CompletableFuture:异步编程没那么难
25 | CompletionService:如何批量执行异步任务?
26 | Fork/Join:单机版的MapReduce
27 | 并发工具类模块热点问题答疑
第三部分:并发设计模式 (10讲)
28 | Immutability模式:如何利用不变性解决并发问题?
29 | Copy-on-Write模式:不是延时策略的COW
30 | 线程本地存储模式:没有共享,就没有伤害
31 | Guarded Suspension模式:等待唤醒机制的规范实现
32 | Balking模式:再谈线程安全的单例模式
33 | Thread-Per-Message模式:最简单实用的分工方法
34 | Worker Thread模式:如何避免重复创建线程?
35 | 两阶段终止模式:如何优雅地终止线程?
36 | 生产者-消费者模式:用流水线思想提高效率
37 | 设计模式模块热点问题答疑
第四部分:案例分析 (4讲)
38 | 案例分析(一):高性能限流器Guava RateLimiter
39 | 案例分析(二):高性能网络应用框架Netty
40 | 案例分析(三):高性能队列Disruptor
41 | 案例分析(四):高性能数据库连接池HiKariCP
第五部分:其他并发模型 (4讲)
42 | Actor模型:面向对象原生的并发模型
43 | 软件事务内存:借鉴数据库的并发经验
44 | 协程:更轻量级的线程
45 | CSP模型:Golang的主力队员
结束语 (1讲)
结束语 | 十年之后,初心依旧
用户故事 (2讲)
用户来信 | 真好,面试考到这些并发编程,我都答对了!
3 个用户来信 | 打开一个新的并发世界
Java并发编程实战
登录|注册

26 | Fork/Join:单机版的MapReduce

王宝令 2019-04-27
前面几篇文章我们介绍了线程池、Future、CompletableFuture 和 CompletionService,仔细观察你会发现这些工具类都是在帮助我们站在任务的视角来解决并发问题,而不是让我们纠缠在线程之间如何协作的细节上(比如线程之间如何实现等待、通知等)。对于简单的并行任务,你可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;而批量的并行任务,则可以通过 CompletionService 来解决。
我们一直讲,并发编程可以分为三个层面的问题,分别是分工、协作和互斥,当你关注于任务的时候,你会发现你的视角已经从并发编程的细节中跳出来了,你应用的更多的是现实世界的思维模式,类比的往往是现实世界里的分工,所以我把线程池、Future、CompletableFuture 和 CompletionService 都列到了分工里面。
下面我用现实世界里的工作流程图描述了并发编程领域的简单并行任务、聚合任务和批量并行任务,辅以这些流程图,相信你一定能将你的思维模式转换到现实世界里来。
从上到下,依次为简单并行任务、聚合任务和批量并行任务示意图
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Java并发编程实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(34)

  • 爱吃回锅肉的瘦子
    https://www.liaoxuefeng.com/article/001493522711597674607c7f4f346628a76145477e2ff82000,老师,您好,我在廖雪峰网站中也看到forkjoin使用方式。讲解了,为啥不使用两次fork,分享出来给大家看看。

    作者回复: 用两次fork()在join的时候,需要用这样的顺序:a.fork(); b.fork(); b.join(); a.join();这个要求在JDK官方文档里有说明。

    如果是一不小心写成a.fork(); b.fork(); a.join(); b.join();就会有大神廖雪峰说的问题。

    建议还是用fork()+compute(),这种方式的执行过程普通人还是能理解的,fork()+fork()内部做了很多优化,我这个普通人看的实在是头痛。

    感谢分享啊。我觉得讲的挺好的。用这篇文章的例子理解fork()+compute()很到位。

    2019-04-28
    1
    28
  • CPU同一时间只能处理一个线程,所以理论上,纯cpu密集型计算任务单线程就够了。多线程的话,线程上下文切换带来的线程现场保存和恢复也会带来额外开销。但实际上可能要经过测试才知道。

    作者回复: 👍

    2019-04-27
    23
  • QQ怪
    学习了老师的分享,现在就已经在工作用到了,的确是在同事面前好好装了一次逼

    作者回复: 👍说明你很有悟性😄

    2019-05-24
    12
  • 尹圣
    看到分治任务立马就想到归并排序,用Fork/Join又重新实现了一遍,
     /**
      * Ryzen 1700 8核16线程 3.0 GHz
      */
     @Test
     public void mergeSort() {
         long[] arrs = new long[100000000];
         for (int i = 0; i < 100000000; i++) {
             arrs[i] = (long) (Math.random() * 100000000);
         }
         long startTime = System.currentTimeMillis();
         ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
         MergeSort mergeSort = new MergeSort(arrs);
         arrs = forkJoinPool.invoke(mergeSort);
         //传统递归
         //arrs = mergeSort(arrs);
         long endTime = System.currentTimeMillis();
         System.out.println("耗时:" + (endTime - startTime));
     }
     /**
      * fork/join
      * 耗时:13903ms
      */
     class MergeSort extends RecursiveTask<long[]> {
         long[] arrs;
         public MergeSort(long[] arrs) {
             this.arrs = arrs;
         }
         @Override
         protected long[] compute() {
             if (arrs.length < 2) return arrs;
             int mid = arrs.length / 2;
             MergeSort left = new MergeSort(Arrays.copyOfRange(arrs, 0, mid));
             left.fork();
             MergeSort right = new MergeSort(Arrays.copyOfRange(arrs, mid, arrs.length));
             return merge(right.compute(), left.join());
         }
     }
     /**
      * 传统递归
      * 耗时:30508ms
      */
     public static long[] mergeSort(long[] arrs) {
         if (arrs.length < 2) return arrs;
         int mid = arrs.length / 2;
         long[] left = Arrays.copyOfRange(arrs, 0, mid);
         long[] right = Arrays.copyOfRange(arrs, mid, arrs.length);
         return merge(mergeSort(left), mergeSort(right));
     }
     public static long[] merge(long[] left, long[] right) {
         long[] result = new long[left.length + right.length];
         for (int i = 0, m = 0, j = 0; m < result.length; m++) {
             if (i >= left.length) {
                 result[m] = right[j++];
             } else if (j >= right.length) {
                 result[m] = left[i++];
             } else if (left[i] > right[j]) {
                 result[m] = right[j++];
             } else result[m] = left[i++];
         }
         return result;
     }

    作者回复: 👍👍👍举一反三了😄

    2019-04-29
    1
    9
  • linqw
    以前在面蚂蚁金服时,也做过类似的题目,从一个目录中,找出所有文件里面单词出现的top100,那时也是使用服务提供者,从目录中找出一个或者多个文件(防止所有文件一次性加载内存溢出,也为了防止文件内容过小,所以每次都确保读出的行数10万行左右),然后使用fork/join进行单词的统计处理,设置处理的阈值为20000。
    课后习题:单核的话,使用单线程会比多线程快,线程的切换,恢复等都会耗时,并且要是机器不允许,单线程可以保证安全,可见性(cpu缓存,单个CPU数据可见),线程切换(单线程不会出现原子性)

    作者回复: 👍

    2019-04-27
    5
  • 右耳听海
    请教老师一个问题,merge函数里的mr2.compute先执行还是mr1.join先执行,这两个参数是否可交换位置

    作者回复: 我觉得不可以,如果join在前面会先首先让当前线程阻塞在join()上。当join()执行完才会执行mr2.compute(),这样并行度就下来了。

    2019-04-27
    5
  • Geek_ebda96
    如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。

    老师这里的意思是不是,如果有耗时的i/o计算,需要用单独的forkjoin pool 来处理这个计算,在程序设计的时候就要跟其他cpu密集计算的任务分开处理?

    作者回复: 是的

    2019-05-13
    4
  • Nick
    简易的MapReduce的程序跑下来不会栈溢出吗?

    作者回复: 递归程序,如果语言层面没有办法优化,都会的

    2019-06-05
    2
  • êwěn
    老师,fork是fork调用者的子任务还是表示下面new出来的任务是子任务?

    作者回复: fork是fork调用者这个子任务加入到任务队列里

    2019-04-27
    2
  • 木木匠
    单核cpu上多线程会导致线程的上下文切换,还不如单核单线程处理的效率高。
    2019-04-27
    2
  • 蓝天白云看大海
    join会阻塞线程吗?如果阻塞线程,而线程池里的线程个数又有线,那么递归几次之后所有线程不都全阻塞了吗!
    2019-06-02
    1
  • 张三
    ForkJoinTask这个抽象类的 fork() 和 join()底层是怎么实现的呢?
    2019-04-29
    1
  • 狂风骤雨
    好希望工作当中能有老师这样一位大牛,能为我答疑解惑

    作者回复: 我知道的就这些,都写出来了😂,显然我不是大牛😄

    2019-04-29
    1
  • 右耳听海
    这里用的递归调用,数据量大的时候会不会粘溢出,虽然这里用的二分,时间复杂度为logn

    作者回复: 我觉得会

    2019-04-28
    1
  • 朱晋君
    老师,请问为什么不能merge mr1.compute和mr2..compute或者mr1.join和mr2的join呢?

    作者回复: compute+compute相当于没用forkjoin,都在一个线程里跑的。如果用join+join也可以,不过jdk官方有个建议,顺序要用:a.fork(); b.fork(); b.join(); a.join();否则性能有问题。所以还是用fork+compute更简单。

    2019-04-28
    1
  • 王伟
    老师,我现在碰到一个生产问题:用户通过微信小程序进入我们平台,我们只能需要使用用户的手机号去我们商家库中查取该用户的注册信息。在只知道用户手机号的情况下我们需要切换到所有的商家库去查询。这样非常耗时。ps:我们商家库做了分库处理而且数量很多。想请教一下您,这种查询该如何做?

    作者回复: 可以加redis缓存看看,也可以加本地缓存。不要让流量直接打到数据库上

    2019-04-28
    1
  • 密码123456
    我记得之前提到过,使用线程数目大小的方法。如果io耗时过长可以多加线程数量,能够提升性能。如果io耗时过短,增加线程数量就不能,提升性能了?不知道是否能够对应,这个题目的答案?
    2019-04-28
    1
  • ban
    “如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。”

    老师这个问题,这句话前面的文字也看到,但是不太懂。如果共用一个线程池,但是不是有多个线程,如果一个线程操作I/O,应该不影响其他线程吧,其他线程还能继续执行,我不太理解为什么会拖慢整个系统,求老师帮我解答这个疑问。

    作者回复: 前提是有很多请求并发访问这个很慢的I/O计算,我们这的并发程序,往往都有很多请求同时访问的

    2019-04-27
    1
  • 张天屹
    对于单核CPU而言,FJ线程池默认1个线程,由于是CPU密集型,失去了线程切换的意义,平白带来上下文切换的性能损耗。
    老师我想请教下前文斐波那契数列的例子,一个30的斐波那契递归展开后是一个深度30的二叉树,每一层的一个分支由主线程执行,另一个提交FJ的线程池执行,那么可不可以理解为最后一半的任务被主线程执行了,另一半的任务被FJ 的线程池执行了呢。如果是的话,提交给FJ任务队列的任务会进入不同的任务队列吗?我对于FJ分多个任务队列的目的和原理都不太了解。

    作者回复: 不是一半被主线程执行了,fork()任务之后,这个任务会被一个线程X执行,这个线程X会就是你理解的主线程,但它不是线程池里的固定的一个,而是线程池里所有线程都有可能。我这样说不知道能不能回答到你的点上

    2019-04-27
    1
  • 张三
    打卡!
    2019-04-27
    1
收起评论
34
返回
顶部