多线程
volatile
//TODO
Synchronized
//TODO
ReentrantLock
ReentrantLock是API级别的锁,实现了Lock接口。通过方法lock()获取锁,unlock()释放锁。先通过一个简单的例子看看ReentrantLock的用法。
public class LockExp1 { private static int count=0; private Lock lock=new ReentrantLock(); public static void main(String[] args) throws InterruptedException { LockExp1 lockexp=new LockExp1(); ExecutorService executor= Executors.newCachedThreadPool(); for (int i=0;i<1000;i++){ executor.execute(()->lockexp.add()); } Thread.sleep(2000); System.out.println(count); executor.shutdown(); } public void add(){ lock.lock(); try{ count++; }finally{ lock.unlock(); } } }
lock的实现
上面代码中lock是通过ReentrantLock类向上转型创建的实例对象,所以lock对象调用lock方法要去ReentrantLock中看源码,如下:
public void lock() { sync.lock(); }
sync是Sync类创建的对象,Sync是ReentrantLock类中的一个内部类,继承自AbstractQueuedSynchronizer。
private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { ... }
Sync类中的lock方法如下:
abstract void lock();
此方法在Sync中是抽象方法,所以继续找实现了Sync中lock方法的子类(先看非公平锁的实现):NonfairSync
static final class NonfairSync extends Sync { ... final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } ... }
if语句中compareAndSetState方法其实就是调用了cas将AbstractQueuedSynchronizer类中的state从0改为1.
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
修改成功则将当前线程为独占线程。我们主要看else中的acquire方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire方法在NonfairSync中实现了,实际上是调用了Sync中的nonfairTryAcquire方法:
//tryAcquire在NonfairSync中 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } //nonfairTryAcquire在Sync中 final boolean nonfairTryAcquire(int acquires) { //获取当前线程和状态 final Thread current = Thread.currentThread(); int c = getState(); //若状态为0,则尝试将状态设置为1,设置成功则将当前线程设为独占线程,返回true if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //若当前线程就是独占线程,状态值再次+1(acquires为1),返回true。其实就是实现可重入。 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //其他情况返回false return false; }
回过头看acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,先看其中的addWaiter(Node.EXCLUSIVE),
此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。
private Node addWaiter(Node mode) { //创建一个Node记录当前线程,把Node的模式设置为EXCLUSIVE(独占)。 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //队列不为空时,将node节点尾接至队列中,返回node Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //队列为空则调用enq初始化队列. enq(node); return node; } //初始化队列 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
再回到acquireQueued(addWaiter(Node.EXCLUSIVE), arg)中,看看acquireQueued做了什么:
final boolean acquireQueued(final Node node, int arg) { //标记是否成功拿到资源 boolean failed = true; try { //标记等待过程中是否被中断过 boolean interrupted = false; for (;;) { final Node p = node.predecessor();//拿到前驱 //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。 if (p == head && tryAcquire(arg)) { //将此节点设置为head,并将原head的next设为null,方便GC。 setHead(node); p.next = null; // help GC failed = false; return interrupted;//返回等待过程中是否被中断 } //如果自己可以休息了,就进入waiting状态,直到被unpark() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
现在看看shouldParkAfterFailedAcquire(p, node)做了什么:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;//拿到前驱的状态 //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。 * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后 * 就会被保安大叔赶走了(GC回收)! */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下(保证下次返回true)。有可能失败,人家说不定刚刚释放完呢! compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
回到if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())中,如果线程找好安全休息点后,那就可以安心去休息了。调用parkAndCheckInterrupt:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//调用park()使线程进入waiting状态 return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。 }
park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:
1)被unpark();
2)被interrupt()。
OK,看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),现在让我们再回到
acquireQueued(),总结下该函数的具体流程:
- 结点进入队尾后,检查状态,找到安全休息点;
- 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
- 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。
acquireQueued()分析完之后,我们接下来再回到acquire(),总结下它的流程吧:
- 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
- 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
以上就是ReentrantLock中lock方法的实现,接下来看看unlock。
unlock的实现
首先还是看ReentrantLock中unlock函数源码
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head;//找到头结点 if (h != null && h.waitStatus != 0) unparkSuccessor(h);//唤醒等待队列里的下一个线程 return true; } return false; }
它调用tryRelease()来释放资源。有一点需要注意的是,它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了。
protected final boolean tryRelease(int releases) { int c = getState() - releases;//新状态值c设置为原状态值-1(releases=1) //如果当前线程不是独占线程,抛出异常IllegalMonitorStateException if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false;//返回值free标记是否完全释放,即c=0 if (c == 0) { free = true setExclusiveOwnerThread(null);//独占线程设置为null } setState(c);//将c设为新状态值 return free;//返回free }
再回到release方法的unparkSuccessor:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus;//拿到node的状态 if (ws < 0) //状态有效则尝试把node状态设为0 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next;//取node后续节点s //得到node的有效后续节点 if (s == null || s.waitStatus > 0) { s = null; //从尾部向前遍历,直到node,当t的状态有效,则赋值给s,最终得到的s节点时离node最近的有效节点。 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//调用unpark唤醒 }
这个函数并不复杂。一句话概括:用unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head &&tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了!
//TODO