Java中的锁以及并发工具类
概念上见名知义:
- 独占锁(排他锁)VS共享锁
- 公平锁VS公平锁
- 乐观锁VS悲观锁
- 可重入锁
- 自旋锁
JDK中对锁的实现
Lock接口
- ReentrantLock互斥锁(重入锁)实现类(逻辑实现在内部类Sync中)
Sync父类AbstractQueueSynchronizer队列同步器AQS
- ReadWriteLock接口,实现类为ReentrantReadWriteLock
Lock readLock()
Lock writeLock()
- Condition接口,配合Lock进行线程的阻塞和唤醒
通过Lock实现类的对象创建,newCondition()
await()
signal()
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException{
lock.lock();
try{
condition.await();
}finally{
lock.unlock();
}
}
public void conditionSignal() throws InterruptedException{
lock.lock();
try{
condition.signal();
}finally{
lock.unlock();
}
}
Object监视器方法和Condition接口的对比:
总结:
ReentrantLock:读读互斥,读写互斥,写写互斥
ReentrantReadWriteLock:读读不互斥,读写互斥,写写互斥
引入JDK8新增的StampedLock
StampedLock:读读不互斥,读写不互斥,写写互斥
====================================================
除了上述锁与Condition搭配的同步方式
其他同步工具类如下:
Semaphore(也是基于AQS):控制同时访问特定资源的线程数量,协调各个线程保证合理使用公共资源
主要方法:
intavailablePermits() 返回此信号量中当前可用的许可证数
intgetQueueLength() 返回正在等待获取许可的线程数
booleanhasQueuedThreads() 是否由线程正在等待获取许可
void reducePermits(int reduction) 减少reduction个许可,protected修饰
Collection getQueuedThreads() 返回所有等待获取许可的线程集合,protected修饰
Semaphore s = new Semaphore(10,true);//10个共享资源,公平 s.acquire();//每次获取一个,如果获取不到线程会阻塞 s.release();//释放
public class SemaphoreTest{
private static final int HTREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(HTREAD_COUNT);
//控制10个线程的流量
private static Semaphore s = new Semaphore(10);
public static void main(String[] args){
for(int i=0;i<HTREAD_COUNT;i++){
threadPool.execute(new Runnable(){
public void run(){
try{
s.acquire();//获取许可
System.out.println("save");
s.release();//归还许可
}catch(){
}
}
});
}
threadPool.shutdown();
}
}
CountDownLatch(也是基于AQS):等待多线程完成
//一个主线程要等待10个work线程全部执行结束后才能退出 CountDownLatch cdl = new CountDownLatch(10); cdl.await();//主线程调用该方法,阻塞 cdl.countDown();//每个work执行完毕都调用1次,当减到0,主线程被唤醒
public class CountDownLatchTest{
//等待2个节点完成
static CountDownLatch c = new CountDownLatch(2);
public static void main(String[] args)throws InterruptedException{
new Thread(new Runnable(){
@Override
public void run(){
System.out.println(1);
c.countDown();//每调用一次,节点减1
System.out.println(2);
}
}).start();
c.await();//阻塞线程
System.out.println("3");
}
}
CylicBarrier(基于ReentrantLock+Condition实现):可循环使用屏障
来了10个线程,这10个线程互相等待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这10个线程继续互相等待,到齐后再一起 被唤醒。每一轮被称为一个Generation,就是一次同步点(屏障点)。
public class CyclicBarrierTest{
static CyclicBarrier c = new CyclicBarrier(2);//屏障拦截的线程数量是2
public static void main(String[] args){
new Thread(
@Override
public void run(){
try{
c.await();//已经到达了屏障,当前线程被阻塞
}catch(Exception e){
}
System.out.println(1);
}
).start();
//如果构造改成3,则主子线程都会等待,因为没有第三个线程执行await到达屏障
//因为主线程和子线程的调度是由CPU决定的,两个线程都有可能先执行,所以会产生两种输出(1 2 或 2 1)
try{
c.await();
}catch(Exception e){
}
System.out.println(2);
}
}
public class CyClicBarrierTest{
//该构造在线程到达屏障时,优先执行barrierAction
//拦截线程数量是2,必须等待第一个线程和A都执行完,再执行主线程(先执行主线程)
static CyclicBarrier c = new CyclicBarrier(2,new A());
public static void main(String[] args){
new Thread(new Runnable(){
@Override
public void run(){
try{
c.await();
}catch(Exception e){
}
System.out.println(1);
}
}).start();
try{
c.await();
}catch(Exception e){
}
System.out.println(2);
}
static class A implements Runnable{
@Override
public void run(){
System.out.println(3); //结果 3 1 2
}
}
}
public clas BankWaterService implements Runnable{
//创建四个屏障,优先执行当前类的线程任务
private CyclicBarrier c = new CyclicBarrier(4.this);
//创建四个线程的线程池
private Executor executor = Executors.newFixedThreadPool(4);
private ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<>();
private void count(){
for(int i=0;i<4;i++){
executor.execute(new Runnable(){
public void run(){
map.put(Thread.currentThread().getName(),1);
c.await();//省略异常处理,阻塞
}
});
}
}
//将四个线程的结果进行汇总
public void run(){
int result = 0;
for(Entry<String,Integer> s:map.entrySet()){
result += s.getValue();
}
map.put("result",result);
System.out.println(result);
}
public static void main(String[] args){
BankWaterService bs = new BankWaterService();
bs.count();
}
}
public class CyclicBarrierTest{
static CyClicBarrier c = new CyclicBarrier(2);
public static void main(String[] args){
Thread t = new Thread(new Runnable(){
public void run(){
c.await();//阻塞(省略处理异常)
}
});
t.start();
t.interrupt();
try{
c.await();
}catch(Exception e){
//isBroken()方法用来了解阻塞的线程是否被中断
System.out.println(c.isBoken());//true
}
}
}
Exchanger
当一个线程调用exchange准备和其他线程交换数据的时候,一种是没有其他线程要交换数据,自己只能自旋或者阻塞,等待;另一种是恰好有其他线程在 Slot 里面等着,那么和对方交换。
Exchange<String> ex = new Exchange<>(); ThreadA a = new ThreadA(ex);
public class ExchagerTest{
private static final Exchange<String> e = new Exchanger<>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args){
threadPool.execute(new Runnable(){
public void run(){
String a = "银行流水A"
e.exchange(a);//交换数据(省略处理异常)
}
});
threadPool.execute(new Runnable(){
public void run(){
String b = "银行流水B"
e.exchange(b);//如果两个线程有一个没有执行exchange()方法,则会一直等待
}
});
}
}
Phaser(替代CyclicBarrier和CountDownLatch)
Phaser ph = new Phaser(10); ph.awaitAdance(ph.getPhase());//阻塞,等待当前同步节点完成 ph.arrive();//每个线程完成任务调用1次,表达同步到达了该同步点
Phaser ph = new Phaser(10); ph.arriveAndAwaitAdvance();//第一个同步点 ph.arriveAndAwaitAdvance();//第二个同步点
