死磕JDK源码之CyclicBarrier

 源码分析

  1 package java.util.concurrent;
  2 
  3 import java.util.concurrent.locks.Condition;
  4 import java.util.concurrent.locks.ReentrantLock;
  5 
  6 /*
  7 一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点,之所以称为cyclic的barrier,是因为等待线程释放后栅栏可以重用
  8 
  9 对于失败的同步尝试,CyclicBarrier用了一种all-or-none破坏模式:
 10 如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在屏障点等待的其他所有线程也会通过BrokenBarrierException异常离开屏障点,
 11 如果几乎同时被中断,那么会通过InterruptedException以反常的方式离开
 12 
 13 实现思路:就是设置一个计数,每当有线程达到时,计数count-1,Condition.await进入阻塞,当count=0,那么可以signalAll,让所有线程得以唤醒,唤醒后立即重置
 14 */
 15 public class CyclicBarrier {
 16     /*
 17     屏障的每次use都意味着一次generation instance。当屏障重置时,generation会发生改变,在用屏障的过程中,可能有许多代与线程相关联,
 18     这是因为锁分配给等待线程的方式的不确定性,但是在同一个时间只能有一个generation处于活动状态,而其他所有的都会被破坏
 19     */
 20     private static class Generation {
 21         boolean broken = false;
 22     }
 23 
 24     // 守护屏障入口的锁
 25     private final ReentrantLock lock = new ReentrantLock();
 26     // Condition即条件谓词
 27     private final Condition trip = lock.newCondition();
 28     // 屏障释放前需要等待的线程数量
 29     private final int parties;
 30     // 屏障突破后,要执行的命令
 31     private final Runnable barrierCommand;
 32     // 轮,相当于一次集合到释放为一轮,一轮一轮的进行
 33     private Generation generation = new Generation();
 34 
 35     // 重置后count=parties,count表示还需要等待的线程数量才能结束当前轮进入下一轮
 36     private int count;
 37 
 38     // 更新屏障的状态并唤醒所有等待的线程,只有持有锁时才会被调用
 39     private void nextGeneration() {
 40         // signal completion of last generation
 41         trip.signalAll();
 42         // set up next generation
 43         count = parties;
 44         generation = new Generation();
 45     }
 46 
 47     // 设置当前屏障的generation并唤醒所有等待的线程,只有持有锁时才会被调用
 48     private void breakBarrier() {
 49         generation.broken = true;
 50         count = parties;
 51         trip.signalAll();
 52     }
 53 
 54     // timed:是否有时间限制    nanos:wait的纳秒数
 55     private int dowait(boolean timed, long nanos)
 56             throws InterruptedException, BrokenBarrierException,
 57             TimeoutException {
 58         final ReentrantLock lock = this.lock;
 59         lock.lock();
 60         try {
 61             final Generation g = generation;
 62 
 63             if (g.broken)
 64                 throw new BrokenBarrierException();
 65 
 66             if (Thread.interrupted()) {
 67                 breakBarrier();
 68                 throw new InterruptedException();
 69             }
 70 
 71             int index = --count;
 72             if (index == 0) {  // tripped
 73                 boolean ranAction = false;
 74                 try {
 75                     final Runnable command = barrierCommand;
 76                     if (command != null)
 77                         command.run();
 78                     ranAction = true;
 79                     nextGeneration();
 80                     return 0;
 81                 } finally {
 82                     if (!ranAction)
 83                         breakBarrier();
 84                 }
 85             }
 86 
 87             // loop until tripped, broken, interrupted, or timed out
 88             for (; ; ) {
 89                 try {
 90                     if (!timed)
 91                         trip.await();
 92                     else if (nanos > 0L)
 93                         nanos = trip.awaitNanos(nanos);
 94                 } catch (InterruptedException ie) {
 95                     if (g == generation && !g.broken) {
 96                         breakBarrier();
 97                         throw ie;
 98                     } else {
 99                         // We're about to finish waiting even if we had not
100                         // been interrupted, so this interrupt is deemed to
101                         // "belong" to subsequent execution.
102                         Thread.currentThread().interrupt();
103                     }
104                 }
105 
106                 if (g.broken)
107                     throw new BrokenBarrierException();
108 
109                 if (g != generation)
110                     return index;
111 
112                 if (timed && nanos <= 0L) {
113                     breakBarrier();
114                     throw new TimeoutException();
115                 }
116             }
117         } finally {
118             lock.unlock();
119         }
120     }
121 
122     // 创建一个参数为parties带有barrierAction的CyclicBarrier
123     public CyclicBarrier(int parties, Runnable barrierAction) {
124         if (parties <= 0) throw new IllegalArgumentException();
125         this.parties = parties;
126         this.count = parties;
127         this.barrierCommand = barrierAction;
128     }
129 
130     // 创建一个参数为parties的CyclicBarrier
131     public CyclicBarrier(int parties) {
132         this(parties, null);
133     }
134 
135     // 返回barrier拦截的线程数量
136     public int getParties() {
137         return parties;
138     }
139 
140     /*
141     在所有的参与者线程调用await方法之前,屏障一直阻塞已经await的线程,如果当前线程不是最后一个线程,当前线程会处于休眠状态直到:
142     1.最后一个线程到达
143     2.其他某个线程中断当前线程
144     3.其他某个线程中断另一个等待线程
145     4.其他某个线程在等待barrier时超时
146     5.其他某个线程在barrier上调用reset
147     如果当前线程在进入await方法时已经设置了中断状态,或者在等待时被中断,则抛出InterruptedException异常,并清除当前线程的已中断状态
148 
149     如果在线程处于等待状态时barrier被reset(),或者在调用await时barrier被损坏,或任意一个线程正处于等待状态,则抛出BrokenBarrierException异常
150 
151     如果任何线程在等待时被中断,则其他所有等待线程都会抛出BrokenBarrierException异常,且barrier会被置为损坏状态
152     如果当前线程是最后一个到达的线程,并且构造方法中提供了一个非空的屏障操作,则在允许其他线程继续运行之前,当前线程
153     会运行barrier action
154     如果在执行屏障操作过程中发生异常,则异常会传播到当前线程中,且barrier会被置为损坏状态
155 
156     await方法返回的是到达的当前线程的索引,其中getParties()-1表示到达的第一个线程,0表示到达的最后一个线程
157     如果当前线程在等待时被中断则抛出InterruptedException
158     如果另一个线程在当前线程等待时被中断或超时,或者重置了barrier,或者在调用await时barrier被损坏,或由于异常而导致屏障操作(如果存在)失败则抛出BrokenBarrierException
159      */
160     public int await() throws InterruptedException, BrokenBarrierException {
161         try {
162             return dowait(false, 0L);
163         } catch (TimeoutException toe) {
164             throw new Error(toe); // cannot happen
165         }
166     }
167 
168     // 带有限制时间的await方法
169     public int await(long timeout, TimeUnit unit)
170             throws InterruptedException,
171             BrokenBarrierException,
172             TimeoutException {
173         return dowait(true, unit.toNanos(timeout));
174     }
175 
176     // 查询屏障是否处于损坏状态
177     public boolean isBroken() {
178         final ReentrantLock lock = this.lock;
179         lock.lock();
180         try {
181             return generation.broken;
182         } finally {
183             lock.unlock();
184         }
185     }
186 
187     // 重置屏障为初始状态
188     public void reset() {
189         final ReentrantLock lock = this.lock;
190         lock.lock();
191         try {
192             breakBarrier();   // break the current generation
193             nextGeneration(); // start a new generation
194         } finally {
195             lock.unlock();
196         }
197     }
198 
199     // 返回当前在屏障处等待的线程数量
200     public int getNumberWaiting() {
201         final ReentrantLock lock = this.lock;
202         lock.lock();
203         try {
204             return parties - count;
205         } finally {
206             lock.unlock();
207         }
208     }
209 }

典型用法

 1 package jcip;
 2 
 3 import java.util.concurrent.BrokenBarrierException;
 4 import java.util.concurrent.CyclicBarrier;
 5 
 6 /*
 7 循环栅栏:让一组线程到达一个屏障(也叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续执行
 8 CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await()方法告诉CyclicBarrier自己已经到达了屏障,然后当前线程被阻塞
 9 CyclicBarrier还提供了一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction所以输出为3 1 2 true或者3 2 1 true
10 */
11 public class CyclicBarrierTest {
12     // 如果改为new CyclicBarrier(3);则主线程和子线程会永远等待,因为没有第3个线程执行await()方法,即没有第3个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行
13     static CyclicBarrier c = new CyclicBarrier(2);
14     static CyclicBarrier c2 = new CyclicBarrier(2, new Runnable() {
15         @Override
16         public void run() {
17             System.out.print(3 + " ");
18         }
19     });
20     static CyclicBarrier c3 = new CyclicBarrier(2);
21 
22     public static void main(String[] args) throws Exception {
23         new Thread(new Runnable() {
24             @Override
25             public void run() {
26                 try {
27                     c.await();// c2.await();
28                 } catch (InterruptedException e) {
29                     e.printStackTrace();
30                 } catch (BrokenBarrierException e) {
31                     e.printStackTrace();
32                 }
33                 System.out.print(1 + " ");
34             }
35         }).start();
36         c.await();// c2.await();
37         System.out.print(2 + " ");
38 
39         Thread thread = new Thread(new Runnable() {
40             @Override
41             public void run() {
42                 try {
43                     c3.await();
44                 } catch (Exception e) {
45                 }
46             }
47         });
48         thread.start();
49         thread.interrupt();
50         try {
51             c3.await();
52         } catch (Exception e) {
53             System.out.println(c3.isBroken());
54         }
55     }
56 
57 }
58 /*
59 1 2 true或者2 1 true
60  */

CyclicBarrier和CountDownLatch的区别

CountDownLatch CyclicBarrier
减计数方式 加计数方式
计数为0时释放所有等待的线程 计数达到指定值时释放所有等待的线程
计数为0时,无法重置(一次性的) 计数达到指定值时,计数置为0重新开始(可重复)

调用countDown()方法计数减1,

调用await()方法只进行阻塞,对计数没任何影响

调用await()方法计数加1,

若加1后的值不等于构造方法的值,则线程阻塞

全部评论

相关推荐

点赞 收藏 评论
分享
牛客网
牛客企业服务