Java并发之CountDownLatch

当存在多线程执行任务,需要等待指定数量的子线程都执行完后,再执行主线程任务场景时,可以使用CountDownLatch类来实现。其静态内部类Sync继承了AbstractQueuedSynchronizer,主要实现了AQStryAcquireSharedtryReleaseShared方法。

基本使用

public class CountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
        // 定义计数器值为10的CountDownLatch对象
        CountDownLatch countDownLatch = new CountDownLatch(10);
        // 创建10个子线程执行任务,最少需要10个子线程
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 任务执行完后将计数器减1
                    countDownLatch.countDown();
                }
                System.out.println(Thread.currentThread().getName() + " end...");
            }, "Thread-" + i).start();
        }
        // 在此等待所有任务完成
        countDownLatch.await();
        System.out.println("Thread-main end...");
    }
}

源码解析

  • 构造一个计数器10的CountDownLatch对象,设置state值为10
// step 1
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
// step 2
Sync(int count) {
    setState(count);
}
  • countDown以及await方法分析
// step 1: 调用了sync实例的releaseShared方法
public void countDown() {
    sync.releaseShared(1);
}
// step 2: 接着调用了sync实例的tryReleaseShared方法
public final boolean releaseShared(int arg) {
    // 尝试将计数器state进行减一
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
// step3: 接着查看`tryReleaseShared`方法,该方法具体实现是在`Sync`类中
private static final class Sync extends AbstractQueuedSynchronizer {
    // 省略不相关代码...
    protected boolean tryReleaseShared(int releases) {
        // 此处的for循环是为了解决在高并发场景下,CAS操作失败时,进行重试
        for (;;) {
            int c = getState();
            // 当state已经为0时,直接返回false。
            if (c == 0)
                return false;
            int nextc = c-1;
            // 基于CAS操作,保证state变量的原子性更新
            if (compareAndSetState(c, nextc))
                // 当state更新为0时,返回true。
                return nextc == 0;
        }
    }
}
// step 4: 根据tryReleaseShared方法的分析得知:
// 只有将state成功更新为0的线程,才会执行doReleaseShared逻辑,即step 2中的if块逻辑
// 在分析这个方法之前,先看下await方法,直接看step 5
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)
            break;
    }
}
// step 5: 判断子线程是否执行完,反之将主线程阻塞。
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    // 线程是否已被中断
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        // 将主线程阻塞
        doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) { // 改方法也是在CountDownLatch重写了
    // 这里逻辑很简单,就是判断state是否为0,为0时才返回1
    // 所以如果在执行await之前,state为已经为0时,不会将主线程阻塞。
    // 有且仅当还有等待的线程时,才会将主线程阻塞。
    return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 将主线程添加到阻塞队列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 获取前继节点
            final Node p = node.predecessor();
            // 该队列里中只有一个主线程节点,所以此处为true
            if (p == head) {
                // 再次检查state是否为0
                int r = tryAcquireShared(arg);
                // 条件满足表示state为0,所需数量的子线程任务已执行完
                if (r >= 0) {
                    // 将head指针指向新加的主线程阻塞节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 如果子线程未执行完,将head节点设为可通知运行状态
            // 并在第二次for循环后,设置为阻塞状态,通过LockSupport$park方法设置
            // 注意:此处阻塞之后,会在子线程执行完后,进行唤醒,唤醒后,会从此处继续执行
            // 继续执行后,会再进入一次循环,下一次循环进来时,state即为0了
            // head指针也会指向主线程阻塞的那个节点,退出循环
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
// step 6: 再回到step 4源码处
// 此方法进入条件为state为0,即子线程执行完毕, 将唤醒主线程
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 如果阻塞队列中存在阻塞线程节点
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 第五步已分析到,主线程阻塞前,head的waitStatus值会被修改为Node.SIGNAL
            if (ws == Node.SIGNAL) {
                // 若设置失败,通过for循环重试
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 将主线程唤醒,主线程继续执行,通过LockSupport$unpark方法设置
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)
            break;
    }
}
全部评论

相关推荐

投递美团等公司10个岗位
点赞 评论 收藏
转发
点赞 收藏 评论
分享
牛客网
牛客企业服务