Java源码阅读之CyclicBarrier
源码阅读是基于JDK7,本篇主要涉及CyclicBarrier常用方法源码分析。
1.概述
CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到所有线程都到达某个公共屏障点(也可以叫同步点),即相互等待的线程都完成调用await方法,所有被屏障拦截的线程才会继续运行await方法后面的程序。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时CyclicBarrier很有用。因为该屏障点在释放等待线程后可以重用,所以称它为循环的屏障点。CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达屏障点之后(但在释放所有线程之前),该命令只在所有线程到达屏障点之后运行一次,并且该命令由最后一个进入屏障点的线程执行。
2.使用样例
下面的代码演示了CyclicBarrier简单使用的样例。
public class CyclicBarrierDemo { @Test public void test() { final CyclicBarrier barrier = new CyclicBarrier(2, myThread); new Thread(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName()); barrier.await(); System.out.println(Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } } }, "thread1").start(); new Thread(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName()); barrier.await(); System.out.println(Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } } }, "thread2").start(); } Thread myThread = new Thread(new Runnable() { @Override public void run() { System.out.println("myThread"); } }, "thread3"); }
输出结果如下所示:
thread1 thread2 myThread thread2 thread1
3.数据结构
CyclicBarrier中声明了如下一些属性及变量:
private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); private final int parties; private final Runnable barrierCommand; private Generation generation = new Generation(); private int count;
(1)lock用于保护屏障入口的锁;
(2)trip线程等待条件;
(3)parties参与等待的线程数;
(4)barrierCommand当所有线程到达屏障点之后,首先执行的命令;
(5)count实际中仍在等待的线程数,每当有一个线程到达屏障点,count值就会减一;当一次新的运算开始后,count的值被重置为parties。
4.构造方法
提供了两个构造函数可供使用。
//创建一个CyclicBarrier实例,parties指定参与相互等待的线程数, //barrierAction指定当所有线程到达屏障点之后,首先执行的操作,该操作由最后一个进入屏障点的线程执行。 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } //创建一个CyclicBarrier实例,parties指定参与相互等待的线程数 public CyclicBarrier(int parties) { this(parties, null); }
5.getParties方法
//返回参与相互等待的线程数 public int getParties() { return parties; }
6.await方法
//该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态 //直到所有线程都到达屏障点,当前线程才会被唤醒 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } //该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态 //在timeout指定的超时时间内,等待其他参与线程到达屏障点 //如果超出指定的等待时间,则抛出TimeoutException异常,如果该时间小于等于零,则此方法根本不会等待 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //当所有参与的线程都到达屏障点,立即去唤醒所有处于休眠状态的线程,恢复执行 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) //让当前执行的线程阻塞,处于休眠状态 trip.await(); else if (nanos > 0L) //让当前执行的线程阻塞,在超时时间内处于休眠状态 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } //唤醒所有处于休眠状态的线程,恢复执行 //重置count值为parties //重置中断状态为false private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } //唤醒所有处于休眠状态的线程,恢复执行 //重置count值为parties //重置中断状态为true private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
这个等待的await方法,其实是使用ReentrantLock和Condition控制实现的。
7.isBroken方法
public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } }
判断此屏障是否处于中断状态。如果因为构造或最后一次重置而导致中断或超时,从而使一个或多个参与者摆脱此屏障点,或者因为异常而导致某个屏障操作失败,则返回true;否则返回false。
8.reset方法
//将屏障重置为其初始状态。 public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { //唤醒所有等待的线程继续执行,并设置屏障中断状态为true breakBarrier(); // break the current generation //唤醒所有等待的线程继续执行,并设置屏障中断状态为false nextGeneration(); // start a new generation } finally { lock.unlock(); } }
9.getNumberWaiting方法
//返回当前在屏障处等待的参与者数目,此方法主要用于调试和断言。 public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }
小结:
1.CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。