Java中的锁以及并发工具类

概念上见名知义:

  • 独占锁(排他锁)VS共享锁
  • 公平锁VS公平锁
  • 乐观锁VS悲观锁
  • 可重入锁
  • 自旋锁

JDK中对锁的实现

Lock接口

  • ReentrantLock互斥锁(重入锁)实现类(逻辑实现在内部类Sync中)

Sync父类AbstractQueueSynchronizer队列同步器AQS

  • ReadWriteLock接口,实现类为ReentrantReadWriteLock

Lock readLock()

Lock writeLock()

  • Condition接口,配合Lock进行线程的阻塞和唤醒

通过Lock实现类的对象创建,newCondition()

await()

signal()

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException{
	lock.lock();
  try{
  	condition.await();
  }finally{
  	lock.unlock();
  }
}
public void conditionSignal() throws InterruptedException{
	lock.lock();
  try{
  	condition.signal();
  }finally{
  	lock.unlock();
  }
}

Object监视器方法和Condition接口的对比:

总结:

ReentrantLock:读读互斥,读写互斥,写写互斥

ReentrantReadWriteLock:读读不互斥,读写互斥,写写互斥

引入JDK8新增的StampedLock

StampedLock:读读不互斥,读写不互斥,写写互斥

====================================================

除了上述锁与Condition搭配的同步方式

其他同步工具类如下:

Semaphore(也是基于AQS):控制同时访问特定资源的线程数量,协调各个线程保证合理使用公共资源

主要方法:

intavailablePermits() 返回此信号量中当前可用的许可证数

intgetQueueLength() 返回正在等待获取许可的线程数

booleanhasQueuedThreads() 是否由线程正在等待获取许可

void reducePermits(int reduction) 减少reduction个许可,protected修饰

Collection getQueuedThreads() 返回所有等待获取许可的线程集合,protected修饰

Semaphore s = new Semaphore(10,true);//10个共享资源,公平
s.acquire();//每次获取一个,如果获取不到线程会阻塞
s.release();//释放
public class SemaphoreTest{
  private static final int HTREAD_COUNT = 30;
  private static ExecutorService threadPool = Executors.newFixedThreadPool(HTREAD_COUNT); 
  
  //控制10个线程的流量
  private static Semaphore s = new Semaphore(10);
  public static void main(String[] args){
  	for(int i=0;i<HTREAD_COUNT;i++){
		threadPool.execute(new Runnable(){
			public void run(){
				try{
				  s.acquire();//获取许可
				  System.out.println("save");
				  s.release();//归还许可
				}catch(){
				}
			}
		});
	}
	
	threadPool.shutdown();
  }
}

CountDownLatch(也是基于AQS):等待多线程完成

//一个主线程要等待10个work线程全部执行结束后才能退出
CountDownLatch cdl = new CountDownLatch(10);
cdl.await();//主线程调用该方法,阻塞
cdl.countDown();//每个work执行完毕都调用1次,当减到0,主线程被唤醒
public class CountDownLatchTest{
  
  	//等待2个节点完成
	static CountDownLatch c = new CountDownLatch(2);
  public static void main(String[] args)throws InterruptedException{
  	new Thread(new Runnable(){
		@Override
	  	public void run(){
			System.out.println(1);
		  	c.countDown();//每调用一次,节点减1
		  	System.out.println(2);
		}
	}).start();
	
	c.await();//阻塞线程
	System.out.println("3");
  }
}

CylicBarrier(基于ReentrantLock+Condition实现):可循环使用屏障

来了10个线程,这10个线程互相等待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这10个线程继续互相等待,到齐后再一起 被唤醒。每一轮被称为一个Generation,就是一次同步点(屏障点)。

public class CyclicBarrierTest{
	static CyclicBarrier c = new CyclicBarrier(2);//屏障拦截的线程数量是2
  public static void main(String[] args){
  	new Thread(
		@Override
	    public void run(){
			try{
				c.await();//已经到达了屏障,当前线程被阻塞
			}catch(Exception e){
			}
		  
		  	System.out.println(1);
		} 
	).start();
	
	//如果构造改成3,则主子线程都会等待,因为没有第三个线程执行await到达屏障
	//因为主线程和子线程的调度是由CPU决定的,两个线程都有可能先执行,所以会产生两种输出(1 2 或 2 1)
	try{
		c.await();
	}catch(Exception e){
	}
	System.out.println(2);
  }
}
public class CyClicBarrierTest{
  //该构造在线程到达屏障时,优先执行barrierAction
  //拦截线程数量是2,必须等待第一个线程和A都执行完,再执行主线程(先执行主线程)
	static CyclicBarrier c = new CyclicBarrier(2,new A());
	public static void main(String[] args){
		new Thread(new Runnable(){
			@Override
		  public void run(){
		  	try{
				c.await();
			}catch(Exception e){
			}
			System.out.println(1);
		  }
		}).start();
	  
	  try{
	  	c.await();
	  }catch(Exception e){
	  }
	  System.out.println(2);
	}
  
  static class A implements Runnable{
  	@Override
	public void run(){
		System.out.println(3);  //结果 3 1 2
	}
  }
}
public clas BankWaterService implements Runnable{
	//创建四个屏障,优先执行当前类的线程任务
  private CyclicBarrier c = new CyclicBarrier(4.this);
	
  //创建四个线程的线程池
  private Executor executor = Executors.newFixedThreadPool(4);
  private ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<>();
  
  private void count(){
  	for(int i=0;i<4;i++){
		executor.execute(new Runnable(){
			public void run(){
				map.put(Thread.currentThread().getName(),1);
			  c.await();//省略异常处理,阻塞
			}
		});
	}
  }
  
  //将四个线程的结果进行汇总
 	public void run(){
		int result = 0;
	  for(Entry<String,Integer> s:map.entrySet()){
	  	result += s.getValue();
	  }
	  map.put("result",result);
	  System.out.println(result);
	}
  
  public static void main(String[] args){
  	BankWaterService bs = new BankWaterService();
	bs.count();
  }
}
public class CyclicBarrierTest{
	static CyClicBarrier c = new CyclicBarrier(2);
  public static void main(String[] args){
  	Thread t = new Thread(new Runnable(){
		public void run(){
			c.await();//阻塞(省略处理异常)
		}
	});
	t.start();
	t.interrupt();
	
	try{
	c.await();
	}catch(Exception e){
	  //isBroken()方法用来了解阻塞的线程是否被中断
		System.out.println(c.isBoken());//true
	}
  }
}

Exchanger

当一个线程调用exchange准备和其他线程交换数据的时候,一种是没有其他线程要交换数据,自己只能自旋或者阻塞,等待;另一种是恰好有其他线程在 Slot 里面等着,那么和对方交换。

Exchange<String> ex = new Exchange<>();

ThreadA a = new ThreadA(ex);

public class ExchagerTest{
	private static final Exchange<String> e = new Exchanger<>();
 	private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
  	public static void main(String[] args){
		threadPool.execute(new Runnable(){
			public void run(){
				String a = "银行流水A"
				e.exchange(a);//交换数据(省略处理异常)
			}
		});
	  
	  threadPool.execute(new Runnable(){
	  	public void run(){
			String b = "银行流水B"
			e.exchange(b);//如果两个线程有一个没有执行exchange()方法,则会一直等待
		}
	  });
	}
}

Phaser(替代CyclicBarrier和CountDownLatch

Phaser ph = new Phaser(10);
ph.awaitAdance(ph.getPhase());//阻塞,等待当前同步节点完成
ph.arrive();//每个线程完成任务调用1次,表达同步到达了该同步点
Phaser ph = new Phaser(10);
ph.arriveAndAwaitAdvance();//第一个同步点

ph.arriveAndAwaitAdvance();//第二个同步点

全部评论
是谁又狠狠学到了😋
点赞 回复
分享
发布于 2023-05-29 22:18 广东
进来学点新芝士
点赞 回复
分享
发布于 2023-05-29 22:21 陕西
滴滴
校招火热招聘中
官网直投
Java就喜欢花里胡哨的名字
点赞 回复
分享
发布于 2023-06-04 09:54 广西

相关推荐

3 3 评论
分享
牛客网
牛客企业服务