并发工具类
Semaphore限制线程并发数
- 其构造参数new Semaphore(10,true)是初始允许的线程数量,true/false决定是否公平信号量
- accquire(2) 动态添加许可数量
- acquireUninterruptibly() 使等待进入acquire()方法的线程不允许被中断
- availablePermits() 返回Semaphore对象中当前可用许可数
- drainPermits() 可获取并返回立即可用的所有许可个数,并将可用许可置0
- getQueueLength() 取得等待许可的线程个数
- hasQueuedThreads()判断有没有线程再等待这个许可
- tryAcquire() 尝试获得一个许可,如果获取不到返回false.添加参数,尝试获得X个许可
- tryAcquire(long timeout,TimeUnit unit) 在指定时间内尝试获取一个许可,如果获取不到返回false
- tryAcquire(int permits,long timeout,TimeUnit unit)在指定时间内尝试获取X个许可,如果获取不到返回false
public class Service{ //同一时间最初最多允许2个线程acquire和release之间的代码 private Semaphore semaphore = new Semaphore(2); public void testMethod(){ try{ semaphore.acquire();//acquire(int permits)每调用一次就使用多少个许可 System.out.println(Thread.currentThread().getName()+"begin timer"+System.currentTimeMillis()); semaphore.release(); System.out.println(Thread.currentThread().getName()+"end timer"+System.currentTimeMillis()); }catch(InterruptedException e){ e.printStackTrace(); } } public static void main(String[] args){ Service service = new Service(); ThreadA a = new ThreadA(service); a.setName("A"); ThreadB b = new ThreadB(service); b.setName("B"); ThreadC c = new ThreadC(service); c.setName("C"); a.start(); b.start(); c.start(); } } //创建三个线程(略) public class ThreadA extends Thread{ private Service service; public ThreadA(Service service){ super(); this.service = service; } @Override public void run(){ service.testMethod(); } }
Exchanger两个线程之间交换数据
- exchange() 等待其他线程来取得数据,如果没有其他线程取得数据,则一直阻塞等待。
- exchange(V x,long timeout,TimeUnit unit)在指定的时间内没有其他线程获取数据,超时异常
public class ThreadA extends Thread{ private Exchange<String> exchange; public ThreadA(Exchanger<String> exchanger){ super(); this.exchanger = exchanger; } public void run(){ try{ System.out.println("在线程A中得到线程B的值="+exchanger.exchange("中国人A")); System.out.println("A end!"); }catch(){} } } public class ThreadB extends Thread{ private Exchange<String> exchange; public ThreadA(Exchanger<String> exchanger){ super(); this.exchanger = exchanger; } public void run(){ try{ System.out.println("在线程B中得到线程A的值="+exchanger.exchange("中国人B")); System.out.println("A end!"); }catch(){} } } public class Run{ public static void main(String[] args){ Exchanger<String> exchanger = new Exchanger<>(); ThreadA a = new ThreadA(exchanger); ThreadB b = new ThreadB(exchanger); a.start(); System.out.println("main end!"); } }
CountDownLatch 线程以组团的方式执行任务
- await()判断计数是否为0,如果不为0则等待,
- countDown()方法,其他线程调用,计数减1,当计数减到0时,等待的线程继续运行
- getCount() 获取当前的计数个数
计数无法被重置,需要重置计数,使用CyclicBarrier
public class MyService{ private CountDownLatch down = new CountDownLatch(1); public void testMethod(){ try{ System.out.println("A"); down.await();//判断计数是否为0,不为0就等待 System.out.println("B"); }catch(InterruptedException e){ e.printStackTrace(); } } public void downMethod(){ System.out.println("x"); down.countDown();//其他线程调用将计数减1 } } public class MyThread extends Thread{ private MyService myService; public MyThread(MyService myService){ super(); this.myService = myService; } public void run(){ myService.testMethod(); } } public class Run{ public static void main(String[] args){ MyService service = new MyService(); MyThread t = new MyThread(service); t.start(); Thread.sleep(2000); service.downMethod(); } }
public class MyThread extends Thread{ private CountDownLatch maxRunner; public MyThread(CountDownLatch maxRunner){ super(); this.maxRunner = maxRunner; } public void run(){ try{ Thread.sleep(2000); maxRunner.countDown();//计数减1 }catch(Exception e){ e.printStackTrace(); } } } public class Run{ public static void main(){ //初始化为10 CountDownLatch maxRunner = new CountDownLatch(10); MyThread[] tArray = new MyThread[Integer.parseInt(""+maxRunner.getCount())]; //启动十个线程 for(int i=0;i<tArray.length;i++){ tArray[i] = new MyThread(maxRunner); tArray[i].setName("线程"+(i+1)); tArray[i].start } //直到子线程全部执行,主线程才结束阻塞 maxRunner.await(); System.out.println("线程全部执行"); } }
CyclicBarrier循环地实现一起做任务的的目标
一组线程互相等待直到达到某个公共屏障点,其公共屏障点可以重用。
- getNumberWaiting() 获取有几个线程已经到达屏障点
- isBroken() 查询此屏障是否处于损坏状态
- await(long timeout,TimeUnit unit) 指定的时间内达到Parties数量则继续向下运行,否则出现超时,则抛出TimeoutException异常。cyclicBarrier.await(2,TimeUnit.SECONDS);
- getNumverWaiting() 有几个线程已经到达了屏障点
- getParties() 取得parties个数
- reset() 重置屏障,线程会出现Broken异常
public class MyThread extends Thread{ private CyclicBarrier cb; public MyThread(CyclicBarrier cb){ super(); this.cb = cb; } public void run(){ try{ Thread.sleep((int)(Math.random()*1000)); System.out.println(Thread.currentThread().getName()+"到了"+System.currentTimeMillis()); cb.await();//等待(5个线程都执行了该await方法才能继续执行) }catch(Exception e){ e.printStackTrace(); } } } public class Run{ public static void main(String[] args){ CyclicBarrier cb = new CyclicBarrier(5,new Runnable(){ public void run(){ System.out.println("全部到齐"); } }); //创建5个子线程 MyThread[] threadArray = new MyThread[5]; for(int i=0;i<threadArray.length;i++){ threadArray[i] = new MyThread(cb); } //启动 for(int i=0;i<threadArray.length;i++){ threadArray[i].start(); } } }
public class MyThread extends Thread{ private CyclicBarrier cb; public MyThread(CyclicBarrier cb){ super(); this.cb = cb; } public void run(){ try{ System.out.println(Thread.currentThread().getName()+"等待凑齐2个继续运行"+System.currentTimeMillis()); cb.await(); System.out.println(Thread.currentThread().getName()+"已经凑齐两个"+System.currentTimeMillis()); }catch(Exception e){ e.printStackTrace(); } } } public class Test{ public static void main(String[] args){ CyclicBarrier cb = new CyclicBarrier(2,new Runnable(){ public void run(){ System.out.println("全来了"); } }); for(int i=0;i<4;i++){ ThreadA threadA = new ThreadA(cb); threadA.start(); Thread.sleep(2000); } } }
public class ThreadA extends Thread{ private CyclicBarrier cb; public ThreadA(CyclicBarrier cb){ this.cb = cb; } public void run(){ try{ cb.await(); }catch(Exception e){ e.printStackTrace(); } } } public class Test{ public static void main(String[] args){ CyclicBarrier cb = new CyclicBarrier(3); ThreadA threadA1 = new ThreadA(cb); threadA1.start(); Thread.sleep(500); System.out.println(cb.getNumberWaiting());//1 ThreadA threadA2 = new ThreadA(cb); threadA2.start(); Thread.sleep(500); System.out.println(cb.getNumberWaiting());//2 ThreadA threadA3 = new ThreadA(cb); threadA3.start(); Thread.sleep(500); System.out.println(cb.getNumberWaiting());//0 到达屏障点的线程为0,即重置 ThreadA threadA4 = new ThreadA(cb); threadA4.start(); Thread.sleep(500); System.out.println(cb.getNumberWaiting());//1 } }
CyClicBarrier与CountDownLatch的区别:
- CountDownLatch:一个线程或者多个线程,等待另外一个线程或多个线程完成某个事情之后才能继续执行
- CyClicBarrier:多个线程之间相互等待,任何一个线程完成之前,所有的线程都必须等待。重点是其中任何一个线程没有完成,则所有线程都必须等待。(计数属于加法操作)
Phaser JDK7新增 对计数器的操作是加法操作 设置多屏障的功能
- arriveAndAwaitAdvance() 计数不足,线程呈阻塞状态,不能继续向下运行
- arriveAndDeregister() 当前线程退出,使parties减1
- getPhase() 获取已经到达第几个屏障了
- onAdvance() 通过新的屏障时被调用
- getRegisteredParties() 获得注册的parties数量
- register() 每执行一次就动态添加一个parties值
- bulkRegister() 批量增加parties数量(参数中添加需要增加的数量)
- getArrivedParties() 获得未被使用的parties个数
- getUnarrivedParties()
- arrive() 使parties值加1,并且不在屏障出等待,直接向下面的代码继续运行
- awaitAdvance(int phase) 如果传入参数phase值和当前getPhase()方法返回值一样,则在屏障处等待否则向下运行。
- awaitAdvanceInterruptibly(int) 不可中断
- awaitAdvanceInterruptibly(int,long,TimeUnit)指定的栏数等待最大的单位时间,如果在指定的时间内栏数未变,则出现异常,否则向下运行
- forceTermination() 使Phaser对象的屏障功能失效
- isTerminated() 判断Phaser对象是否呈销毁状态
public class PrintTools{ public static Phaser phaser; public static void methodA(){ //A1 begin phaser.arriveAndAwaitAdvance(); //A1 end //A2 begin phaser.arriveAndAwaitAdvance(); //A2 end } public static void methodA(){ //A1 begin Thread.sleep(5000); phaser.arriveAndAwaitAdvance(); //A1 end //A2 begin Thread.sleep(5000); phaser.arriveAndAwaitAdvance(); //A2 end } } //定义三个线程A,B,C 三个线程的run方法调用上面的methodA和methodB (略) //主类中Phaser phaser = new Phaser(3),并启动三个线程
public static void method1(){ //...业务处理 System.out.println(phaser.getRegisteredParties()); phaser.arriveAndDeregister();//使parties减1 System.out.println(phaser.getRegisteredParties()); }
public void run(){ phaser.arriveAndAwaitAdvance(); phaser.getPhase();//1 phaser.arriveAndAwaitAdvance(); phaser.getPhase();//2 } //onAdvance() 通过新的屏障被调用 Phaser phaser = new Phaser(2){ protected boolean onAdvance(int phase,int registeredParties){ return true;//返回true,不等待了,Phaser呈无效、销毁状态 //false 则Phaser继续工作 } };