二本26届大三Java实习准备 Day6

2025 年 3 月 2 日 Day 6

今日学习AQS

AQS

AQS 是一个底层的框架, 是其他实现锁、同步器组件的基石; AQS 主要解决的是 “锁应该非配给谁” “其他线程该如何等待” “线程释放锁后如何唤醒下个线程”的问题,是并发竞争锁场景下的一个调度框架

AQS 的结构

通过一个 FIFO 队列实现线程排队; 用 state 状态位表示锁的状态; 所以 AQS 的底层就是 CHL 的双端队列+state 状态标志位

alt

alt

从 ReentrantLock 的使用开始分析

ReentrantLock 实现了 Lock 接口; ReentrantLock 中有个变量为 Sync 类型, Sync 类型有公平锁和非公平两个子类, 并且这个类继承了 AQS 也就是说具备了线程争取和等候锁的调度能力 alt

公平和非公平的实现 通过创建锁传入的布尔值来创建公平和非公平; 非公平锁在每次上锁之前都会直接尝试获取锁;

public final void acquire(int arg) {
  if (!tryAcquire(arg) &&
	  acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
	  // 如果获取锁后并且中断标记为true那么就会执行中断逻辑
	  selfInterrupt();
}

tryAcquire 是 Sync 的子类实现的方法, 由 AQS 的上层业务自定义逻辑,ReentrantLock 中分为公平和非公平

会先通过 tryAcquire 获取锁, 如果失败那么就会通过加入队列的方式获取锁 acquireQueued; 如果通过 acquireQueued 的返回值为 true 说明当前线程虽然获取到锁了但是同时也收到了中断信息

tryAcquire

// NonfairSync 是 ReentrantLock 的内部类,继承自 Sync
static final class NonfairSync extends Sync {
    // 非公平锁直接尝试抢占资源
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);  // 调用 Sync 中的非公平获取方法
    }
}

// Sync 类中的实际实现
abstract static class Sync extends AbstractQueuedSynchronizer {
    // 非公平获取锁的核心逻辑
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();  // 获取当前锁状态(0=未锁定,>0=重入次数)
        if (c == 0) {  // 锁未被占用
            if (compareAndSetState(0, acquires)) {  // CAS 尝试直接获取锁
                setExclusiveOwnerThread(current);  // 设置当前线程为独占线程
                return true;  // 获取成功
            }
        } else if (current == getExclusiveOwnerThread()) {  // 锁已被当前线程持有(重入)
            int nextc = c + acquires;
            if (nextc < 0)  // 溢出检查(理论上不会发生)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);  // 更新重入次数
            return true;  // 重入成功
        }
        return false;  // 获取失败
    }
}

尝试直接获取锁, 如果没有成功那么就尝试获取重入锁; 如果都不行那么就直接需要进入封装节点入队逻辑

addWaiter

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 尝试快速插入队尾
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 快速插入失败,调用enq()自旋插入
    enq(node);
    return node;
}

enq

private Node enq(final Node node) {
    for (;;) {  // 自旋(死循环),直到成功插入队列尾部
        Node t = tail;
        if (t == null) {  // 队列未初始化(头尾均为空)
            // 初始化一个哑节点(Dummy Node)作为头节点
            if (compareAndSetHead(new Node()))  
                tail = head;  // 头尾指向同一个哑节点
        } else {
            // 将新节点的前驱指向当前尾节点
            node.prev = t;
            // CAS操作:尝试将新节点设置为尾节点
            if (compareAndSetTail(t, node)) {  
                t.next = node;  // 原尾节点的后继指向新节点
                return t;       // 返回旧尾节点(前驱节点)
            }
        }
    }
}

封装成节点入队之前要确保队列已经初始化了, 意思就是如果队列为空那么就要在头部插入一个空节点充当哨兵的角色; 他的目的是为了统一逻辑, 可以将队列的第一个节点理解成当前持有锁的线程节点

acquireQueued

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
	    boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 如果前驱是头节点,尝试获取资源
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // 帮助GC
                failed = false;
                return interrupted;
            }
            // 检查是否需要阻塞线程(通过前驱的waitStatus)
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; // 执行到这里说明线程需要被中断
        }
    } finally{
	    if(failed)
		    cancelAcquire(node);
    }
}

如果当前节点上队列的第二个节点那么会尝试再次直接获取锁, 如果成功了那么就直接持有锁并将当前节点晋升为新的头结点;如果不是第二个节点, 那么就要进行挂起, 但是为了确保挂起后能被唤醒那么就要和找到一个可靠的前置节点,通过 shouldParkAfterFailedAcquire

shouldParkAfterFailedAcquire

// 在获取锁失败后,判断当前线程是否需要挂起(park)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;  // 前驱节点的等待状态
    if (ws == Node.SIGNAL)     // 前驱节点状态为 SIGNAL(表示会通知当前节点)
        return true;           // 当前线程可以安全挂起
    if (ws > 0) {              // 前驱节点已取消(CANCELLED 状态值为 1)
        do {
            node.prev = pred = pred.prev;  // 跳过所有已取消的前驱节点,寻找有效前驱
        } while (pred.waitStatus > 0);
        pred.next = node;      // 重新链接队列,跳过已取消的节点
    } else {                   // 前驱节点状态为 0 或 PROPAGATE
        // 将前驱节点状态设为 SIGNAL(表示需要通知当前节点)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;  // 不需要立即挂起,需再次尝试获取锁或调整队列
}

方法的作用就是要找到一个可靠的前置节点, 如果前置节点状态>0 那么说明已经取消了, 当前节点如果直接挂起那么就不会唤醒了, 所以要删除 sw>0 的节点; 如果为 0 那么说明前面的节点还不知道需要唤醒后置节点将 sw 改为-1, 让后置节点和前置节点约定好后面的节点挂起后前置节点会在未来唤醒它

parkAndCheckInterrupt

// 挂起当前线程,并在唤醒后返回中断状态
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);         // 阻塞当前线程(直到被 unpark 或中断)
    return Thread.interrupted();    // 返回中断标志,并清除中断状态
}

挂起线程;当线程醒过来之后会返回中断信息, 判断这次醒来是因为被前置节点 unpark 唤醒, 还是因为收到了中断信号而醒来; 如果是因为中断信号那么外层会因为循环的缘故在再次循环再次进入挂起状态, 直到获取到锁才会返回, 只有获取到锁返回后才能继续处理中断逻辑; 相当于将中断处理滞后

释放锁

public void unlock() {
//进入release方法
   sync.release(1);
}

//AQS独占锁解锁核心代码
public final boolean release(int arg) {
//进入子类实现的解锁逻辑
//如果tryRelease返回true说明没有独占锁,否则好友线程持有
    if (tryRelease(arg)) {
    //当线程没有持有锁后,唤醒下一个线程执行
        Node h = head;
        //根据waitstatus判断需要唤醒哪个线程
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
} 	


//子类定义的解锁逻辑
protected final boolean tryRelease(int releases) {
//获取state - 1
    int c = getState() - releases;
    //判断当前线程是否是独占资源的线程
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
        //设置资源是否还被独占着
    boolean free = false;
    //如果state = 0,表示没有线程独占
    if (c == 0) {
    //设置为true
        free = true;
        //当前独占线程置空
        setExclusiveOwnerThread(null);
    }
    //cas设置state
    setState(c);
    return free;
}
private void unparkSuccessor(Node node) {
    // 1. 获取当前节点的 waitStatus
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0); // 尝试将状态重置为0(初始状态)

    // 2. 从尾节点向前遍历,找到离当前节点最近的有效后继节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) { // 如果后继节点无效(已取消)
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev) {
            if (t.waitStatus <= 0)
                s = t; // 记录最近的有效节点
        }
    }

    // 3. 唤醒该节点的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}

释放的逻辑很简单; 如果成功将 state 置为 0 说明释放成功, 如果当前节点的 sw 为-1 说明需要唤醒后面的节点, 现在就要唤醒当前节点的直接后继, 当然直接后继可能由于各种原因取消了, 所以通过从后往前的方式找最靠近首节点的需要唤醒 (前驱节点 sw 为-1)的节点

为什么唤醒线程时需要从后往前遍历

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred; // Step 1: 设置新节点的 prev
        if (compareAndSetTail(pred, node)) { // Step 2: CAS 更新尾节点
            pred.next = node; // Step 3: 设置原尾节点的 next
            return node;
        }
    }
    enq(node); // 若 CAS 失败,通过 enq 自旋插入
    return node;
}

这个问题的场景是在锁释放后需要唤醒头结点后的节点, 也就是队列中的第二个节点, 但是第二个节点被取消那么就应该唤醒下个有效节点; 这是因为要考虑到节点入队的操作的并发性; 节点的入队操作是通过一个 CAS 将新节点赋值给 tail 然后成功进入 if 语句中然后将前置节点的 next 指针指向当前节点; cas 只能保证新节点赋值给 tail 的原子性但是无法保证语句块中的并发操作; 也就有可能导致从前往后遍历会出现前置节点的 next 指针为 null 的情况, 无法到达当前指针; 假设一个场景就是当前节点前的所有节点突然全部取消, 这时候锁被释放并且需要唤醒一个节点, 如果从前往后遍历的话那么新节点这个应该被唤醒的节点就无法唤醒; 如果从后 tail 指针出发往前遍历的话就不会有这种问题;

AQS 是如何实现线程的等待和唤醒的?

AQS 的等待唤醒底层是通过 LockSupport 提供的 park 和 unpark 实现的; 具体来说, 在一个线程进入队列后并不会立刻挂起, 而是要找到一个可靠的前驱节点, 将前驱节点的 waitState 改为-1 意思是我这个节点要挂起了你要记得锁释放的时候唤醒我; 当锁释放时就会从队列中查找到需要唤醒的节点通过 unpark 进行唤醒

AQS 条件队列原理

条件队列用于线程之间的协调和通信; 可以让线程在不满住条件时被挂起, 知道其他线程显示改变条件并唤醒等待在该条件上的线程; 条件队列和同步队列的区别:

  • 目的不同:同步队列是实现锁的调度; 条件队列用于等待某个条件成立后开启线程
  • 结构不同:同步队列通过标志位 +双向链表;条件队列则是单向链表

什么是 AQS 的独占模式和共享模式?

AQS 的独占模式就是同一时刻只有一个线程可以持有锁, 而共享模式可以让多个线程同时获取锁; 独占模式每次只唤醒下个有效节点, 而共享模式则可能连续唤醒多个节点; 具体使用哪种模式应该取决于我们想要保护的资源; 如果资源同一时刻只能被一个线程访问, 那么用独占; 如果资源允许被多个线程同时读取, 并且写入操作较少时那么可以使用共享模式来提高并发性能

AQS 为什么采用双向链表?

可以先从比对双向和单向链表的优缺点开始分析; 单向链表占用的内存空间更少, 双向链表则在双向遍历、中间插入和删除元素、快速定位尾节点等操作上效率更高; 是很经典的空间换时间策略;

而 AQS 选择双向链表的原因有一下几点:

  • 在插入元素时, 需要将节点插入到尾部;
  • 中间节点可能随时被取消
  • 在唤醒节点时有时候需要从后往前遍历

上面的每个需求如果通过单链表来实现都是效率十分低下的, 何况 AQS 还是在可能面对高并发的场景下所以双向链表是必然的选择

AQS 的 state 变量有什么作用?如何保证线程安全?

state 是用于标记锁资源的状态的, 可以表示锁的重入次数; 为了保证可见性和原子性通过 volatile 和 CAS 实现;

公平锁和非公平锁在 AQS 中如何实现?

主要区别就是在入队之前都会执行 tryAcquire 方法; 公平锁会进行判断队列是否有节点正在等待, 如果有则直接返回然后进行入队; 而非公平锁的 tryAcquire 则会直接通过 CAS 尝试获取锁

如何基于 AQS 实现一个自定义同步器?

首先自定义类要继承 AQS 这个抽象类; 然后重写 tryAcquire 和 tryRelease 方法; 如果是共享模式就要实现 tryAcquireShared 和 tryReleaseShared

#java##面试#
全部评论

相关推荐

点赞 评论 收藏
分享
评论
1
3
分享

创作者周榜

更多
牛客网
牛客企业服务