• 张天屹
    2019-04-25
    我觉得问题出在return m这里需要等待三个线程执行完成,但是并没有。
    ...
    AtomicReference<Integer> m = new AtomicReference<>(Integer.MAX_VALUE);
    CountDownLatch latch = new CountDownLatch(3);
    for(int i=0; i<3; i++) {
        executor.execute(()->{
            Integer r = null;
            try {
                r = cs.take().get();
            } catch(Exception e) {}
            save(r);
            m.set(Integer.min(m.get(), r));
            latch.countDown();
        });
        latch.await();
        return m;
    }
    展开

    作者回复: 👍

     6
     40
  • 小华
    2019-04-25
    看老师的意图是要等三个比较报假的线程都执行完才能执行主线程的的return m,但是代码无法保证三个线程都执行完,和主线程执行return的顺序,因此,m的值不是准确的,可以加个线程栈栏,线程执行完计数器,来达到这效果

    作者回复: 👍

    
     12
  • 代码搬运工
    2019-04-25
    m.get()和m.set()不是原子性操作,正确代码是:do{int expect = m.get();int min= Integer.min(expect,r);}while(!m.compareAndSet(expect,min))。老师,是这样吗?
    
     12
  • 西行寺咕哒子
    2019-04-25
    试过返回值是2147483647,也就是int的最大值。没有等待操作完成就猴急的返回了。 m.set(Integer.min(m.get(), r)... 这个操作也不是原子操作。
    试着自己弄了一下:
    public Integer run(){
            // 创建线程池
            ExecutorService executor = Executors.newFixedThreadPool(3);
            // 创建 CompletionService
            CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
            AtomicReference<Integer> m = new AtomicReference<>(Integer.MAX_VALUE);
            // 异步向电商 S1 询价
            cs.submit(()->getPriceByS1());
            // 异步向电商 S2 询价
            cs.submit(()->getPriceByS2());
            // 异步向电商 S3 询价
            cs.submit(()->getPriceByS3());
            // 将询价结果异步保存到数据库
            // 并计算最低报价
            for (int i=0; i<3; i++) {
                Integer r = logIfError(()->cs.take().get());
                executor.execute(()-> save(r));
                m.getAndUpdate(v->Integer.min(v, r));
            }
            return m.get();
        }
    不知道可不可行
    展开

    作者回复: 👍

     1
     6
  • 天涯煮酒
    2019-04-25
    先调用m.get()并跟r比较,再调用m.set(),这里存在竞态条件,线程并不安全
     1
     5
  • linqw
    2019-04-25
    老师stampedLock的获取锁源码,老师能帮忙解惑下么?阻塞的读线程cowait是挂在写节点的下方么?老师能解惑下基于的理论模型
    private long acquireWrite(boolean interruptible, long deadline) {
            WNode node = null, p;
            for (int spins = -1;;) { // spin while enqueuing
                long m, s, ns;
                //如果当前的state是无锁状态即100000000
                if ((m = (s = state) & ABITS) == 0L) {
                    //设置成写锁
                    if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                        return ns;
                }
                else if (spins < 0)
                    //当前锁状态为写锁状态,并且队列为空,设置自旋值
                    spins = (m == WBIT && wtail == whead) ? SPINS : 0;
                else if (spins > 0) {
                    //自旋操作,就是让线程在此自旋
                    if (LockSupport.nextSecondarySeed() >= 0)
                        --spins;
                }
                //如果队列尾元素为空,初始化队列
                else if ((p = wtail) == null) { // initialize queue
                    WNode hd = new WNode(WMODE, null);
                    if (U.compareAndSwapObject(this, WHEAD, null, hd))
                        wtail = hd;
                }
                //当前要加入的元素为空,初始化当前元素,前置节点为尾节点
                else if (node == null)
                    node = new WNode(WMODE, p);
                //队列的稳定性判断,当前的前置节点是否改变,重新设置
                else if (node.prev != p)
                    node.prev = p;
                //将当前节点加入尾节点中
                else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                    p.next = node;
                    break;
                }
            }

            
    展开

    作者回复: 这可难倒我了,并发库的源码我只是零散得看的,看完基本也忘得差不多了,感觉自己也不是搞算法的料,放弃了😂

    
     3
  • Corner
    2019-04-25
    1.AtomicReference<Integer>的get方法应该改成使用cas方法
    2.最后筛选最小结果的任务是异步执行的,应该在return之前做同步,所以最好使用sumit提交该任务便于判断任务的完成
    最后请教老师一下,第一个例子中为什么主线程会阻塞在f1.get()方法呢?

    作者回复: 👍,示例代码有问题,已经改了

    
     3
  • 郑晨Cc
    2019-04-25
    executor.execute(Callable)提交任务是非阻塞的 return m;很大概率返回 Integer.Maxvalue,而且老师为了确保返回这个max还特意加入了save这个阻塞的方法
    
     3
  • 墨飞域
    2019-10-23
    老师,并发工具类,这整个一章,感觉听完似懂非懂的,因为实践中没用过,我要如何弥补这部分,还是说只要听说过,然后用的时候再去查看demo吗

    作者回复: 用的时候查文档就行,工具类主要是会用,知道什么场景用什么

    
     2
  • Sunqc
    2019-05-01
    // 获取电商 S1 报价并保存
    r=f1.get();
    executor.execute(()->save(r));

    如果把r=f1.get()放进execute里应该是也能保证先执行完的先保存

    作者回复: 是的

    
     2
  • 一眼万年
    2019-04-28
    课后思考如果需要等待最小结果,本来就有阻塞队列了,加了个线程池,评论还要加上栏栅,那除了炫技没啥作用
     1
     2
  • 黄海峰
    2019-04-25
    我实际测试了第一段代码,确实是异步的,f1.get不会阻塞主线程。。。

    public static void main(String[] args) {
            ExecutorService executor = Executors.newFixedThreadPool(3);
            Future<Integer> f1 = executor.submit(()->getPriceByS1());
            Future<Integer> f2 = executor.submit(()->getPriceByS2());
            Future<Integer> f3 = executor.submit(()->getPriceByS3());

            executor.execute(()-> {
                try {
                    save(f1.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
            executor.execute(()-> {
                try {
                    save(f2.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
            executor.execute(()-> {
                try {
                    save(f3.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
        }

        private static Integer getPriceByS1() {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        }
        private static Integer getPriceByS2() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 2;
        }
        private static Integer getPriceByS3() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 3;
        }
        private static void save(Integer i) {
            System.out.println("save " + i);
        }
    展开

    作者回复: 👍

    
     2
  • undifined
    2019-04-25
    老师 用 CompletionService 和用 CompletionFuture 查询,然后用 whenComplete 或者 thenAcceptEither 这些方法的区别是什么,我觉得用 CompletionFuture 更直观些;
    老师可以在下一讲的时候说一下上一讲的思考题正确答案吗,谢谢老师
    
     2
  • helloworld
    2019-08-30
    老师,冒昧的问下:在文章刚开始的例子,无论是三个询价任务(通过submit方法提交),还是保存询价任务(通过execute方法提交)都是异步的执行执行的啊!如果s1询价的时间过长的话,也不会影响到s2保存保价的先执行啊!他只影响到s1保存询价的动作。老师不知道我说的有么有道理,有问题请老师帮忙指正

    作者回复: 问题出在f1.get()不是异步的,它会阻塞线程

    
     1
  • 海鸿
    2019-04-26
    重新发过,刚刚的代码有误!
    1.for循环线程池执行属于异步导致未等比价结果就 return了,需要等待三次比价结果才能 return,可以用 CountDownLatch
    2. m. set( Integer. min( m. get(), r))存在竞态条件,可以更改为
    Integer o;
    do{
    o= m. get();
    if(o<=r){ break;}
    }
    while(! m. compareAndSet( o, r));
    3.还有一个小问题就是 try- catch捕获异常后的处理,提高程序鲁棒性
    展开
    
     1
  • hj
    2019-04-25
    确实,第一个例子老师为了抛出问题引出CompletionService,但是逻辑上的确不成立。不是f1.get()阻塞主线程
    
     1
  • 苏志辉
    2019-04-25
    m.set和get存在静态条件不是原子的,可能存在设置和不是最小值
    
     1
  • 空空空空
    2019-04-25
    算低价的时候是用三个不同的线程去计算,是异步的,因此可能算出来并不是预期的结果
    老师,这样理解对吗?

    作者回复: 对的!

    
     1
  • supermouse
    2020-02-09
    老师您好,为什么我写的这个用CompletionService创建四个线程计算从1加到n的程序在打印出最后的结果后没有退出呢?
    public class TestCompletionService {
        private static long addRange(long start, long end){
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            long n = 3000000000L;
            Long res = 0L;

            ExecutorService executor = Executors.newFixedThreadPool(4); //创建线程池
            CompletionService<Long> cs = new ExecutorCompletionService<>(executor);

            cs.submit(() -> addRange(1L, n >> 2));
            cs.submit(() -> addRange((n >> 2) + 1, n >> 1));
            cs.submit(() -> addRange((n >> 1) + 1, (n >> 2) * 3));
            cs.submit(() -> addRange((n >> 2) * 3 + 1, n));

            for (int i = 0; i < 4; i++) {
                res += cs.take().get();
            }
            System.out.println(res);
        }
    }
    展开
    
    
  • 旅途
    2020-02-06
    老师 问一下 用 CompletionService 实现 Dubbo 中的 Forking Cluster 这个例子 为什么使用了一个future集合 像之前的例子一样 直接用CompletionService对象的take不行吗
    
    
我们在线,来聊聊吧