Java线程池原理剖析

一、写在前面
1.1 线程池是什么
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。

线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

而本文描述线程池是JDK中提供的ThreadPoolExecutor类。

当然,使用线程池可以带来一系列好处:

**降低资源消耗**:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
**提高响应速度**:任务到达时,无需等待线程创建即可立即执行。
**提高线程的可管理性**:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
**提供更多更强大的功能**:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

1.2 Java线程池总体设计图


二、ThreadPoolExecutor核心方法
(结合源码分析,一次没看过的先看看源码和注释,否则没法理解)
1.1 构造方法
七大核心参数,不再详细阐述,想提高的同学需要着重关注的是:不同的阻塞队列/线程工厂/拒绝策略如何根据不同的业务场景进行选择
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,//存活时间-从阻塞队列获取任务的最长等待时间(没有获取到,消除其在Hash表的引用,等待JVM垃圾回收)
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

1.2 基础方法和变量
这些变量和方法是理解线程池最重要的入口
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//利用原子类的CAS算法维护ctl的值
private static final int COUNT_BITS = Integer.SIZE - 3;//32位,高3位保存线程池状态,低29位保存有效线程数(优点需要自己思考一下)
// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }//计算线程池状态
private static int workerCountOf(int c)  { return c & CAPACITY; }//计算有效线程数
private static int ctlOf(int rs, int wc) { return rs | wc; }//将线程池状态和有效线程数进行位运算,转成32位

1.3 execute方法
1.3.1 说明
这是线程池的核心执行方法,先来看看总体设计(这张图不完全正确,参考了其他文献)

1.3.2 源码分析
/*
 * 该方法宏观上分为三步
 */
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
       
        /*
         * 第一步:判断线程数是否小于核心线程数,是则执行addWorker(方法解析放在后面,先记下这个方法)
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        /*
         * 第二步:判断线程池状态是否处在RUNNING(线程池不同状态对任务调度的影响,自己学习,很重要),是则将任务添加到阻塞队列
         */
        if (isRunning(c) && workQueue.offer(command)) {//这里的逻辑叫double-check,线程池状态可能发生变化或者线程数=0(这种情况出现在allowCoreThreadTimeOut = true的时候,参数含义自己搜索)
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);//先自己思考为啥有这一步!!!
        }
        /*
         * 第三步:尝试创建非核心线程执行任务
         */
        else if (!addWorker(command, false))//若失败,说明线程池状态变更或者线程数超出限制
            reject(command);//这种情况出现在线程数已经超过了最大线程数,那么执行拒绝策略(4种,默认抛出异常策略)
    }

1.4 addWorker(Runnable firstTask, boolean core)
1.4.1 说明
firstTask:工作线程要执行的第一个任务
core:是否为核心线程
思想:创建工作线程并执行任务
1.4.2 源码分析
代码比较长,先说概要设计
第一步:通过CAS算法,将线程数+1(失败则自旋,直到成功);思考:为啥用用for(;;)取代while(true)做自旋?
第二步:执行任务(由Worker对象持有,对象维护在HashSet中)
    private boolean addWorker(Runnable firstTask, boolean core) {
        //环节1:CAS算法维护ctl值
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))//执行CAS算法,如果成功,则目的达到,break,进入环节2;失败则自旋转
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        //环节2:执行任务
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);//Worker是Runnbale接口的实现类,同时实现了AQS接口
            final Thread t = w.thread;//所以这里拿到Worker对象持有的Thread对象,线程启动后,执行的就是Worker对象的run方法,从而引出后面的runWorker()方法
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();//同步执行
                try {
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//启动线程,执行Worker对象的runWorker方法
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

1.5 runWorker(Worker w)
1.5.1 说明
w:持有工作线程和任务的对象,是ThreadPoolExecutor的内部类
思想:1、获取任务(getTask()方法)    2、执行(就是调用Runnable的run方法)

1.5.2 源码分析
这个方法的
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 思考这里为什么调一下这个方法
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//不断从队列获取任务:getTask()方法后面解析
                w.lock();
                // 判断线程池状态
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//前置方法,方便扩展、定制(类似Spring的后置处理器BeanPostProcessor)
                    Throwable thrown = null;
                    try {
                        task.run();//执行任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//后置方法,方便扩展、定制
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);//获取不到任务了,消除自身引用。还会后置判断队列中是否还有任务(防止最后一个线程消除后,队列中还有任务),如果有,创建线程去执行
        }
    }

1.6 getTask()
1.6.1 说明
思想:不断的从阻塞队列中获取任务
1.6.2 源码分析
    private Runnable getTask() {
        boolean timedOut = false; // 记录最后一次从队列获取任务是否超时

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 判断线程池和队列状态
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();//CAS维护ctl值
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//allowCoreThreadTimeOut=true时,核心线程数在空闲等待时间过后,也是可回收的,默认false

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //从队列中获取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;//超时了,说明没获取到任务,记录下来
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
三、思考题答案
Q:
A:这一步的总体逻辑是,将任务提交到阻塞队列。最后还要判断一下工作线程数是否为0,是防止allowCoreThreadTimeOut=true的情况,因为这时核心线程获取不到任务之后也会被回收,那么队列中的任务就需要新起一个线程去消费,为什么这里的AddWorker的参数设置为null呢,因为为null时,才会去队列中取任务(getTask方法),

Q:这里为什么要调用一下解锁?
A:是因为这个worker还没开始执行,我们允许它被打断的,否则在自旋获取锁被阻塞的时候被打断会抛出异常,干扰到原来的正常逻辑

四、吐槽点
1.1 牛客的这个编辑器做的真不咋地,感觉灵活度好低啊。。。
1.2 线程池的原理懂了以后,如何做到动态配置和动态扩容/缩容,自己可多了解下,毕竟线上运行的项目,线程池参数不可能写死在代码里
1.3 参考了美团线程池的文章
1.4 文章字数不多,重点都写在了注释里面,会定期维护这篇文章,欢迎指正

五、后期展望
AQS框架的实现原理
#Java开发##Java##学习路径#
全部评论
第一个思考题我有另一种看法,前面addworker方法失败,除了线程池状态发生改变,也有可能线程数目>=核心线程,如果我传入的核心线程数目为0,并且阻塞队列为无界呢,我们就需要创建救急线程,workQueue.offer(command)只会把任务添加到阻塞队列中,所以这里有一步判断核心线程是否为0,如果是0,则创建救急线程
点赞 回复
分享
发布于 2022-09-14 12:03 浙江

相关推荐

2 18 评论
分享
牛客网
牛客企业服务