Java 线程池源码解读与实践
肖文英
Java 资深研发工程师
27 人已学习
立即订阅
课程目录
已更新 28 讲/共 32 讲
并发编程基础知识 (8讲)
时长 09:08
时长 40:33
时长 21:26
时长 44:37
时长 09:11
线程池基础知识 (6讲)
时长 12:38
时长 09:28
时长 30:59
时长 44:51
时长 45:02
时长 15:15
线程池实现详解 (8讲)
时长 14:10
时长 23:21
时长 25:00
时长 13:57
时长 36:02
时长 17:28
时长 42:08
常见开源线程池 (3讲)
时长 22:14
时长 27:06
时长 25:32
Java 线程池源码解读与实践
15
15
1.0x
00:00/00:00
登录|注册

运行工作线程

本章节我们继续分析 runWorker 源码,由于这个方法源码比较复杂,所以我们可以先问问自己以下问题,然后带着问题去阅读源码:
为什么 Worker 继承 AQS?
runWorker 都干了哪些事?
想在 run 方法真正执行自己业务代码的前后打印 log 怎么做?
非核心线程是怎么结束的?核心线程又是怎么长期存活不会销毁的?

1 runWorker 全貌

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
// 覆写Runnable接口的run方法
public void run() {
runWorker(this);
}

2 拆解

1 大有学问的 unlock

// 获取当前线程
Thread wt = Thread.currentThread();
// 获取当前任务
Runnable task = w.firstTask;
w.firstTask = null;
// 在前面代码中没有lock,这里为什么要unklock?
w.unlock();
// Worker哪来的?来看下Worker构造器:继承了AQS
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
Worker(Runnable firstTask) {
// 设置AQS是state为-1,主要目的是为了不让中断。
setState(-1);
// 设置任务
this.firstTask = firstTask;
// 创建线程
this.thread = getThreadFactory().newThread(this);
}
}
好了,我们知道构造器里面做了三件事,但是我们这里只关注第一件事,那就是 setState(-1),state 是 AQS 的变量,-1 表示这个任务不可以被中断。因为没开始执行,我们只是 new 一个 Worker 出来,线程没启动,所以不允许中断。
那再看 unlock 做了什么?
public void unlock() {
release(1);
}
调用 AQS 的 release 方法,把 state 设置为 0,这意味着:现在任务得到了执行,我们允许中断这个 worker 了。
所以 Worker 继承 AQS 是为了使用 AQS 的中断机制。

2 while 循环拿任务

while (task != null || (task = getTask()) != null) {
...
}
task 是 w.firstTask,也就是说我们在 addWorker 方法里包装在 Worker 里的任务,第一次肯定不是 null,所以会执行 while 循环体,执行完后在 finally 里把 task 设置 null。所以task != null这个条件仅在第一次执行的时候为 true。
task = getTask():从队列里取任务,取出来后赋值给 task,如果队列里有任务从队列里 remove 然后执行循环体,如果 getTask 获取不到任务则会阻塞,因为底层是 BlockingQueue,getTask 方法会在后面章节分析。

3 中断判断

if ( // 语句1-1
(runStateAtLeast(ctl.get(), STOP) ||
// 语句1-2
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
// 语句2
&& !wt.isInterrupted())
wt.interrupt();
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
情况 1 语句 1-1 和语句 2 组合:线程池状态>=STOP && 中断标志位为 false
情况 2 语句 1-2 和语句 2 组合:当前工作线程已被中断 && 线程池状态>=STOP && 中断标志位为 false。Thread.interrupted() 会检查并清除当前线程的中断状态,所以如果这里返回 true,意味着在调用这个方法之前,当前线程确实被中断了。
如果情况 1 或情况 2 满足(线程池正在停止或已停止或者当前线程被中断且线程池也在停止或已停止状态,且目标工作线程中断标识位为 false),则调用 wt.interrupt() 来中断工作线程。
注意我们在前面执行了 w.lock(),那么这种 check-then-act 操作是线程安全的。

为什么需要这样的逻辑?

线程池停止:当线程池处于 STOP 状态时,我们希望确保所有工作线程都被中断,以便它们能够尽快停止执行并退出。
处理外部中断:在某些情况下,可能希望响应外部中断(例如,用户按下了 Ctrl+C),并尝试中断线程池中的所有线程。然而,由于线程池可能正在处理其他关闭逻辑(如优雅地关闭),因此仅当线程池也处于停止状态时,才应响应这种外部中断。
避免重复中断:中断状态是线程的一个属性,一旦设置,就会保持到该线程检查它为止(除非它被清除)。因此,在尝试中断线程之前,检查它是否已经被中断是一个好习惯。

4 beforeExecute

beforeExecute(wt, task);
在线程任务开始执行前做一些处理,可以自定义实现方法。需要注意的地方是它只被 try-finally 包起来了,没有 catch,也就是说即使 beforeExecute 抛出异常,也不会影响线程下面的工作 (finally 语句块)。
注意由于我们没有捕获异常,那么这个异常导致我们从 while 循环中退出,进而导致线程运行结束。

5 任务执行

try {
// 开始执行任务,也就是提交到线程池里的任务,且捕获异常
task.run();
}
// 注意这里是UnChecked异常,因为Checked异常我们在run方法中已经自己处理了,否则编译不能通过
catch (RuntimeException x) {
thrown = x; throw x;
}
// Error通常表示严重的系统级错误,不应该由应用程序代码捕获
//(除非你有特定的理由需要这样做)。如果task.run()抛出了Error,
// 这个catch块将执行相同的操作:将错误赋值给thrown并重新抛出。
catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
这个方法使用了一种异常设计思想:我们通过多个 catch 语句块捕获各种异常,是为了记录抛出的异常,然后我们在后面逻辑处理时可以使用这个异常。
第三个 catch 块捕获 Throwable 的所有实例,这包括了 Exception 和 Error。由于前两个 catch 块已经分别捕获了 RuntimeException 和 Error,这个 catch 块实际上只会捕获那些既不是 RuntimeException 也不是 Error 的 Throwable 实例。这个 catch 块的存在可能是为了处理一些极端情况或未来的扩展。在这个 catch 块中,捕获到的 Throwable 被封装在一个新的 Error 对象中并重新抛出。

6 afterExecute

afterExecute(task, thrown);
在线程任务执行之后做一些处理,可以自定义实现方法。由于 afterExecute 是在 finally 语句块中执行的,所以不论 try 语句块如何执行,它都会被执行。
同理如果我们在 afterExecute 抛出了异常,那么这个异常导致我们从 while 循环中退出,进而导致线程运行结束。

7 finally

两层 try finally 结构:
try {
beforeExecute(wt, task);
try {
task.run();
} catch (RuntimeException x) {
...
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
afterExecute 确实被包起来了,可以理解为被这段代码包起来了:
try {
afterExecute(task, thrown);
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
首先可以发现也没有 catch 捕获。其次就是一些辅助工作,比如 task 设置 null 来辅助最外层的 while 循环,完成的任务数 +1,解锁的工作。
剩余没讲的代码还有最后一段,那就是:processWorkerExit,在说这个方法之前,必须看下前面提到两次的 completedAbruptly 的作用。

8 completedAbruptly

// 默认是true
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
...
try {
beforeExecute(wt, task);
try {
task.run();
} catch (Throwable x) {
...
} finally {
afterExecute(task, thrown);
}
} finally {
...
}
}// while结束
// 这里设置成false
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
如果任务正常得到执行,没有任何异常,它就是 false,如果中途发生了异常,那就是 true,因为在前面的执行逻辑中抛出了异常,所以不会执行 completedAbruptly = false。
所以 completedAbruptly 变量被用来区分任务是正常完成还是由于发生异常被迫停止的,从而允许我们采取不同的后续行动。

9 processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果任务执行过程中发生了异常,则CAS的方式把工作线程数-1。
if (completedAbruptly)
decrementWorkerCount();
// 加锁是为了保证completedTaskCount和workers共享变量可以在多线程下安全执行
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 将任务移除队列,因为任务已经执行完了嘛
workers.remove(w);
} finally {
// 解锁
mainLock.unlock();
}
// 钩子函数,后续章节说明
tryTerminate();
int c = ctl.get();
// 调用runStateLessThan(c, STOP)方法检查当前线程池的运行状态是否小于
// STOP状态。STOP状态通常表示线程池已经完全停止,不接受新任务,也不处理队列中的任务。
// 如果当前状态小于STOP,即线程池仍在运行或正在关闭但还未完全停止),则继续执行后续逻辑。
if (runStateLessThan(c, STOP)) {
// 如果completedAbruptly为false,表示线程池其中的某个工作线程没有
// 因为异常情况而突然停止。只有在这个前提下,才会考虑是否需要添加新的工作线程。
if (!completedAbruptly) {
// 以下代码计算了需要保持的最小工作线程数。
// 如果allowCoreThreadTimeOut为true,表示允许核心线程在空闲时超时并退出,因此最小工作线程数设为0。
// 但如果工作队列(workQueue)不为空,即使允许核心线程超时,
// 也至少需要保持一个工作线程来处理队列中的任务。
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 获取当前的工作线程数,并与计算出的最小工作线程数进行比较。
// 如果当前工作线程数已经满足或超过了最小工作线程数,
// 则不需要添加新的工作线程,直接返回。
if (workerCountOf(c) >= min)
return;
}
// 能够执行到这里说明线程池需要有线程来执行任务,则调用
// addWorker(null, false)方法尝试添加一个新的非核心工作线程(第二个参数为false)。
// 注意这里传入的第一个参数是null,表示新线程没有初始任务要执行,它将从工作队列中取任务执行。
addWorker(null, false);
}
}
分析完 processWorkerExit 方法我们可以知道如果我们创建了最大线程数为 1 的线程池,并且需要被执行的任务可能会抛出异常,那么会导致工作线程退出,但是最终在这个方法中线程池做了兜底处理:如果线程池没有退出或正在退出但是任务队列中存在待执行的任务,那么线程池会创建新的工作线程来保证线程池是可用的,这和我们使用 Thread 处理任务是有区别的。

3 小结

先 unlock 调用 AQS 的 release 方法,让 worker 可以响应中断(因为工作线程已经开始执行了)
while 循环拿任务,没任务就阻塞,getTask 内部采取的 BlockingQueue 的阻塞方法
中断判断,线程池状态是大于等于 STOP 的话(执行了 shutdownNow),就让线程中断
线程执行前会先执行 beforeExecute,可重写
真正的任务执行
线程执行后会执行 afterExecute,可重写
从 while 循环跳出后将工作线程从 workers 里 remove 掉
如果线程池是 RUNNING 或者 SHUTDOWN 状态的话,且任务队列不是空,那么至少保证线程池中有一个线程可以执行任务
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
  • 解释
  • 总结
仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Java 线程池源码解读与实践》
立即购买
登录 后留言

精选留言

由作者筛选后的优质留言将会公开显示,欢迎踊跃留言。
收起评论
大纲
固定大纲
1 runWorker 全貌
2 拆解
1 大有学问的 unlock
2 while 循环拿任务
3 中断判断
4 beforeExecute
5 任务执行
6 afterExecute
7 finally
8 completedAbruptly
9 processWorkerExit
3 小结
显示
设置
留言
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部