ThreadPoolExecutor 从线程池参数引发的“血案”(下)
Q:讲下任务提交过程吧?
> 即 execute 方法流程.
execute
/**
* 在未来的某个时刻执行给定的任务.
* 该任务可能在一个新线程中执行,也可能在池中已经存在的线程
*
* 如果任务无法被提交执行,可能原因:
* 1. 该executor已被shutdown
* 2. 已经达到其容量上限,任务被当前的RejectedExecutionHandler处理
* @param command 要执行的任务
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
*/
public void execute(Runnable command) {
// 不支持空任务
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 若实际运行线程数 < corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 会尝试创建一个新线程,将该command作为其第一个任务(firstTask)
// 对addWorker的调用原子性地检查runState和workerCount,从而通过返回false来防止在不应该添加线程的情况下发出错误的警报。
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
/*
* 如果一个任务入队成功,仍需双重校验:
* 1.是否需要新建一个工作线程,因为可能存在自上次校验后已死亡的工作线程
* 2.进入该方法后,线程池是否被关闭了
* 因此需要再次检查state,并分别处理以上两种情况:
*/
int recheck = ctl.get();
// 若线程池已被停止,则回滚入队
if (! isRunning(recheck) && remove(command))
reject(command);
// 若池中已无可用工作线程
else if (workerCountOf(recheck) == 0)
// 则新建一个线程
addWorker(null, false);
}
// 若任务无法入队,则尝试添加一个新的工作线程.
else if (!addWorker(command, false))
// 新建线程失败,说明线程池已停止或已饱和,因此执行拒绝策略
reject(command);
}
继续细看
addWorker 方法
Worker 类
该类主要是维护中断控制状态的线程运行的任务。 此类继承AQS,简化了获取和释放包围每个任务执行的锁。
这可以防止那些旨在唤醒一个工作线程,而不是从中断任务正在运行任务的等待中断。
因为不希望Woker的任务能够重新获取锁,因此实现为一个简单的不可重入的互斥锁,而不是使用的ReentrantLock,他们调用池控制方法,如setCorePoolSize。
此外,为了抑制中断,直到线程实际开始运行的任务,我们初始化锁定状态为负值,并且清除它在启动时(在runWorker)


可以看到构造器入参即为一个 Runnable 可运行任务.

其中字段 thread 即为该Worker正在运行的线程,通过线程工厂创建。所以一个Worker 就代表了一个工作线程.
workers集合
-
该 Set 包含了在池中的所有工作线程。 只有持有mainLock全局锁时才能被访问.只有成功添加至workers集合的才是真正的可执行任务的工作线程.
tryRelease
unLock 时将state 从-1置0.
/** * 新增工作线程需要遵守线程池控制状态规定和边界限制 * * @param core core为true时允许扩容到核心工作线程数,否则为最大工作线程数 * @return 新增成功返回true,失败返回false */ private boolean addWorker(Runnable firstTask, boolean core) { // 重试(goto 语法) retry: // 外自旋,判断线程池的运行状态 for (;;) { int c = ctl.get(); int rs = runStateOf(c); /** * 仅有两种情况允许新增线程: * 1.线程池状态为RUNNING * 2.线程池状态为SHUTDOWN,firstTask为null,队列非空 */ if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 内自旋,判断实际工作线程数 wc 是否满足数量边界条件. for (;;) { // 当前工作线程数 int wc = workerCountOf(c); /** * wc 异常情况: * 1.wc >= 最大容量 * 2.wc >= 自定义配置边界,这时就需要看第二个入参了 * core为true,边界为corePoolSize * core为false,边界为maximumPoolSize */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS自增工作线程数 if (compareAndIncrementWorkerCount(c)) // 成功,表明新增条件都满足,直接退出retry循环 break retry; // 重读原子控制量 c = ctl.get(); /** * 校验线程池运行状态是否改变 * 1.是,再次retry循环,重新校验整个新增条件 * 2.否,说明是因为wc改变了才导致CAS失败,只需重新判断新增条件中的边界条件 */ if (runStateOf(c) != rs) continue retry; } } // ===================================== // 前面都是校验线程池是否满足新增工作线程的条件 // ===================================== // 记录新增worker是否开始工作 boolean workerStarted = false; // 记录新增worker是否成功添加到workers集合 boolean workerAdded = false; Worker w = null; try { // 通过新提交的任务新建一个Worker实例 w = new Worker(firstTask); // 获取该Worker中的实际工作线程 final Thread t = w.thread; // 判断是否成功创建工作线程 if (t != null) { // 新增工作线程需要加全局锁,确保原子更新workers集合和largestPoolSize
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java源码模拟面试解析指南 文章被收录于专栏
<p> “挨踢”业行情日益严峻,企业招聘门槛愈来愈高,大厂hc更是少之又少,而Java技术面试普遍对基础知识的掌握考察特别深,大多数同学突击所看的 Java 面试基础知识点根本达不到面试官近乎挑剔的要求。 本专刊针对如今的校招及社招痛点,深入解析 JDK 的核心源码,探究 JDK 的设计精髓及最佳实践,同时以模拟面试的场景切入,让同学们在阅读过程中也能轻松掌握面试技巧。 本专刊购买后即可解锁所有章节,故不可以退换哦~ </p>
