Java 线程池源码解读与实践
肖文英
Java 资深研发工程师
27 人已学习
立即订阅
课程目录
已更新 28 讲/共 32 讲
并发编程基础知识 (8讲)
时长 09:08
时长 40:33
时长 21:26
时长 44:37
时长 09:11
线程池基础知识 (6讲)
时长 12:38
时长 09:28
时长 30:59
时长 44:51
时长 45:02
时长 15:15
线程池实现详解 (8讲)
时长 14:10
时长 23:21
时长 25:00
时长 13:57
时长 36:02
时长 17:28
时长 42:08
常见开源线程池 (3讲)
时长 22:14
时长 27:06
时长 25:32
Java 线程池源码解读与实践
15
15
1.0x
00:00/00:00
登录|注册

执行任务

我们首先来分析 execute 方法,这也是线程池最核心的方法,因为 submit 方法其底层也是调用 execute 方法进行执行。
线程池中的工作线程以 Worker 作为体现,真正工作的线程为 Worker 的成员变量,Worker 即是 Runnable,又是同步器(继承了AbstractQueuedSynchronizer)。Worker 从工作队列中取出任务来执行,并能通过 Worker 控制任务状态。
接下来通过 execute 方法源码来看下如何通过 Worker 完成任务的创建及运行。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 线程池当前线程数<corePoolSize时调用addWorker创建worker来执行任务
if (workerCountOf(c) < corePoolSize) {
// addWorker即创建worker的过程,并且将command作为firstTask
// core==true表示采用核心线程数量限制,false采用maxinumPoolSize进行限制
// addWorker返回true表示创建worker成功,返回false表示创建失败
if (addWorker(command, true))
return;
// 获取ctl的最新值,用于下面的isRunning判断
c = ctl.get();
}
// 结合合上文得知线程池当前线程数>=corePoolSize(大概率)
// 判断线程池是否处于运行中状态:只有处于运行中状态时才向workQueue添加任务
// offer方法不会阻塞当前线程,添加成功会立刻返回true,添加失败会返回false
if (isRunning(c) && workQueue.offer(command)) {
// 获取到当前线程池的状态,赋值给recheck,是为了重新检查状态
// 因为没有使用锁,可能isRunning检查成功,但是执行offer时,线程池状态可能不是running了
int recheck = ctl.get();
// 如果isRunning返回false,那就remove掉这个任务,然后执行拒绝策略
if (!isRunning(recheck) && remove(command))
reject(command);
// 线程池是否处于running状态,我们不能得知,因为可能改变了
// 如果从工作队列移除command失败,说明这个任务已经被消费了
// 如果没有工作线程,那就创建工作线程
else if (workerCountOf(recheck) == 0)
// null表示不是firstTask,false表示创建工作线程时使用maximumPoolSize作为限制条件
addWorker(null, false);
}
// 如果线程池没有处于running状态或任务放入workQueue失败,
// 说明线程池已经关闭(或正在关闭)或者任务队列满了,则尝试通过创建更多的worker来执行任务
// 如果创建worker(创建worker时会判断线程池状态)失败,则说明线程池已经关闭(或正在关闭)或者已经饱和,会执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
// 这行代码调用了内部工作队列(workQueue)的remove方法,尝试从队列中移除指定的task。
// workQueue通常是一个线程安全的阻塞队列,它支持remove操作来移除队列中的元素。
// 如果队列中确实存在该任务,则remove方法返回true;如果不存在,则返回false。
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
// 后续章节详细解释
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}

1 简化的执行流程

工作线程数量小于核心数量,创建核心线程;
达到核心数量,任务放入任务队列;
任务队列满了,创建非核心线程;
工作线程达到最大数量,执行拒绝策略。
以上是没有结合线程池状态得出线程池 execute 执行的流程,但是从代码分析中我们可以知道线程池 execute 方法之所以复杂是因为我们需要时刻判断线程池状态,才能做出相应的动作。所以在前面章节中我们实现的线程池是没有考虑线程池状态的,所以代码比较简单,只能用于理解线程池,不能用于线上环境。

2 recheck 机制

在 Java 中,临界区是指一段需要线程互斥访问的代码区域,即一次只允许一个线程进入的代码段。临界区通常用于保护共享资源免被并发修改。Java 提供了几种方法来创建临界区,包括 synchronized 关键字、Lock 接口及其实现类(如 ReentrantLock)。
在上面代码中我们没有使用锁机制,所以不会存在临界区,但是 ctl 明显是一个共享变量,并且这个共享变量会在其它线程执行 shutdown 或 shutdownNow 等方法而被修改,所以我们在某一时刻根据 ctl 做出条件判断时返回 true,但是在后面执行某些动作 (workQueue.offer(command)) 时,这个条件可能就不满足了,为了解决这个问题线程池的解决方案如下:
线程池的状态转换是经过精心设计的,在前面章节中我们知道它的状态转换图是一个有向无换图,所以它的状态只能向某一个方向变化,例如当前线程池处于 running 状态,如果在后面某一时刻它变成了 shutdown 状态,那么随着代码继续执行它的状态肯定不能再次变为 running 状态。所以我们第一次检查发现线程池处于 running 状态,把任务放入工作队列,放入成功后,我们紧接着再次检查线程池状态,如果它不是 running 状态,为了满足线程池只能处于 running 时才能把任务放入任务队列这一约束,我们需要把之前放入的任务从队列中移除。
我们需要注意 ctl 是 AtomicInteger 类型的变量,它没有被 volatile 修饰,但是 AtomicInteger 本身就是线程安全的整数类,我们通过 get 方法能够获取它的最新值。

3 addWorker 调用入参不一致

第一次调用:当线程池中的线程数少于 corePoolSize 时,会尝试添加一个核心线程来执行任务。这时,addWorker 的第一个参数是 command(要执行的任务),第二个参数是 true(表示这是一个核心线程)。
第二次调用(可能发生在两个地方):如果工作队列已满,且线程池仍在运行,会尝试添加一个非核心线程来直接执行任务。这时,addWorker 的第一个参数是 command,第二个参数是 false(表示这是一个非核心线程)。如果从工作队列移除 command 失败,并且当前没有工作线程,那么会尝试添加一个非核心线程,但这次没有具体的任务给它执行,所以第一个参数是 null,第二个参数是 false。

4 是否会被阻塞

我们按照如下情况进行分析:
当提交任务到线程池时,如果当前线程数小于核心线程数,线程池会创建新线程来执行任务。execute 方法立刻返回并且任务状态处于运行中。
如果当前线程数已经达到核心线程数,任务会被放入任务队列中等待执行。execute 方法立刻返回但是任务处于未开始运行状态。
如果任务队列已满且当前线程数小于最大线程数,线程池会创建新线程来执行任务。execute 方法立刻返回并且任务状态处于运行中。
如果任务队列已满且当前线程数已经达到最大线程数,线程池会根据配置的拒绝策略来处理新提交的任务。
CallerRunsPolicy:在这种情况下,execute 方法不会阻塞,但它会在调用 execute 方法的线程上执行任务。execute 方法不会立刻返回并且任务状态处于运行中。
AbortPolicy(默认策略):在这种情况下,execute 方法会抛出一个 RejectedExecutionException 异常,而不会阻塞。任务最终不会被运行。
DiscardPolicy:在这种情况下,execute 方法会默默丢弃无法执行的任务,不会阻塞。execute 方法立刻返回,任务最终不会被运行。
DiscardOldestPolicy:在这种情况下,execute 方法会丢弃任务队列中最旧的任务,然后尝试重新执行当前任务。最坏情况该拒绝策略会被调用多次,直到没有其它线程频繁提交任务为止。在最坏情况下,execute 方法不会立刻返回,任务最终会被执行。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
  • 解释
  • 总结
仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Java 线程池源码解读与实践》
立即购买
登录 后留言

精选留言

由作者筛选后的优质留言将会公开显示,欢迎踊跃留言。
收起评论
大纲
固定大纲
1 简化的执行流程
2 recheck 机制
3 addWorker 调用入参不一致
4 是否会被阻塞
显示
设置
留言
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部