【下】分享自己整理的Java并发基础笔记(附实例代码)

计算机网络:https://www.nowcoder.com/discuss/342320
MySQL:https://www.nowcoder.com/discuss/353707
Java并发上:https://www.nowcoder.com/discuss/355081
我的csdn源地址:https://blog.csdn.net/qq_41112238/article/details/103545803


六 原子变量类

什么是原子变量类

意义

  • 多线程访问同一变量,需要加锁保证线程安全,而锁比较消耗性能。
  • 原子变量比锁的粒度更细,量级更轻,对于在多处理器上实现高性能的并发代码十分关键。原子变量将发生竞争的范围缩小到单个变量上,更新原子变量的快速(非竞争)路径不会比获取锁的路径慢,并且通常会更快,而它的慢速路径也比锁的慢速路径快,因为它不需要挂起或重新调度线程。在使用基于原子变量而非锁的算法中,线程在执行时更不易出现延迟,并且遇到竞争也更容易恢复。

概念

  • 原子变量类相当于一种泛化的volatile变量,能够支持原子的和有条件的读-改-写操作。
  • 这些类位于java.util.concurrent.atomic包中,到JDK1.8,该包共有17个类,依据作用分为四种:标量类、更新器类、数组类以及复合变量类。

标量类

AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference这四种基本类型用来处理布尔,整数,长整数,对象四种数据。
其内部实现使用更为高效的CAS+volatile和native方法,避免了synchronized的高开销,执行效率大为提升。

  • AtomicInteger

AtomicInteger类表示一个int类型的值,并提供了get和set方法,这些volatile类型的int变量在读取和写入上有着相同的内存语义。
它还提供了一个原子性的CompareAndSet方法(如果该方法成功执行,那么将实现与读取/写入一个volatile变量相同的内存效果),以及原子的添加、递增和递减等方法。
AtomicInteger类非常像一个扩展的Counter类,但在发生竞争时能提供更高的可伸缩性,因为它直接利用了硬件对并发的支持。

回顾一下之前说过的一个非线程安全增加int值类:

public class UnsafeThread {
    private static int num;

    private  static void inCreate(){
        num++;
    }

    public static void main(String[] args) throws InterruptedException {
        //创建100个线程,执行对num的加1操作
        for(int i=0;i<100;i++){
            new Thread(()->{
                for (int j = 0; j < 100; j++) {
                    inCreate();
                }
            }).start();
        }
        Thread.sleep(1000);
        System.out.println(num);//结果不为10000
    }
}

可使用AtomicInteger以保证线程安全:

public class SafeThread {
    private static AtomicInteger num=new AtomicInteger(0);//创建原子变量类,并赋初值0

    private static void inCreate(){
        num.incrementAndGet();//对num值加1
    }

    public static void main(String[] args) throws InterruptedException {
        for(int i=0;i<100;i++){
            new Thread(()->{
                for (int j = 0; j < 100; j++) {
                    inCreate();
                }
            }).start();
        }
        Thread.sleep(1000);
        System.out.println(num);//结果一定为10000
    }
}
  • AtomicLong
  • AtomicBoolean
  • AtomicReference

开启多线程修改引用类型

public class AtomicRef {
    public static void main(String[] args) {
        Student s1=new Student(0,"a");
        Student s2=new Student(1,"b");
        AtomicReference<Student> atomicRef = new AtomicReference<>(s1);//设置初始值s1
        for (int i = 0; i <5 ; i++) {
                new Thread(()->{
                    System.out.println(Thread.currentThread().getName()+"当前值:"+atomicRef.get());
                    if(atomicRef.compareAndSet(s1,s2)){//CAS算法,如果当前值为s1,则改为s2
                        System.out.println(Thread.currentThread().getName()+"->"+atomicRef.get());
                    }else {
                        System.out.println(Thread.currentThread().getName()+"修改失败,当前值:"+atomicRef.get());
                    }
                }).start();
        }
    }
}

运行结果:

Thread-1当前值:Student{id=0, name='a'}
Thread-3当前值:Student{id=0, name='a'}
Thread-4当前值:Student{id=0, name='a'}
Thread-2当前值:Student{id=0, name='a'}
Thread-0当前值:Student{id=0, name='a'}
Thread-2修改失败,当前值:Student{id=1, name='b'}
Thread-4修改失败,当前值:Student{id=1, name='b'}
Thread-3修改失败,当前值:Student{id=1, name='b'}
Thread-1->Student{id=1, name='b'}
Thread-0修改失败,当前值:Student{id=1, name='b'}

我们可以看到,只能有一个线程成功修改所引用对象的值。

更新器类

AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater是基于反射的实用工具,可以对指定类的指定volatile字段进行原子更新。
使用约束:

  • 字段必须是volatile类型的
  • 字段的描述类型是与调用者与操作对象字段的关系一致即调用者能够直接操作对象字段,那么就可以反射进行原子操作。
    但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。
  • 只能是实例变量,不能是类变量,即不能加static关键字。
  • 不能是final变量,实际上final的语义和volatile是有冲突的,这两个关键字不能同时存在。
  • 对于AtomicIntegerFieldUpdater 和AtomicLongFieldUpdater只能修改int/long类型的字段,不能修改其包装类型(Integer/Long),如果要修改包装类型就需要使用AtomicReferenceFieldUpdater。
  • AtomicIntegerFieldUpdater

看一下AtomicIntegerFieldUpdater的newUpdater方法,主要构建一个int类型的更新器

    @CallerSensitive//此方法的调用者必须有权限
    //第一个参数为域所属的对象的类对应class,第二个参数为域名,实例见下
    public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
                                                              String fieldName) {

        return new AtomicIntegerFieldUpdaterImpl<U>
            (tclass, fieldName, Reflection.getCallerClass());
    }
  • AtomicLongFieldUpdater
  • AtomicReferenceFieldUpdater

AtomicReferenceFieldUpdater的newUpdater方法构建一个引用类型的更新器

@CallerSensitive
//第一个参数为域所属对象的类的class,第二个参数为要修改的域的引用类型,第三个参数为域名
public static <U,W> AtomicReferenceFieldUpdater<U,W> newUpdater(Class<U> tclass,
                                                                    Class<W> vclass,
                                                                    String fieldName) {
        return new AtomicReferenceFieldUpdaterImpl<U,W>
            (tclass, vclass, fieldName, Reflection.getCallerClass());
    }

举一个引用类型线程不安全的例子,先构建一个Student类

class Student{
    private int id;
    private String name;

    public Student(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }
}

不安全的操作类:

public class UnsafeRef {

    public static void main(String[] args) throws InterruptedException {
        Student student=new Student(1,"unsafe");
        for (int i = 0; i <100 ; i++) {
            for (int j = 0; j <100 ; j++) {
                new Thread(()->{
                    student.setId(student.getId()+1);
                    student.setName("a"+student.getId());
                }).start();
            }
        }
        Thread.sleep(1000);
        System.out.println(student);
    }
}

输出为:Student{id=9962, name='a9962'}
使用AtomicReferenceFieldUpdater构建线程安全的对象操作类:

public class UnsafeRef {

    public static void main(String[] args) throws InterruptedException {
        Student student=new Student(0,"unsafe");
        AtomicIntegerFieldUpdater<Student> idUpdater = AtomicIntegerFieldUpdater.newUpdater(Student.class, "id");
        AtomicReferenceFieldUpdater<Student, String> nameUpdater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
        for (int i = 0; i <100 ; i++) {
            for (int j = 0; j <100 ; j++) {
                new Thread(()->{
                    idUpdater.addAndGet(student,1);
                    nameUpdater.getAndSet(student,"a"+idUpdater.get(student));
                }).start();
            }
        }
        Thread.sleep(1000);
        System.out.println(student);
    }
}

输出为:Student{id=10000, name='a10000'}
注意:
使用更新器时域必须是volatile修饰的,且必须保证调用者有权限,如果是private的,则不可修改。

    volatile int id;
    volatile String name;

数组类

AtomicIntegerArray,AtomicLongArray还有AtomicReferenceArray类进一步扩展了原子操作,对这些类型的数组提供了支持。
其内部并不是像AtomicInteger一样维持一个valatile变量,而是全部由native方法实现

  • AtomicIntegerArray
    类似之前的非安全自增类,我们来看一个线程不安全的数组操作类

    public class UnsafeArray {
      private int[] arr=new int[2];
    
      private void add(){
          arr[0]++;
          arr[1]++;
      }
      public static void main(String[] args) throws InterruptedException {
          UnsafeArray unsafeMul = new UnsafeArray();
          for(int i=0;i<100;i++){
              new Thread(()->{
                  for (int j = 0; j <100 ; j++) {
                      unsafeMul.add();
                  }
              }).start();
          }
          Thread.sleep(1000);
          System.out.println(unsafeMul.arr[0]+"->"+unsafeMul.arr[1]);//输出结果不为10000->10000
      }
    }

    使用AtomicIntegerArray将其改造为线程安全的

    public class SafeArray {
      private AtomicIntegerArray arr=new AtomicIntegerArray(new int[2]);
    
      private void add(){
          arr.addAndGet(0,1);//相当于arr[0]++;
          arr.addAndGet(1,1);//相当于arr[1]++;
          //或者使用accumulateAndGet 第一个参数为下标 第二个参数为更新值 第三个参数为操作方法
          //arr.accumulateAndGet(0,1, Integer::sum);
          //arr.accumulateAndGet(1,1, Integer::sum);
      }
      public static void main(String[] args) throws InterruptedException {
          SafeArray safeArray = new SafeArray();
          for(int i=0;i<100;i++){
              new Thread(()->{
                  for (int j = 0; j <100 ; j++) {
                      safeArray.add();
                  }
              }).start();
          }
          Thread.sleep(1000);
          System.out.println(safeArray.arr.get(0)+"->"+safeArray.arr.get(1));
      }
    }
  • AtomicLongArray

  • AtomicReferenceArray

    复合变量类

    AtomicReference的功能增强版,AtomicMarkableReference能够把引用跟一个boolean型绑定,AtomicStampedReference能够把引用和一个int型的版本号绑定来实现时间戳的功能

  • AtomicMarkableReference

  • AtomicStampedReference

    JDK1.8新增的类

  • DoubleAccumulator 、 LongAccumulator

支持自定义运算,将运算值作为更新值取代旧值

LongAcculator构造方法第一个参数传入运算方法,第二个参数传入初值

public LongAccumulator(LongBinaryOperator accumulatorFunction,
                           long identity) {
        this.function = accumulatorFunction;
        base = this.identity = identity;
    }

例,多线程求1-10000的和

public class Mul {
    private LongAccumulator num=new LongAccumulator(Long::sum,0);

    public static void main(String[] args) throws InterruptedException {
        Mul mul = new Mul();
        LongAccumulator num = mul.num;
        for (int i = 1; i <=10000; i++) {
            int finalI = i;
            new Thread(()->{
                num.accumulate(finalI);
            }).start();
        }
        Thread.sleep(1000);
        System.out.println(num.get());
    }
}
  • DoubleAdder、LongAdder

对 Double、Long的原子类性能进行优化提升,在高并发环境下更高效

  • Striped64

七 容器

同步容器

同步容器类包括Vector和Hashtable,二者是早期JDK的一部分,此外还包括在JDK1.2中添加的一些功能相似的类,这些同步的封装器类是由Collections.synchronizedXxx等工厂方法创建的。
这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法进行同步,使得每次只能有一个线程访问容器的状态。

  • 同步容器类的问题

    单独使用同步容器类中的方法是安全的,但若进行复合操作时不加锁,其他线程并发修改容器可能产生错误。
    Java并发实战举了getLast和removeLast的方法,百度大多也是直接拿来用这两个,但我在API中并未找到此方法,于是进行一个类似模拟。
    当getLast得到的index为1时,仍会报出数组越界异常错误,因为此时removeLast移除了唯一一个元素,于是CPU时间片再分给getLast线程时就会出错了。

public class VectorDemo {
    Vector<Integer> vector=new Vector<>();

    public Object getLast(Vector list){
        int index=list.size()-1;
        try {//模拟线程不安全操作
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return list.get(index);
    }
    public void deleteLast(Vector list){
        int index=list.size()-1;
        list.remove(index);
    }

    public static void main(String[] args) {
        VectorDemo vectorDemo = new VectorDemo();
        Vector<Integer> vector = vectorDemo.vector;
        vector.add(1);
        new Thread(()->{
            vectorDemo.getLast(vector);
        }).start();
        new Thread(()->{
            vectorDemo.deleteLast(vector);
        }).start();
    }

}

单线程时,若在使用foreach循环中进行了修改容器的操作,会抛出ConcurrentModificationException异常,如果修改为使用迭代器则不会出错,但如果是并发情况,迭代更新操作也会抛出异常。

单线程foreach中修改容器(不安全):

public class VectorDemo {

    public static void main(String[] args) {
        Vector<Integer> vector=new Vector<>();
        for (int i = 0; i <10 ; i++) {
            vector.add(i);
        }
        vector.forEach(e->{
            if (e.equals(5))
                vector.remove(e);
        });
    }
}

单线程迭代器修改容器(安全):

public class VectorDemo {

    public static void main(String[] args) {
        Vector<Integer> vector=new Vector<>();
        for (int i = 0; i <10 ; i++) {
            vector.add(i);
        }
        Iterator<Integer> iterator = vector.iterator();
        while(iterator.hasNext()){
            Integer next = iterator.next();
            if(next.equals(5)){
                iterator.remove();
            }
        }
    }
}

多线程迭代修改容器(不安全):

public class VectorDemo {

    public static void main(String[] args) {
       Vector<Integer> vector=new Vector<>();
        for (int i = 0; i <10 ; i++) {
            vector.add(i);
        }
        for (int i = 0; i <5 ; i++) {
            new Thread(()->{
                Iterator<Integer> iterator = vector.iterator();
                while(iterator.hasNext()){
                    Integer next = iterator.next();
                    if(next.equals(5)){
                        iterator.remove();
                    }
                }
            }).start();
        }
    }
}

并发容器

Java5.0提供了多种并发容器类来改进同步容器的性能,同步容器将所有对容器状态的访问都串行化,以实现它们的线程安全性。这种方法的代价是严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重降低。
Java5.0中增加了ConcurrentHashMap用来替代同步且基于散列的Map(在新的ConcurrentMap接口中增加了一些对常见复合操作的支持,例如若没有则添加、替换以及有条件删除等。),以及CopyOnWriteArrayList用于在遍历操作为主要操作的情况下代替同步的List。
Java5.0增加了两种新的容器类:Queue(底层用LinkedList实现)和BlockingQueue。Queue用来临时保存一组待处理元素,它提供了几种实现,包括:ConcurrentLinkedQueue,这是一个传统的FIFO队列,以及PriorityQueue,这是一个非并发的优先队列。Queue上的操作不会阻塞,如果队列为空获取元素的操作返回空值。虽然可用List模拟Queue,但Queue能去掉List的随机访问需求,实现更高效的并发。
BlockingQueue扩展了Queue,增加了可阻塞的插入和获取等操作。如果队列为空,那么获取元素的操作将一直阻塞,直到队列中出现一个可用的元素。如果队列已满(有界队列),那么插入元素的操作将一直阻塞,直到队列出现可用的空间。在生产者消费者模式中,阻塞队列非常有用。

  • CopyOnWriteArrayList

    用于替代同步List,在某些情况下提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。
    “写入时复制COW”容器的线程安全性在于,只要正确发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步,在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。COW容器返回的迭代器不会抛出ConcurrentModificationException异常,并且返回的元素与迭代器创建时的元素完全一致,不必考虑修改操作之后带来的影响。
    每次修改容器时都会复制底层数组,这需要一定开销,仅当迭代操作远多于修改操作时,才应该使用“写入时复制”容器。

    ArrayList不安全的add方法,有可能线程A在ensureCapacityInternal后用完了时间片,线程B添加了一个元素,A再继续执行,就会出错。
    public boolean add(E e) {
          ensureCapacityInternal(size + 1);  // Increments modCount!!
          elementData[size++] = e;
          return true;
      }
    CopyOnWriteArrayList线程安全的add方法,加了一把ReentrantLock锁。
    public boolean add(E e) {
          final ReentrantLock lock = this.lock;
          lock.lock();
          try {
              Object[] elements = getArray();
              int len = elements.length;
              Object[] newElements = Arrays.copyOf(elements, len + 1);
              newElements[len] = e;
              setArray(newElements);
              return true;
          } finally {
              lock.unlock();
          }
      }
  • ConcurrentHashMap
    线程不安全的HashMap
public class UnsafeDemo {

    public static void main(String[] args) throws InterruptedException {
        HashMap<Integer,Integer> map=new HashMap<>();
        for (int i = 0; i <10 ; i++) {
            int finalI = i;
            new Thread(()->{
                    map.put(finalI,finalI);
            }).start();
        }
        Thread.sleep(1000);
        System.out.println("size:"+map.size());
        Set<Integer> integers = map.keySet();
        for(int key:integers){
            System.out.println(key+"->"+map.get(key));
        }
    }
}

输出结果:

size:10
8->8
9->9
2->2
3->3
4->4
5->null
6->null
7->null

将HashMap改为ConcurrentHashMap后运行结果正常,那么为什么ConcurrentHashMap是线程安全的?
JDK1.8之前

  • ConcurrentHashMap并不是将每个方法都在同一个锁上同步并使每次只有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制成为分段锁。在这种机制中,任意数量的读线程可以并发访问Map,并且一定数量的写线程可以并发地修改Map。
  • ConcurrentHashMap在并发访问环境下将实现更高效的吞吐量,而在单线程环境中只损失非常小的性能。

JDK1.8

  • JDK1.8前是采用Segment + HashEntry + ReentrantLock的方式保证线程安全的,而JDK1.8中放弃了Segment臃肿的设计,使用了红黑树的数据结构进行了优化,采用Node+CAS + Synchronized来保证线程安全,synchronized只锁定当前链表或红黑树的首节点,这样只要hash不冲突,就不会产生并发,效率又得以提升。
  • JDK8中的实现也是锁分离的思想,它把锁分的比segment更细一些,只要hash不冲突,就不会出现并发获得锁的情况。它首先使用无锁操作CAS插入头结点,如果插入失败,说明已经有别的线程插入头结点了,再次循环进行操作。如果头结点已经存在,则通过synchronized获得头结点锁,进行后续的操作。性能比segment分段锁又再次提升。
  • get操作可以无锁是由于Node的元素val和指针next是用volatile修饰的,在多线程环境下线程A修改结点的val或者新增节点的时候是对线程B可见的。
  • BlockingQueue
    • 阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞直到有空间可用,如果队列为空,那么take方法将会阻塞直到有元素可用。队列可以有界也可以无界,无界队列永远都不会充满,因此无界队列上的put方法也不会阻塞。
    • 阻塞队列支持生产者-消费者这种设计模式,当数据生成时,生产者把数据放入队列,而当消费者准备处理数据时,将从队列中获取数据。BlockingQueue简化了生产者-消费者设计的实现过程,它支持任意数量的生产者和消费者,一种常见的生产者-消费者设计模式就是线程池与工作队列的组合,在Executor任务执行框架中就体现了这种模式。

使用阻塞队列实现的生产者消费者实例
厨师类:

public class Cook implements Runnable {

    private final LinkedBlockingQueue<Food> queue;

    public Cook(LinkedBlockingQueue queue){
        this.queue=queue;
    }

    @Override
    public void run() {
        while (true){
            if(queue.size()<10) {
                try {
                    System.out.println("开始生产食物");
                    Thread.sleep(1000);
                    queue.put(new Food());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

顾客类:

public class Customer implements Runnable {

    private final LinkedBlockingQueue<Food> queue;

    public Customer(LinkedBlockingQueue queue){
        this.queue=queue;
    }

    @Override
    public void run() {
        while (true){
            if(queue.size()>0){
                try {
                    queue.take();
                    Thread.sleep(1000);
                    System.out.println("吃食物完毕");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

测试类:

public class Test {
    public static void main(String[] args) {
        LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>(10);
        Customer customer = new Customer(queue);
        Cook cook = new Cook(queue);
        new Thread(customer).start();
        new Thread(cook).start();
    }
}

输出:

开始生产食物
开始生产食物
开始生产食物
吃食物完毕
吃食物完毕
开始生产食物
吃食物完毕
开始生产食物
吃食物完毕
开始生产食物
吃食物完毕
开始生产食物
吃食物完毕
开始生产食物...


八 同步工具类

同步工具类可以是任何一个对象,只要它根据其自身状态来协调线程的控制流。阻塞队列可以作为同步工具类,其它类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。在平台类库中还包含其他一些同步工具类的类,如果这些类还无法满足需要,那么构建自定义的同步工具类。
所有的同步工具类都包含一些特定的结构化属性:它们封装了一些状态,这些状态将决定执行同步工具类的线程是继续执行还是等待,此外还提供了一些对状态进行操作的方法,以及另一些用于高效等待同步工具类进入预期状态的方法。

闭锁

闭锁是一种同步工具类,可以延迟线程的进度直到其到达终点状态。其作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。
当闭锁达到结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如:

  • 确保某个计算在其需要的所有资源都初始化后才执行。
  • 确保某个服务在其依赖的所有其他服务都已经启动后才启动。
  • 等待直到某个操作的所有参与者都就绪再执行(多玩家游戏中所有玩家就绪后游戏开始)。
  • CountDownLatch

一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器到达0,这表示所有需要等待的事件都已发生,如果计数器的值非0,那么await会一直阻塞直到计数器为0,或者等待中的线程中断,或者等待超时。

一个利用闭锁计算n线程并发执行某任务的工具类实例

public class CountDown {

    public long runTime(int numOfThreads,Runnable task) throws InterruptedException {
        CountDownLatch start=new CountDownLatch(1);//“开始”闭锁 计数器1 1个开始都开始
        CountDownLatch end=new CountDownLatch(numOfThreads);//“结束”闭锁 计数器n 全部结束才结束

        for(int i=0;i<numOfThreads;i++){
            new Thread(()->{
                try {
                    start.await();//等待“开始"闭锁打开
                    try{
                        task.run();//执行任务task
                    }finally {
                        end.countDown();//每结束完一个task,“结束”闭锁计数器减1
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        long startTime=System.nanoTime();//开始时间
        start.countDown();//“开始”闭锁计数器减一,打开,后续线程可执行
        end.await();//等待所有线程执行完task
        long endTime = System.nanoTime();//结束时间
        return endTime-startTime;
    }

    public static void main(String[] args) throws InterruptedException {
        Runnable r= () -> {
            //...具体任务逻辑
        };
        long l = new CountDown().runTime(10, r);
        System.out.println(l);
    }
}

信号量

  • 计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
  • Semaphore中管理着一组虚拟的许可,许可的初始数值可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,
    那么acquire将阻塞直到有许可(或者直到被中断或者操作超时)。release方法将返回一个许可给信号量。
  • Semaphore可以用于实现资源池,如数据库连接池。我们可以构造一个固定长度的资源池,当池为空时,请求资源将会失败,但我们真正希望的是被阻塞而不是失败,并且当池非空时接触阻塞。如果将Semaphore的计数器初始化为池的大小,从在从池中获取一个资源之前首先调用acquire方法获得一个许可,将在资源返回给池后释放一个许可,那么acquire将一直阻塞直到资源池不为空。(也可使用阻塞队列实现),同样可使用Semaphore将任何一种容器变为有界阻塞容器。

使用信号量控制并发线程的实例,执行4线程,每次最多允许2线程执行:

public class SemaphoreTest {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);
        for (int i = 0; i <4 ; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"开始执行");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println(Thread.currentThread().getName()+"结束执行");
                    semaphore.release();
                }
            }).start();
        }
    }
}

输出为:

Thread-0开始执行
Thread-1开始执行
(约1秒后)
Thread-0结束执行
Thread-1结束执行
Thread-2开始执行
Thread-3开始执行
(约1秒后)
Thread-3结束执行
Thread-2结束执行

一道leetcode上利用信号量解决的题:

现在有两种线程,氢 oxygen 和氧 hydrogen,你的目标是组织这两种线程来产生水分子。
存在一个屏障(barrier)使得每个线程必须等候直到一个完整水分子能够被产生出来。
氢和氧线程会被分别给予 releaseHydrogen 和 releaseOxygen 方法来允许它们突破屏障。
这些线程应该三三成组突破屏障并能立即组合产生一个水分子。
你必须保证产生一个水分子所需线程的结合必须发生在下一个水分子产生之前。
换句话说:
如果一个氧线程到达屏障时没有氢线程到达,它必须等候直到两个氢线程到达。
如果一个氢线程到达屏障时没有其它线程到达,它必须等候直到一个氧线程和另一个氢线程到达。
书写满足这些限制条件的氢、氧线程同步代码。

class H2O {
    private Semaphore hydrogen=new Semaphore(2);//H的信号量 初值2
    private Semaphore oxygen=new Semaphore(0);//O的信号量 初值0
    public H2O() {

    }

    public void hydrogen(Runnable releaseHydrogen) throws InterruptedException {
        hydrogen.acquire(1);//需要获取1个H许可
        releaseHydrogen.run();
        oxygen.release(1);//每次释放1个,这样释放2次O线程才能执行,满足2个H搭配1个O
    }

    public void oxygen(Runnable releaseOxygen) throws InterruptedException {
        oxygen.acquire(2);//O线程获取到2个O许可才能继续执行=需要H线程成功执行2次
        releaseOxygen.run();
        hydrogen.release(2);//释放2个H许可
    }
}

栅栏

  • 栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置才能继续执行。闭锁一般用于某个线程等待其他线程都执行完毕才执行,不可重复使用,而栅栏用于一组线程互相等待到某个状态再执行,可重复使用。
  • CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常讲一个问题拆分为一系列相互独立的子问题。当线程到达栅栏位置将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException异常。
  • 另一种形式的栅栏是Exchanger,它是一种两方栅栏,各方在栅栏位置上交换数据。当两方执行不对称操作时,Exchanger会非常有用,例如当一个线程向缓冲区写入数据而另一个线程从缓冲区读取数据。

下面来看一个CyclicBarrier栅栏的实例

class CycTest {
    public static void main(String[] args) throws InterruptedException {
        //构造方法的第一个参数代表一共需要等待的数量
        //第二个参数表示栅栏打开后执行的任务,Runnable类型,可不写
        CyclicBarrier barrier=new CyclicBarrier(5,()->{
            System.out.println("所有人都到了");
        });
        for (int i = 0; i <5 ; i++) {
            int finalI = i;
            new Thread(()->{
                try {
                    System.out.println("第"+(finalI +1)+"个人到了");
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

输出:

第1个人到了
第5个人到了
第4个人到了
第3个人到了
第2个人到了
所有人都到了

因此上面的H20问题也可搭配栅栏解决(效率低,但更好理解)

class H2O {
    private Semaphore hydrogen=new Semaphore(2);//初始可以获取2个H
    private Semaphore oxygen=new Semaphore(1);//初始可以获取1个O
    //栅栏打开后 再创造2个H和1个O
    private CyclicBarrier barrier=new CyclicBarrier(3,()->{
        hydrogen.release(2);
        oxygen.release(1);
    });

    public H2O() {

    }

    public void hydrogen(Runnable releaseHydrogen) throws InterruptedException {
        try{
            hydrogen.acquire();//获取1个H
            releaseHydrogen.run();
            barrier.await();//等栅栏
        }catch (Exception ignored) {
        }

    }

    public void oxygen(Runnable releaseOxygen) throws InterruptedException {
        try{
            oxygen.acquire();//获取1个O
            releaseOxygen.run();
            barrier.await();//等栅栏
        }catch (Exception ignored) {
        }
    }
}

再看一个简单的Exchanger栅栏的实例

public class ExchangerTest {
    public static void main(String[] args) {
        String str1="36D";
        String str2="DoubleA";
        Exchanger<String> exchanger = new Exchanger<>();
        new Thread(()->{
            System.out.println("交易前:"+Thread.currentThread().getName()+"'s wife's cup: "+str1);
            try {
                String ex=exchanger.exchange(str1);
                System.out.println("交易后:"+Thread.currentThread().getName()+"'s wife's cup: "+ex);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"A").start();
        new Thread(()->{
            System.out.println("交易前:"+Thread.currentThread().getName()+"'s wife's cup: "+str2);
            try {
                String ex=exchanger.exchange(str2);
                System.out.println("交易后:"+Thread.currentThread().getName()+"'s wife's cup: "+ex);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"B").start();
    }
}

输出:

交易前:A's wife's cup: 36D
交易前:B's wife's cup: DoubleA
交易后:B's wife's cup: 36D
交易后:A's wife's cup: DoubleA

注意Exchanger只能成对使用。


九 Executor框架和线程池

无限制创建线程的不足

在生产环境中,为每个任务分配一个线程这种方法存在一些缺陷,尤其是当需要创建大量的线程时:

  • 线程声明周期的开销非常高,如果请求的到达率非常高且请求的处理过程是轻量级的,例如大多数服务器应用程序就是这种情况,那么为每个请求创建一个新线程将消耗大量的计算资源。
  • 资源消耗,活跃的线程会消耗系统资源,尤其是内存,如果可运行的线程数量多于可用处理器的数量,那么有些线程将闲置。大量空闲的线程会占用许多内存,给GC带来压力,而且大量线程在竞争CPU资源时还将产生其他性能开销,如果已经有足够多的线程使所有CPU保持忙碌状态,那么再创建线程反而降低性能。
  • 稳定性,在可创建线程的数量上存在一个限制,该限制跟平台有关,并且受多个因素制约,包括JVM启动参数、Thread构造函数中请求栈的大小,以及底层操作系统对线程的限制等。如果破坏了这些限制很可能抛出内存溢出异常。

在一定范围内,增加线程可以提高系统的吞吐率,但如果超过该范围再创建更多线程只会降低程序的执行速度,并且如果过多创建一个线程,那么整个应用程序将崩溃。想要避免这种危险,就应对应用程序可以创建的线程数量进行限制,并全面对应用程序测试,确保在线程数量达到限制时程序也不会耗尽资源。
为每个任务分配一个线程的问题在于没有限制可创建线程的数量,只限制了远程用户提交HTTP请求的速率,与其他并发危险一样,在原型设计和开发阶段无限制创建线程或许可以良好运行,一旦在应用部署后并处于高负载下运行时,才会有问题不断暴露。

Executor

虽然Executor是个简单的接口,但它却为灵活而强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。
Executor基于生产者-消费者模式,提交任务的操作相当与生产者(生产待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。如果要在程序中实现一个生产者-消费者的设计,最简单的方式通常就是使用Executor。

执行策略

通过将任务的提交与执行解耦开来,从而无需太大的困难就可为某种类型的任务制定和修改执行策略。在执行策略中定义了任务执行的What、Where、When、How等方面,包括:

  • 在什么(What)线程中执行任务?
  • 任务按照什么(What)顺序执行(FIFO、LIFO、优先级)?
  • 有多少个(How Many)任务能并发执行?
  • 在队列中有多少个(How Many)任务在等待执行?
  • 如果系统由于过载需要拒绝一个任务,那么应该选择哪一个(Which)任务?另外,如何(How)通知应用程序有任务被拒绝?
  • 在执行一个任务之前或之后,应该进行哪些(What)动作?

各种执行策略都是一种资源管理工具,最佳策略取决于可用的计算资源以及对服务质量的需求。通过限制并发任务的数量,可用确保应用程序不会由于资源耗尽而失败,或者由于在稀缺资源上发生竞争而严重降低性能。通过将任务的提交与任务的执行策略分离开来,有助于在部署阶段选择与可用硬件资源最匹配的执行策略。
每当看到new Thread(runnable).start()这种代码,并且希望获得一种更灵活的执行策略时,考虑使用Executor来代替Thread。

线程池

从字面含义看,是指管理一组同构工作线程的资源池。线程池是与工作队列密切相关的,其中在工作队列中保存了所有等待执行的任务。工作者线程(Worker)的任务很简单,从工作队列获取一个任务,执行任务,然后返回线程池并等待下一个任务。

  • 使用线程池的好处

    在线程池中执行任务比为每个任务分配一个线程优势更多,通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和撤销过程中产生的巨大开销。另一个好处是:当请求到达时,工作线程通常已经存在,因此不会由于等待线程创建而延迟任务的执行,从而提高了响应性。通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。

  • 线程池的创建

ThreadPoolExecutor的通用构造方法

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
                              TimeUnit unit,BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        //....
    }

构造方法参数含义

(1)corePoolSize: 线程池基本大小,也就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。
(2)maximumPoolSize: 线程池最大大小,表示可同时活动的线程数量的上限。
(3)keepAliveTime:存活时间,如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过基本大小时,这个线程将被终止。
(4)unit: 线程池维护线程所允许的空闲时间的单位。 unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
(5)workQueue: 线程池所使用的阻塞队列。阻塞队列的选择:

  • ArrayBlockingQueue:一个有边界的阻塞队列,它的内部实现是一个数组。它的容量在初始化时就确定不变。
  • LinkedBlockingQueue:阻塞队列大小的配置是可选的,其内部实现是一个链表。
  • PriorityBlockingQueue:是一个没有边界的队列,所有插入到PriorityBlockingQueue的对象必须实现java.lang.Comparable接口,队列优先级的排序就是按照我们对这个接口的实现来定义的。
  • SynchronousQueue队列内部仅允许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。

(6)thread factory:使用的创建线程工厂方法,可省略,将使用默认工厂
(7)handler:所用的拒绝执行处理策略,可省略,将使用默认拒绝执行策略。handler有四个选择:

  • AbortPolicy(): 抛出java.util.concurrent.RejectedExecutionException异常
  • CallerRunsPolicy(): 重试添加当前的任务,他会自动重复调用execute()方法
  • DiscardOldestPolicy(): 抛弃旧的任务
  • DiscardPolicy(): 抛弃当前的任务

可以通过调用Executors中的静态工厂方法之一来创建一个线程池

以下4种静态工厂方法内部实现都是调用了ThreadPoolExecutor方法的某个重载方法。

  • newFixedThreadPool,创建一个固定长度的线程池,每当提交一个任务就创建一个线程,直到达到线程池的最大数量,这是线程池的规模将不再变化(如果某个线程由于发生了未预期的异常而结束,那么线程池会补充一个新的线程)。
    将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程不会超时。
  • newCachedThreadPool,创建一个可缓存的线程池,如果线程池的当前规模超过了处理器需求,那么将回收空闲的线程,而当需求增加时,可以添加新的线程,线程池的规模不存在任何限制。
    将线程池的最大大小设置为Integer.MAX_VALUE,而将基本大小设置为0,并将超时设为1分钟,这种方法创建出的线程池可被无限扩展,并当需求降低时自动收缩。
  • newFixedThreadPoolnewCachedThreadPool这两个工厂方法返回通用的ThreadPoolExecutor实例,这些实例可以直接用来构造专门用途的executor。
  • newSingleThreadExecutor,一个单线程的Executor,创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来代替。确保依照任务在队列中的顺序来串行执行。
    • newScheduledThreadPool,创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer。
  • 线程池工作原理

在这里插入图片描述

通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是
Runnable类型对象的run()方法。

1 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。即使队列里面有任务,线程池也不会马上执行它们。
2 当一个任务通过execute(Runnable)方法欲添加到线程池时:

  • 1 :如果workerCount<corePoolSize,那么创建并启动一个线程执行新提交的任务。
  • 2:如果workerCount>=corePoolSize,且线程池内的阻塞队列未满,那么将这个任务放入队列。
  • 3:如果workerCount>=corePoolSize,且阻塞队列已满,若满足workerCount<maximumPoolSize,那么还是要创建并启动一个线程执行新提交的任务。
  • 4:若阻塞队列已满,并且workerCount>=maximumPoolSize,则根据 handler所指定的策略来处理此任务,默认的处理方式直接抛出异常。
  • 也就是处理任务的优先级为: 核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

3 当一个线程完成任务时,它会从队列中取下一个任务来执行。
4 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于corePoolSize时,那么这个线程会被停用掉,所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。

线程池实例1——使用ThreadPoolExecutor创建线程池

public class ThreadPoolTest {
    public static void main(String[] args) {
        BlockingDeque<Runnable> queue=new LinkedBlockingDeque<>(2);
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(2,4,1, TimeUnit.SECONDS,queue);
        for (int i = 0; i <6 ; i++) {
            threadPoolExecutor.submit(()->{
                try {
                    System.out.println(Thread.currentThread().getName()+"开始执行");
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println(Thread.currentThread().getName()+"执行结束");
                }
            });
        }
    }
}

运行结果:

pool-1-thread-1开始执行
pool-1-thread-4开始执行
pool-1-thread-3开始执行
pool-1-thread-2开始执行
pool-1-thread-4执行结束
pool-1-thread-1执行结束
pool-1-thread-2执行结束
pool-1-thread-3执行结束
pool-1-thread-2开始执行
pool-1-thread-4开始执行
pool-1-thread-4执行结束
pool-1-thread-2执行结束

可以看到,最多有4个线程执行,这跟我们设置的线程池最大大小有关。
线程池实例2——使用newFixedThreadPool创建线程池

public class ThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        for (int i = 0; i <6 ; i++) {
            threadPool.submit(()->{
                try {
                    System.out.println(Thread.currentThread().getName()+"开始执行");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println(Thread.currentThread().getName()+"执行结束");
                }
            });
        }
    }
}

运行结果:

pool-1-thread-2开始执行
pool-1-thread-1开始执行
pool-1-thread-1执行结束
pool-1-thread-2执行结束
pool-1-thread-1开始执行
pool-1-thread-2开始执行
pool-1-thread-2执行结束
pool-1-thread-1执行结束
pool-1-thread-2开始执行
pool-1-thread-1开始执行
pool-1-thread-2执行结束
pool-1-thread-1执行结束

可以看到,每次只有2个线程在执行任务,2即我们设置的固定线程池大小。

线程池实例3——使用newCachedThreadPool创建线程池

public class ThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newCachedThreadPool();
        for (int i = 0; i <6 ; i++) {
            threadPool.submit(()->{
                try {
                    System.out.println(Thread.currentThread().getName()+"开始执行");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println(Thread.currentThread().getName()+"执行结束");
                }
            });
        }
    }
}

运行结果:

pool-1-thread-1开始执行
pool-1-thread-6开始执行
pool-1-thread-4开始执行
pool-1-thread-5开始执行
pool-1-thread-3开始执行
pool-1-thread-2开始执行
pool-1-thread-2执行结束
pool-1-thread-4执行结束
pool-1-thread-6执行结束
pool-1-thread-5执行结束
pool-1-thread-1执行结束
pool-1-thread-3执行结束

可以看到,newCachedThreadPool线程池的大小是没有限制的。

线程池实例4——使用newSingleThreadPool创建线程池

public class ThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        for (int i = 0; i <6 ; i++) {
            threadPool.submit(()->{
                try {
                    System.out.println(Thread.currentThread().getName()+"开始执行");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println(Thread.currentThread().getName()+"执行结束");
                }
            });
        }
    }
}

运行结果:

pool-1-thread-1开始执行
pool-1-thread-1执行结束
pool-1-thread-1开始执行
pool-1-thread-1执行结束
pool-1-thread-1开始执行
pool-1-thread-1执行结束
pool-1-thread-1开始执行
pool-1-thread-1执行结束
pool-1-thread-1开始执行
pool-1-thread-1执行结束
pool-1-thread-1开始执行
pool-1-thread-1执行结束

可见,每次只有一个线程执行任务,是单线程的。
线程池实例5——使用newScheduledThreadPool创建线程池

public class ThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
        for (int i = 0; i <3 ; i++) {
            service.schedule(()->{
                System.out.println(Thread.currentThread().getName()+"开始执行");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },3,TimeUnit.SECONDS);
        }
    }
}

运行结果:

(3秒后)//设置定时时间为3秒
pool-1-thread-1开始执行
pool-1-thread-2开始执行
(1秒后)//任务休眠1秒 因为设置固定线程池大小为2 需要等待另一线程执行完毕
pool-1-thread-1开始执行

只有2个线程是创建线程时我们传入构造方法的参数决定,间隔3秒是schedule延时执行方法我们传入的参数3和单位Seconds决定。

生命周期

  • Executor的实现通常会创建线程来执行任务,但JVM只有在所有非守护线程全部终止后才会退出,因此如果无法正确关闭Executor,那么JVM将无法结束。
  • 由于Executor以异步方式执行任务,因此在任何时刻,之前提交任务的状态不是立即可见的。有些任务可能已经完成,有些可能正在运行,而其他的任务可能在队列中等待执行。当关闭应用程序时,可能采用最平缓的关闭方式(完成所有已启动的任务,并且不再接受任何新任务),也可能采取最粗暴的关闭形式(直接关掉电源),以及其他各种可能的形式。既然Executor是为应用程序提供服务的,因而它们也是可关闭的(无论采取平缓的还是粗暴的形式),并将在关闭操作中受影响的任务的状态反馈给应用程序。
  • 为了解决服务的生命周期问题,Executor扩展了ExecutorService接口,添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的遍历方法)。
  • ExecutorService的生命周期有3种状态:运行、关闭和已终止。
  • ExecutorService在初始创建时处于运行状态。
    shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括那些还未开始执行的任务。
    shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。
  • 在ExecutorService关闭后提交的任务将有“拒绝执行处理器REH”来处理,它会抛弃任务,或者使得execute方法抛出一个未检查的RejectedExecutionException。
    等所有任务都完成后,ExecutorService将转入终止状态。
    可以调用awaitTermination来等待ExecutorService到达终止状态,或者通过调用isTerminated来轮询ExecutorService是否已终止。
    通常在调用awaitTermination后会理解调用shutdown,从而产生同步地关闭ExecutorService的效果。

携带结果的任务Callable和Future

  • Executor框架使用Runnable作为其基本的任务表示形式,Runnbale是一种有很大局限的抽象,虽然run能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常。
  • 许多任务实际上都是存在延迟的计算——执行数据库查询,从网络上获取资源,或者计算某个复杂功能。对于这些任务,Callable是一种更好的抽象:它认为主入口点(call)将返回一个值,并可能抛出一个异常。
  • 在Executor中包含了一些辅助方法能将其他类型的任务封装为一个Callable,例如Runnable和java.security.PrivilegedAction。
  • Runnable和Callable描述的都是抽象的计算任务,这些任务通常是有范围的,即都有一个明确的起始点,并且最终会结束。Executor执行的任务有4个生命周期阶段:创建、提交、开始和完成。由于有些任务可能要执行很长时间,因此通常希望能取消这些任务。在Executor框架中,已提交但尚未开始的任务可以取消,但对于已经开始执行的任务,只有当它们能响应中断时才能取消。取消一个已经完成的任务不会有任何影响。
  • Future表示一个任务的生命周期,并提供了响应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。在Future规范中包含的隐含意义是,任务的生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,它就永远停留在”完成“状态上。

步骤

1.创建Callable的实现类,并冲写call()方法,该方法为线程执行体,并且该方法有返回值

2.创建Callable的实例,并用FutuerTask类来包装Callable对象,该FutuerTask封装了Callable对象call()方法的返回值
3.实例化FutuerTask类,参数为FutuerTask接口实现类的对象来启动线程
4.通过FutuerTask类的对象的get()方法来获取线程结束后的返回值

FutureTask实现了两个接口,Runnable和Future,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值,那么这个组合的使用有什么好处呢?
假设有一个很耗时的返回值需要计算,并且这个返回值不是立刻需要的话,那么就可以使用这个组合,用另一个线程去计算返回值,而当前线程在使用这个返回值之前可以做其它的操作,等到需要这个返回值时,再通过Future得到。

实例:

public class ThreadPoolTest {
    public static void main(String[] args) throws Exception {
        Call call = new Call();
        FutureTask<Integer> futureTask = new FutureTask<>(call);
        new Thread(futureTask).start();
        try{
            Thread.sleep(5000);//模拟耗时操作
            System.out.println("取得结果:"+futureTask.get());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

class Call implements Callable<Integer>{

    @Override
    public Integer call() throws Exception {
        int sum=0;
        for(int i=1;i<=10;i++){
            sum+=i;
        }
        return sum;
    }
}
------------------------------------------------------
也可使用线程池的submit执行Callable并取得结果
public class ThreadPoolTest {
    public static void main(String[] args) throws Exception {
        Call call = new Call();
        ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
        Future<Integer> future = threadExecutor.submit(call);
        try{
            Thread.sleep(5000);//模拟耗时操作
            System.out.println("取得结果:"+future.get());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

运行结果:

(5秒后)
取得结果:55

实现自己的线程池

  • 创建Executor接口
    public interface MyExecutor {
      void execute(Runnable command);
    }
  • 创建拒绝策略接口
    public interface RejectHandler {
      void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor);
    }
  • Executor的实现类
public class MyThreadPoolExecutor implements MyExecutor{
    //线程池的基本大小
    private int coreSize;
    //线程池最大大小
    private int maxSize;
    //线程池任务阻塞队列
    private BlockingQueue<Runnable> taskQueue;
    //线程池拒绝策略
    private RejectHandler handler;
    //当前正在运行的线程数,初始为0
    private AtomicInteger runningCount = new AtomicInteger(0);
    //构造方法
    public MyThreadPoolExecutor(int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue) {
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.handler = new MyRejectHandler();
    }
    //线程池的执行
    @Override
    public void execute(Runnable task) {
        int count = runningCount.get();// 正在运行的线程数
        if (count < coreSize) {// 如果workerCount<corePoolSize,那么创建并启动一个线程执行新提交的任务。
            if (addWorker(task, true)) {// 这里不一定添加成功,addWorker()方法里面还要判断一次是不是确实小
                return;
            }
        }
        // 如果workerCount>=corePoolSize,且线程池内的阻塞队列未满,那么将这个任务放入队列。
        if (!taskQueue.offer(task)) {//offer失败会返回false
            // 如果workerCount>=corePoolSize,且阻塞队列已满,若满足workerCount<maximumPoolSize,那么还是要创建并启动一个线程执行新提交的任务
            if (!addWorker(task, false)) {
                // 若阻塞队列已满,并且workerCount>=maximumPoolSize,则根据 handler所指定的策略来处理此任务
                handler.reject(task, this);
            }
        }
    }
    //添加线程到阻塞队列
    private boolean addWorker(Runnable newTask, boolean core) {
        // 自旋判断是不是真的可以创建一个线程
        for (; ; ) {
            int count = runningCount.get();// 正在运行的线程数
            int max = core ? coreSize : maxSize; // 核心线程还是非核心线程
            // 不满足创建线程的条件,直接返回false
            if (count >= max) {
                return false;
            }
            // 修改runningCount成功,可以创建线程
            if (runningCount.compareAndSet(count, count + 1)) {
                // 创建线程并启动
                new Thread(() -> {
                    // 运行的任务
                    Runnable task = newTask;
                    // 不断从任务队列中取任务执行,如果取出来的任务为null,则跳出循环,线程也就结束了
                    while (task != null || (task = getTask()) != null) {
                        try {
                            task.run(); // 执行任务
                        } finally {
                            task = null; // 任务执行完成,置为空
                        }
                    }
                }).start();
                break;
            }
        }
        return true;
    }
    //获取任务
    private Runnable getTask() {
        try {
            return taskQueue.take();// take()方***一直阻塞直到取到任务为止
        } catch (InterruptedException e) {
            runningCount.decrementAndGet();// 当前线程结束,把runningCount的数量减一
            return null;// 线程中断了,返回null可以结束当前线程
        }
    }
    //拒绝策略实现类
    public static class MyRejectHandler implements RejectHandler {
        public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
            System.out.println(task.toString()+"拒绝执行");
        }
    }
}
  • 测试类
    跟之前创建线程池的实例区别不大,只是把new ThreadPoolExecutor改为new MyThreadPoolExecutor。
    public class Test {
      public static void main(String[] args) {
          MyThreadPoolExecutor threadPool =new MyThreadPoolExecutor(2, 4, new LinkedBlockingQueue<>(2));
          for (int i = 0; i < 6; i++) {
              threadPool.execute(()->{
                  System.out.println(Thread.currentThread().getName()+"执行");
                  try {
                      Thread.sleep(500);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              });
          }
      }
    }

补充知识

Java中的线程是内核线程,Thread的start、join、sleep、yield等方法的实现都是通过本地的native方法,JVM不能直接操作CPU资源。
在这里插入图片描述
内核空间并不会维护用户级线程的线程表,对内核来说无论进程创建了多少线程,都只给进程分配时间片,而不知道进程中有多少线程。
在这里插入图片描述

  • 证明Java线程是内核级线程
      //一个测试类:
      public class Test {
          public static void main(String[] args) {
              for(int i=0;i<100;i++){
                  new Thread(()->{
                      try {
                          Thread.sleep(10000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }).start();
              }
          }
      }
    (1)当测试类未启动时
    在这里插入图片描述
    (2)测试类启动后
    在这里插入图片描述

测试类启动前线程数在2300左右浮动,而启动后骤增至2500,可以看出Java线程是内核级线程,可***作系统感知到。

  • Java线程创建依赖于系统内核,通过JVM调用系统库创建内核线程,内核线程与Java-Thread是1:1的映射关系。
    在这里插入图片描述)在这里插入图片描述
#Java工程师##学习路径##笔记#
全部评论
👴给自己挽个尊
1 回复
分享
发布于 2019-12-17 20:13
加油 ,跟在大佬屁股后面学😅
点赞 回复
分享
发布于 2019-12-17 21:05
淘天集团
校招火热招聘中
官网直投
🆙
点赞 回复
分享
发布于 2019-12-17 21:55
加油~太厉害了😊
点赞 回复
分享
发布于 2020-01-04 14:01

相关推荐

点赞 评论 收藏
转发
10 49 评论
分享
牛客网
牛客企业服务