18.3.5 线程池核心参数与调优策略
1. 线程池基础概念
1.1 ThreadPoolExecutor核心参数
public class ThreadPoolBasics {
/*
* ThreadPoolExecutor构造参数:
*
* corePoolSize: 核心线程数
* maximumPoolSize: 最大线程数
* keepAliveTime: 非核心线程空闲存活时间
* unit: 时间单位
* workQueue: 工作队列
* threadFactory: 线程工厂
* handler: 拒绝策略
*/
public void demonstrateBasicThreadPool() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60L, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<>(10), // workQueue
Executors.defaultThreadFactory(), // threadFactory
new ThreadPoolExecutor.AbortPolicy() // handler
);
System.out.println("核心线程数: " + executor.getCorePoolSize());
System.out.println("最大线程数: " + executor.getMaximumPoolSize());
// 提交任务测试
for (int i = 1; i <= 15; i++) {
final int taskId = i;
try {
executor.submit(() -> {
System.out.println("任务" + taskId + "执行,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
System.out.println("提交任务" + taskId + " - 线程数: " + executor.getPoolSize() +
", 队列: " + executor.getQueue().size());
} catch (RejectedExecutionException e) {
System.out.println("任务" + taskId + "被拒绝");
}
}
executor.shutdown();
}
}
1.2 线程池执行流程
public class ThreadPoolExecutionFlow {
/*
* 线程池任务执行流程:
*
* 1. 如果当前线程数 < corePoolSize,创建新线程执行任务
* 2. 如果当前线程数 >= corePoolSize,将任务加入队列
* 3. 如果队列已满且当前线程数 < maximumPoolSize,创建新线程
* 4. 如果队列已满且当前线程数 >= maximumPoolSize,执行拒绝策略
*/
public void demonstrateExecutionFlow() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
new CustomThreadFactory("demo"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
for (int i = 1; i <= 8; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "执行 - " + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
System.out.println("提交任务" + taskId + " - 线程: " + executor.getPoolSize() +
", 队列: " + executor.getQueue().size());
}
executor.shutdown();
}
static class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
CustomThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + "-thread-" + threadNumber.getAndIncrement());
t.setDaemon(false);
return t;
}
}
}
2. 工作队列详解
2.1 不同队列类型对比
public class WorkQueueComparison {
/*
* 常用工作队列类型:
*
* 1. ArrayBlockingQueue: 有界阻塞队列,FIFO
* 2. LinkedBlockingQueue: 可选有界阻塞队列,FIFO
* 3. SynchronousQueue: 不存储元素的阻塞队列
* 4. PriorityBlockingQueue: 优先级阻塞队列
*/
public void demonstrateQueueTypes() {
testArrayBlockingQueue();
testLinkedBlockingQueue();
testSynchronousQueue();
testPriorityBlockingQueue();
}
private void testArrayBlockingQueue() {
System.out.println("--- ArrayBlockingQueue测试 ---");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 2, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3), // 有界队列
new ThreadPoolExecutor.AbortPolicy()
);
for (int i = 1; i <= 6; i++) {
final int taskId = i;
try {
executor.submit(() -> {
System.out.println("ArrayBlocking任务" + taskId + "执行");
try { Thread.sleep(2000); } catch (InterruptedException e) {}
});
System.out.println("提交任务" + taskId + " - 队列: " + executor.getQueue().size());
} catch (RejectedExecutionException e) {
System.out.println("任务" + taskId + "被拒绝");
}
}
executor.shutdown();
}
private void testSynchronousQueue() {
System.out.println("--- SynchronousQueue测试 ---");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 5, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), // 直接交换队列
new ThreadPoolExecutor.CallerRunsPolicy()
);
for (int i = 1; i <= 8; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Synchronous任务" + taskId + "执行 - " + Thread.currentThread().getName());
try { Thread.sleep(2000); } catch (InterruptedException e) {}
});
System.out.println("提交任务" + taskId + " - 线程数: " + executor.getPoolSize());
}
executor.shutdown();
}
private void testPriorityBlockingQueue() {
System.out.println("--- PriorityBlockingQueue测试 ---");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 60L, TimeUnit.SECONDS,
new PriorityBlockingQueue<>(),
new ThreadPoolExecutor.AbortPolicy()
);
int[] priorities = {3, 1, 4, 2, 5};
for (int priority : priorities) {
executor.submit(new PriorityTask(priority));
System.out.println("提交优先级" + priority + "任务");
}
executor.shutdown();
}
static class PriorityTask implements Runnable, Comparable<PriorityTask> {
private final int priority;
public PriorityTask(int priority) {
this.priority = priority;
}
@Override
public void run() {
System.out.println("执行优先级" + priority + "任务");
try { Thread.sleep(1000); } catch (InterruptedException e) {}
}
@Override
public int compareTo(PriorityTask other) {
return Integer.compare(other.priority, this.priority);
}
}
}
3. 拒绝策略详解
3.1 四种内置拒绝策略
public class RejectionPolicyDemo {
/*
* 四种内置拒绝策略:
*
* 1. AbortPolicy: 抛出RejectedExecutionException异常
* 2. CallerRunsPolicy: 由调用线程执行任务
* 3. DiscardPolicy: 静默丢弃任务
* 4. DiscardOldestPolicy: 丢弃队列中最老的任务
*/
public void demonstrateRejectionPolicies() {
testAbortPolicy();
testCallerRunsPolicy();
testDiscardPolicy();
testCustomRejectionHandler();
}
private void testAbortPolicy() {
System.out.println("--- AbortPolicy测试 ---");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.AbortPolicy()
);
for (int i = 1; i <= 3; i++) {
final int taskId = i;
try {
executor.submit(() -> {
System.out.println("Abort任务" + taskId + "执行");
try { Thread.sleep(2000); } catch (InterruptedException e) {}
});
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java面试圣经 文章被收录于专栏
Java面试圣经,带你练透java圣经
查看6道真题和解析