J.U.C并发包(2)

J.U.C并发包(2)

FutureTask

Callable接口和Runnable接口对比

  • 二者大致相似,不同的是,Runnable只有一个方法就是Run,而Callable有一个Call方法,功能也更强大,因为被线程执行后有返回值,并且可以抛出异常。

Future接口

  • Future对于具体的Runnable和Callable任务进行查询,取消或者获取结果等。可以监视目标线程调用Call的情况,当调用get()方法就会获得线程Call方法的返回值。

FutureTask类

  • FutureTask是RunnableFuture的实现类,RunnableFuture的父接口就是Runnable和Future。

  • 既可以作为Runnable被线程执行,也可以作为Future得到Call的返回值。

  • 因此FutureTask是非常便利的。

示例代码:

public class FutureTaskExample {
    private static Logger log = LoggerFactory.getLogger(FutureTaskExample.class);

    public static void main(String[] args) throws Exception {
        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("do something in callable");
                Thread.sleep(5000);
                return "Done";
            }
        });

        new Thread(futureTask).start();
        log.info("do something in main");
        Thread.sleep(1000);
        String result = futureTask.get();
        log.info("result:{}", result);
    }
}

Fork/Join框架

  • 并行执行任务,把大任务分割为若干个子任务(Fork),然后合并子任务的执行结果(Join)。

  • 采用工作窃取算法,为了减少窃取任务线程与被窃取任务线程之间进行竞争,采用双端队列,被窃取任务线程从双端队列头部拿任务,窃取任务线程从双端队列尾部拿任务

  • 优点:充分使用线程进行并行计算,减少线程之间的竞争

  • 缺陷:还是可能存在浪费系统资源的可能。

  • 局限性:任务只能使用Fork/Join作为同步机制,如果使用其他同步机制,线程就不能执行其他任务

  • 执行的线程中的任务不可以做IO操作

  • 任务不能跑出检查异常,必须额外通过代码处理。

BlockingQueue

  • 当一个线程对一个已经填满的阻塞队列进行入队操作便会阻塞,当一个线程对一个空阻塞队列出队操作时也会发生阻塞。

  • 优点:线程安全,用于生产者消费者模式。

实现类:

  • ArrayBlockingQueue:有界,FIFO
  • DelayQueue:元素有序,通常按照元素过期时间排序
  • LinkedBlockingQueue:大小可选,可以指定大小,FIFO
  • PriorityBlockingQueue:带优先级的阻塞队列,可以插入空值,必须实现Compareable接口。
  • SynchronousQueue:只能允许容纳一个元素,当一个线程插入一个元素后就会被阻塞,除非这元素被另一个线程消费,所以也被称为无界非缓存队列。

线程池

优点:

  • 重用存在的线程,减少对象的创建、消亡的开销,性能更好
  • 可以有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多的资源竞争,避免阻塞。

ThreadPoolExecutor

  • corePoolSize:核心线程数量
  • maximumPoolSize:线程最大线程数
  • workQueue:阻塞队列,存储等待执行的任务,

当一个任务进入线程池,会根据当前线程池中运行中的线程数量决定该任务的处理方式:

  • 直接切换:即SynchronousQueue
  • 使用无界队列:即LinkedBlockingQueue,线程池中能够运行的最大线程数就是corePoolSize,maximumPoolSize失效。
  • 使用有界队列:即ArrayBlockingQueue,线程池中能够运行的最大线程数是maximumPoolSize,这样可以降低资源消耗,但降低了性能。

  • keepAliveTime:线程2没执行任务最多保持多久时间终止。
  • unit:keepAliveTime的时间单位。
  • threadFactory:线程工厂,用来创建线程。

RejectedExecutionHandler:拒绝处理任务时的策略。

  • AbortPolicy抛出RejectedExecutionException异常。
  • DiscardOldestPolicy将队列最老的任务干掉
  • DiscardPolicy也是丢弃任务,但是不抛出异常
  • CallerRunsPolicy即不用线程池中的线程执行,而是交给调用方来执行

ThreadPoolExecutor


  • execute():提交任务,交给线程池执行
  • submit():提交任务,能够返回执行结果,execute加上Future
  • shutdown():关闭线程池,等待线程执行完毕,即阻塞队列中的任务也进入线程池执行完毕,不再接受新任务。
  • shutdownNow():关闭线程池,不等待任务执行完,暂停正在执行的线程,

Executors框架接口

  • Executors.newCachedThreadPool:可缓存的线程池
  • Executors.newFixedThreadPool:定长的线程池
  • Executors.newScheduledThreadPool:定长的线程池,支持定时执行,这也就有了线程调度的思想,使用schedule方法和scheduleAtFixedRate()方法。
public class ThreadPoolExample4 {

    public static void main(String[] args) {

        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);

        executorService.schedule(new Runnable() {
            @Override
            public void run() {
                log.warn("schedule run");
            }
        }, 3, TimeUnit.SECONDS);
        executorService.shutdown();
    }
}
public class ThreadPoolExample4 {
    private static Logger log = LoggerFactory.getLogger(ThreadPoolExample4.class);
    public static void main(String[] args) {

        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);

        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                log.warn("schedule run");
            }
        }, 1, 3, TimeUnit.SECONDS);
    }
}

  • Executors.newSingelThreadPool:单线程化线程池,只用唯一的工作线程执行,可以使线程按照顺序执行。

线程池的合理配置

  • CPU密集性任务,线程池容量设置为CPU数+1
  • IO密集性任务,线程池容量设置为CPU数 * 2
全部评论

相关推荐

点赞 评论 收藏
转发
点赞 收藏 评论
分享
牛客网
牛客企业服务