• 探索无止境
    2019-04-11
    今天的文章很精彩,有案例有递进,一气呵成!设置线程池为单个线程可以保证对账的操作按顺序执行
    
     113
  • 傲
    2019-05-23
    我觉得老师的问题其实是两个:
    1.为啥要用线程池,而不是在回调函数中直接调用?
    2.线程池为啥使用单线程的?

    我的考虑:
    1.使用线程池是为了异步操作,否则回掉函数是同步调用的,也就是本次对账操作执行完才能进行下一轮的检查。
    2.线程数量固定为1,防止了多线程并发导致的数据不一致,因为订单和派送单是两个队列,只有单线程去两个队列中取消息才不会出现消息不匹配的问题。
    展开

    作者回复: 👍

     3
     73
  • 刘晓林
    2019-04-12
    老师,CyclicBarrier的回调函数在哪个线程执行啊?主线程吗?比如这里的最后一段代码中,循环会在回调的时候阻塞吗?
    如果是这样的话,那check函数岂不是可以直接作为回调函数了呀,并不需要线程池了啊

    作者回复: 好问题,CyclicBarrier的回调函数执行在一个回合里最后执行await()的线程上,而且同步调用回调函数check(),调用完check之后,才会开始第二回合。所以check如果不另开一线程异步执行,就起不到性能优化的作用了。

    
     54
  • undifined
    2019-04-11
    线程池大小为1是必要的,如果设置为多个,有可能会两个线程 A 和 B 同时查询,A 的订单先返回,B 的派送单先返回,造成队列中的数据不匹配;所以1个线程实现生产数据串行执行,保证数据安全

    如果用Future 的话可以更方便一些:

            CompletableFuture<List> pOrderFuture = CompletableFuture.supplyAsync(this::getPOrders);
            CompletableFuture<List> dOrderFuture = CompletableFuture.supplyAsync(this::getDOrders);
            pOrderFuture.thenCombine(dOrderFuture, this::check)
                        .thenAccept(this::save);

    老师这样理解对吗,谢谢老师
    展开

    作者回复: 对,👍👍👍

    
     28
  • 空知
    2019-04-11
    老师,关于CyclicBarrier回调函数,请教下
    自己写了个 CyclicBarrier的例子,回调函数总是在计数器归0时候执行,但是线程T1 T2要等回调函数执行结束之后才会再次执行...看了下CyclicBarrier 的源码,当内部计数器 index == 0时候,

    final Runnable command = barrierCommand;
                        
    if (command != null)
                            
        command.run();
    没有开启子线程吧.也就是说 对账还是同步执行的,结束之后才是下一次的查询
    展开

    作者回复: 所以才需要线程池来异步执行回调函数,你一不小心把答案找到了😂

    
     23
  • Kǎfκã²⁰²⁰
    2019-04-11
    回调中的线程池用单线程是为了确保从两个队列取数时可以一对一获取,避免错乱。比如说,如果有两个线程,则可能出现线程1获取PO1,线程获取PO2和DO1,线程获取DO2的乱序。

    其实线程池改成多线程也可以,要把两个remove(0)放到一个同步块中
    
     20
  • 曾轼麟
    2019-04-13
    老师推荐您使用ThreadPoolExecutor去实现线程池,并且实现里面的RejectedExecutionHandler和ThreadFactory,这样可以方便当调用订单查询和派送单查询的时候出现full gc的时候 dump文件 可以快速定位出现问题的线程是哪个业务线程,如果是CountDownLatch,建议设置超时时间,避免由于业务死锁没有调用countDown()导致现线程睡死的情况

    作者回复: 好建议,所有的阻塞操作,都需要设置超时时间,这是个很好的习惯。

    
     19
  • 波波
    2019-04-11
    思考题中,如果生产者比较快,消费者比较慢,生产者通知的时候,消费者还在对账,这个时候会怎么处理?会不会导致消费者错失通知,导致队列满了,但是消费者却没有收到通知。

    作者回复: 有这种可能,还能oom

     2
     14
  • ... ...
    2019-04-12
    追问:如果线程池是单线程的话。那假如生产者速度快运check函数执行时间。那是不是就会出现堵塞情况了。久而久之,是不是会出现队列内存溢出

    作者回复: 会

    
     9
  • nanquanmama
    2019-04-11
    最后的那个例子,业务逻辑的部分已经变得很不直观,并发控制的逻辑掩盖住了业务逻辑。请问一下老师,实际项目开发中,并发控制逻辑如何做,才能和业务逻辑分离出来?

    作者回复: 放到不同的类里,这方面传统的面向对象可以解决,lambda也能解决,这个模块的最后几章能解决你说的这个问题,但是更复杂的场景还得自己设计

    
     8
  • xuery
    2019-05-01
    有,如果为线程池有多个线程,则由于check()函数里面的两个remove并不是原子操作,可能导致消费错乱。假设订单队列中有P1,P2;派送队列中有D1,D2;两个线程T1,T2同时执行check,可能出现T1消费到P1,D2,T2消费到P2,D1,就是T1先执行pos.remove(0), 而后T2执行pos.remove(0);dos.remov(0);然后T1才执行dos.remove(0)的场景

    作者回复: 多个线程有这个可能,所以线程池用的是单线程的

     1
     7
  • 半心人
    2019-06-14
    如果生产者比较快,消费者check还没对账完 会不会照成 队列越来越多 最后内存溢出了 ,有没有什么好的方案解决呢?

    作者回复: 方案上基本都是限流

    
     6
  • 木偶人King
    2019-04-11
    老师,最后checkAll() 这里为什么new 了两个Thread 而不是使用线程池


    作者回复: 反正也不会反复创建,用不用都没关系

    
     6
  • Simons A Soros
    2019-04-11
    感谢老师,一直不太明白什么时候用CyclicBarrier,今天看到案例了,刚看到join那段我想到了CompletableFuture

    作者回复: 👍

    
     6
  • iron_man
    2019-04-11
    王老师,cyclicbarrier,具体是在什么时候清零计数器呢?是在所有线程await返回后还是在回调函数调用后?await和回掉函数的调用顺序是怎样的

    作者回复: 回调函数执行完之后才会唤醒等待的线程。

    
     6
  • Darren
    2019-04-11
    而且其实可以直接single线程池的,但是最好不要Executors提供的线程池,都有弊端,最好自定义线程池

    作者回复: 👍

    
     6
  • aguan(^・ェ・^)
    2019-04-18
    老师,问一个业务逻辑的问题,在从两个队列中分别取订单和派送单的做比较的时候,怎么保证这订单和派送单是一一对应的关系呢?如果派送单有漏单,那如何对账比较取结果时的数据是一一对应关系?

    作者回复: 一一是一组和一组等价,check的时候也是批量操作。没有就就放一个空对象做占位就可以了

    
     5
  • 西兹兹
    2019-04-11
    undefind同学的意思差不多对。 只有一个线程的线程池,是因为,订单队列和派单队列读取数据存在竞态条件。 如果要开多个线程,则需要一个lock进行同步那两个remove方法。 个人推荐的思路是,如果生产者速度比消费者快的情况下,放入一个双向的阻塞队列尾部,每次从双向队列头部取两个对象,根据对象属性来区别订单类型,也能开多个线程进行check操作。 但本文业务里check速度很快,所以这个场景只需要开1个线程的线程池是合理的。

    作者回复: 一次多取几个然后批量执行,这个办法非常实用!

    
     5
  • crazypokerk
    2019-04-11
    请教一下老师,上面说的将CyclicBarrier计数器初始值设为2,假如当T1先执行完,然后执行await时减1,此时计数器为1大于0,等待,然后T2执行await时再减1,此时计数器为0,则唤醒T3执行,与此同时,将计数器重置为2,T1、T2继续开始执行,以此循环往复,可以这样理解吗?

    作者回复: 是的

    
     5
  • null
    2019-05-21
    老师,您好!我有几个疑问:

    1. 文章里提到:获取订单 getPOrders() 和获取派送订 getDOrders() 是相互独立、互不依赖的。
    我们的订单系统通过 MQ 与派送系统进行数据交互,并且一个订单有可能生成多个派送单(仓库不同,拆单),想了好久,也没想到比较好的方式实现订单和派送单的查询操作可以并行处理。
    如果每次只筛选过去一个小时未对账的订单,和过去一小时的派送单,当存在漏生成派送单时,系统发现不了(不知道是漏生成派送单,还是 MQ 没消费)。

    2. 文章第二种方案,线程池多生成一个线程,专门用来处理 ( check & save ),也能够实现查询和对账并行处理。因此不太能理解“一个线程等待多个线程”和“一组线程之间的相互等待”的区别。感觉 CountDownLatch 和 CyclicBarrier 都是 (check & save) 线程在等待 getPOrders 和 getDOrders 线程。

    3.文章最后一种方案,每次只查询一条改成一次查多条,这样可以减少查询的次数。check 的时候,也批量处理,吞吐量是不是会好一点吖。

    谢谢老师!!
    展开

    作者回复: 查询数据库一定是批量查询的,先把订单的id查出来,然后用这些id就能并行查了,cyclicbarrier能循环利用,用起来更简单,你可以用countdownlatch实现一下,就知道他们的区别了

    
     4
我们在线,来聊聊吧