Java之AQS框架底层原理分析

写在前面

1、内容都是原创(少量图片和官方文字有引用),是本人根据源码一行一行的研读,最后手写汇总给大家的

2、网上各种转发来转发去的AQS教程,真的看吐了,一堆错的知识点还在转发来转发去的,本人觉得这类知识还是不要看这类文章为好,自己跟着源码去理解才是正道

3、认真看完我写的每一句话,一定对你有帮助,可能会有少许错别字和疏忽点,请大家务必评论指正哈!

一、AQS框架简述

Java中的大部分同步类(Lock、Semaphore、ReentrantLock等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的。AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。本文会从应用层逐渐深入到原理层,并通过ReentrantLock的基本特性和ReentrantLock与AQS的关联,来深入解读AQS中独占锁的逻辑、Sync Queue(下面都称"同步队列")和Condition Queue(下面都称"等待队列"),不讲述包含共享锁和的部分。

二、临界资源的保护

1、概念

临界资源最重要的就是要维护好线程安全,防止出现冲突或者脏数据等等

2、学习

案例:多线程并发插入HashMap
public static void main(String[] args) throws InterruptedException {
    //1、一个非线程安全的Map
    Map<Integer, String> map = new HashMap<>();
    //2、多线程插入
    new Thread(() -> {
        for (int i = 0; i < 10000; i++) {
            map.put(i, i+"");
        }
    }).start();

    new Thread(() -> {
        for (int i = 10000; i < 20000; i++) {
            map.put(i, i+"");
        }
    }).start();
    //3、阻塞-保证两个线程跑完
    Thread.sleep(2000);
    //4、输出结果
    System.out.println(map.size());
}
很明显,我们大概率得不到20000结果,循环越多,误差就越大(关于HashMap的插入源码,值得深入学习,这里不做过多阐述)

这时候,我们有三种解决办法

1、将map变成ConcurrentHashMap,变成线程安全的Map,这样解决了插入冲突(可以了解下jdk7和8对ConcurrentHashMap、HashMap的优化)

2、利用同步代码关键字synchronized保证临界资源map的线程安全

3、用Lock接口提供的锁方法,锁住临界资源,如下:
public static void main(String[] args) throws InterruptedException {
    //1、一个非线程安全的Map
    Map<Integer, String> map = new HashMap<>();
    Lock lock = new ReentrantLock();//创建可重入锁对象
    //2、多线程插入
    new Thread(() -> {
        for (int i = 0; i < 10000; i++) {
          	//加锁
            lock.lock();
            try {
                map.put(i, i+"");
            } finally {
              	//释放锁
                lock.unlock();
            }
        }
    }).start();

    new Thread(() -> {
        for (int i = 10000; i < 20000; i++) {
          	//加锁
            lock.lock();
            try {
                map.put(i, i+"");
            } finally {
              	//释放锁
                lock.unlock();
            }
        }
    }).start();
    //3、阻塞-保证两个线程跑完
    Thread.sleep(2000);
    //4、输出结果
    System.out.println(map.size());
}
显然,这里的输出结果是20000,因为是线程安全的

那么可重入锁ReentrantLock的lock方法下面到底是什么原理呢,可重入又是如何从代码层面去实现的呢,下面为大家一一阐述

三、AQS的几个重要状态量

1、等待状态

AQS本身是通过一种FIFO的双向队列实现的,它的好处在于节点之间可以相互影响和操作,AQS的一个重要内部类Node,我们来看看几个重要的数据域:
//1、下面两个对象分别代编代表共享锁和独占锁(排它锁),可以理解成枚举
static final Node SHARED = new Node();//共享锁
static final Node EXCLUSIVE = null;//排它锁

//2、四种等待状态
/** 表示线程获取锁的请求已经取消了 */
static final int CANCELLED =  1;
/** 下一个节点(线程)需要被唤醒 */
static final int SIGNAL    = -1;
/** 表示节点在等待队列中,节点线程等待唤醒 */
static final int CONDITION = -2;
/** 共享锁模式下才会用到 */
static final int PROPAGATE = -3;

//3、常见变量
volatile int waitStatus;//等待状态
volatile Node prev;//同步队列-前置节点
volatile Node next;//后置节点
volatile Thread thread;//节点包含的线程-类似线程池的Worker持有线程
Node nextWaiter;//指向下一个处于CONDITION状态的节点/SHARED

2、state变量。
AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。

AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。

state>0意味着锁已经被获取了(次数取决于state大小),state=0意味着锁已经被释放了(临界资源可以尝试被获取,0也是Node节点初始化值),简要原理图如下:

img

3、AQS整体架构图

调用顺序自上而下(引用美团的技术文档,看看就好,这里不用深究,因为有不严谨的地方)

img

四、ReentrantLock

1、概念

ReentrantLock是可重入锁,可重入的概念是线程可以重复获取临界资源、对其加锁,我们结合对ReentrantLock的代码分析,深入理解AQS

2、lock()方法

1、Sync是ReentrantLock的内部类,并且有两个实现类,分别是公平锁和非公平锁

2、所以该lock方法有两个实现
public void lock() {
    sync.lock();
}
NonfairSync.lock()

1、利用CAS算法更改锁的状态值,非公平锁上来就会来这里抢锁

2、成功,将独占锁的线程换成当前线程

3、失败,去排队获取锁
final void lock() {

    if (compareAndSetState(0, 1))//1 
        setExclusiveOwnerThread(Thread.currentThread());//2
    else
        acquire(1);
}
FairSync.lock()
1、直接排队获取锁(体现了公平)
final void lock() {
    acquire(1);
}
3、acquire(int arg)

非公平锁抢占失败/公平锁直接进入此方法,先说总体思路:

1、尝试获取锁

2、成功则结束acquire()方法结束

3、失败则将该节点加入同步队列,并且是排它锁模式==》Node.EXCLUSIVE

4、排队自旋获取锁

5、如果线程是通过打断的方式退出阻塞状态的,则再次打断当前线程(Thread标志位=true),这里涉及到Java的协作打断知识,一定要了解
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&//1
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//3、4
        selfInterrupt();//5
}//2
重点:当前节点尝试获取锁之前,先判断自己是不是头节点的下一个节点(头节点只是个空节点,作标志位)
NonfairSync.tryAcquire(int acquires)

1、判断当前锁是否被占用(state是否=0)

2、是,则CAS维护state

成功,则设置独占锁的持有线程为当前线程,结束获取锁流程;

失败,则尝试失败,进入上面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

3、否,则判断是有锁的线程是否是当前线程

是,state值增加,代表获取锁成功

否,同 2-失败
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {//1-是
        if (compareAndSetState(0, acquires)) {//2
            setExclusiveOwnerThread(current);//2-成功
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {//1-否
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);//3-是
        return true;
    }
    return false;//2-失败、3-否
}

FairSync.tryAcquire(int acquires)
1、唯一的区别在hasQueuedPredecessors():true = 同步队列除了头节点外,有其他节点并且不是自己,意味着要去排队获取锁
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() && //1也体现了公平锁的特点-前面有人就去排队
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

addWaiter(Node mode)

1、包装一个节点:当前线程对象、锁模式(共享or独占)

2、如果同步队列尾节点不为空,则进行快速操作=》指针+CAS设置同步队列的尾节

3、如果快速操作失败,进行完整入队操作:自旋+CAS
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);//1
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {//2
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);//3
    return node;
}

acquireQueued(final Node node, int arg)

自旋:只有当该节点的前驱节点是头节点时,才去尝试获取锁

1、当前节点在同步队列中的前驱节点是否是头节点

是,尝试获取锁tryAcquire(arg)

否,shouldParkAfterFailedAcquire=》判断是否阻塞该线程(阻塞条件:前驱节点的等待状态为SIGNAL)

是,则阻塞等唤醒/打断

否,返回是否被打断

2、如果失败,取消尝试获取锁,这个方法里应该不会进入cancelAcquire方法,除非其他异常
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {//1
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
          	//不是头节点的后驱节点&nbs***bsp;尝试获取锁失败
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);//2
    }
}

shouldParkAfterFailedAcquire()

1、判断前驱节点的等待状态是否是SIGNAL状态

是,则放心阻塞,返回true

否,判断前驱节点是否处于CANCELLED状态

是,则循环移除同步队列中处于取消状态的节点

否,则设置前驱节点的等待状态为SIGNAL
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0&nbs***bsp;PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt()

1、阻塞当前线程

2、若被唤醒或者当前线程被打断,执行下一步

3、返回打断状态,并重置标志位=false(前面说过了)
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);//1
    return Thread.interrupted();//3
}

cancelAcquire(Node node)
后面维护这个方法,有兴趣的可以自己先读读源码和注释
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    //&nbs***bsp;signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

全部执行完毕之后,回到了这里
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      	//如果被打断了,则把当前线程打断==》Thread.currentThread().interrupt();
        selfInterrupt();
}


5、unlock()方法
加锁并执行完业务之后,自然是要释放掉锁(一定要在finally代码块中释放,理由不用多说),代码如下:
public void unlock() {
  	//调用AQS的release方法,这个方法是框架提供的
    sync.release(1);
}

6、release(int arg)
public final boolean release(int arg) {
    if (tryRelease(arg)) {//调用实现类的方法,我们看看开发者如何自定义的
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒头节点的下一个节点==》LockSupport.unpark(h.next.thread);
        return true;
    }
    return false;
}

tryRelease(int releases)

干了两件事

1、将独占线程置空

2、维护state

同时我们发现,这个方法是写在Sync类里面的,没有区分公平/非公平锁,说明释放锁的逻辑是共用的,没有区别

该方法可以传入指定的获取锁的次数releases,这注定ReentrantLock是可重入的
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

unparkSuccessor(Node node)

1、如果当前节点未取消,则清除状态-CAS失败了也无所谓

2、寻找后置节点

3、唤醒后置第一个不是取消状态的节点(采取从队列尾部向头部遍历,这个很重要,自己思考为什么不从头部向尾部遍历)
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);//1

    Node s = node.next;//2
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//3
}

7、AQS获取锁总结

AQS是一个抽象类,留了部分方法让我们自行去实现,主要是下面五个方法

方法名 描述
protected boolean isHeldExclusively() 该线程是否正在独占资源。只有用到Condition才需要去实现它。
protected boolean tryAcquire(int arg) 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。
protected boolean tryRelease(int arg) 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。
protected int tryAcquireShared(int arg) 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected boolean tryReleaseShared(int arg) 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。

从lock到最后锁的释放,完整的调用流程是这样的:蓝色框由开发者自行实现,紫色框由AQS框架提供

img

五、Condition接口

1、属性

除了获取锁的同步队列,AQS的内部类ConditionObject实现了Condition接口,提供了一种等待队列(线程阻塞等待被唤醒的队列)

一个锁对象只有一个同步队列,却可以有多个等待队列(一个Condition对象对应一个等待队列)
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

创建Condition对象
ReentrantLock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();

2、wait()/notify()

这两个方法是我们见到最多的等待/唤醒机制,并且是Object类中的方法

wait()

1、只有当前线程持有该对象的监视器的时候,才可以调用,也就是说要配合synchronized(方法或代码块)关键字使用,否则会抛出IllegalMonitorStateException异常

2、如果在等待唤醒状态,线程被打断,则抛出InterruptedException异常
public final void wait() throws InterruptedException {
    wait(0);
}

//使用示例
synchronized (obj) {
    while (some conditions)
        obj.wait();
       ... 
}

notify()
本地方法,随机唤醒
public final native void notify();

3、与await()/signal()的区别

字面意思都是等待/唤醒,他们与Object的wait()/notify()有以下区别(此处欢迎补充)

区别点 await()/signal() wait()/notify()
使用前提 结合Lock使用 结合synchronized使用
唤醒机制 指定唤醒 随机唤醒
谓词条件 可以多个 只有一个

4、await()方法

1、如果当前线程被打断了,抛出异常,catch并自定义后续操作

2、向该Condition对象的等待队列添加一个等待节点到队列尾部(单向队列),并返回这个节点

3、释放该节点(当前线程)持有的锁

4、循环判断当前节点是否在同步队列中(循环的意义是,防止虚假唤醒)

5、如果不在同步队列(说明在等待队列),则阻塞当前线程直至被唤醒/打断(打断处于阻塞状态的线程会抛出InterruptedException异常,但是打断调用过park()方法阻塞的线程不会抛异常,最后讲解)

6、唤醒后,进行下一步:判断interruptMode(没有被打断、唤醒前打断的、需要唤醒后打断)

7、while循环结束,当前节点开始尝试获取锁:成功后并且interruptMode != THROW_IE(抛出异常模式),设置interruptMode为REINTERRUPT:重新打断

8、该节点有后置节点,说明当前Condition对象的等待队列不止一个等待节点,清理掉处于cancelled状态的节点

9、根据打断模式interruptMode决定,抛出异常 or 重新打断当前线程(单纯给个标志位,让用户自己决定该线程后续是否应该终止)
public final void await() throws InterruptedException {
    if (Thread.interrupted())//1
        throw new InterruptedException();
    Node node = addConditionWaiter();//2
    int savedState = fullyRelease(node);//3
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {//4
        LockSupport.park(this);//5
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//6
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//7
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();//8
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);//9
}

isOnSyncQueue(Node node)

1、判断节点状态,如果是CONDITION或者前驱节点为空(同步队列上的节点的前驱节点不为空,因为至少有头节点作前驱),说明一定在等待队列(单向队列)上

2、判断节点状态,如果该节点的next不为空(next/prev指针是用在同步队列上的,等待队列上的指针是firstWaiter/lastWaiter/nextWaiter),说明一定在同步队列上

3、从后往前遍历同步队列查找是否存在该节点

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)//1
        return false;
    if (node.next != null) //2
        return true;
  
    return findNodeFromTail(node);//3
}

checkInterruptWhileWaiting(Node node)

1、判断当前线程是否被打断

2、判断唤醒前打断还是要在唤醒后打断

3、返回三种状态值:

THROW_IE:唤醒之前就被打断了(已经被打断了)

REINTERRUPT:唤醒之后再被打断(还没被打断后面再打断)

0:没有被打断

备注:

1、阻塞状态下,打断线程会抛出异常;运行状态下打断,没有任何影响,只是更新了一下标志位=true,表示被打断,用于后置的自定义操作

2、调用Thread.interrupted()会返回标志位,并且重置标志位=false

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

transferAfterCancelledWait(Node node)

1、CAS更改节点状态

2、成功,将该节点加入同步队列(节点状态为0,等后续节点入队之后,若该节点没取消,则状态会被设置成Node.SIGNAL),返回true
3、C失败,自旋判断当前节点是否在同步队列,若不是,则让出线程(等到节点在同步队列了,结束循环并返回false)
final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    /*
     * If we lost out to a signal(), then we can't proceed
     * until it finishes its enq().  Cancelling during an
     * incomplete transfer is both rare and transient, so just
     * spin.
     */
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

5、signal()

1、判断当前线程是否持有独占锁(是,才有资格去唤醒其他线程,否则抛出异常)

2、拿到Condition对象等待队列的第一个节点(等了最久的一个节点==最早调用await()的节点)

3、确实有等待节点

4、执行唤醒操作:只唤醒一个节点

public final void signal() {
    if (!isHeldExclusively())//1
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;//2
    if (first != null)//3
        doSignal(first);//4
}


doSignal(Node first)

1、判断,如果当前等待队列只有这一个节点,则将lastWaiter置空

2、将该节点移出等待队列

3、转换节点状态(若状态转换失败并且还有后置等待节点,则尝试唤醒下一个节点,以此类推)

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}


transferForSignal(Node node)
1、执行CAS算法,若失败,则说明该节点是取消状态(因为等待队列的节点状态只能是CONDITION or CONCELLED)

2、状态转换成功,将该节点加入同步队列并返回该节点的前置节点

3、如果前置节点处于取消状态或者CAS失败,唤醒该节点的线程
/*
 * Transfers a node from a condition queue onto sync queue.
 */
final boolean transferForSignal(Node node) {
  
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//1
        return false;

    Node p = enq(node);//2
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//3
        LockSupport.unpark(node.thread);
    return true;
}

六、interrupt()与LockSupport.park()的关系

AQS的代码看的多了之后,会发现这两个东西经常出现,实际上这两者的关系实属难理解,接下来我来细细讲解他们的关系

1、阻塞

我们知道LockSupport.park()方法调用能够阻塞当前线程,而interrupt()方法能够打破线程的阻塞状态

interrupt()方法中的注释是这样写的:
If this thread is blocked in an invocation of the 
wait(), wait(long)&nbs***bsp;wait(long, int) methods of the Object}&nbs***bsp;of the join(), join(long), join(long, int), sleep(long),&nbs***bsp;sleep(long, int) methods of Thread class, 
then its interrupt status will be cleared and it will receive an InterruptedException.
线程因为调用这些方法被阻塞时,调用interrupt()后,会抛出一个InterruptedException异常

2、异常

那么调用LockSupport.park()也会让当前线程阻塞,这时调用interrupt()会不会抛出异常呢,看看下面的代码
public static void main(String[] args) {
    Thread t1 = new Thread(() -> {
        int count = 100;
        while (count > 0) {
            LockSupport.park();//阻塞在这里,等待唤醒
            System.out.println(count--);
        }
    });

    Thread t2 = new Thread(() -> {
        try {
            while (true) {
                Thread.sleep(100);//保证t1线程先阻塞
                t1.interrupt();//打断t1线程
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    t1.start();
    t2.start();
}
预期:1、抛出异常 2、每隔100ms,打印一次数字

结果:1、不会抛出异常 2、100到1几乎无间隔输出了

3、分析

1、LockSupport.park()的底层代码是这样的
class Parker : public os::PlatformParker {
private:
   //表示许可
  volatile int _counter ; 
  Parker * FreeNext ;
  JavaThread * AssociatedWith ; // Current association
public:
  Parker() : PlatformParker() {
    //初始化_counter
    _counter       = 0 ; 
    FreeNext       = NULL ;
    AssociatedWith = NULL ;
  }
protected:
  ~Parker() { ShouldNotReachHere(); }
public:
  void park(bool isAbsolute, jlong time);
  void unpark();

  // Lifecycle operators  
  static Parker * Allocate (JavaThread * t) ;
  static void Release (Parker * e) ;
private:
  static Parker * volatile FreeList ;
  static volatile int ListLock ;

};
调用这个方法之后,线程开始阻塞(因为满足当前线程还未被打断)

然后调用t1.interrupt()打断线程之后,线程退出阻塞状态,并更新标志位=true(意味着被打断了),这个状态值保留到调用Thread.interrupted()为止(重置标志位=false)

睡眠100ms的时候,t1线程肯定先进入park()方法了,但是isInterrupted()返回标志位=true,不满足阻塞条件,所以不会阻塞,出现了上述结果

2、LockSupport.unpark()
void Parker::unpark() {
  int s, status ;
 //给互斥量加锁,如果互斥量已经上锁,则阻塞到互斥量被解锁
 //park进入wait时,_mutex会被释放
  status = pthread_mutex_lock(_mutex);
  assert (status == 0, "invariant") ; 
  //存储旧的_counter
  s = _counter; 
//许可改为1,每次调用都设置成发放许可
  _counter = 1;
  if (s < 1) {
     //之前没有许可
     if (WorkAroundNPTLTimedWaitHang) {
      //默认执行 ,释放信号,表明条件已经满足,将唤醒等待的线程
        status = pthread_cond_signal (_cond) ;
        assert (status == 0, "invariant") ;
        //释放锁
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant") ;
     } else {
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant") ;
        status = pthread_cond_signal (_cond) ;
        assert (status == 0, "invariant") ;
     }
  } else {
   //一直有许可,释放掉自己加的锁,有许可park本身就返回了
    pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
  }
}


4、解决办法
必然是线程被唤醒之后,调用一次Thread.interrupted()清理标志位
public static void main(String[] args) {
    Thread t1 = new Thread(() -> {
        int count = 100;
        while (count > 0) {
            LockSupport.park();
            System.out.println(count--);
            Thread.interrupted();//清理标志位
        }
    });

    Thread t2 = new Thread(() -> {
        try {
            while (true) {
                Thread.sleep(100);
                t1.interrupt();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    t1.start();
    t2.start();
}

5、interrupt()到底有什么用处

具体来说,当对一个线程,调用 interrupt() 时, ① 如果线程处于被阻塞状态(例如处于sleep, wait, join 等状态),那么线程将立即退出被阻塞状态,并抛出一个InterruptedException异常。仅此而已。 ② 如果线程处于正常活动状态,那么会将该线程的中断标志设置为 true,仅此而已。被设置中断标志的线程将继续正常运行,不受影响。

interrupt() 并不能真正的中断线程,需要被调用的线程自己进行配合才行。 也就是说,一个线程如果有被中断的需求,那么就可以这样做。 ① 在正常运行任务时,经常检查本线程的中断标志位,如果被设置了中断标志就自行停止线程。 ② 在调用阻塞方法时正确处理InterruptedException异常。(例如,catch异常后就结束线程。)
Thread thread = new Thread(() -> {
    while (!Thread.interrupted()) {
        // do more work.
    }
});
thread.start();

// 一段时间以后
thread.interrupt();

七、写在后面

1、AQS框架非常重要,无论是面试还是业务中,都会用到,无处不在

2、培养看源码的习惯,一遍看不懂就多看几遍,休息几天再来啃,看懂的那一天比通便还爽~~

3、有不足之处请指正👏,虚心指教

八、下期展望

各种锁的源码分析:读写锁、闭锁、信号量以及分布式锁下的锁特性

#Java开发##笔经##秋招##Java##校招##社招##Java工程师#
全部评论

相关推荐

3 52 评论
分享
牛客网
牛客企业服务