Java中的锁以及并发工具类
概念上见名知义:
- 独占锁(排他锁)VS共享锁
- 公平锁VS公平锁
- 乐观锁VS悲观锁
- 可重入锁
- 自旋锁
JDK中对锁的实现
Lock接口
- ReentrantLock互斥锁(重入锁)实现类(逻辑实现在内部类Sync中)
Sync父类AbstractQueueSynchronizer队列同步器AQS
- ReadWriteLock接口,实现类为ReentrantReadWriteLock
Lock readLock()
Lock writeLock()
- Condition接口,配合Lock进行线程的阻塞和唤醒
通过Lock实现类的对象创建,newCondition()
await()
signal()
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void conditionWait() throws InterruptedException{ lock.lock(); try{ condition.await(); }finally{ lock.unlock(); } } public void conditionSignal() throws InterruptedException{ lock.lock(); try{ condition.signal(); }finally{ lock.unlock(); } }
Object监视器方法和Condition接口的对比:
总结:
ReentrantLock:读读互斥,读写互斥,写写互斥
ReentrantReadWriteLock:读读不互斥,读写互斥,写写互斥
引入JDK8新增的StampedLock
StampedLock:读读不互斥,读写不互斥,写写互斥
====================================================
除了上述锁与Condition搭配的同步方式
其他同步工具类如下:
Semaphore(也是基于AQS):控制同时访问特定资源的线程数量,协调各个线程保证合理使用公共资源
主要方法:
intavailablePermits() 返回此信号量中当前可用的许可证数
intgetQueueLength() 返回正在等待获取许可的线程数
booleanhasQueuedThreads() 是否由线程正在等待获取许可
void reducePermits(int reduction) 减少reduction个许可,protected修饰
Collection getQueuedThreads() 返回所有等待获取许可的线程集合,protected修饰
Semaphore s = new Semaphore(10,true);//10个共享资源,公平 s.acquire();//每次获取一个,如果获取不到线程会阻塞 s.release();//释放
public class SemaphoreTest{ private static final int HTREAD_COUNT = 30; private static ExecutorService threadPool = Executors.newFixedThreadPool(HTREAD_COUNT); //控制10个线程的流量 private static Semaphore s = new Semaphore(10); public static void main(String[] args){ for(int i=0;i<HTREAD_COUNT;i++){ threadPool.execute(new Runnable(){ public void run(){ try{ s.acquire();//获取许可 System.out.println("save"); s.release();//归还许可 }catch(){ } } }); } threadPool.shutdown(); } }
CountDownLatch(也是基于AQS):等待多线程完成
//一个主线程要等待10个work线程全部执行结束后才能退出 CountDownLatch cdl = new CountDownLatch(10); cdl.await();//主线程调用该方法,阻塞 cdl.countDown();//每个work执行完毕都调用1次,当减到0,主线程被唤醒
public class CountDownLatchTest{ //等待2个节点完成 static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args)throws InterruptedException{ new Thread(new Runnable(){ @Override public void run(){ System.out.println(1); c.countDown();//每调用一次,节点减1 System.out.println(2); } }).start(); c.await();//阻塞线程 System.out.println("3"); } }
CylicBarrier(基于ReentrantLock+Condition实现):可循环使用屏障
来了10个线程,这10个线程互相等待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这10个线程继续互相等待,到齐后再一起 被唤醒。每一轮被称为一个Generation,就是一次同步点(屏障点)。
public class CyclicBarrierTest{ static CyclicBarrier c = new CyclicBarrier(2);//屏障拦截的线程数量是2 public static void main(String[] args){ new Thread( @Override public void run(){ try{ c.await();//已经到达了屏障,当前线程被阻塞 }catch(Exception e){ } System.out.println(1); } ).start(); //如果构造改成3,则主子线程都会等待,因为没有第三个线程执行await到达屏障 //因为主线程和子线程的调度是由CPU决定的,两个线程都有可能先执行,所以会产生两种输出(1 2 或 2 1) try{ c.await(); }catch(Exception e){ } System.out.println(2); } }
public class CyClicBarrierTest{ //该构造在线程到达屏障时,优先执行barrierAction //拦截线程数量是2,必须等待第一个线程和A都执行完,再执行主线程(先执行主线程) static CyclicBarrier c = new CyclicBarrier(2,new A()); public static void main(String[] args){ new Thread(new Runnable(){ @Override public void run(){ try{ c.await(); }catch(Exception e){ } System.out.println(1); } }).start(); try{ c.await(); }catch(Exception e){ } System.out.println(2); } static class A implements Runnable{ @Override public void run(){ System.out.println(3); //结果 3 1 2 } } }
public clas BankWaterService implements Runnable{ //创建四个屏障,优先执行当前类的线程任务 private CyclicBarrier c = new CyclicBarrier(4.this); //创建四个线程的线程池 private Executor executor = Executors.newFixedThreadPool(4); private ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<>(); private void count(){ for(int i=0;i<4;i++){ executor.execute(new Runnable(){ public void run(){ map.put(Thread.currentThread().getName(),1); c.await();//省略异常处理,阻塞 } }); } } //将四个线程的结果进行汇总 public void run(){ int result = 0; for(Entry<String,Integer> s:map.entrySet()){ result += s.getValue(); } map.put("result",result); System.out.println(result); } public static void main(String[] args){ BankWaterService bs = new BankWaterService(); bs.count(); } }
public class CyclicBarrierTest{ static CyClicBarrier c = new CyclicBarrier(2); public static void main(String[] args){ Thread t = new Thread(new Runnable(){ public void run(){ c.await();//阻塞(省略处理异常) } }); t.start(); t.interrupt(); try{ c.await(); }catch(Exception e){ //isBroken()方法用来了解阻塞的线程是否被中断 System.out.println(c.isBoken());//true } } }
Exchanger
当一个线程调用exchange准备和其他线程交换数据的时候,一种是没有其他线程要交换数据,自己只能自旋或者阻塞,等待;另一种是恰好有其他线程在 Slot 里面等着,那么和对方交换。
Exchange<String> ex = new Exchange<>(); ThreadA a = new ThreadA(ex);
public class ExchagerTest{ private static final Exchange<String> e = new Exchanger<>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args){ threadPool.execute(new Runnable(){ public void run(){ String a = "银行流水A" e.exchange(a);//交换数据(省略处理异常) } }); threadPool.execute(new Runnable(){ public void run(){ String b = "银行流水B" e.exchange(b);//如果两个线程有一个没有执行exchange()方法,则会一直等待 } }); } }
Phaser(替代CyclicBarrier和CountDownLatch)
Phaser ph = new Phaser(10); ph.awaitAdance(ph.getPhase());//阻塞,等待当前同步节点完成 ph.arrive();//每个线程完成任务调用1次,表达同步到达了该同步点
Phaser ph = new Phaser(10); ph.arriveAndAwaitAdvance();//第一个同步点 ph.arriveAndAwaitAdvance();//第二个同步点