Java 线程池源码解读与实践
肖文英
Java 资深研发工程师
27 人已学习
立即订阅
课程目录
已更新 28 讲/共 32 讲
并发编程基础知识 (8讲)
时长 09:08
时长 40:33
时长 21:26
时长 44:37
时长 09:11
线程池基础知识 (6讲)
时长 12:38
时长 09:28
时长 30:59
时长 44:51
时长 45:02
时长 15:15
线程池实现详解 (8讲)
时长 14:10
时长 23:21
时长 25:00
时长 13:57
时长 36:02
时长 17:28
时长 42:08
常见开源线程池 (3讲)
时长 22:14
时长 27:06
时长 25:32
Java 线程池源码解读与实践
15
15
1.0x
00:00/00:00
登录|注册

添加工作线程

本章节我们继续分析 addWorker 源码,由于这个方法源码比较复杂,所以我们可以先问问自己以下问题,然后带着问题去阅读源码:
addWorker 为什么在 add 和 remove 的时候要上锁?
提交的任务会包装成 Worker 对象对吗?Worker 存到哪里了?
Worker 添加失败或者添加成功但是线程启动失败会怎样?

1 整体思路

private boolean addWorker(Runnable firstTask, boolean core) {
1. CAS模式增加线程数。
2. 加锁创建工作线程并添加到工作线程集合。
}

2 状态判断

1 全貌

// 可以设置成其它值,不要求是retry
retry:
for (;;) {
// 【1】获取ctl,ctl包含线程池状态和线程池当前活跃线程数。
int c = ctl.get();
// 【2】获取线程池状态。
int rs = runStateOf(c);
// 【3】
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
// 【4】从前面CAS模式章节可以知道,在使用CAS模式时通常把要执行的逻辑放
// 在while(true)循环体中,并且只有满足特定条件时才能从死循环中返回
for (;;) {
// 【5】每次都重新获取线程池中当前活跃线程数,因为是多线程环境,所以需要重新获取。
int wc = workerCountOf(c);
// 【6】
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 【7】
if (compareAndIncrementWorkerCount(c))
break retry;
// 【8】重新获取ctl,能走到这一步说明上一步没有break跳出循环,
// 也就代表CAS失败了。如果CAS设置失败,原因肯定是因为多个线程竞争导致的,
// 所以重新获取线程池的状态,也就是需要重新获取ctl。
c = ctl.get();
// 【9】判断线程池状态是否发生变化,若没有变化则继续执行内层死循环,也就是继续CAS。
// 反之线程池状态发生变化,我们需要continue到外层循环,再次获取线程池状态然后进行判断逻辑
if (runStateOf(c) != rs)
continue retry;
}
}

2 拆解【3】

if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
线程池状态取值范围是整数,并且从小到大分布,如下所示   
总结一下上面看着比较复杂的条件判断代码可以分为 3 种情况,并且这 3 种情况之间是或的关系,也就是只要某一种情况符合,那么就会返回 false:
情况 1 rs >= SHUTDOWN && rs != SHUTDOWN ,也就是 rs>=STOP,所以不能创建 worker。
情况 2 rs >= SHUTDOWN && firstTask != null (正在提交的任务) ,也就是 rs>=SHUTDOWN,所以不能创建 worker。
情况 3 rs >= SHUTDOWN && workQueue.isEmpty(),也就是 rs>=SHUTDOWN 并且任务队列为空,那么不可以创建 worker。

3 拆解【6】

if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
情况 1 wc >= CAPACITY:这个条件检查当前的工作线程数(wc)是否已经达到了某个预设的容量上限(CAPACITY)。CAPACITY 是一个静态常量,表示线程池能够容纳的最大工作线程数的绝对上限,这个上限可能由于系统资源限制或其他原因而设置。如果 wc 达到了或超过了 CAPACITY,则条件为真,方法返回 false,表示无法再添加新的工作线程。
情况 2 wc >= (core ? corePoolSize : maximumPoolSize):这是一个三元运算符表达式,它根据 core 的值来选择比较的目标:如果 core 为 true,则表达式变为 wc >= corePoolSize,这意味着如果当前的工作线程数(wc)已经达到了或超过了核心线程池的大小(corePoolSize),则条件为真。如果 core 为 false,则表达式变为 wc >= maximumPoolSize,这意味着如果当前的工作线程数(wc)已经达到了或超过了最大线程池的大小(maximumPoolSize),则条件为真。
为了避免过度创建线程,如果当前的工作线程数已经达到了容量上限(CAPACITY),或者已经达到了核心线程池的大小(如果 core 为 true),或者已经达到了最大线程池的大小(如果 core 为 false),那么就不能再添加新的工作线程了。
注意这里是跳出死循环的第一个出口。

4 拆解【7】

if (compareAndIncrementWorkerCount(c))
break retry;
代码执行到这里说明我们可以向线程池添加工作线程,线程池的设计思想和常规做法不一样,它是先把线程个数通过 CAS 方式加 1,然后在后面逻辑中才会真正的创建工作线程对象。
注意这里是跳出死循环的第二个出口。

break retry 语法分析

在 Java 中,break 关键字通常用于立即退出当前循环(如 for、while 或 do-while 循环)。然而,break 关键字后面直接跟 retry(如 break retry;)在标准的 Java 语法中是不合法的,除非 retry 是一个标记(label)的名称,并且这个标记被用在了循环或 switch 语句之前。
标记(label)是 Java 中一种不常用的特性,它允许你在一个嵌套的循环或 switch 语句中,从更深层的循环或 switch 块中直接跳出到外层循环或 switch 语句的末尾。这种用法通常应该避免,因为它会使代码难以理解和维护,但在某些特定情况下,它可能是解决问题的便捷方法。
下面是一个使用标记(label)和 break 关键字来跳出多层循环的例子:
public class Main {
public static void main(String[] args) {
outerLoop: // 这是一个标记
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 5; j++) {
if (i == 2 && j == 2) {
// 当 i 和 j 都等于 2 时,跳出到标记为 outerLoop 的循环的末尾
break outerLoop;
}
System.out.println("i = " + i + ", j = " + j);
}
}
System.out.println("跳出循环");
}
}
在这个例子中,outerLoop: 是一个标记,它被放在了最外层的 for 循环之前。当 i 和 j 都等于 2 时,break outerLoop 语句会执行,导致程序跳出到标记为 outerLoop 的循环的末尾,并继续执行循环之后的代码(即打印 "跳出循环")。
然而,如果你的代码只是简单地写了 break outerLoop; 而没有相应的标记 outerLoop,那么这段代码会导致编译错误,因为 Java 编译器无法识别 retry 作为有效的跳出目标。如果你确实需要在你的代码中使用这种结构,你需要确保有一个相应的标记被定义在循环或 switch 语句之前。

5 小结

判断线程池状态是否还能继续接收任务,比如如果是 SHUTDOWN 之后的状态,则肯定不允许接收任务。
通过 CAS 方式判断线程池个数是否满足要求,如果满足才会执行真正的添加逻辑,这样做的好处时避免加锁逻辑,减少上下文切换。
双层 for 循环:外层 for 循环主要作用是判断线程池状态,只有符合状态时才会进入内层 for 循环;内层 for 循环主要是在满足条件时执行 CAS 操作,如果执行成功那么两层 for 都会结束,如果执行失败,说明线程池的状态变量 ctl 被其它线程修改了,所以需要重新执行外层 for 循环。

3 真正添加工作线程

1 全貌

// 【1】
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 【2】
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 【3】
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 【4】
int rs = runStateOf(ctl.get());
// 【5】
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 【6】
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 【7】
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;

2、拆解【1】

// worker是否开始执行,也就是是否执行run方法
boolean workerStarted = false;
// worker是否被加到工作者集合里(workers,HashSet)
boolean workerAdded = false;
// 工作线程对象
Worker w = null;

3、拆解【2】

// 将任务作为属性赋值给Worker对象。
w = new Worker(firstTask);
// 获取工作线程的thread对象
final Thread t = w.thread;
//Worker构造函数
Worker(Runnable firstTask) {
// 在runWorker方法实际开始执行之前,抑制对线程的中断请求
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

4、拆解【3】

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
加锁目的是为了保证下面的 workers.add(worker) 方法在多线程操作时候是线程安全的,因为可能会有多线程向 workers 中添加 worker,为了保证这个操作的原子性和可见性,所以需要加锁。

5、拆解【4】

// 获取线程池的状态
int rs = runStateOf(ctl.get());

6、拆解【5】

// 情况1 线程池处于Running状态
// 情况2 线程池处于Shutdown状态,但是创建的线程是用于执行任务队列剩余的任务
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
// 如果线程已经被start过了,则抛出异常,不允许重复调用start(下面的【6】会start)
if (t.isAlive())
throw new IllegalThreadStateException();
// 添加工作线程到HashSet集合
workers.add(w);
int s = workers.size();
// 如果workers的长度(任务队列长度)大于阈值,则更新阈值。
// 因为上面的CAS操作不能保证原子性,所以largestPoolSize以添加工作线程后的个数为准
if (s > largestPoolSize)
largestPoolSize = s;
// 设置为添加成功
workerAdded = true;
} finally {
// 解锁
mainLock.unlock();
}

7、拆解【6】

if (workerAdded) {
t.start();
workerStarted = true;
}
如果任务添加成功了,那么 OK,启动线程,最后标记线程启动成功!

8、拆解【7】

finally {
// 【7】
if (! workerStarted)
addWorkerFailed(w);
}
如果添加任务的流程中失败了或者添加成功了,但是执行任务的线程启动失败了,则执行失败的策略。
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
很简单,就是要把工作线程移除(因为可能添加成功了,但是线程启动失败了,所以要 remove 掉),还需要将线程池中的工作线程数 -1(CAS 的方式进行减一)。

9、小结

通过加锁方式创建工作线程,并且只有满足特定条件时添加到 workers 这个 HashSet 中。。
执行成功,启动工作线程对应的 thread 去执行 firstTask 或从任务队列消费任务
执行失败,则将工作线程从 workers 这个 HashSet 中移除,且将线程池中线程数量 -1(CAS 的方式)。
两层 try-finally 结构:外层 try 在执行时会执行 new Worker(firstTask),这个方法可能会抛出异常,所以需要使用 finally 做兜底处理;内层 try 在执行前获取了锁,所以需要在 finally 中执行释放锁的逻辑。注意如果在执行内层 try 时抛出了 IllegalThreadStateException 异常,那么 addWorker 方法会抛出异常,最终可能会在 execute 方法执行时抛出异常。
 
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
  • 解释
  • 总结
仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Java 线程池源码解读与实践》
立即购买
登录 后留言

精选留言

由作者筛选后的优质留言将会公开显示,欢迎踊跃留言。
收起评论
大纲
固定大纲
1 整体思路
2 状态判断
1 全貌
2 拆解【3】
3 拆解【6】
4 拆解【7】
5 小结
3 真正添加工作线程
1 全貌
2、拆解【1】
3、拆解【2】
4、拆解【3】
5、拆解【4】
6、拆解【5】
7、拆解【6】
8、拆解【7】
9、小结
显示
设置
留言
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部