并发工具类

Semaphore限制线程并发数

  • 其构造参数new Semaphore(10,true)是初始允许的线程数量,true/false决定是否公平信号量
  • accquire(2) 动态添加许可数量
  • acquireUninterruptibly() 使等待进入acquire()方法的线程不允许被中断
  • availablePermits() 返回Semaphore对象中当前可用许可数
  • drainPermits() 可获取并返回立即可用的所有许可个数,并将可用许可置0
  • getQueueLength() 取得等待许可的线程个数
  • hasQueuedThreads()判断有没有线程再等待这个许可
  • tryAcquire() 尝试获得一个许可,如果获取不到返回false.添加参数,尝试获得X个许可
  • tryAcquire(long timeout,TimeUnit unit) 在指定时间内尝试获取一个许可,如果获取不到返回false
  • tryAcquire(int permits,long timeout,TimeUnit unit)在指定时间内尝试获取X个许可,如果获取不到返回false

public class Service{
  //同一时间最初最多允许2个线程acquire和release之间的代码
	private Semaphore semaphore = new Semaphore(2);
	public void testMethod(){
		try{
			semaphore.acquire();//acquire(int permits)每调用一次就使用多少个许可
		  	System.out.println(Thread.currentThread().getName()+"begin timer"+System.currentTimeMillis());
		   semaphore.release();
		  System.out.println(Thread.currentThread().getName()+"end timer"+System.currentTimeMillis());
		}catch(InterruptedException e){
			e.printStackTrace();
		}
	}
  
  public static void main(String[] args){
	Service service = new Service();
  	
	ThreadA a = new ThreadA(service);
	a.setName("A");
	ThreadB b = new ThreadB(service);
	b.setName("B");
	ThreadC c = new ThreadC(service);
	c.setName("C");
	
	a.start();
	b.start();
	c.start();
  }
}

//创建三个线程(略)
public class ThreadA extends Thread{
	private Service service;
  public ThreadA(Service service){
  	super();
	this.service = service;
  }
  @Override
  public void run(){
  	service.testMethod();
  }
}

Exchanger两个线程之间交换数据

  • exchange() 等待其他线程来取得数据,如果没有其他线程取得数据,则一直阻塞等待。
  • exchange(V x,long timeout,TimeUnit unit)在指定的时间内没有其他线程获取数据,超时异常

public class ThreadA extends Thread{
	private Exchange<String> exchange;
  	public ThreadA(Exchanger<String> exchanger){
		super();
	    this.exchanger = exchanger;
	}
  	public void run(){
		try{
			System.out.println("在线程A中得到线程B的值="+exchanger.exchange("中国人A"));
			System.out.println("A end!");
		}catch(){}
	}
  
}

public class ThreadB extends Thread{
	private Exchange<String> exchange;
  	public ThreadA(Exchanger<String> exchanger){
		super();
	    this.exchanger = exchanger;
	}
  	public void run(){
		try{
			System.out.println("在线程B中得到线程A的值="+exchanger.exchange("中国人B"));
			System.out.println("A end!");
		}catch(){}
	}
  
}

public class Run{
	public static void main(String[] args){
	  Exchanger<String> exchanger = new Exchanger<>();
	  ThreadA a = new ThreadA(exchanger);
	  ThreadB b = new ThreadB(exchanger);
	  
	  a.start();
	  System.out.println("main end!");
	}
}

CountDownLatch 线程以组团的方式执行任务

  • await()判断计数是否为0,如果不为0则等待,
  • countDown()方法,其他线程调用,计数减1,当计数减到0时,等待的线程继续运行
  • getCount() 获取当前的计数个数

计数无法被重置,需要重置计数,使用CyclicBarrier

public class MyService{
	private CountDownLatch down = new CountDownLatch(1);
  
  	public void testMethod(){
		try{
			System.out.println("A");
		  	down.await();//判断计数是否为0,不为0就等待
		  	System.out.println("B");
		}catch(InterruptedException e){
			e.printStackTrace();
		}
	}
  
  	public void downMethod(){
		System.out.println("x");
	    down.countDown();//其他线程调用将计数减1
	}
}

public class MyThread extends Thread{
	private MyService myService;
  	public MyThread(MyService myService){
		super();
	  	this.myService = myService;
	}
  	public void run(){
		myService.testMethod();
	}
}

public class Run{
	public static void main(String[] args){
		MyService service = new MyService();
	  	MyThread t = new MyThread(service);
		t.start();
		Thread.sleep(2000);
		service.downMethod();
	}
}
public class MyThread extends Thread{
	private CountDownLatch maxRunner;
  	public MyThread(CountDownLatch maxRunner){
		super();
	  	this.maxRunner = maxRunner;
	}
  	public void run(){
		try{
			Thread.sleep(2000);
		  	maxRunner.countDown();//计数减1
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

public class Run{
	public static void main(){
	  	//初始化为10
		CountDownLatch maxRunner = new CountDownLatch(10);
	  	MyThread[] tArray = new MyThread[Integer.parseInt(""+maxRunner.getCount())];
		
	  //启动十个线程
	  	for(int i=0;i<tArray.length;i++){
			tArray[i] = new MyThread(maxRunner);
		  	tArray[i].setName("线程"+(i+1));
		  	tArray[i].start
		}
	  //直到子线程全部执行,主线程才结束阻塞
	  maxRunner.await();
	  System.out.println("线程全部执行");
	}
}

CyclicBarrier循环地实现一起做任务的的目标

一组线程互相等待直到达到某个公共屏障点,其公共屏障点可以重用。

  • getNumberWaiting() 获取有几个线程已经到达屏障点
  • isBroken() 查询此屏障是否处于损坏状态
  • await(long timeout,TimeUnit unit) 指定的时间内达到Parties数量则继续向下运行,否则出现超时,则抛出TimeoutException异常。cyclicBarrier.await(2,TimeUnit.SECONDS);
  • getNumverWaiting() 有几个线程已经到达了屏障点
  • getParties() 取得parties个数
  • reset() 重置屏障,线程会出现Broken异常

public class MyThread extends Thread{
	private CyclicBarrier cb;
  	public MyThread(CyclicBarrier cb){
		super();
	  	this.cb = cb;
	}
  	public void run(){
		try{
			Thread.sleep((int)(Math.random()*1000));
		  	System.out.println(Thread.currentThread().getName()+"到了"+System.currentTimeMillis());
		  	cb.await();//等待(5个线程都执行了该await方法才能继续执行)
		}catch(Exception e){
		  	e.printStackTrace();
		}
	}
}

public class Run{
	public static void main(String[] args){
		CyclicBarrier cb = new CyclicBarrier(5,new Runnable(){
			public void run(){
				System.out.println("全部到齐");
			}
		});
	  
	  	//创建5个子线程
	  MyThread[] threadArray = new MyThread[5];
	  for(int i=0;i<threadArray.length;i++){
	  	threadArray[i] = new MyThread(cb);
	  }
	  //启动
	  for(int i=0;i<threadArray.length;i++){
	  	threadArray[i].start();
	  }
	}
}
public class MyThread extends Thread{
	private CyclicBarrier cb;
  	public MyThread(CyclicBarrier cb){
		super();
	  	this.cb = cb;
	}
  	public void run(){
		try{
		  	System.out.println(Thread.currentThread().getName()+"等待凑齐2个继续运行"+System.currentTimeMillis());
		  	cb.await();
		  System.out.println(Thread.currentThread().getName()+"已经凑齐两个"+System.currentTimeMillis());
		}catch(Exception e){
		  	e.printStackTrace();
		}
	}
}

public class Test{
	public static void main(String[] args){
		CyclicBarrier cb = new CyclicBarrier(2,new Runnable(){
			public void run(){
				System.out.println("全来了");
			}
		});
	  
	  for(int i=0;i<4;i++){
	  	ThreadA threadA = new ThreadA(cb);
		threadA.start();
		Thread.sleep(2000);
	  }
	}
}
public class ThreadA extends Thread{
	private CyclicBarrier cb;
  	public ThreadA(CyclicBarrier cb){
		this.cb = cb;
	}
  	public void run(){
		try{
			cb.await();
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

public class Test{
	public static void main(String[] args){
		CyclicBarrier cb = new CyclicBarrier(3);
	  
	  	ThreadA threadA1 = new ThreadA(cb);
	  	threadA1.start();
	  	Thread.sleep(500);
	  	System.out.println(cb.getNumberWaiting());//1
	  
	  ThreadA threadA2 = new ThreadA(cb);
	  	threadA2.start();
	  	Thread.sleep(500);
	  	System.out.println(cb.getNumberWaiting());//2
	  
	  ThreadA threadA3 = new ThreadA(cb);
	  	threadA3.start();
	  	Thread.sleep(500);
	  	System.out.println(cb.getNumberWaiting());//0  到达屏障点的线程为0,即重置
	  
	  ThreadA threadA4 = new ThreadA(cb);
	  	threadA4.start();
	  	Thread.sleep(500);
	  	System.out.println(cb.getNumberWaiting());//1
	}
}

CyClicBarrier与CountDownLatch的区别:

  • CountDownLatch:一个线程或者多个线程,等待另外一个线程或多个线程完成某个事情之后才能继续执行
  • CyClicBarrier:多个线程之间相互等待,任何一个线程完成之前,所有的线程都必须等待。重点是其中任何一个线程没有完成,则所有线程都必须等待。(计数属于加法操作)

Phaser JDK7新增 对计数器的操作是加法操作 设置多屏障的功能

  • arriveAndAwaitAdvance() 计数不足,线程呈阻塞状态,不能继续向下运行
  • arriveAndDeregister() 当前线程退出,使parties减1
  • getPhase() 获取已经到达第几个屏障了
  • onAdvance() 通过新的屏障时被调用
  • getRegisteredParties() 获得注册的parties数量
  • register() 每执行一次就动态添加一个parties值
  • bulkRegister() 批量增加parties数量(参数中添加需要增加的数量)
  • getArrivedParties() 获得未被使用的parties个数
  • getUnarrivedParties()
  • arrive() 使parties值加1,并且不在屏障出等待,直接向下面的代码继续运行
  • awaitAdvance(int phase) 如果传入参数phase值和当前getPhase()方法返回值一样,则在屏障处等待否则向下运行。
  • awaitAdvanceInterruptibly(int) 不可中断
  • awaitAdvanceInterruptibly(int,long,TimeUnit)指定的栏数等待最大的单位时间,如果在指定的时间内栏数未变,则出现异常,否则向下运行
  • forceTermination() 使Phaser对象的屏障功能失效
  • isTerminated() 判断Phaser对象是否呈销毁状态

public class PrintTools{
	public static Phaser phaser;
  
  	public static void methodA(){
	  	//A1 begin
		phaser.arriveAndAwaitAdvance();
	  	//A1 end
	  	//A2 begin
		phaser.arriveAndAwaitAdvance();
	  	//A2 end
	}
  	public static void methodA(){
		//A1 begin
	  	Thread.sleep(5000);
		phaser.arriveAndAwaitAdvance();
	  	//A1 end
	  	//A2 begin
	  	Thread.sleep(5000);
		phaser.arriveAndAwaitAdvance();
	  	//A2 end
	}
}

//定义三个线程A,B,C 三个线程的run方法调用上面的methodA和methodB (略)

//主类中Phaser phaser = new Phaser(3),并启动三个线程
public static void method1(){
	//...业务处理
  	System.out.println(phaser.getRegisteredParties());
  	phaser.arriveAndDeregister();//使parties减1
  	System.out.println(phaser.getRegisteredParties());
}
public void run(){
	phaser.arriveAndAwaitAdvance();
  	phaser.getPhase();//1
  
  	phaser.arriveAndAwaitAdvance();
  	phaser.getPhase();//2
}

//onAdvance()  通过新的屏障被调用
Phaser phaser = new Phaser(2){
	protected boolean onAdvance(int phase,int registeredParties){
		return true;//返回true,不等待了,Phaser呈无效、销毁状态
	  //false 则Phaser继续工作
	}
};

全部评论

相关推荐

点赞 收藏 评论
分享
牛客网
牛客企业服务