Java并发编程实战
王宝令
资深架构师
立即订阅
15151 人已学习
课程目录
已完结 50 讲
0/4登录后,你可以任选4讲全文学习。
开篇词 (1讲)
开篇词 | 你为什么需要学习并发编程?
免费
学习攻略 (1讲)
学习攻略 | 如何才能学好并发编程?
第一部分:并发理论基础 (13讲)
01 | 可见性、原子性和有序性问题:并发编程Bug的源头
02 | Java内存模型:看Java如何解决可见性和有序性问题
03 | 互斥锁(上):解决原子性问题
04 | 互斥锁(下):如何用一把锁保护多个资源?
05 | 一不小心就死锁了,怎么办?
06 | 用“等待-通知”机制优化循环等待
07 | 安全性、活跃性以及性能问题
08 | 管程:并发编程的万能钥匙
09 | Java线程(上):Java线程的生命周期
10 | Java线程(中):创建多少线程才是合适的?
11 | Java线程(下):为什么局部变量是线程安全的?
12 | 如何用面向对象思想写好并发程序?
13 | 理论基础模块热点问题答疑
第二部分:并发工具类 (14讲)
14 | Lock和Condition(上):隐藏在并发包中的管程
15 | Lock和Condition(下):Dubbo如何用管程实现异步转同步?
16 | Semaphore:如何快速实现一个限流器?
17 | ReadWriteLock:如何快速实现一个完备的缓存?
18 | StampedLock:有没有比读写锁更快的锁?
19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致?
20 | 并发容器:都有哪些“坑”需要我们填?
21 | 原子类:无锁工具类的典范
22 | Executor与线程池:如何创建正确的线程池?
23 | Future:如何用多线程实现最优的“烧水泡茶”程序?
24 | CompletableFuture:异步编程没那么难
25 | CompletionService:如何批量执行异步任务?
26 | Fork/Join:单机版的MapReduce
27 | 并发工具类模块热点问题答疑
第三部分:并发设计模式 (10讲)
28 | Immutability模式:如何利用不变性解决并发问题?
29 | Copy-on-Write模式:不是延时策略的COW
30 | 线程本地存储模式:没有共享,就没有伤害
31 | Guarded Suspension模式:等待唤醒机制的规范实现
32 | Balking模式:再谈线程安全的单例模式
33 | Thread-Per-Message模式:最简单实用的分工方法
34 | Worker Thread模式:如何避免重复创建线程?
35 | 两阶段终止模式:如何优雅地终止线程?
36 | 生产者-消费者模式:用流水线思想提高效率
37 | 设计模式模块热点问题答疑
第四部分:案例分析 (4讲)
38 | 案例分析(一):高性能限流器Guava RateLimiter
39 | 案例分析(二):高性能网络应用框架Netty
40 | 案例分析(三):高性能队列Disruptor
41 | 案例分析(四):高性能数据库连接池HiKariCP
第五部分:其他并发模型 (4讲)
42 | Actor模型:面向对象原生的并发模型
43 | 软件事务内存:借鉴数据库的并发经验
44 | 协程:更轻量级的线程
45 | CSP模型:Golang的主力队员
结束语 (1讲)
结束语 | 十年之后,初心依旧
用户故事 (2讲)
用户来信 | 真好,面试考到这些并发编程,我都答对了!
3 个用户来信 | 打开一个新的并发世界
Java并发编程实战
登录|注册

36 | 生产者-消费者模式:用流水线思想提高效率

王宝令 2019-05-21
前面我们在《34 | Worker Thread 模式:如何避免重复创建线程?》中讲到,Worker Thread 模式类比的是工厂里车间工人的工作模式。但其实在现实世界,工厂里还有一种流水线的工作模式,类比到编程领域,就是生产者 - 消费者模式
生产者 - 消费者模式在编程领域的应用也非常广泛,前面我们曾经提到,Java 线程池本质上就是用生产者 - 消费者模式实现的,所以每当使用线程池的时候,其实就是在应用生产者 - 消费者模式。
当然,除了在线程池中的应用,为了提升性能,并发编程领域很多地方也都用到了生产者 - 消费者模式,例如 Log4j2 中异步 Appender 内部也用到了生产者 - 消费者模式。所以今天我们就来深入地聊聊生产者 - 消费者模式,看看它具体有哪些优点,以及如何提升系统的性能。

生产者 - 消费者模式的优点

生产者 - 消费者模式的核心是一个任务队列,生产者线程生产任务,并将任务添加到任务队列中,而消费者线程从任务队列中获取任务并执行。下面是生产者 - 消费者模式的一个示意图,你可以结合它来理解。
生产者 - 消费者模式示意图
从架构设计的角度来看,生产者 - 消费者模式有一个很重要的优点,就是解耦。解耦对于大型系统的设计非常重要,而解耦的一个关键就是组件之间的依赖关系和通信方式必须受限。在生产者 - 消费者模式中,生产者和消费者没有任何依赖关系,它们彼此之间的通信只能通过任务队列,所以生产者 - 消费者模式是一个不错的解耦方案
取消
完成
0/1000字
划线
笔记
复制
© 版权归极客邦科技所有,未经许可不得传播售卖。 页面已增加防盗追踪,如有侵权极客邦将依法追究其法律责任。
该试读文章来自付费专栏《Java并发编程实战》,如需阅读全部文章,
请订阅文章所属专栏。
立即订阅
登录 后留言

精选留言(32)

  • 在应用系统中,日志系统一般都是最后关闭的吧,因为它要为其他系统关闭提供写日志服务。所以日志系统关闭时需要把队列中所有日志都消费掉才能关闭。
    可能需要在关闭日志系统时投入一个毒丸,表示没有新的日志写入。线程池在消费到毒丸时知道没有日志写入,将所有的日志刷盘,break循环体。

    作者回复: 👍

    2019-05-21
    51
  • PK時頭髮不亂
    极客时间有好多课程, 我觉得王老师的干货是最实际最可用的, 必须要赞一个。

    作者回复: 感谢感谢,有钱难买合适:)

    2019-05-21
    1
    21
  • 聂旋
    安卓的主线程中也是采用消息队列加消息循环方式,来处理用户输入及各种事件。当应用退出时,会发送一个处理对象为null的消息给队列,消息循环遇到这样的消息时就退出了。
    2019-05-23
    2
    7
  • êwěn

    之前遇到过一个生产问题,一个服务启动一段时间后就不停的超时,后面结合线程栈发现很多阻塞在打印日志的地方(我们用的就是log4j2),后面查到机子硬盘问题,io直接100%以上,日志刷盘满导致消费速度慢,队列撑满阻塞了写,这间接说明平衡好生产和消费速度以及适当的队列大小是很有必要。

    作者回复: 能快速定位的问题👍👍

    2019-05-22
    7
  • Asanz
    看到很多示例代码都没有关闭线程池的动作,难道局部的线程池就不要关闭吗?

    作者回复: 需要

    2019-08-08
    3
  • 晓杰
    35讲说到优雅地终止线程,首先需要线程状态转换为runnable状态(在终止刷盘的方法中调用Thread.interrupt()方法)
    然后可以通过设置标志位来让线程优雅终止,具体有两种方式:
    1、通过判断线程的中断状态Thread.currentThread.isInterrupted()
    2、设置自己的线程终止标志位,该标志位volatile修饰的共享变量。(这种方式需要在终止刷盘的方法中修改该共享变量的值)
    2019-05-21
    3
  • berthav_ss
    宝令老师,如何优雅的停止线程池中某一组线程呢?例如我在线程a中启动了1-10线程,线程b中启动了2-30线程,如何优雅停止1-10线程呢

    作者回复: 可以考虑一下毒丸的方式

    2019-06-11
    2
  • 曾轼麟
    补充一下上面的留言,先通过创建的钩子去创建一个毒丸,然后释放资源
    2019-05-23
    2
  • 曾轼麟
    使用Runtime提供的钩子,然后在关闭前,先让内部任务执行完毕,再释放资源
    2019-05-23
    2
  • 苏籍
    您好老师问个最近用到的线程池使用的问题
    我的工程是springboot的,在unitTest里(@SpringBootTest) 里调用了一个service A(通过@Autowired的)中的方法,A中启用了一个线程池,执行的任务 是往数据库里插入数据。但是总抛出数据源已经被关闭的异常,我理解的是在单测主线程已经结束,所以关闭了数据源这些清理工作,而此时线程池的线程还
    没结束,这个时候去调用数据源是null 的,不知道这么理解对不对,另外这个test主线程结束,为啥线程池的线程还没结束(通过打断点看到的)。这个怎么理解,求教

    作者回复: 只有守护线程才会自动结束,线程池的线程不是守护线程

    2019-05-22
    2
  • ack
    public class Logger {
    ...

        volatile boolean stop;

        // 启动写日志线程
        void start() throws IOException {
    ...
            this.es.execute(() -> {
                try {
                    // 未刷盘日志数量
                    int curIdx = 0;
                    long preFT = System.currentTimeMillis();
                    while (!stop) {
                        ...
                    }
                } catch (InterruptedException e) {
                    // 重新设置线程中断状态
                    Thread.currentThread().interrupt();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
    ...
                }
            });
        }
    ...
        void stop(){
            stop = true;
            es.shutdown();
        }
    }
    2019-05-21
    2
  • 泛岁月的涟漪
    1、使用线程池的shutdown或者shutdownNow关闭线程池
    2、while循环条件设置为一个volatile boolean变量
    3、可以使用interrupt,但是线程是线程池管理的,没有消费者线程的引用中断不了
    2019-05-21
    2
  • null
    private BlockingQueue<X> bq = new LinkedBlockingQueue<>(1000);

    // 从任务队列中获取批量任务
    List<X> pollTasks() throws InterruptedException{
      List<X> ts=new LinkedList<>();

      X t = bq.take();
      while (t != null) {
        ts.add(t);
        t = bq.poll();
      }

      return ts;
    }

    -----

    需求背景:(一个线程往 bq 写数据,三个线程从 bq 读数据)
    1. 线程 A 从数据库批量读取数据,每次读 1000 条记录,然后在 for 循环内写入队列 bq.put(x)。
    2. 线程 B、线程 C、线程 D 调用 pollTasks() 方法获取数据列表,然后将数据列表做为参数,调用 Y 接口获取一批数据,最后进行业务运算。

    -----

    跑 demo 时发现 pollTasks() 方法有两个地方需要注意一下(一是:获取的列表数量不均,二是:退化成单元素列表):
    1. 线程 B、C、D 调用 pollTasks() 获得的列表,数据量不均匀,例如线程 B 只读取到 10+ 个元素,而线程 C 却读取了 1000+ 个元素。
    2. 如果我上游写入队列 bq 速度较慢(通过一些复杂的运算再写入 bq),这时下游通过 pollTasks() 获取的列表,几乎都是只有一个元素的列表。


    列表数据不均,可以增加返回列表的上限,或者增加超时机制。

    退化成单元素列表:
    1. pollTasks() 的调用方主动等待片刻,再获取数据。
    2. 修改 pollTasks() 的实现,返回列表的前提条件是:列表的 size 必须 batchSizeLimit 下限,否则等待超时 System.currentTimeMillis()-startMillis>1000。
    2019-07-26
    1
  • 张三
    还是不太懂,线程池的实现是有两种模式吗? Worker Thread 和 生产者-消费者 模式 ?
    2019-05-22
    1
  • 佑儿
    声明一个volatie变量用于表示线程结束,为true时,退出循环

    作者回复: 队列中的任务就丢了

    2019-05-21
    1
  • 密码123456
    设置一个volitle 。这里中断设置不了,没有引用。我觉得一个volite关键字够了。之前说happens before的时候说,volit写,优于volit读,应该立刻可见。还要问下老师,这么理解可以吗?中断是不是一定必须的?

    作者回复: 可见,中断要看场景

    2019-05-21
    1
  • 放个屁臭到了自己
    如果使用轻量级线程,就没有必要平衡生产者和消费者的速度差异了,因为轻量级线程本身就是廉价的,

    为什么廉价就不需要平衡?

    作者回复: 廉价意味着可以轻松创建很多,线程是不能创建很多的。

    2019-11-25
    1
  • Joker
    如果没有队列的话:就会,一个生产线程就需要一个消费线程来相互跟进,所以这个就但是消费的赶不上生产的速度,就导致了两个问题:1.等待,2.多余线程创建。
    2019-11-08
  • 生活用来品
    请问一下高并发场景,四个人拼一个团,怎么拼?
    1.db里记录拼团人数,如果小于4则直接update到拼团用户表,否则创建新的拼团id,新的记录。
    2.高并发场景,怎么保证读写db的一致性?redis和db双写?
    3.期待老师高见

    作者回复: 高并发下,我觉得主要是做好限流和缓存,保护好瓶颈资源数据库,限流和缓存的方案要看流量大小和系统架构

    2019-09-18
  • 王藝明
    老师好!批量插入 SQL 的案例中,【首先是以阻塞方式获取任务队列中的一条任务,而后则是以非阻塞的...】下面的代码,感觉如果一直能取到内容,岂不是退不出循环体了

    作者回复: 取光为止

    2019-08-02
收起评论
32
返回
顶部