JDK 源码剖析 —— ThreadPoolExecutor
Java 中的线程池,一般都是围绕 ThreadPoolExecutor 展开的,其他的实现要么是基于它,要么是模仿它的思想。所以只要理解 ThreadPoolExecutor,就相当于完全理解了 Java 线程池的精髓。
我们可以提前给线程池下一个定义:提供预定义好的线程,供调用者直接执行任务的工具。
本章中的源码基于 JDK 1.8。
线程池优点
也可以说是池化的优点,可类推到各种如连接池、内存池等各种 “池” 的优点。
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
类签名及继承关系
ThreadPoolExecutor 类签名如下:
public class ThreadPoolExecutor extends AbstractExecutorService
ThreadPoolExecutor 类继承了 AbstractExecutorService 类,再向上,一个整体的继承关系如下 UML 类图所示:
ThreadPoolExecutor 实现的顶层接口是 Executor,顶层接口 Executor 提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供 Runnable 对象,将任务的运行逻辑提交到执行器(Executor)中,由 Executor 框架完成线程的调配和任务的执行部分。ExecutorService 接口增加了一些能力:
- 扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法
- 提供了管控线程池的方法,比如停止线程池的运行。
AbstractExecutorService 则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类 ThreadPoolExecutor 实现最复杂的运行部分,ThreadPoolExecutor 将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
我们可以在这里就先看一下 ThreadPoolExecutor 的运行模型,如下图(图源美团技术团队):

ThreadPoolExecutor 在内部实际上构建了一个生产者消费者模型,将任务看作产品,将任务提交和线程执行解耦。ThreadPoolExecutor 可以在逻辑上分成两个部分:任务管理和线程管理。任务管理充当生产者的角色,当有任务提交后,由线程池决定后续流转:
- 直接申请线程执行任务
- 缓冲到阻塞队列等待
- 拒绝任务
线程管理部分承担消费者的角色,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。
生命周期管理
ThreadPoolExecutor 内部会随着线程池运行自行维护线程池状态。ThreadPoolExecutor 内部同时将线程数量(workerCount)和运行状态(runState)封装在一个变量中统一进行维护:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl 是 AtomicInteger 类型,其高 3 位保存 runState,低 29 位保存 workerCount。在源码中大部分情况都要同时获取这两个变量来判断状态,使用一个变量存储可以避免在改变状态时,不必去为了维护两者一致而占用锁。源码中也提供了一些方法供用户获得线程池当前的运行状态、线程个数。这些方法一般都是通过位运算:
// mask
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; } 线程池将其运行状态分成五种:
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
这里的左移实际上是因为 ctl 中只有高三位是表示运行状态的。每个状态具体如下:
| 运行状态 | 描述 |
|---|---|
| RUNNING | 接受新提交的任务,也能处理阻塞队列中的任务 |
| SHUTDOWN | 不再接受新提交任务,但是仍然能继续处理阻塞队列中的任务 |
| STOP | 不再接受新任务,也不再处理阻塞队列中的任务,同时中断正在处理任务的线程 |
| TIDYING | 所有任务都被终止,workerCount 为 0 |
| TERMINATED | TIDYING 状态时会自动调用 terminated 方法,方法调用完成后进入本状态 |
声明周期转换如下图所示:

任务调度
任务调度是线程池的入口,当用户提交了一个任务,接下来这个任务的全部过程(执行或拒绝)都由这个阶段负责。
任务调度依赖于几个很重要的参数,这些参数在线程池构造时就会被设置,ThreadPoolExecutor 最长的构造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) 几个参数解释如下:
corePoolSize: 线程池核心线程数(平时保留的线程数),使用时机: 在初始时刻,每次请求进来都会创建一个线程直到达到该size
maximumPoolSize: 线程池最大线程数,使用时机: 当workQueue都放不下时,启动新线程,直到最大线程数,此时到达线程池的极限
keepAliveTime/unit: 超出corePoolSize数量的线程的保留时间,unit为时间单位
workQueue: 任务队列,当核心线程数达到或者超出后,会先尝试将任务放入该队列由各线程自行消费;
ArrayBlockingQueue: 构造函数一定要传大小
LinkedBlockingQueue: 构造函数不传大小会默认为65536(Integer.MAX_VALUE ),当大量请求任务时,容易造成 内存耗尽。
SynchronousQueue: 同步队列,一个没有存储空间的阻塞队列 ,将任务同步交付给工作线程。
PriorityBlockingQueue: 优先队列
threadFactory:线程工厂,用于线程需要创建时,调用其newThread()生产新线程使用
handler: 饱和策略,当队列已放不下任务,且创建的线程已达到 maximum 后,则不能再处理任务,直接将任务交给饱和策略
AbortPolicy: 直接抛弃(默认)
CallerRunsPolicy: 用调用者的线程执行任务
DiscardOldestPolicy: 抛弃队列中最久的任务
DiscardPolicy: 抛弃当前任务整体的一个任务流转过程可以由下图表示:

总体流程总结如下:
- 判断核心线程池是否已满,如果不是,则创建线程执行任务
- 如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中
- 如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务
- 如果线程池也满了,则按照拒绝策略对任务进行处理
任务调度策略的入口是 execute() 方法,它主要的工作是,检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 当还没有达到核心线程池的数量时,直接添加 1 个新线程,然后让其执行任务即可
if (workerCountOf(c) < corePoolSize) {
// 添加新线程,且执行 command 任务
// 添加成功,即不需要后续操作了,添加失败,则说明外部环境变化了
// addWorker 第二个参数 true 表示使用核心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 当核心线程达到后,则尝试添加到阻塞队列中,具体添加方法由阻塞队列实现
// isRunning => c < SHUTDOWN;
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 添加队列成功后,还要再次检测线程池的运行状态,决定启动线程或者状态过期
// 当线程池已关闭,则将刚刚添加的任务移除,走reject策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 当一个 worker 都没有时,则添加 worker
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 当队列满后,则直接再创建新的线程运行,addWorker 的 false 表示使用 maximumPoolSize
// 如果不能再创建线程了,则 reject
else if (!addWorker(command, false))
reject(command);
} 整个过程没有通过锁,而是仅依靠一个 AtomicInteger ctl 就保证了线程安全。
Worker 管理
线程池为了获取线程状态,维护线程生命周期,使用了工作线程 Worker 作为线程的包装,Worker 部分代码如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;// Worker 持有的线程
Runnable firstTask;// 初始化的任务,可以为 null
} Worker 工作线程,持有了一个线程 thread和一个初始化任务 firstTask,同时 Worker 自身也实现了 Runnable 接口。thread 是由线程池构造中的 threadFactory 创建的,而 firstTask 则在 Worker 创建时传入,如果 firstTask 不为 null,Worker 就会在创建完成后立刻执行该任务;如果 firstTask 是 null,说明该 Worker 是一个非核心线程,这个线程就需要去任务队列(workQueue)中获取任务执行。
Worker 执行任务的模型如下图:

对于非核心线程,在创建完成并且没有任务执行后,需要考虑回收的问题。线程池通过一个 HashSet 来持有 Worker 的引用:
private final HashSet<Worker> workers = new HashSet<Worker>();
这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期,这时就需要判断线程是否正在运行。
Worker 通过继承了 AbstractQueuedSynchronizer(AQS)来实现独占,实现了一个不可重入锁来反映线程当前的状态(所以没有直接继承 ReentrantLock 可重入锁)。具体如下:
- lock 方法获得独占锁,表示当前线程正在执行
- 当线程执行任务完成后,会调用 unlock 释放锁
- 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方***使用 tryLock 方法来尝试获得锁,以判断线程池中的线程是否是空闲状态
在线程回收的过程中就用到了上述独占锁的特性,回收过程示意如下:

增加 Worker
增加 Worker 线程主要通过 addWorker() 方法,该方法功能很单一,仅仅是增加一个 Worker,并不会判断当前的状态等,判断策略是在上个步骤(如 execute() 方法)完成的。
addWorker() 方法有两个参数:firstTask、core。firstTask 参数用于指定新增的线程执行的第一个任务,该参数可以为空;core 参数为 true 表示在新增线程时会判断当前活动线程数是否少于 corePoolSize,false 表示新增线程前需要判断当前活动线程数是否少于 maximumPoolSize。addWorker() 流程如下:

addWorker() 方法注释如下:
private boolean addWorker(Runnable firstTask, boolean core) {
// 为确保线程安全,进行CAS反复重试
retry:
for (;;) {
int c = ctl.get();
// 获取runState
int rs = runStateOf(c);
// 已经shutdown, firstTask 为空的添加并不会成功
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 如果超出最大允许创建的线程数,则直接失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 更新worker+1数,成功则说明占位成功退出retry,后续的添加操作将是安全的
// 失败则说明已有其他线程变更该值
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// runState 变更,则退出到 retry 重新循环
if (runStateOf(c) != rs)
continue retry;
}
}
// 以下为添加 worker 过程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 使用 Worker 封闭 firstTask 任务,后续运行将由 Worker 接管
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 添加 worker 的过程,需要保证线程安全
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// SHUTDOWN 情况下还是会创建 Worker, 但是后续检测将会失败
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 既然是新添加的线程,就不应该是 alive 状态
if (t.isAlive())
throw new IllegalThreadStateException();
// workers 只是一个工作线程的容器,使用 HashSet 承载,以保持其引用
workers.add(w);
int s = workers.size();
// 维护一个全局达到过的最大线程数计数器
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// worker 添加成功后,进行将worker启起来,里面应该是有一个 死循环,一直在获取任务
// 不然怎么运行添加到队列里的任务呢?
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果任务启动失败,则必须进行清理,返回失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
} Worker 执行任务
Worker 中的 run() 方***调用 runWorker() 来执行任务,方法执行过程如下:
- while 循环不断地通过
getTask()方法获取任务。 getTask()方法从阻塞队列中取任务。- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
- 执行任务。
- 如果 getTask 结果为 null 则跳出循环,执行 processWorkerExit() 方法,销毁线程。
整体流程如下图所示:
runWorker() 方法注释如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许打断
boolean completedAbruptly = true;
try {
// 不停地从 workQueue 中获取任务,然后执行,就是这么个逻辑
// getTask() 会阻塞式获取,所以 Worker 往往不会立即退出
while (task != null || (task = getTask()) != null) {
// 执行过程中是不允许并发的,即同时只能一个 task 在运行,此时也不允许进行 interrupt
w.lock();
// 检测是否已被线程池是否停止 或者当前 worker 被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 中断信息传递
wt.interrupt();
try {
// 任务开始前 切点,默认为空执行
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 直接调用任务的run方法, 具体的返回结果,会被 FutureTask 封装到 某个变量中
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任务开始后 切点,默认为空执行
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 正常退出,有必要的话,可能重新将 Worker 添加进来
completedAbruptly = false;
} finally {
// 处理退出后下一步操作,可能重新添加 Worker
processWorkerExit(w, completedAbruptly);
}
} Worker 回收
线程池中线程的销毁依赖 JVM 自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被 JVM 回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker 被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当 Worker 无法获取到任务,也就是获取的任务为空时,循环会结束,Worker 会主动消除自身在线程池内的引用。在上一节 runWorker() 源码中就可以看到。
try {
while (task != null || (task = getTask()) != null) {
//执行任务
}
} finally {
processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
} 线程回收的工作是在 processWorkerExit() 方法完成的。
大致流程如下:

代码注释如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 移出线程池
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
// 在 Worker 正常退出的情况下,检查是否超时导致,维持最小线程数
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果满足最小线程要求,则直接返回
if (workerCountOf(c) >= min)
return;
}
// 否则再添加一个 Worker 到线程池中备用
// 非正常退出,会直接再添加一个 Worker
addWorker(null, false);
}
} 事实上,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。
结束语
线程池算是 JDK 源码中综合性很强的部分了,对于很多项目的设计都是很有启发性的。
面试中也主要是针对 ThreadPoolExecutor 的设计理念来提问,甚至可能会扩展到让面试者自行设计。
#Java##学习路径#
