Java并发编程实战
王宝令
资深架构师
立即订阅
15151 人已学习
课程目录
已完结 50 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 你为什么需要学习并发编程?
免费
学习攻略 (1讲)
学习攻略 | 如何才能学好并发编程?
第一部分:并发理论基础 (13讲)
01 | 可见性、原子性和有序性问题:并发编程Bug的源头
02 | Java内存模型:看Java如何解决可见性和有序性问题
03 | 互斥锁(上):解决原子性问题
04 | 互斥锁(下):如何用一把锁保护多个资源?
05 | 一不小心就死锁了,怎么办?
06 | 用“等待-通知”机制优化循环等待
07 | 安全性、活跃性以及性能问题
08 | 管程:并发编程的万能钥匙
09 | Java线程(上):Java线程的生命周期
10 | Java线程(中):创建多少线程才是合适的?
11 | Java线程(下):为什么局部变量是线程安全的?
12 | 如何用面向对象思想写好并发程序?
13 | 理论基础模块热点问题答疑
第二部分:并发工具类 (14讲)
14 | Lock和Condition(上):隐藏在并发包中的管程
15 | Lock和Condition(下):Dubbo如何用管程实现异步转同步?
16 | Semaphore:如何快速实现一个限流器?
17 | ReadWriteLock:如何快速实现一个完备的缓存?
18 | StampedLock:有没有比读写锁更快的锁?
19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致?
20 | 并发容器:都有哪些“坑”需要我们填?
21 | 原子类:无锁工具类的典范
22 | Executor与线程池:如何创建正确的线程池?
23 | Future:如何用多线程实现最优的“烧水泡茶”程序?
24 | CompletableFuture:异步编程没那么难
25 | CompletionService:如何批量执行异步任务?
26 | Fork/Join:单机版的MapReduce
27 | 并发工具类模块热点问题答疑
第三部分:并发设计模式 (10讲)
28 | Immutability模式:如何利用不变性解决并发问题?
29 | Copy-on-Write模式:不是延时策略的COW
30 | 线程本地存储模式:没有共享,就没有伤害
31 | Guarded Suspension模式:等待唤醒机制的规范实现
32 | Balking模式:再谈线程安全的单例模式
33 | Thread-Per-Message模式:最简单实用的分工方法
34 | Worker Thread模式:如何避免重复创建线程?
35 | 两阶段终止模式:如何优雅地终止线程?
36 | 生产者-消费者模式:用流水线思想提高效率
37 | 设计模式模块热点问题答疑
第四部分:案例分析 (4讲)
38 | 案例分析(一):高性能限流器Guava RateLimiter
39 | 案例分析(二):高性能网络应用框架Netty
40 | 案例分析(三):高性能队列Disruptor
41 | 案例分析(四):高性能数据库连接池HiKariCP
第五部分:其他并发模型 (4讲)
42 | Actor模型:面向对象原生的并发模型
43 | 软件事务内存:借鉴数据库的并发经验
44 | 协程:更轻量级的线程
45 | CSP模型:Golang的主力队员
结束语 (1讲)
结束语 | 十年之后,初心依旧
用户故事 (2讲)
用户来信 | 真好,面试考到这些并发编程,我都答对了!
3 个用户来信 | 打开一个新的并发世界
Java并发编程实战
登录|注册

16 | Semaphore:如何快速实现一个限流器?

王宝令 2019-04-04
Semaphore,现在普遍翻译为“信号量”,以前也曾被翻译成“信号灯”,因为类似现实生活里的红绿灯,车辆能不能通行,要看是不是绿灯。同样,在编程世界里,线程能不能执行,也要看信号量是不是允许。
信号量是由大名鼎鼎的计算机科学家迪杰斯特拉(Dijkstra)于 1965 年提出,在这之后的 15 年,信号量一直都是并发编程领域的终结者,直到 1980 年管程被提出来,我们才有了第二选择。目前几乎所有支持并发编程的语言都支持信号量机制,所以学好信号量还是很有必要的。
下面我们首先介绍信号量模型,之后介绍如何使用信号量,最后我们再用信号量来实现一个限流器。

信号量模型

信号量模型还是很简单的,可以简单概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down() 和 up()。你可以结合下图来形象化地理解。
信号量模型图
这三个方法详细的语义具体如下所示。
init():设置计数器的初始值。
down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。
up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Java并发编程实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(88)

  • CCC
    我理解的和管程相比,信号量可以实现的独特功能就是同时允许多个线程进入临界区,但是信号量不能做的就是同时唤醒多个线程去争抢锁,只能唤醒一个阻塞中的线程,而且信号量模型是没有Condition的概念的,即阻塞线程被醒了直接就运行了而不会去检查此时临界条件是否已经不满足了,基于此考虑信号量模型才会设计出只能让一个线程被唤醒,否则就会出现因为缺少Condition检查而带来的线程安全问题。正因为缺失了Condition,所以用信号量来实现阻塞队列就很麻烦,因为要自己实现类似Condition的逻辑。

    作者回复: 👍👍👍

    2019-04-04
    82
  • 老杨同志
    需要用线程安全的vector,因为信号量支持多个线程进入临界区,执行list的add和remove方法时可能是多线程并发执行

    作者回复: 👍

    2019-04-04
    54
  • 任大鹏
    有同学认为up()中的判断条件应该>=0,我觉得有可能理解为生产者-消费者模式中的生产者了。可以这么想,>0就意味着没有阻塞的线程了,所以只有<=0的情况才需要唤醒一个等待的线程。其实down()和up()是成对出现的,并且是先调用down()获得锁,处理完成再调用up()释放锁,如果信号量初始值为1,应该是不会出现>0的情况的,除非故意调先用up(),这也失去了信号量本身的意义了。不知道我理解的对不对。

    作者回复: 对👍👍👍

    2019-04-04
    19
  • shawn
    老师能否把课程所有的完整代码放到github上,这样我们学起来更方便。包括全面几章的也发下,因为有时候根据您的代码,我没法运行
    2019-04-04
    12
  • crazypokerk
    文中,up():计数器的值加 1;如果此时计数器的值小于或者等于0,这句话应该是大于等于0吧
    2019-04-04
    1
    9
  • 缪文@场景鹿
    这个限流器实际上限的是并发量,也就是同时允许多少个请求通过,如果限制每秒请求数,不是这个实现的吧

    作者回复: 后面会介绍guava的限流器

    2019-04-06
    6
  • Alvan
    很多人对up()方法的计数器count<=0不理解,可以看下这里:
    1、反证法验证一下,假如一个线程先执行down()操作,那么此时count的值是0,接着这个线程执行up()操作,此时count的值是1,如果count应该是大于等于0,那么应该唤醒其他线程,可是此时并没有线程在睡眠呀,count的值不应该是大于等于0。
    2、假如一个线程t1执行down()操作,此时count = 0,然后t1被中断,另外的线程t2执行down()操作,此时count=-1,t2阻塞睡眠,另外的线程t3执行down()操作,count=-2,t3也睡眠。count=-2 说明有两个线程在睡眠,接着t1执行up() 操作,此时count=-1,小于等于0,唤醒t2或者t3其中一个线程,假如计数器count是大于等于0才唤醒其他线程,这明显是不对的。

    作者回复: 👍

    2019-09-09
    1
    5
  • 榣山樵客™
    换ArrayList是不行的,临界区内可能存在多个线程来执行remove操作,出现不可预知的后果。

    对于chaos同学说return之前释放的问题,我觉得可以这么理解:return的是执行后的结果,而不是“执行”。所以顺序应该是这样的:1acquire;2apply;3finally release;4return2的结果

    作者回复: 是的,感谢回复的这么详细!!!

    2019-04-04
    5
  • crazypokerk
    老师,那个计数器中得s.acquire()是需要捕获异常的。
    static int count;
        static final Semaphore s = new Semaphore(1);

        static void addOne() throws InterruptedException {
            s.acquire();
            try {
                count += 1;
            }finally {
                s.release();
            }
        }

    作者回复: 异常都被我省略了,这样代码更能专注的表达问题,如果你本地实验,加上就可以了。手机屏幕太小,折行后行数太多,看到后面忘了前面,所以我尽讲精炼代码

    2019-04-04
    5
  • ken

    public class Food {

        public String name;

        private long warmTime;

        public Food(String name, long warmTime) {
            this.name = name;
            this.warmTime = warmTime;
        }

        public String getName() {
            return name;
        }

        public long getWarmTime() {
            return warmTime;
        }
    }



    public class MicrowaveOven {

        public String name;

        public MicrowaveOven(String name) {
            this.name = name;
        }

        public Food warm(Food food) {
            long second = food.getWarmTime() * 1000;
            try {
                Thread.sleep(second);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(String.format("%s warm %s %d seconds food.", name,food.getName() ,food.getWarmTime()));
            return food;
        }

        public String getName() {
            return name;
        }
    }
    public class MicrowaveOvenPool {

        private List<MicrowaveOven> microwaveOvens;

        private Semaphore semaphore;

        public MicrowaveOvenPool(int size,@NotNull List<MicrowaveOven> microwaveOvens) {
            this.microwaveOvens = new Vector<>(microwaveOvens);
            this.semaphore = new Semaphore(size);
        }
        public Food exec(Function<MicrowaveOven, Food> func) {
            MicrowaveOven microwaveOven = null;
            try{
                semaphore.acquire();
                microwaveOven = microwaveOvens.remove(0);
                return func.apply(microwaveOven);
            }catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                microwaveOvens.add(microwaveOven);
                semaphore.release();
            }
            return null;
        }

    }

    作者回复: 👍

    2019-04-08
    4
  • master
    老师,void up()方法中的this.count判断条件是否应该为>=0
    2019-04-04
    4
  • 长眉_张永
    对于进入的多个线程资源之间,如果有公用的信息的话,是否还需要加锁操作呢?

    作者回复: 需要

    2019-04-09
    3
  • 小和尚笨南北
    semaphore底层通过AQS实现,AQS内部通过一个volatile变量间接实现同步。
    根据happen-before原则的volatile规则和传递性规则。使用arraylist也不会发生线程安全问题。

    作者回复: 不可以,有多个线程进入临界区

    2019-04-04
    3
  • Mr Q.
    创建对象池的时候都是添加的同一个对象。
    2019-05-17
    2
  • 木偶人King
    ObjPool(int size, T t){
        pool = new Vector<T>(){};
        for(int i=0; i<size; i++){
          pool.add(t);
        }
        sem = new Semaphore(size);
      }
     //--------------------------------

    老师这里pool.add(t) 一直循环添加的是同一个引用对象。没太明白。 为什么不是添加不同的t

    作者回复: 实际项目中一定是不同的

    2019-04-09
    2
  • 陈华应
    不可以,临界区会有多个线程并发执行
    2019-04-06
    2
  • QQ怪
    用初始化为1的Semaphore和管程来单单控制线程安全,哪个更有优势?为啥java不直接用信号量来实现互斥?

    作者回复: 如果仅仅是为了互斥,都可以。

    2019-04-05
    2
  • 刘彦辉
    假如有3个线程,线程A、B、C,信号量计数器为1,线程A执行down的时候变为0,不阻塞;线程B执行down,变为-1,阻塞;线程C执行down变为-2,阻塞。当线程A执行完,调用up后,变为-1,此时唤醒一个线程,那么请问唤醒之后的操作呢?唤醒之后直接就执行了业务代码了?还是唤醒之后还需要去先执行down?按分析的话应该不能执行down了,如果执行down的话,计数器变为-2,还会阻塞,所以是不是这块儿的阻塞和唤醒也是用的wait和notify呢?唤醒之后,从阻塞的代码开始继续执行,这样就可以成功执行下去了。麻烦老师解答一下哈,谢谢。

    作者回复: 唤醒后直接执行业务代码,在哪里阻塞,唤醒后就在哪里继续执行

    2019-09-20
    1
  • 倚梦流
    限流器,基于老师的代码,自己手动完善了一下。
    package com.thread.demo;

    import java.util.List;
    import java.util.Vector;
    import java.util.concurrent.Semaphore;
    import java.util.function.Function;

    public class ObjPool<T,R> {
        private List<T> pool;
        //使用信号量实现限流器
        private final Semaphore semaphore;


        ObjPool(T[] tArray){
            pool=new Vector<T>(){};
            int size=tArray.length;
            for(int i=0;i<size;i++){
                pool.add(tArray[i]);
            }
            semaphore=new Semaphore(size);
        }

        R exec(Function<T,R> func) throws InterruptedException {
            T t=null;
            semaphore.acquire();
            try{
                t=pool.remove(0);
                return func.apply(t);
            }finally {
                pool.add(t);
                semaphore.release();
            }
        }

        public static void main(String[] args){
            String[] messages=new String[10];
            for(int i=0;i<10;i++){
                messages[i]="obj_"+i;
            }
            ObjPool<String,String> pool=new ObjPool<>(messages );

            for(int i=0;i<100;i++){
                Thread thread=new Thread(() ->{
                    try {
                        pool.exec(t -> {
                            System.out.println("当前线程id:"+Thread.currentThread().getId()+",当前获取到的对象:"+t);
                            return t;
                        });
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
                thread.start();
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }


    }

    作者回复: 👍

    2019-07-07
    1
  • 古夜
    在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down() 和 up()。

    这里应该是不透明吧?
    2019-07-02
    1
收起评论
88
返回
顶部