手把手教你实现简单而又安全的线程池

微信公众号:大黄奔跑
关注我,可了解更多有趣的面试相关问题。

写在之前

前两篇文档分别从面试角度、源码角度分析了线程池使用及原理,抛开Jdk实现的线程池,如果让你自己写一个线程池,该当如何快速实现呢?
本篇是之前面试快手时面试官给的题目,后续自己加了一些自己的思考并且手动实现了一个简单的线程池

线程池的好处之前已经赘述,可以避免频繁的创建和销毁线程,管理多个线程。Jdk本身也对线程池做了很好的实现,特别是AQS用于同步队列,简直是妙哉,但是如果想让我们手动去实现一个线程池,该如何做呢?

本文就来实现一个简单的线程池,肯定不能够保证性能媲美 AQS,但是安全性还是要保证的。

首先是定义线程池的接口:
线程池的核心是:用户不需要自己去new线程,而是将new的过程交由线程池处理。
并且线程池可以用于管理线程

/**
 * @time 2020/5/17 11:03
 * @Description  手动来实现一个简单的线程池
 */

public interface ThreadPool<Job extends Runnable{

    /**
     * 执行一个工作任务,将任务提交给线程池
     * @param job
     */

    void execute(Job job);

    /**
     * 关闭线程池
     */

    void shutdown();

    /**
     * 增加工作线程
     * @param num       增加的个数
     */

    void addWorkers(int num);

    /**
     * 减少工作者线程
     * @param num       减少的个数
     */

    void removeWorkers(int num);

    /**
     * 获取正在等待执行的任务数量
     * @return
     */

    int getJobSize();
}

定义具体的实现

/**
 * @time 2020/5/17 11:08
 * @Description
 */

public class MySelfThreadPool<Job extends Runnableimplements ThreadPool<Job>{


    /**
     * 最大的线程数       默认10
     */

    private static final int MAX_WORKER_NUMBERS =      10;

    /**
     * 默认创建的线程数
     */

    private static final int DEFAULT_WORKER_NUMBERS =  5;

    /**
     * 最小的线程数
     */

    private static final int MIN_WORKER_NUMBERS =      1;

    /**
     * 工作列表,需要向里面插入工作任务
     */

    private final LinkedList<Job> jobs = new LinkedList <>();

    /**
     * 工作者列表  利用自带的容器同步工具,也就是有多少个线程可以处理任务咯
     */

    private final List<MyWorker>  workers = Collections.synchronizedList(new ArrayList <>());

    /**
     * 工作线程的数量
     */

    private int workerNum = DEFAULT_WORKER_NUMBERS;

    /**
     * 生成线程编号
     */

    private AtomicLong threadNum = new AtomicLong();

    /**
     * 构造方法,如果不传参数的时候,初始化线程数
     */

    public MySelfThreadPool(){
        initWorkers(DEFAULT_WORKER_NUMBERS);
    }

    /**
     * 构造方法   初始化特定的线程数
     * @param num       线程数   不可以超过最大的线程数
     */

    public MySelfThreadPool(int num){
        // 如果传入的参数非法,则直接转化为范围内的值
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
        initWorkers(workerNum);
    }

    /**
     * 初始化线程数,并且这些线程已经启动的
     * @param workerNum
     */

    private void initWorkers(int workerNum) {
        for(int i = 0;i < workerNum;i++){
            MyWorker worker = new MyWorker();
            // 因为workers队列本身是线程安全的
            workers.add(worker);
            Thread thread = new Thread(worker,"ThreadPool-worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }

    /**
     * 执行一个工作任务,将任务提交给线程池,这里直接用synchronized锁定同步容器
     * @param job
     */

    @Override
    public void execute(Job job) {
        if(job != null){
            synchronized (jobs){
                jobs.addLast(job);
                jobs.notify();
            }
        }

    }

    /**
     * 关闭线程池,那肯定需要关闭一个个线程咯
     */

    @Override
    public void shutdown() {
        for(MyWorker worker : workers){
            worker.shutdown();
        }
    }

    /**
     * 向工作队列中增加线程
     * @param num       增加的个数
     */

    @Override
    public void addWorkers(int num) {
        synchronized (jobs){
            // 防止线程数移除
            if(num + this.workerNum > MAX_WORKER_NUMBERS){
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initWorkers(num);
            this.workerNum += num;
        }

    }

    /**
     * 从工作队列中移除线程
     * @param num       减少的个数
     */

    @Override
    public void removeWorkers(int num) {
        synchronized (jobs){
            if(num >= this.workerNum){
                throw new IllegalArgumentException("参数有误哦");
            }

            // 一个一个移除线程
            int count = 0 ;
            while (count < num){
                MyWorker worker = workers.get(count);
                // 如果顺利移除,则关闭线程
                if(workers.remove(worker)){
                    worker.shutdown();
                    count++;
                }
            }
            // 一定要修改全局的变量
            this.workerNum -= count;
        }
    }

    @Override
    public int getJobSize() {
        return jobs.size();
    }

    /**
     * 定义自己的工作线程
     */

    class MyWorker implements Runnable{

        /**
         * 定义一个boolean类型的数据表示是否运行中……
         */

        private volatile boolean running = true;

        @Override
        public void run() {
            // 判断线程是否处于运行状态,如果运行则说明可以做后续工作
            while (running){
                Job job = null;
                // 直接锁住整个工作队列
                synchronized (jobs){
                    // 如果工作队列为空,则需要等待
                    while (jobs.isEmpty()){
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    // 如果工作队列不为空,则取出任务
                    job = jobs.removeFirst();
                }
                if(job != null){
                    job.run();
                }

            }

        }

        /**
         * 暂停工作
         */

        public void shutdown(){
            running = false;
        }
    }
}

总结

本意想借着文章的机会给大家分享所学所得,无奈知识浅薄,文章难免很有很多纰漏,如果你发现了错误的地方,欢迎私信我。

番外

另外,关注大黄奔跑公众号,第一时间收获独家整理的面试实战记录及面试知识点总结。

我是大黄,一个只会写HelloWorld的程序员,咱们下期见。

#快手##面试题目#
全部评论

相关推荐

点赞 评论 收藏
转发
2 8 评论
分享
牛客网
牛客企业服务