线程池的线程复用原理

前言

线程池有核心线程和非核心线程之分:

  • 核心线程是一直存活在线程池中的
  • 非核心线程是在执行完任务之后超时销毁的

当Thread执行完Runnable任务之后就会销毁,而且就算执行完任务之后把线程挂起也没有办法再去执行其他任务,那线程池是如何做到核心线程复用的呢?

首先来看一下执行线程任务的方法,实际上就是根据工作线程数量去执行不同的策略

execute()

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        /*
         * 如果当前活跃线程数小于核心线程数,就会添加一个worker来执行任务;
         * 具体来说,新建一个核心线程放入线程池中,并把任务添加到该线程中。
         */
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

      //程序执行到这里,说明要么活跃线程数大于核心线程数;要么addWorker()失败

        /*
         * 如果当前线程池是运行状态,会把任务添加到队列
         */
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

       //程序执行到这里,说明要么线程状态不是RUNNING;要么workQueue队列已经满了

        //调用addWorker方法去创建非核心线程,
        //如果当前线程数已经达到 maximumPoolSize,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

里面分成了3种情况,但是都会执行addWoker()方法

这个方法就和名字一样,添加一个工人来完成任务;而这个工人就是Thread,任务就是Runnable。

addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
       ...省略一些不重要的

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //Worker是实现了Runnable接口的包装类
            w = new Worker(firstTask);
            //Thread是在Worker构造方法创建的
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    //检查线程池状态,分为2种情况
                    //1、线程池处于RUNNING
                   //2、线程池处于SHUTDOWN并且firstTask==null
                   //这2种情况都会创建Worker来执行队列中的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //重新设置标识位
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //启动线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

由代码,大概知道是通过创建Worker来执行任务的,而且线程是在Worker内部创建的,所以想到Thread需要的Runnable应该也在Worker类内部

Worker()

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

Worker是一个Runnable任务,而且Worker的构造方法中创建了Thread对象。

这样的话,在之前的addWorker()方法中调用t.start(); 就会执行到Worker的run()方法。

runWorker()

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //这个就是addWorker传进来的Runnable
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果task不为null或从workQueue中获取任务不为null
            //就会一直执行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt

                //检查线程池状态,如果线程池处于中断状态,将调用interrupt将线程中断。 
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    //中断线程
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //线程任务执行啦
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

这里的关键在于这个while()条件判断,当第一次创建Worker时就有任务,当执行完这个任务后,这个方法并没有结束,而是不断地调用getTask()方法从阻塞队列中获取任务然后调用task.run()执行任务

getTask()

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 1.allowCoreThreadTimeOut表示是否允许核心线程超时销毁,默认是false,也就是说核心线程即使空闲也不会被销毁
          //当然,如果设置为true,核心线程是会销毁的
          //这样的话,只有正在工作的线程数大于核心线程数才会为true,否则返回false
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //2.如果timed为true,通过poll取任务;如果为false,通过take取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

这个方法是通过一个死循环取任务,取任务的话是通过workQueue这个阻塞队列去完成的,在默认不改变allowCoreThreadTimeOut的前提下,

  • 如果工作线程数大于核心线程数,则通过poll()从队列取任务,取完队列后,线程销毁;
  • 如果工作线程数小于核心线程数,则通过take()从队列取任务,取完队列后,线程阻塞;

这2个方法的区别是:take()取任务时,如果队列中没有任务了会调用await()阻塞当前线程

总结

  • 当Thread的run方法执行完一个任务之后,会循环地从阻塞队列中取任务来执行,这样执行完一个任务之后就不会立即销毁了;
  • 当工作线程数小于核心线程数,那些空闲的核心线程再去队列取任务的时候,如果队列中的Runnable数量为0,就会阻塞当前线程,这样线程就不会回收了。线程的唤醒是在execute时,当调用workQueue.offer()方法,将任务放入阻塞队列时,会调用Condition.signal()方法唤醒一个之前阻塞的线程。

微信公众号:程序员WeiG
个人博客:wggz.top
语雀主页:yuque.com/weig

#后端开发##学习路径#
全部评论
厉害了 好多知识点!
点赞 回复
分享
发布于 2022-01-27 20:13

相关推荐

2 3 评论
分享
牛客网
牛客企业服务