Java并发编程实战
王宝令
资深架构师
立即订阅
15151 人已学习
课程目录
已完结 50 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 你为什么需要学习并发编程?
免费
学习攻略 (1讲)
学习攻略 | 如何才能学好并发编程?
第一部分:并发理论基础 (13讲)
01 | 可见性、原子性和有序性问题:并发编程Bug的源头
02 | Java内存模型:看Java如何解决可见性和有序性问题
03 | 互斥锁(上):解决原子性问题
04 | 互斥锁(下):如何用一把锁保护多个资源?
05 | 一不小心就死锁了,怎么办?
06 | 用“等待-通知”机制优化循环等待
07 | 安全性、活跃性以及性能问题
08 | 管程:并发编程的万能钥匙
09 | Java线程(上):Java线程的生命周期
10 | Java线程(中):创建多少线程才是合适的?
11 | Java线程(下):为什么局部变量是线程安全的?
12 | 如何用面向对象思想写好并发程序?
13 | 理论基础模块热点问题答疑
第二部分:并发工具类 (14讲)
14 | Lock和Condition(上):隐藏在并发包中的管程
15 | Lock和Condition(下):Dubbo如何用管程实现异步转同步?
16 | Semaphore:如何快速实现一个限流器?
17 | ReadWriteLock:如何快速实现一个完备的缓存?
18 | StampedLock:有没有比读写锁更快的锁?
19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致?
20 | 并发容器:都有哪些“坑”需要我们填?
21 | 原子类:无锁工具类的典范
22 | Executor与线程池:如何创建正确的线程池?
23 | Future:如何用多线程实现最优的“烧水泡茶”程序?
24 | CompletableFuture:异步编程没那么难
25 | CompletionService:如何批量执行异步任务?
26 | Fork/Join:单机版的MapReduce
27 | 并发工具类模块热点问题答疑
第三部分:并发设计模式 (10讲)
28 | Immutability模式:如何利用不变性解决并发问题?
29 | Copy-on-Write模式:不是延时策略的COW
30 | 线程本地存储模式:没有共享,就没有伤害
31 | Guarded Suspension模式:等待唤醒机制的规范实现
32 | Balking模式:再谈线程安全的单例模式
33 | Thread-Per-Message模式:最简单实用的分工方法
34 | Worker Thread模式:如何避免重复创建线程?
35 | 两阶段终止模式:如何优雅地终止线程?
36 | 生产者-消费者模式:用流水线思想提高效率
37 | 设计模式模块热点问题答疑
第四部分:案例分析 (4讲)
38 | 案例分析(一):高性能限流器Guava RateLimiter
39 | 案例分析(二):高性能网络应用框架Netty
40 | 案例分析(三):高性能队列Disruptor
41 | 案例分析(四):高性能数据库连接池HiKariCP
第五部分:其他并发模型 (4讲)
42 | Actor模型:面向对象原生的并发模型
43 | 软件事务内存:借鉴数据库的并发经验
44 | 协程:更轻量级的线程
45 | CSP模型:Golang的主力队员
结束语 (1讲)
结束语 | 十年之后,初心依旧
用户故事 (2讲)
用户来信 | 真好,面试考到这些并发编程,我都答对了!
3 个用户来信 | 打开一个新的并发世界
Java并发编程实战
登录|注册

19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致?

王宝令 2019-04-11
前几天老板突然匆匆忙忙过来,说对账系统最近越来越慢了,能不能快速优化一下。我了解了对账系统的业务后,发现还是挺简单的,用户通过在线商城下单,会生成电子订单,保存在订单库;之后物流会生成派送单给用户发货,派送单保存在派送单库。为了防止漏派送或者重复派送,对账系统每天还会校验是否存在异常订单。
对账系统的处理逻辑很简单,你可以参考下面的对账系统流程图。目前对账系统的处理逻辑是首先查询订单,然后查询派送单,之后对比订单和派送单,将差异写入差异库。
对账系统流程图
对账系统的代码抽象之后,也很简单,核心代码如下,就是在一个单线程里面循环查询订单、派送单,然后执行对账,最后将写入差异库。
while(存在未对账订单){
// 查询未对账订单
pos = getPOrders();
// 查询派送单
dos = getDOrders();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

利用并行优化对账系统

老板要我优化性能,那我就首先要找到这个对账系统的瓶颈所在。
目前的对账系统,由于订单量和派送单量巨大,所以查询未对账订单 getPOrders() 和查询派送单 getDOrders() 相对较慢,那有没有办法快速优化一下呢?目前对账系统是单线程执行的,图形化后是下图这个样子。对于串行化的系统,优化性能首先想到的是能否利用多线程并行处理
对账系统单线程执行示意图
所以,这里你应该能够看出来这个对账系统里的瓶颈:查询未对账订单 getPOrders() 和查询派送单 getDOrders() 是否可以并行处理呢?显然是可以的,因为这两个操作并没有先后顺序的依赖。这两个最耗时的操作并行之后,执行过程如下图所示。对比一下单线程的执行示意图,你会发现同等时间里,并行执行的吞吐量近乎单线程的 2 倍,优化效果还是相对明显的。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Java并发编程实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(111)

  • 探索无止境
    今天的文章很精彩,有案例有递进,一气呵成!设置线程池为单个线程可以保证对账的操作按顺序执行
    2019-04-11
    101
  • 我觉得老师的问题其实是两个:
    1.为啥要用线程池,而不是在回调函数中直接调用?
    2.线程池为啥使用单线程的?

    我的考虑:
    1.使用线程池是为了异步操作,否则回掉函数是同步调用的,也就是本次对账操作执行完才能进行下一轮的检查。
    2.线程数量固定为1,防止了多线程并发导致的数据不一致,因为订单和派送单是两个队列,只有单线程去两个队列中取消息才不会出现消息不匹配的问题。

    作者回复: 👍

    2019-05-23
    3
    52
  • 刘晓林
    老师,CyclicBarrier的回调函数在哪个线程执行啊?主线程吗?比如这里的最后一段代码中,循环会在回调的时候阻塞吗?
    如果是这样的话,那check函数岂不是可以直接作为回调函数了呀,并不需要线程池了啊

    作者回复: 好问题,CyclicBarrier的回调函数执行在一个回合里最后执行await()的线程上,而且同步调用回调函数check(),调用完check之后,才会开始第二回合。所以check如果不另开一线程异步执行,就起不到性能优化的作用了。

    2019-04-12
    40
  • undifined
    线程池大小为1是必要的,如果设置为多个,有可能会两个线程 A 和 B 同时查询,A 的订单先返回,B 的派送单先返回,造成队列中的数据不匹配;所以1个线程实现生产数据串行执行,保证数据安全

    如果用Future 的话可以更方便一些:

            CompletableFuture<List> pOrderFuture = CompletableFuture.supplyAsync(this::getPOrders);
            CompletableFuture<List> dOrderFuture = CompletableFuture.supplyAsync(this::getDOrders);
            pOrderFuture.thenCombine(dOrderFuture, this::check)
                        .thenAccept(this::save);

    老师这样理解对吗,谢谢老师

    作者回复: 对,👍👍👍

    2019-04-11
    25
  • 西西弗与卡夫卡
    回调中的线程池用单线程是为了确保从两个队列取数时可以一对一获取,避免错乱。比如说,如果有两个线程,则可能出现线程1获取PO1,线程获取PO2和DO1,线程获取DO2的乱序。

    其实线程池改成多线程也可以,要把两个remove(0)放到一个同步块中
    2019-04-11
    19
  • 空知
    老师,关于CyclicBarrier回调函数,请教下
    自己写了个 CyclicBarrier的例子,回调函数总是在计数器归0时候执行,但是线程T1 T2要等回调函数执行结束之后才会再次执行...看了下CyclicBarrier 的源码,当内部计数器 index == 0时候,

    final Runnable command = barrierCommand;
                        
    if (command != null)
                            
    command.run();
    没有开启子线程吧.也就是说 对账还是同步执行的,结束之后才是下一次的查询

    作者回复: 所以才需要线程池来异步执行回调函数,你一不小心把答案找到了😂

    2019-04-11
    16
  • 曾轼麟
    老师推荐您使用ThreadPoolExecutor去实现线程池,并且实现里面的RejectedExecutionHandler和ThreadFactory,这样可以方便当调用订单查询和派送单查询的时候出现full gc的时候 dump文件 可以快速定位出现问题的线程是哪个业务线程,如果是CountDownLatch,建议设置超时时间,避免由于业务死锁没有调用countDown()导致现线程睡死的情况

    作者回复: 好建议,所有的阻塞操作,都需要设置超时时间,这是个很好的习惯。

    2019-04-13
    14
  • 波波
    思考题中,如果生产者比较快,消费者比较慢,生产者通知的时候,消费者还在对账,这个时候会怎么处理?会不会导致消费者错失通知,导致队列满了,但是消费者却没有收到通知。

    作者回复: 有这种可能,还能oom

    2019-04-11
    2
    13
  • ... ...
    追问:如果线程池是单线程的话。那假如生产者速度快运check函数执行时间。那是不是就会出现堵塞情况了。久而久之,是不是会出现队列内存溢出

    作者回复: 会

    2019-04-12
    8
  • nanquanmama
    最后的那个例子,业务逻辑的部分已经变得很不直观,并发控制的逻辑掩盖住了业务逻辑。请问一下老师,实际项目开发中,并发控制逻辑如何做,才能和业务逻辑分离出来?

    作者回复: 放到不同的类里,这方面传统的面向对象可以解决,lambda也能解决,这个模块的最后几章能解决你说的这个问题,但是更复杂的场景还得自己设计

    2019-04-11
    7
  • xuery
    有,如果为线程池有多个线程,则由于check()函数里面的两个remove并不是原子操作,可能导致消费错乱。假设订单队列中有P1,P2;派送队列中有D1,D2;两个线程T1,T2同时执行check,可能出现T1消费到P1,D2,T2消费到P2,D1,就是T1先执行pos.remove(0), 而后T2执行pos.remove(0);dos.remov(0);然后T1才执行dos.remove(0)的场景

    作者回复: 多个线程有这个可能,所以线程池用的是单线程的

    2019-05-01
    1
    5
  • 木偶人King
    老师,最后checkAll() 这里为什么new 了两个Thread 而不是使用线程池


    作者回复: 反正也不会反复创建,用不用都没关系

    2019-04-11
    5
  • 蝴蝶效应🦋
    感谢老师,一直不太明白什么时候用CyclicBarrier,今天看到案例了,刚看到join那段我想到了CompletableFuture

    作者回复: 👍

    2019-04-11
    5
  • iron_man
    王老师,cyclicbarrier,具体是在什么时候清零计数器呢?是在所有线程await返回后还是在回调函数调用后?await和回掉函数的调用顺序是怎样的

    作者回复: 回调函数执行完之后才会唤醒等待的线程。

    2019-04-11
    5
  • Darren
    而且其实可以直接single线程池的,但是最好不要Executors提供的线程池,都有弊端,最好自定义线程池

    作者回复: 👍

    2019-04-11
    5
  • 半心人
    如果生产者比较快,消费者check还没对账完 会不会照成 队列越来越多 最后内存溢出了 ,有没有什么好的方案解决呢?

    作者回复: 方案上基本都是限流

    2019-06-14
    4
  • aguan(^・ェ・^)
    老师,问一个业务逻辑的问题,在从两个队列中分别取订单和派送单的做比较的时候,怎么保证这订单和派送单是一一对应的关系呢?如果派送单有漏单,那如何对账比较取结果时的数据是一一对应关系?

    作者回复: 一一是一组和一组等价,check的时候也是批量操作。没有就就放一个空对象做占位就可以了

    2019-04-18
    4
  • 西兹兹
    undefind同学的意思差不多对。 只有一个线程的线程池,是因为,订单队列和派单队列读取数据存在竞态条件。 如果要开多个线程,则需要一个lock进行同步那两个remove方法。 个人推荐的思路是,如果生产者速度比消费者快的情况下,放入一个双向的阻塞队列尾部,每次从双向队列头部取两个对象,根据对象属性来区别订单类型,也能开多个线程进行check操作。 但本文业务里check速度很快,所以这个场景只需要开1个线程的线程池是合理的。

    作者回复: 一次多取几个然后批量执行,这个办法非常实用!

    2019-04-11
    4
  • crazypokerk
    请教一下老师,上面说的将CyclicBarrier计数器初始值设为2,假如当T1先执行完,然后执行await时减1,此时计数器为1大于0,等待,然后T2执行await时再减1,此时计数器为0,则唤醒T3执行,与此同时,将计数器重置为2,T1、T2继续开始执行,以此循环往复,可以这样理解吗?

    作者回复: 是的

    2019-04-11
    4
  • Asanz
    最后的代码段看了好几遍并没有发现checkAll()方法被调用啊😂
    2019-07-25
    3
收起评论
99+
返回
顶部