并发工具类
Semaphore限制线程并发数
- 其构造参数new Semaphore(10,true)是初始允许的线程数量,true/false决定是否公平信号量
- accquire(2) 动态添加许可数量
- acquireUninterruptibly() 使等待进入acquire()方法的线程不允许被中断
- availablePermits() 返回Semaphore对象中当前可用许可数
- drainPermits() 可获取并返回立即可用的所有许可个数,并将可用许可置0
- getQueueLength() 取得等待许可的线程个数
- hasQueuedThreads()判断有没有线程再等待这个许可
- tryAcquire() 尝试获得一个许可,如果获取不到返回false.添加参数,尝试获得X个许可
- tryAcquire(long timeout,TimeUnit unit) 在指定时间内尝试获取一个许可,如果获取不到返回false
- tryAcquire(int permits,long timeout,TimeUnit unit)在指定时间内尝试获取X个许可,如果获取不到返回false
public class Service{
//同一时间最初最多允许2个线程acquire和release之间的代码
private Semaphore semaphore = new Semaphore(2);
public void testMethod(){
try{
semaphore.acquire();//acquire(int permits)每调用一次就使用多少个许可
System.out.println(Thread.currentThread().getName()+"begin timer"+System.currentTimeMillis());
semaphore.release();
System.out.println(Thread.currentThread().getName()+"end timer"+System.currentTimeMillis());
}catch(InterruptedException e){
e.printStackTrace();
}
}
public static void main(String[] args){
Service service = new Service();
ThreadA a = new ThreadA(service);
a.setName("A");
ThreadB b = new ThreadB(service);
b.setName("B");
ThreadC c = new ThreadC(service);
c.setName("C");
a.start();
b.start();
c.start();
}
}
//创建三个线程(略)
public class ThreadA extends Thread{
private Service service;
public ThreadA(Service service){
super();
this.service = service;
}
@Override
public void run(){
service.testMethod();
}
}
Exchanger两个线程之间交换数据
- exchange() 等待其他线程来取得数据,如果没有其他线程取得数据,则一直阻塞等待。
- exchange(V x,long timeout,TimeUnit unit)在指定的时间内没有其他线程获取数据,超时异常
public class ThreadA extends Thread{
private Exchange<String> exchange;
public ThreadA(Exchanger<String> exchanger){
super();
this.exchanger = exchanger;
}
public void run(){
try{
System.out.println("在线程A中得到线程B的值="+exchanger.exchange("中国人A"));
System.out.println("A end!");
}catch(){}
}
}
public class ThreadB extends Thread{
private Exchange<String> exchange;
public ThreadA(Exchanger<String> exchanger){
super();
this.exchanger = exchanger;
}
public void run(){
try{
System.out.println("在线程B中得到线程A的值="+exchanger.exchange("中国人B"));
System.out.println("A end!");
}catch(){}
}
}
public class Run{
public static void main(String[] args){
Exchanger<String> exchanger = new Exchanger<>();
ThreadA a = new ThreadA(exchanger);
ThreadB b = new ThreadB(exchanger);
a.start();
System.out.println("main end!");
}
}
CountDownLatch 线程以组团的方式执行任务
- await()判断计数是否为0,如果不为0则等待,
- countDown()方法,其他线程调用,计数减1,当计数减到0时,等待的线程继续运行
- getCount() 获取当前的计数个数
计数无法被重置,需要重置计数,使用CyclicBarrier
public class MyService{
private CountDownLatch down = new CountDownLatch(1);
public void testMethod(){
try{
System.out.println("A");
down.await();//判断计数是否为0,不为0就等待
System.out.println("B");
}catch(InterruptedException e){
e.printStackTrace();
}
}
public void downMethod(){
System.out.println("x");
down.countDown();//其他线程调用将计数减1
}
}
public class MyThread extends Thread{
private MyService myService;
public MyThread(MyService myService){
super();
this.myService = myService;
}
public void run(){
myService.testMethod();
}
}
public class Run{
public static void main(String[] args){
MyService service = new MyService();
MyThread t = new MyThread(service);
t.start();
Thread.sleep(2000);
service.downMethod();
}
}
public class MyThread extends Thread{
private CountDownLatch maxRunner;
public MyThread(CountDownLatch maxRunner){
super();
this.maxRunner = maxRunner;
}
public void run(){
try{
Thread.sleep(2000);
maxRunner.countDown();//计数减1
}catch(Exception e){
e.printStackTrace();
}
}
}
public class Run{
public static void main(){
//初始化为10
CountDownLatch maxRunner = new CountDownLatch(10);
MyThread[] tArray = new MyThread[Integer.parseInt(""+maxRunner.getCount())];
//启动十个线程
for(int i=0;i<tArray.length;i++){
tArray[i] = new MyThread(maxRunner);
tArray[i].setName("线程"+(i+1));
tArray[i].start
}
//直到子线程全部执行,主线程才结束阻塞
maxRunner.await();
System.out.println("线程全部执行");
}
}
CyclicBarrier循环地实现一起做任务的的目标
一组线程互相等待直到达到某个公共屏障点,其公共屏障点可以重用。
- getNumberWaiting() 获取有几个线程已经到达屏障点
- isBroken() 查询此屏障是否处于损坏状态
- await(long timeout,TimeUnit unit) 指定的时间内达到Parties数量则继续向下运行,否则出现超时,则抛出TimeoutException异常。cyclicBarrier.await(2,TimeUnit.SECONDS);
- getNumverWaiting() 有几个线程已经到达了屏障点
- getParties() 取得parties个数
- reset() 重置屏障,线程会出现Broken异常
public class MyThread extends Thread{
private CyclicBarrier cb;
public MyThread(CyclicBarrier cb){
super();
this.cb = cb;
}
public void run(){
try{
Thread.sleep((int)(Math.random()*1000));
System.out.println(Thread.currentThread().getName()+"到了"+System.currentTimeMillis());
cb.await();//等待(5个线程都执行了该await方法才能继续执行)
}catch(Exception e){
e.printStackTrace();
}
}
}
public class Run{
public static void main(String[] args){
CyclicBarrier cb = new CyclicBarrier(5,new Runnable(){
public void run(){
System.out.println("全部到齐");
}
});
//创建5个子线程
MyThread[] threadArray = new MyThread[5];
for(int i=0;i<threadArray.length;i++){
threadArray[i] = new MyThread(cb);
}
//启动
for(int i=0;i<threadArray.length;i++){
threadArray[i].start();
}
}
}
public class MyThread extends Thread{
private CyclicBarrier cb;
public MyThread(CyclicBarrier cb){
super();
this.cb = cb;
}
public void run(){
try{
System.out.println(Thread.currentThread().getName()+"等待凑齐2个继续运行"+System.currentTimeMillis());
cb.await();
System.out.println(Thread.currentThread().getName()+"已经凑齐两个"+System.currentTimeMillis());
}catch(Exception e){
e.printStackTrace();
}
}
}
public class Test{
public static void main(String[] args){
CyclicBarrier cb = new CyclicBarrier(2,new Runnable(){
public void run(){
System.out.println("全来了");
}
});
for(int i=0;i<4;i++){
ThreadA threadA = new ThreadA(cb);
threadA.start();
Thread.sleep(2000);
}
}
}
public class ThreadA extends Thread{
private CyclicBarrier cb;
public ThreadA(CyclicBarrier cb){
this.cb = cb;
}
public void run(){
try{
cb.await();
}catch(Exception e){
e.printStackTrace();
}
}
}
public class Test{
public static void main(String[] args){
CyclicBarrier cb = new CyclicBarrier(3);
ThreadA threadA1 = new ThreadA(cb);
threadA1.start();
Thread.sleep(500);
System.out.println(cb.getNumberWaiting());//1
ThreadA threadA2 = new ThreadA(cb);
threadA2.start();
Thread.sleep(500);
System.out.println(cb.getNumberWaiting());//2
ThreadA threadA3 = new ThreadA(cb);
threadA3.start();
Thread.sleep(500);
System.out.println(cb.getNumberWaiting());//0 到达屏障点的线程为0,即重置
ThreadA threadA4 = new ThreadA(cb);
threadA4.start();
Thread.sleep(500);
System.out.println(cb.getNumberWaiting());//1
}
}
CyClicBarrier与CountDownLatch的区别:
- CountDownLatch:一个线程或者多个线程,等待另外一个线程或多个线程完成某个事情之后才能继续执行
- CyClicBarrier:多个线程之间相互等待,任何一个线程完成之前,所有的线程都必须等待。重点是其中任何一个线程没有完成,则所有线程都必须等待。(计数属于加法操作)
Phaser JDK7新增 对计数器的操作是加法操作 设置多屏障的功能
- arriveAndAwaitAdvance() 计数不足,线程呈阻塞状态,不能继续向下运行
- arriveAndDeregister() 当前线程退出,使parties减1
- getPhase() 获取已经到达第几个屏障了
- onAdvance() 通过新的屏障时被调用
- getRegisteredParties() 获得注册的parties数量
- register() 每执行一次就动态添加一个parties值
- bulkRegister() 批量增加parties数量(参数中添加需要增加的数量)
- getArrivedParties() 获得未被使用的parties个数
- getUnarrivedParties()
- arrive() 使parties值加1,并且不在屏障出等待,直接向下面的代码继续运行
- awaitAdvance(int phase) 如果传入参数phase值和当前getPhase()方法返回值一样,则在屏障处等待否则向下运行。
- awaitAdvanceInterruptibly(int) 不可中断
- awaitAdvanceInterruptibly(int,long,TimeUnit)指定的栏数等待最大的单位时间,如果在指定的时间内栏数未变,则出现异常,否则向下运行
- forceTermination() 使Phaser对象的屏障功能失效
- isTerminated() 判断Phaser对象是否呈销毁状态
public class PrintTools{
public static Phaser phaser;
public static void methodA(){
//A1 begin
phaser.arriveAndAwaitAdvance();
//A1 end
//A2 begin
phaser.arriveAndAwaitAdvance();
//A2 end
}
public static void methodA(){
//A1 begin
Thread.sleep(5000);
phaser.arriveAndAwaitAdvance();
//A1 end
//A2 begin
Thread.sleep(5000);
phaser.arriveAndAwaitAdvance();
//A2 end
}
}
//定义三个线程A,B,C 三个线程的run方法调用上面的methodA和methodB (略)
//主类中Phaser phaser = new Phaser(3),并启动三个线程
public static void method1(){
//...业务处理
System.out.println(phaser.getRegisteredParties());
phaser.arriveAndDeregister();//使parties减1
System.out.println(phaser.getRegisteredParties());
}
public void run(){
phaser.arriveAndAwaitAdvance();
phaser.getPhase();//1
phaser.arriveAndAwaitAdvance();
phaser.getPhase();//2
}
//onAdvance() 通过新的屏障被调用
Phaser phaser = new Phaser(2){
protected boolean onAdvance(int phase,int registeredParties){
return true;//返回true,不等待了,Phaser呈无效、销毁状态
//false 则Phaser继续工作
}
};