并发容器(非阻塞队列+阻塞队列)

非阻塞

ConcurrentHashMap

@Test
public void test(){
	ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<>();
	map.put("jordan",23);
  	map.put("kobe",24);
  
  	if(map.containsKey("jordan") && map.get("jordan").equals(23)){
		map.remove("jordan");
	}
}

CopyOnWriteArrayList

set/add/remove等写操作都使用了ReentrantLock的lock方法加锁,读操作不需要加锁

@Test
public void test(){
	CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
  	list.add("jordan");
  	list.add("kobe");
  	list.add(1,"james");//下标从零开始
  	System.out.println(list.get(1));
  
  if(list.contains("kobe")){
  	Interator<String> value = list.iterator();
	while(value.hasNext()){
		System.out.println(value.next());
	}
  }
}

CopyOnWriteArraySet

CopyOnWriteArraySet<String> list = new CopyOnWriteArraySet<>();
list.add("jordan");
list.add("kobe");
if(list.contains("kobe")){
	Interator<String> value = list.iterator();
  	while(value.hasNext()){
		System.out.println(value.next());
	}
}

==========================================================

BlockingQueue 带阻塞功能的队列

ArrayBlockingQueue数组阻塞队列

先进先出,新元素插入到队尾,队列获取操作从队头开始。

@Test
public void test(){
	BlockingQueue<String> bq = new ArrayBlockintQueue<>(16);
  
  	//创建4个线程,从队列中获取数据
  	for(int i=0;i<4;i++){
		new Thread(new Runnable(){
			public void run(){
				while(true){
					try{
						String log = (String)bq.take();
					}catch(Exception e){
					
					}
				}
			}
		}).start();
	}
  
  	//向队列中添加数据
  	for(int i=0;i<16;i++){
		String log = (i+1) +"-->";
	  	bq.put(log);
	}
}

LinkedBlockingQueue链表阻塞队列

//使用方法同上述ArrayBlockingQueue一样
BlockingQueue<String> bq = new LinkedBlocking<>(16);

PriorityBlockingQueue优先级阻塞队列

优先级的判断通过构造函数传入的Compator对象决定

DelayQueue支持延时获取元素的使用优先级队列的实现的无界阻塞队列。

  • 缓存系统的设计,通过该队列保护缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦从队列中获取元素时,表示缓存有效期到了。
  • 定时任务调度,通过该队列保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取任务,就开始职系执行,比如TimerQueue就是使用DelayQueue实现的。
public class Student implements Delayed{
	private String name;
  	private long submitTime;//交卷时间
  	private long workTime;//考试时间
  	public String getName(){
		return this.name+"交卷用时"+workTime;
	}
  	public Stutent(String name,long submitTime){
		this.name=name;
	  	this.workTime=submitTime;
	  	this.submitTime=TimeUnit.NANOSECONDS.convert(submitTime,TimeUnit.MILLINSECONDS)+System.nanoTime();
	  	System.out.println(this.name+"交卷,用时"+workTime); 
	}
  	public long getDelay(TimeUnit unit){
		return unit.convert(submitTime-System.nanoTime(),unit.NANOSECONDS);
	}
  	public int compareTo(Delayed o){
		return submitTime>that.submitTime?1:(submitTime<that.submitTime?-1:0);
	}
}

public class DelayQueueTest{
	public void static main(String[] args){
		final DelayQueue<Student> bq = new DelayQueue<>();
	  	for(int i=0;i<5;i++){
			Student student = new Student("学生"+1,Math.round(Math.random()*10+i));
		  	System.out.println("bq.peek()"+bq.peek().getName());
		}
	}
}

SynchronousQueue不存储元素的阻塞队列

每一个put操作必须等待一个take操作,否则不能继续添加元素

public class SynchronousQueueTest{
	public static void main(){
		System.out.println("begin:"+(System.currentTimeMillis()/1000));
	  	
	  	final SynchronousQueue<String> sq = new SynchronousQueue<>();
	  	final Semaphore sem = new Semaphore(1);
	  
	  	for(int i=0;i<10;i++){//启动十个线程
			new Thread(new Runnable(){
				public void run(){
					try{
						sem.acquire();
					  	String input = sq.take();
					  	String output = TestDo.doSome(intput);
						System.out.println(Thread.currentThread().getName()+":"+output);
						sem.release();
					}catch(){}
				}
			}).start();
		  
		  for(int i=0;i<10;i++){
		  	String input = i+"";
			try{
				sq.put(input);
			}catch(){
			}
		  }
		}
	}
}

class TestDo{
	public static String doSome(String input){
		Try{
			Thread.sleep(1000);
		}catch(){}
	  	String out = input + ":" (System.currentTimeMillis()/1000);
		return output;
	}
}

LinkedBlockingDeque链表双向阻塞队列

可以从队列的两端进行插入或移除元素

与其他阻塞队列相比,多了addFirst/addLast/offerFirst/offerLast/peekFirst/peekLast

LinkedTransferQueue链表结构组成的无界传输阻塞队列

  • transfer(E e) 若当前存在一个正在等待的消费者线程,即立刻移交之;否则会插入当前元素e到队列尾部,并且等待进入阻塞状态,直到有消费者线程取走该元素
  • tryTransfer(E e),若当前存在一个正在等待获取消费者线程(使用take或poll方法),则使用该方法会立刻传输对象元素e;若不存在,则返回false,并且不进入队列,这是一个不阻塞的操作。
  • tryTransfer(E e,long timeout,TimeUnit unit),若当前存在一个正在等待获取的消费者线程,会立即传输给它,否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉。若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除
  • hasWaitingConsumer() 判断是否为终端消费者线程
  • getWaitingConsumerCount() 获取终端所有等待获取元素的消费者线程数量
  • size() 因为队列的异步特性,检测当前队列的元素个数需要逐一迭代,可能会得到

public void Consumer implements Runnable{
	private final TransferQueue<String> queue;
  	public Consumer(TransferQueue<String> queue){
		this.queue = queue;
	}
  public void run(){
  	try{
		System.out.println("Consumer" +Thread.currentThread().getName()+queue.take());
	}catch(){}
  }
}

public class Producer implements Runnable{
	private final TransferQueue<String> queue;
  	public Producer(TransferQueue<String> queue){
		this.queue = queue;
	}
  	private String produce(){
		return "your luck number"+(new Random().nextInt(100));
	}
  
  	public void run(){
		try{
			while(true){
				if(queue.hasWaitingConsumer()){
					queue.transfer(produce());
				}
			  TimeUnit.SECONDS.sleep(1);
			}
		}catch(){}
	}
}

public class LuckyNumverGenerator{
	public static void main(String[] args){
		TransferQueue<String> queue = new LinkedTransferQueue<>();
		Thread producer = new Thread();
	  	producer.setDaemon(true);//守护线程,使得线程执行结束后程序自动结束
		consumer.start();
	  
	  	try{
			Thread.sleep(1000);
		}catch(){}
	}
}

全部评论

相关推荐

点赞 收藏 评论
分享
牛客网
牛客企业服务