多线程

volatile
//TODO

Synchronized

//TODO

ReentrantLock
ReentrantLock是API级别的锁,实现了Lock接口。通过方法lock()获取锁,unlock()释放锁。先通过一个简单的例子看看ReentrantLock的用法。

public class LockExp1 {
    private static int count=0;
    private Lock lock=new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {
        LockExp1 lockexp=new LockExp1();
        ExecutorService executor= Executors.newCachedThreadPool();
        for (int i=0;i<1000;i++){
            executor.execute(()->lockexp.add());
        }
        Thread.sleep(2000);
        System.out.println(count);
        executor.shutdown();
    }
    public void add(){
        lock.lock();
        try{
            count++;
        }finally{
            lock.unlock();
        }
    }
}

lock的实现
上面代码中lock是通过ReentrantLock类向上转型创建的实例对象,所以lock对象调用lock方法要去ReentrantLock中看源码,如下:

public void lock() {
        sync.lock();
    }

sync是Sync类创建的对象,Sync是ReentrantLock类中的一个内部类,继承自AbstractQueuedSynchronizer。

private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}

Sync类中的lock方法如下:

abstract void lock();

此方法在Sync中是抽象方法,所以继续找实现了Sync中lock方法的子类(先看非公平锁的实现):NonfairSync

static final class NonfairSync extends Sync {
...
    final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
    }
...
}

if语句中compareAndSetState方法其实就是调用了cas将AbstractQueuedSynchronizer类中的state从0改为1.

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

修改成功则将当前线程为独占线程。我们主要看else中的acquire方法:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

tryAcquire方法在NonfairSync中实现了,实际上是调用了Sync中的nonfairTryAcquire方法:

//tryAcquire在NonfairSync中
protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
}
//nonfairTryAcquire在Sync中
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程和状态
            final Thread current = Thread.currentThread();
            int c = getState();
//若状态为0,则尝试将状态设置为1,设置成功则将当前线程设为独占线程,返回true
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
//若当前线程就是独占线程,状态值再次+1(acquires为1),返回true。其实就是实现可重入。
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
//其他情况返回false
            return false;
}

回过头看acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,先看其中的addWaiter(Node.EXCLUSIVE),
此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。

private Node addWaiter(Node mode) {
//创建一个Node记录当前线程,把Node的模式设置为EXCLUSIVE(独占)。
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
//队列不为空时,将node节点尾接至队列中,返回node
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
//队列为空则调用enq初始化队列.
        enq(node);
        return node;
}
//初始化队列
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

再回到acquireQueued(addWaiter(Node.EXCLUSIVE), arg)中,看看acquireQueued做了什么:

final boolean acquireQueued(final Node node, int arg) {
//标记是否成功拿到资源
        boolean failed = true;
        try {
//标记等待过程中是否被中断过
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();//拿到前驱
//如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
                if (p == head && tryAcquire(arg)) {
//将此节点设置为head,并将原head的next设为null,方便GC。
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;//返回等待过程中是否被中断
                }
//如果自己可以休息了,就进入waiting状态,直到被unpark()
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

现在看看shouldParkAfterFailedAcquire(p, node)做了什么:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;//拿到前驱的状态
//如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
             * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后
             * 就会被保安大叔赶走了(GC回收)!
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下(保证下次返回true)。有可能失败,人家说不定刚刚释放完呢!
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

回到if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())中,如果线程找好安全休息点后,那就可以安心去休息了。调用parkAndCheckInterrupt:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//调用park()使线程进入waiting状态
        return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
    }

park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:
1)被unpark();
2)被interrupt()。

OK,看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),现在让我们再回到
acquireQueued(),总结下该函数的具体流程:

  1. 结点进入队尾后,检查状态,找到安全休息点;
  2. 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
  3. 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。

acquireQueued()分析完之后,我们接下来再回到acquire(),总结下它的流程吧:

  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
    图片说明
    以上就是ReentrantLock中lock方法的实现,接下来看看unlock。

unlock的实现
首先还是看ReentrantLock中unlock函数源码

    public void unlock() {
        sync.release(1);
    }

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;//找到头结点
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//唤醒等待队列里的下一个线程
            return true;
        }
        return false;
    }

它调用tryRelease()来释放资源。有一点需要注意的是,它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了。

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;//新状态值c设置为原状态值-1(releases=1)
            //如果当前线程不是独占线程,抛出异常IllegalMonitorStateException
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;//返回值free标记是否完全释放,即c=0
            if (c == 0) {
                free = true
                setExclusiveOwnerThread(null);//独占线程设置为null
            }
            setState(c);//将c设为新状态值
            return free;//返回free
        }

再回到release方法的unparkSuccessor:

        private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;//拿到node的状态
        if (ws < 0)
        //状态有效则尝试把node状态设为0
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;//取node后续节点s
        //得到node的有效后续节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            //从尾部向前遍历,直到node,当t的状态有效,则赋值给s,最终得到的s节点时离node最近的有效节点。
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//调用unpark唤醒
    }

这个函数并不复杂。一句话概括:用unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head &&tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了!
//TODO

全部评论

相关推荐

05-09 14:45
门头沟学院 Java
点赞 评论 收藏
分享
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务