手写简单线程池
线程池
线程池:
问题:在JDK
中什么代表线程池?Executor
线程池其实就是一个生产者消费者模型
生产者:提交任务的线程
消费者:处理任务的线程
产品:任务问题:大家以前是如何创建线程池的?
Executors
里面的静态方法
注意事项:
工作中不要使用Executors
里面的静态方法去创建线程里。
原因:这样创建的线程池,里面的阻塞队列都是LinkedBlockingDeque
, 可以’无限’缓存任务。
在并发量比较高的场景中,容易导致OOM
.
最好使用ThreadPoolExecutor
去创建线程池
public class MyThreadPool implements Executor {
private static final int DEFAULT_CAPACITY = 100;
private static final int MAX_CAPACITY = Integer.MAX_VALUE - 8;
private BlockingQueue<Runnable> tasks;
private int size = 10; // 处理任务的线程数目
public MyThreadPool() {
tasks = new ArrayBlockingQueue<>(DEFAULT_CAPACITY);
init(); // 创建线程,并就绪
}
public MyThreadPool(int capacity) {
if (capacity <= 0 || capacity > MAX_CAPACITY) {
throw new IllegalArgumentException("capacity=" + capacity);
}
tasks = new ArrayBlockingQueue<>(capacity);
init();
}
private void init() {
for (int i = 0; i < size; i++) {
new WorkThread().start();
}
}
@Override
public void execute(Runnable command) {
try {
tasks.put(command); // 提交任务
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private class WorkThread extends Thread{
@Override
public void run() {
while (true) { // 死循环:回收线程
try {
tasks.take().run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
/* JDK:BlockingQueue<E> |-- ArrayBlockingQueue: 容量大小固定 |-- LinkedBlockingDeque: 容量大小步固定,除非指定大小。 */
public class MyBlockingQueue<E> {
private static final int DEFAULT_CAPACITY = 10;
private static final int MAX_CAPACITY = Integer.MAX_VALUE - 8;
// 属性
private int front;
private int rear;
private int size;
private E[] elements;
@SuppressWarnings("unchecked")
public MyBlockingQueue() {
elements = (E[]) new Object[DEFAULT_CAPACITY];
}
@SuppressWarnings("unchecked")
public MyBlockingQueue(int initialCapacity) {
if (initialCapacity <= 0 || initialCapacity > MAX_CAPACITY) {
throw new IllegalArgumentException("initialCapacity=" + initialCapacity);
}
elements = (E[]) new Object[initialCapacity];
}
public synchronized void enqueue(E e) {
// 判断队列是否满
// if (size == elements.length) {
while (size == elements.length) { // Caution! 不能够使用if
try {
wait();// t1, t2
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
// 添加元素
elements[rear] = e;
rear = (rear + 1) % elements.length;
size++;
// 队列不空, 需要唤醒其它线程
notifyAll();
}
public synchronized E dequeue() {
// 判断队列是否为空
// if (size == 0) {
while (size == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 删除元素
E removeValue = elements[front];
elements[front] = null;
front = (front + 1) % elements.length;
size--;
// 队列不满,唤醒其它线程
notifyAll();
return removeValue;
}
public synchronized E peek() {
while (size == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return elements[front];
}
public synchronized boolean isEmpty() {
return size == 0;
}
public synchronized int size() {
return size;
}
}