并发容器(非阻塞队列+阻塞队列)
非阻塞
ConcurrentHashMap
@Test public void test(){ ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<>(); map.put("jordan",23); map.put("kobe",24); if(map.containsKey("jordan") && map.get("jordan").equals(23)){ map.remove("jordan"); } }
CopyOnWriteArrayList
set/add/remove等写操作都使用了ReentrantLock的lock方法加锁,读操作不需要加锁
@Test public void test(){ CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>(); list.add("jordan"); list.add("kobe"); list.add(1,"james");//下标从零开始 System.out.println(list.get(1)); if(list.contains("kobe")){ Interator<String> value = list.iterator(); while(value.hasNext()){ System.out.println(value.next()); } } }
CopyOnWriteArraySet
CopyOnWriteArraySet<String> list = new CopyOnWriteArraySet<>(); list.add("jordan"); list.add("kobe"); if(list.contains("kobe")){ Interator<String> value = list.iterator(); while(value.hasNext()){ System.out.println(value.next()); } }
==========================================================
BlockingQueue 带阻塞功能的队列
ArrayBlockingQueue数组阻塞队列
先进先出,新元素插入到队尾,队列获取操作从队头开始。
@Test public void test(){ BlockingQueue<String> bq = new ArrayBlockintQueue<>(16); //创建4个线程,从队列中获取数据 for(int i=0;i<4;i++){ new Thread(new Runnable(){ public void run(){ while(true){ try{ String log = (String)bq.take(); }catch(Exception e){ } } } }).start(); } //向队列中添加数据 for(int i=0;i<16;i++){ String log = (i+1) +"-->"; bq.put(log); } }
LinkedBlockingQueue链表阻塞队列
//使用方法同上述ArrayBlockingQueue一样 BlockingQueue<String> bq = new LinkedBlocking<>(16);
PriorityBlockingQueue优先级阻塞队列
优先级的判断通过构造函数传入的Compator对象决定
DelayQueue支持延时获取元素的使用优先级队列的实现的无界阻塞队列。
- 缓存系统的设计,通过该队列保护缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦从队列中获取元素时,表示缓存有效期到了。
- 定时任务调度,通过该队列保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取任务,就开始职系执行,比如TimerQueue就是使用DelayQueue实现的。
public class Student implements Delayed{ private String name; private long submitTime;//交卷时间 private long workTime;//考试时间 public String getName(){ return this.name+"交卷用时"+workTime; } public Stutent(String name,long submitTime){ this.name=name; this.workTime=submitTime; this.submitTime=TimeUnit.NANOSECONDS.convert(submitTime,TimeUnit.MILLINSECONDS)+System.nanoTime(); System.out.println(this.name+"交卷,用时"+workTime); } public long getDelay(TimeUnit unit){ return unit.convert(submitTime-System.nanoTime(),unit.NANOSECONDS); } public int compareTo(Delayed o){ return submitTime>that.submitTime?1:(submitTime<that.submitTime?-1:0); } } public class DelayQueueTest{ public void static main(String[] args){ final DelayQueue<Student> bq = new DelayQueue<>(); for(int i=0;i<5;i++){ Student student = new Student("学生"+1,Math.round(Math.random()*10+i)); System.out.println("bq.peek()"+bq.peek().getName()); } } }
SynchronousQueue不存储元素的阻塞队列
每一个put操作必须等待一个take操作,否则不能继续添加元素
public class SynchronousQueueTest{ public static void main(){ System.out.println("begin:"+(System.currentTimeMillis()/1000)); final SynchronousQueue<String> sq = new SynchronousQueue<>(); final Semaphore sem = new Semaphore(1); for(int i=0;i<10;i++){//启动十个线程 new Thread(new Runnable(){ public void run(){ try{ sem.acquire(); String input = sq.take(); String output = TestDo.doSome(intput); System.out.println(Thread.currentThread().getName()+":"+output); sem.release(); }catch(){} } }).start(); for(int i=0;i<10;i++){ String input = i+""; try{ sq.put(input); }catch(){ } } } } } class TestDo{ public static String doSome(String input){ Try{ Thread.sleep(1000); }catch(){} String out = input + ":" (System.currentTimeMillis()/1000); return output; } }
LinkedBlockingDeque链表双向阻塞队列
可以从队列的两端进行插入或移除元素
与其他阻塞队列相比,多了addFirst/addLast/offerFirst/offerLast/peekFirst/peekLast
LinkedTransferQueue链表结构组成的无界传输阻塞队列
- transfer(E e) 若当前存在一个正在等待的消费者线程,即立刻移交之;否则会插入当前元素e到队列尾部,并且等待进入阻塞状态,直到有消费者线程取走该元素
- tryTransfer(E e),若当前存在一个正在等待获取消费者线程(使用take或poll方法),则使用该方法会立刻传输对象元素e;若不存在,则返回false,并且不进入队列,这是一个不阻塞的操作。
- tryTransfer(E e,long timeout,TimeUnit unit),若当前存在一个正在等待获取的消费者线程,会立即传输给它,否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉。若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除
- hasWaitingConsumer() 判断是否为终端消费者线程
- getWaitingConsumerCount() 获取终端所有等待获取元素的消费者线程数量
- size() 因为队列的异步特性,检测当前队列的元素个数需要逐一迭代,可能会得到
public void Consumer implements Runnable{ private final TransferQueue<String> queue; public Consumer(TransferQueue<String> queue){ this.queue = queue; } public void run(){ try{ System.out.println("Consumer" +Thread.currentThread().getName()+queue.take()); }catch(){} } } public class Producer implements Runnable{ private final TransferQueue<String> queue; public Producer(TransferQueue<String> queue){ this.queue = queue; } private String produce(){ return "your luck number"+(new Random().nextInt(100)); } public void run(){ try{ while(true){ if(queue.hasWaitingConsumer()){ queue.transfer(produce()); } TimeUnit.SECONDS.sleep(1); } }catch(){} } } public class LuckyNumverGenerator{ public static void main(String[] args){ TransferQueue<String> queue = new LinkedTransferQueue<>(); Thread producer = new Thread(); producer.setDaemon(true);//守护线程,使得线程执行结束后程序自动结束 consumer.start(); try{ Thread.sleep(1000); }catch(){} } }