ThreadPoolExecutor 从线程池参数引发的“血案”(下)

Q:讲下任务提交过程吧?

> 即 execute 方法流程.

execute

/**
 * 在未来的某个时刻执行给定的任务.
 * 该任务可能在一个新线程中执行,也可能在池中已经存在的线程
 * 
 * 如果任务无法被提交执行,可能原因:
 *     1. 该executor已被shutdown
 *     2. 已经达到其容量上限,任务被当前的RejectedExecutionHandler处理
 * @param command 要执行的任务
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 */
public void execute(Runnable command) {
    // 不支持空任务
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // 若实际运行线程数 < corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        // 会尝试创建一个新线程,将该command作为其第一个任务(firstTask)
        // 对addWorker的调用原子性地检查runState和workerCount,从而通过返回false来防止在不应该添加线程的情况下发出错误的警报。
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    if (isRunning(c) && workQueue.offer(command)) {
        /*
         * 如果一个任务入队成功,仍需双重校验:
         * 1.是否需要新建一个工作线程,因为可能存在自上次校验后已死亡的工作线程
         * 2.进入该方法后,线程池是否被关闭了
         * 因此需要再次检查state,并分别处理以上两种情况:
         */ 
        int recheck = ctl.get();
        // 若线程池已被停止,则回滚入队
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 若池中已无可用工作线程
        else if (workerCountOf(recheck) == 0)
            // 则新建一个线程     
            addWorker(null, false);
    }
    // 若任务无法入队,则尝试添加一个新的工作线程.
    else if (!addWorker(command, false))
        // 新建线程失败,说明线程池已停止或已饱和,因此执行拒绝策略
        reject(command);
}

继续细看

addWorker 方法

Worker 类

该类主要是维护中断控制状态的线程运行的任务。 此类继承AQS,简化了获取和释放包围每个任务执行的锁。
这可以防止那些旨在唤醒一个工作线程,而不是从中断任务正在运行任务的等待中断。
因为不希望Woker的任务能够重新获取锁,因此实现为一个简单的不可重入的互斥锁,而不是使用的ReentrantLock,他们调用池控制方法,如setCorePoolSize。
此外,为了抑制中断,直到线程实际开始运行的任务,我们初始化锁定状态为负值,并且清除它在启动时(在runWorker)


可以看到构造器入参即为一个 Runnable 可运行任务.

其中字段 thread 即为该Worker正在运行的线程,通过线程工厂创建。所以一个Worker 就代表了一个工作线程.

workers集合

  • 该 Set 包含了在池中的所有工作线程。 只有持有mainLock全局锁时才能被访问.只有成功添加至workers集合的才是真正的可执行任务的工作线程.

    tryRelease

    unLock 时将state 从-1置0.

    /**
    * 新增工作线程需要遵守线程池控制状态规定和边界限制
    *
    * @param core core为true时允许扩容到核心工作线程数,否则为最大工作线程数
    * @return 新增成功返回true,失败返回false
    */
    private boolean addWorker(Runnable firstTask, boolean core) {
      // 重试(goto 语法)
      retry:
      // 外自旋,判断线程池的运行状态
      for (;;) {
          int c = ctl.get();
          int rs = runStateOf(c);
          /**
           * 仅有两种情况允许新增线程:
           *     1.线程池状态为RUNNING
           *  2.线程池状态为SHUTDOWN,firstTask为null,队列非空
           */
          if (rs >= SHUTDOWN &&
              !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
              return false;
          // 内自旋,判断实际工作线程数 wc 是否满足数量边界条件.
          for (;;) {
              // 当前工作线程数
              int wc = workerCountOf(c);
              /**
               * wc 异常情况:
               *  1.wc >= 最大容量
               *  2.wc >= 自定义配置边界,这时就需要看第二个入参了
               *     core为true,边界为corePoolSize
               *     core为false,边界为maximumPoolSize
               */
              if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                  return false;
              // CAS自增工作线程数
              if (compareAndIncrementWorkerCount(c))
                  // 成功,表明新增条件都满足,直接退出retry循环
                  break retry;
              // 重读原子控制量
              c = ctl.get();
              /**
               * 校验线程池运行状态是否改变
               *  1.是,再次retry循环,重新校验整个新增条件
               *  2.否,说明是因为wc改变了才导致CAS失败,只需重新判断新增条件中的边界条件
               */
              if (runStateOf(c) != rs)
                  continue retry;
          }
      }
      // =====================================
      // 前面都是校验线程池是否满足新增工作线程的条件
      // ===================================== 
    
      // 记录新增worker是否开始工作
      boolean workerStarted = false;
      // 记录新增worker是否成功添加到workers集合
      boolean workerAdded = false;
      Worker w = null;
      try {
          // 通过新提交的任务新建一个Worker实例
          w = new Worker(firstTask);
          // 获取该Worker中的实际工作线程
          final Thread t = w.thread;
          // 判断是否成功创建工作线程
          if (t != null) {
              // 新增工作线程需要加全局锁,确保原子更新workers集合和largestPoolSize
          

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

Java源码模拟面试解析指南 文章被收录于专栏

<p> “挨踢”业行情日益严峻,企业招聘门槛愈来愈高,大厂hc更是少之又少,而Java技术面试普遍对基础知识的掌握考察特别深,大多数同学突击所看的 Java 面试基础知识点根本达不到面试官近乎挑剔的要求。 本专刊针对如今的校招及社招痛点,深入解析 JDK 的核心源码,探究 JDK 的设计精髓及最佳实践,同时以模拟面试的场景切入,让同学们在阅读过程中也能轻松掌握面试技巧。 本专刊购买后即可解锁所有章节,故不可以退换哦~ </p>

全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务