Java 线程池源码解读与实践
肖文英
Java 资深研发工程师
27 人已学习
立即订阅
课程目录
已更新 28 讲/共 32 讲
并发编程基础知识 (8讲)
时长 09:08
时长 40:33
时长 21:26
时长 44:37
时长 09:11
线程池基础知识 (6讲)
时长 12:38
时长 09:28
时长 30:59
时长 44:51
时长 45:02
时长 15:15
线程池实现详解 (8讲)
时长 14:10
时长 23:21
时长 25:00
时长 13:57
时长 36:02
时长 17:28
时长 42:08
常见开源线程池 (3讲)
时长 22:14
时长 27:06
时长 25:32
Java 线程池源码解读与实践
15
15
1.0x
00:00/00:00
登录|注册

线程池属性

1 ctl

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
它展示了线程池控制状态(ctl)的设计。这个控制状态 ctl 是一个 AtomicInteger 类型的变量,用于同时存储两个关键信息:线程池中的工作线程数量(workerCount)和线程池的运行状态(runState)。这种设计通过位操作高效地利用了单个整数的空间,避免了使用更复杂的数据结构(如 AtomicReference 或 volatile 的复合对象)来存储这两个状态。

控制状态(ctl)的设计

workerCount:表示线程池中当前有效的工作线程数量。这个值可能会暂时与实际的活动线程数不同,例如在线程创建失败或线程正在终止但仍未完全释放资源时。
runState:表示线程池的运行状态,包括 RUNNING、SHUTDOWN、STOP、TIDYING 和 TERMINATED 五种状态。这些状态通过位操作存储在 ctl 变量的高位上,允许线程池根据当前状态执行相应的操作。

控制状态的转换

线程池状态之间的转换逻辑,包括从 RUNNING 到 SHUTDOWN、SHUTDOWN 或 RUNNING 到 STOP、SHUTDOWN 到 TIDYING、STOP 到 TIDYING 以及 TIDYING 到 TERMINATED 的转换。这些转换通常发生在调用 shutdown()、shutdownNow() 方法或线程池内部逻辑处理时。需要注意的是线程池状态转换图是一个有向无环图,这样线程池的状态只能向一个方向转换,不能变成之前的某一个状态。

状态取值范围

运行中 (RUNNING):接收新任务并处理队列中的任务
关闭 (SHUTDOWN):不接受新任务,但处理队列中的任务,并中断空闲工作线程
停止 (STOP):不接受新任务,不处理队列中的任务,并尝试中断正在执行的任务和空闲工作线程
整理中 (TIDYING):所有任务已终止,工作线程数为 0,队列为空,线程转换到整理中状态将运行 terminated() 钩子方法
终止 (TERMINATED):terminated() 方法已完成

状态转换场景

运行中 -> 关闭:在调用 shutdown() 时,也可能隐式地在 finalize()(Object 的方法,垃圾回收前调用)变为关闭
运行中或关闭 -> 停止:在调用 shutdownNow() 时
关闭 -> 整理中:当任务队列和线程池都为空时变为整理中
停止 -> 整理中:当线程池为空时变为整理中
整理中 -> 终止:当 terminated() 钩子方法执行完成后变为终止

2 workQueue

private final BlockingQueue<Runnable> workQueue;
这个成员变量是一个 BlockingQueue类型的队列,它是线程安全的队列,用于在线程池的工作线程之间共享任务。当线程池接收到新的任务时,它会尝试将这些任务添加到 workQueue 中。工作线程则不断从 workQueue 中取出任务并执行,这种方式有助于实现任务的并发执行。注意 workQueue 是一个私有的、不可变的成员变量。这意味着一旦 workQueue 被初始化,它就不能再被重新赋值(即指向另一个队列)。此外,由于它是私有的,因此只能在声明它的类内部被访问和修改。

BlockingQueue

BlockingQueue:这是一个阻塞队列,用于在多个线程之间安全地传递任务。它支持两个附加的操作,即 put 和 take 方法。put 方法用于向队列中添加一个元素,如果队列满则等待;take 方法用于从队列中移除并返回队头元素,如果队列空则等待。此外,它还支持非阻塞的 poll 方法,该方法尝试从队列中检索并移除此队列的头部元素,或者在指定的等待时间之前等待可用的元素。

特殊说明

workQueue.poll() 返回 null 并不一定意味着 workQueue 为空。这是因为有些特殊的队列(如 DelayQueue)允许在元素到期之前返回 null,即使队列中实际上有尚未到期的元素。因此,在判断队列是否为空时,应该只依赖 isEmpty() 方法,而不是 poll() 方法的返回值。
在线程池从 SHUTDOWN 状态转换到 TIDYING 状态时,必须检查队列是否为空。SHUTDOWN 状态表示线程池不再接受新任务,但会等待队列中的任务执行完毕。而 TIDYING 状态则是所有任务都执行完毕后,线程池进入的一个过渡状态,在这个状态下会执行一些清理工作。
综上所述,workQueue 是线程池内部用于存储和分发任务的重要组件,它保证了任务的安全传递和并发执行。

3 mainLock

private final ReentrantLock mainLock = new ReentrantLock();
它是线程池(如 ThreadPoolExecutor)内部使用的一个关键锁。这个锁的主要作用是在访问和修改工作线程集合(workers)操作时提供同步控制。

锁的作用

保护工作线程集合:线程池内部维护了一个工作线程集合,用于管理执行任务的线程。这个集合的访问和修改必须是线程安全的,以防止多个线程同时修改导致的数据不一致问题。mainLock 就是用来确保在访问或修改这个集合时,只有一个线程能够执行相关操作。
避免不必要的中断风暴:在线程池关闭或调整大小时,可能需要中断一些空闲的线程。如果没有适当的同步控制,多个线程可能会同时尝试中断这些空闲线程,导致所谓的“中断风暴”。通过使用 mainLock,可以序列化这些中断操作,从而避免这种情况。
简化统计操作:线程池还需要维护一些统计信息,如最大线程池大小(largestPoolSize)等。这些统计信息的更新也需要是线程安全的。通过使用 mainLock,可以简化这些统计信息的更新过程,确保在更新时不会有其它线程干扰。
确保工作线程集合的稳定性:在调用 shutdown() 和 shutdownNow() 方法时,线程池需要确保工作线程集合的稳定性,以便在检查中断权限和确保实际执行中断操作时集合的状态不会发生变化。

ReentrantLock 的优势

可重入性:ReentrantLock 是可重入的,这意味着同一个线程可以多次获得同一个锁,这在递归调用或复杂的方法调用链中非常有用。
灵活性:与内置的 synchronized 关键字相比,ReentrantLock 提供了更高的灵活性,比如可以尝试非阻塞地获取锁(tryLock())、尝试在给定等待时间后获取锁(tryLock(long timeout, TimeUnit unit))等。
条件变量:ReentrantLock 可以与一个或多个 Condition 对象一起使用,以实现更复杂的线程间通信模式。

4 workers

private final HashSet<Worker> workers = new HashSet<>();
这个成员变量是一个 HashSet 集合,用于存储线程池中的所有工作线程。
private:表示 workers 集合是私有的,只能在定义它的类内部被访问和修改。这有助于封装线程池的内部状态,防止外部代码直接访问或修改这个集合,从而保持线程池的稳定性和安全性。
final:表示 workers 集合的引用一旦被初始化后就不能再被改变。这意味着你不能将整个 HashSet 实例替换为另一个集合实例。然而,这并不意味着你不能向集合中添加或移除元素;final 只是限制了引用本身的可变性,而不是集合内部元素的可变性。

5 termination

private final Condition termination = mainLock.newCondition();
这段 Java 代码定义了一个名为 termination 的 Condition 对象,它是用来支持线程池中的 awaitTermination 方法的。Condition 是 Lock 接口的一部分,它允许线程在某个条件上等待(await)或通知(signal/signalAll)其他等待该条件的线程,用于提供比 synchronized 更灵活的线程同步机制。

使用场景

在线程池的实现中,termination 这样的 Condition 对象可以用来实现 awaitTermination 方法。awaitTermination 方法允许调用者等待线程池中的任务全部完成(或者等待指定的时间),或者直到线程池被关闭。在这个过程中业务线程可能会在某个 Condition 上等待,直到线程池的状态变为“已终止”或等待超时。

6 largestPoolSize

private int largestPoolSize;
这段代码中的 largestPoolSize 是一个私有变量,用于记录线程池曾经达到过的最大线程数量。这个变量仅在获取 mainLock 锁的时候被访问和更新。简而言之,它帮助监控线程池规模的变化。

7 completedTaskCount

private long completedTaskCount;
这段代码定义了一个私有变量 completedTaskCount,用于记录已完成的任务数量。这个计数器仅在工作线程终止时更新,并且在访问时需要持有 mainLock 锁以保证线程安全。

8 threadFactory

private volatile ThreadFactory threadFactory;
threadFactory 用于创建新的线程,是线程池管理和控制工作线程生命周期的一个关键组件。这些被创建的线程通过 addWorker 方法被添加到线程池中。所有需要创建新线程的调用者都必须准备好 addWorker 可能失败的情况,这可能是因为系统或用户的策略限制了线程的数量。
我们向线程池提交任务时可能会创建工作线程,进而会调用 threadFactory 来创建 thread,如果在创建 thread 发生异常,那么 submit 方法会在调用者线程抛出异常。

成员变量声明

private:这个修饰符表明 threadFactory 是私有的,只能在其所属的类内部被访问和修改。这有助于封装线程池的内部状态,防止外部代码直接修改线程工厂,从而确保线程池的稳定性和安全性。
volatile:这个关键字确保了 threadFactory 变量的修改对所有线程都是立即可见的。在多线程环境中,如果没有适当的同步机制,一个线程对共享变量的修改可能对其他线程不可见,导致数据不一致的问题。
ThreadFactory:这是一个函数式接口,定义了一个方法 newThread(Runnable r),用于创建新的线程。通过提供自定义的 ThreadFactory,用户可以控制线程的创建过程,比如设置线程的优先级、名称、是否为守护线程等。

注意事项

错误处理:如果因为某些原因(如系统资源不足)导致无法创建线程,这并不会被视为一个错误,但可能会导致新任务被拒绝或现有任务在队列中等待而无法执行。因此,使用者需要意识到这种可能性,并相应地处理它。
错误恢复:即使在尝试创建线程时遇到如 OutOfMemoryError 这样的严重错误,线程池也应该努力保持其不变性。这意味着即使出现内存不足的错误,线程池也应该能够执行清理操作,以便安全地关闭并释放资源。由于清理代码的执行可能不需要太多内存,因此有可能在不再次遇到 OutOfMemoryError 的情况下完成清理。

9 handler

private volatile RejectedExecutionHandler handler;
handler 用于处理线程池饱和或关闭时无法执行的任务。当线程池无法接受新任务时,会调用此处理器来采取相应的策略,如丢弃任务、抛出异常等。这个变量是可变 (volatile) 的,确保多线程环境下对其访问的可见性。

defaultHandler

private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
这行代码定义了一个静态最终(static final)的 RejectedExecutionHandler 实例,名为 defaultHandler,并将其初始化为 AbortPolicy 的一个新实例。
AbortPolicy 是 RejectedExecutionHandler 的一个实现,它会在无法处理任务时抛出一个 RejectedExecutionException 异常,这个异常是在调用者线程中抛出的。这是默认的拒绝策略,如果你没有为线程池指定其他拒绝策略,那么就会使用这个策略。

10 keepAliveTime

private volatile long keepAliveTime;

解释

private:这是一个访问修饰符,表示 keepAliveTime 是私有的,只能在定义它的类内部被访问和修改。这有助于封装线程池的内部状态,防止外部代码直接访问或修改这个变量。
volatile:这个关键字用于确保 keepAliveTime 变量的可见性和有序性。在多线程环境下,当一个线程修改了 keepAliveTime 的值时,这个修改会立即对其他线程可见。
long:这是 keepAliveTime 的数据类型,表示这个变量存储的是一个长整型值。在这个上下文中,它用于表示时间长度,单位为纳秒(nanoseconds)。

作用

当线程池中的线程数量超过核心线程数(corePoolSize)时,或者如果允许核心线程超时(allowCoreThreadTimeOut 为 true),那么这些空闲的线程就会使用 keepAliveTime 指定的超时时间来等待新任务。如果在这段时间内没有新任务到来,那么这些线程将被终止并从线程池中移除。
如果上述条件不满足(即线程数量未超过 corePoolSize 且 allowCoreThreadTimeOut 为 false),那么这些线程将无限期地等待新任务,即它们不会因为超时而被终止。
如果 keepAliveTime 设置为 0L,意味着非核心空闲线程会被立即终止。

11 allowCoreThreadTimeOut

private volatile boolean allowCoreThreadTimeOut;
这个变量的作用决定了线程池中的核心线程(core threads)在空闲时是否应该被允许超时(即被终止)。

解释

默认值:false。这意味着默认情况下,核心线程即使在空闲时也会保持存活状态,不会被终止。这是为了确保线程池能够迅速响应新的任务,因为当新任务到来时,已经存在的核心线程可以立即开始执行,而不需要重新创建线程。
设置为 true 的影响:如果将 allowCoreThreadTimeOut 设置为 true,则核心线程将使用 keepAliveTime(另一个 ThreadPoolExecutor 的配置参数)来设定超时时间。这意味着如果核心线程在指定的 keepAliveTime 时间内没有执行任何任务,它们将被终止。这种策略可以在某些情况下减少资源的使用,特别是当线程池中的任务量不足以维持所有核心线程都保持活跃状态时。
volatile 关键字:allowCoreThreadTimeOut 被声明为 volatile,这意味着这个变量的值对所有线程都是立即可见的。当某个线程修改了 allowCoreThreadTimeOut 的值时,这个修改会立即对其它线程可见,而不需要通过同步机制(如 synchronized)来确保线程间的可见性。

12 corePoolSize

private volatile int corePoolSize;
corePoolSize 是线程池需要保持存活的最小线程数。这些线程即使处于空闲状态,也不会被销毁(除非设置了 allowCoreThreadTimeOut 为 true)。这有助于确保线程池能够迅速响应新的任务,因为当新任务到来时,这些已经存在的线程可以立即开始执行,而不需要创建新的线程。

13 maximumPoolSize

private volatile int maximumPoolSize;
maximumPoolSize 用于表示线程池能够容纳的最大线程数。

用途

它决定了线程池能够同时运行的最大线程数。当线程池中的线程数达到这个限制时,新的任务可能会被放入工作队列中等待执行,或者根据线程池配置的饱和策略,可能会执行其他操作(如拒绝新任务)。因此,合理设置 maximumPoolSize 对于控制线程池的行为和性能至关重要。

校验

如下所示,如果我们设置线程池参数不合理,在构造线程池时就会抛出 IllegalArgumentException 或 NPE 异常

14 shutdownPerm

private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
这段 Java 代码定义了一个 RuntimePermission 类型的静态常量 shutdownPerm,它被初始化为一个新的 RuntimePermission 实例,权限名称为"modifyThread"。这个权限是用来控制对 Java 线程(Thread)的修改权限的,特别是与线程的中断(interrupt)和线程池的关闭(shutdown)相关的操作。

使用场景

在线程池的 shutdown 和 shutdownNow 方法中,会检查调用者是否拥有 shutdownPerm 所代表的权限。这是为了确保只有经过授权的代码才能尝试关闭线程池或中断其中的线程。如果调用者没有这些权限,线程池将不会执行关闭操作或中断线程,这有助于防止恶意代码干扰系统的正常运行。

15 Worker 内部类的属性

final Thread thread;
Runnable firstTask;
volatile long completedTasks;
thread 表示当前线程池中的一个工作线程。
firstTask 用于存储线程池中当前工作线程的第一个要执行的任务。
completedTasks 用于记录当前工作线程已经完成的任务数量。由于可能会有多个线程并发地操作这个变量,因此使用 volatile 关键字来确保线程安全是非常必要的。这样,当前工作线程修改了 completedTasks 的值,其他线程都能立即看到这个更新。
确认放弃笔记?
放弃后所记笔记将不保留。
新功能上线,你的历史笔记已初始化为私密笔记,是否一键批量公开?
批量公开的笔记不会为你同步至部落
公开
同步至部落
取消
完成
0/2000
荧光笔
直线
曲线
笔记
复制
AI
  • 深入了解
  • 翻译
    • 英语
    • 中文简体
    • 法语
    • 德语
    • 日语
    • 韩语
    • 俄语
    • 西班牙语
  • 解释
  • 总结
仅可试看部分内容,如需阅读全部内容,请付费购买文章所属专栏
《Java 线程池源码解读与实践》
立即购买
登录 后留言

精选留言

由作者筛选后的优质留言将会公开显示,欢迎踊跃留言。
收起评论
大纲
固定大纲
1 ctl
控制状态(ctl)的设计
控制状态的转换
2 workQueue
BlockingQueue
特殊说明
3 mainLock
锁的作用
ReentrantLock 的优势
4 workers
5 termination
使用场景
6 largestPoolSize
7 completedTaskCount
8 threadFactory
成员变量声明
注意事项
9 handler
defaultHandler
10 keepAliveTime
解释
作用
11 allowCoreThreadTimeOut
解释
12 corePoolSize
13 maximumPoolSize
用途
校验
14 shutdownPerm
使用场景
15 Worker 内部类的属性
显示
设置
留言
收藏
沉浸
阅读
分享
手机端
快捷键
回顶部