Java高并发编程-并发容器

学习马士兵老师的公开课,整理的笔记

  1. 火车票销售的问题引出并发问题

题目:有N张火车票,每张票都有一个编号,同时有10个窗口对外售票

实现一:使用ArrayList
因为list的remove操作是非原子性的,所以多个线程在同时remove的时候可能会操作同一张票

代码如下:

public class TicketSeller1 {

	private static List<String> tickets = new ArrayList<>();
	
	static{
		for (int i = 0; i < 10000; i++) {
			tickets.add("票编号:" + i);
		}
	}
	public static void main(String[] args) {
		for (int i = 0; i < 10; i++) {
			new Thread(()->{
				while(tickets.size() > 0){
					System.out.println("销售了>>>>>" + tickets.remove(0));
				}
			}).start();
		}
	}
}

输出如下,发生了重售:

销售了>>>>>票编号:7835
销售了>>>>>票编号:7792
销售了>>>>>票编号:7565
销售了>>>>>票编号:9999
销售了>>>>>票编号:9542
销售了>>>>>票编号:9515
销售了>>>>>票编号:9242

实现二:使用Vector或者Collections.synchronizedXXX
虽然remove方法是原子性的但是判断与remove的操作合在一起就不是原子性的所以仍然会出现问题
代码如下:

public class TicketSeller2 {

	private static Vector<String> tickets = new Vector<String>();
	
	static {
		for (int i = 0; i < 10000; i++) {
			tickets.add("票编号:" + i);
		}
	}
	
	public static void main(String[] args) {
		for (int i = 0; i < 10; i++) {
			new Thread(()->{
				while(tickets.size() > 0){
					try {
						TimeUnit.MILLISECONDS.sleep(10);
					} catch (Exception e) {
						e.printStackTrace();
					}
					System.out.println("销售了>>>" + tickets.remove(0));
				}
			}).start();
		}
	}
}

运行结果如下

销售了>>>票编号:9993
销售了>>>票编号:9992
Exception in thread "Thread-9" java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 0
	at java.util.Vector.remove(Vector.java:831)
	at thread.demo_024.TicketSeller2.lambda$main$0(TicketSeller2.java:37)
	at java.lang.Thread.run(Thread.java:748)
销售了>>>票编号:9995

实现三:使用同步代码块锁住容器-保证正确性但效率低
就算操作A和B都是同步的,但A和B组成的复合操作也未必是同步的,仍然需要自己进行同步 就像这个程序,判断size和进行remove必须是一整个的原子操作
代码如下:

public class TicketSeller3 {

	private static List<String> tickets = new LinkedList<String>();
	
	static {
		for (int i = 0; i < 10000; i++) {
			tickets.add("票编号:" + i);
		}
	}
	
	public static void main(String[] args) {
		for (int i = 0; i < 10; i++) {
			new Thread(()->{
				while(true){
					synchronized(tickets){
						if(tickets.size() <= 0) break;
						
						try {
							TimeUnit.MILLISECONDS.sleep(10);
						} catch (Exception e) {
							e.printStackTrace();
						}
						
						System.out.println("销售了>>>" + tickets.remove(0));
					}
				}
			}).start();
		}
	}
}

实现四:使用ConcurrentQueue提高并发性
使用JDK1.5之后提供的并发队列ConcurrentLinkedQueue存储元素,其底层使用CAS实现而非加锁实现的,其效率较高。
并发队列ConcurrentLinkedQueuepoll()方法会尝试从队列头中取出一个元素,若获取不到,则返回null,对其返回值做判断可以实现先取票后判断,可以避免加锁。

代码如下:

public class TicketSeller4 {

	private static Queue<String> tickets = new ConcurrentLinkedQueue<String>();
	
	static {
		for (int i = 0; i < 10000; i++) {
			tickets.add("票编号:" + i);
		}
	}
	
	public static void main(String[] args) {
		for (int i = 0; i < 10; i++) {
			new Thread(()->{
				while(true){
					String s = tickets.poll();
					if(s == null) break;
					
					try {
						TimeUnit.MILLISECONDS.sleep(10);
					} catch (Exception e) {
						e.printStackTrace();
					}
					System.out.println("销售了>>>>" + s);
				}
			}).start();
		}
	}
}

2. 并发容器

Map/Set 它们比较类似 set没有value只有key而已


(1)非并发容器
HashMap,TreeMap,LinkedHashMap
(2)并发容器
HashTable,Collections.sychronizedXXX,ConcurrentMap

  • 并发量不大的时候使用HashTable,Collections.sychronizedXXX 他们同步实现的原理类似都是直接给整个容器加上锁,所以效率会比较差

Collections.sychronizedXXX 的用法是使用装饰器模式,调用其构造方法并传入一个XXX的实现类,返回一个同步的XXX容器
代码如下:

public class T03_SynchronizedMap {

	public static void main(String[] args) {
		HashMap<String> hashMap = new HashMap<String>();
		Map<String> synchronizedMap = Collections.synchronizedMap(hashMap); //此时的集合是同步的	
	}
}
  • 并发量比较高的时候使用ConcurrentMap,有两个实现类:
    • ConcurrentHashMap: 使用哈希表实现,key是无序的
    • ConcurrentSkipListMap: 使用跳表实现,key是有序的

其同步的实现原理在JDK1.8前后不同

  • 在JDK1.8以前,其实现同步使用的是分段锁,将整个容器分为16段(Segment),每次操作只锁住操作的那一段,是一种细粒度更高的锁.
  • 在JDK1.8及以后,其实现同步用的是Node+CAS.
public class T01_ConcurrentMap {

	public static void main(String[] args) {
		Map<String, String> map = new ConcurrentHashMap<String, String>();
		//Map<String, String> map = new ConcurrentSkipListMap<String, String>(); //高并发并且排序
		
		//Map<String, String> map = new Hashtable<>();
		//Map<String, String> map = new HashMap<String, String>();
		
		Random random = new Random();
		Thread[] threads = new Thread[100];
		CountDownLatch latch = new CountDownLatch(threads.length);
		long start = System.currentTimeMillis();
		for (int i = 0; i < threads.length; i++) {
			threads[i] = new Thread(()->{
				for(int j=0; j<10000;j++) map.put("a" + random.nextInt(100000), "a" + random.nextInt(100000));
				latch.countDown();
			});
		}
		
		Arrays.asList(threads).forEach(t->t.start());
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		long end = System.currentTimeMillis();
		System.out.println(end-start);
	}
}

Queue

低并发队列

适用于并发量比较低的队列有:VectorSynchronizedList,其中vector类似HashTable,是JDK1.2就提供的类;SynchronizedList类似SynchronizedMap使用装饰器模式,其构造函数接受一个List实现类并返回同步List,在java.util.Collections包下.
它们实现同步的原理都是将所有方法用同步代码块包裹起来.

写时复制CopyOnWriteList
CopyOnWriteArrayList位于java.util.concurrent包下,它实现同步的方式是: 当发生写操作(添加,删除,修改)时,就会复制原有容器然后对新复制出的容器进行写操作,操作完成后将引用指向新的容器.其写效率非常低,读效率非常高

  • 为什么读操作不需要加锁?因为本身CopyOnWriteList就不存在脏读的情况

优点: 读写分离,使得读操作不需要加锁,效率极高
缺点: 写操作效率极低
应用场合: 应用在读少写多的情况,如事件***

高并发队列

方法 抛出异常 返回特殊值 一直阻塞 (非阻塞队列不可用) 阻塞一段时间 (非阻塞队列不可用)
插入元素 add(element) offer(element) put(element) offer(element,time,unit)
移除首个元素 remove() poll() take() poll(time,unit)
返回首个元素 element() peek()

对于高并发队列,若使用不同的方法对空队列执行查询和删除,以及对满队列执行插入,会产生不同行为

  • 抛出异常: 使用add(),remove(),element()方法,若执行错误操作会直接抛出异常
  • 返回特殊值: 若使用offer(),poll(),peek()方法执行错误操作会返回falsenull,并放弃当前错误操作,不抛出异常.
  • 一直阻塞: 若使用put(),take()方法执行错误操作,当前线程会一直阻塞直到条件允许才唤醒线程执行操作.
  • 阻塞一段时间: 若使用offer(),poll()方法并传入时间单位,会将当前方法阻塞一段时间,若阻塞时间结束后仍不满足条件则返回falsenull,并放弃当前错误操作,不抛出异常.

非阻塞队列ConcurrentLinkedQueue
非阻塞队列使用CAS保证操作的原子性,不会因为加锁而阻塞线程.类似于ConcurrentMap

阻塞队列BlockingQueue
阻塞队列的常用实现类有LinkedBlockingQueue,ArrayBlockingQueue,DelayedQueue,TransferQueue,SynchronousQueue. 分别对应于不同的应用场景.

延迟队列DelayedQueue
它是一种无界队列,延迟队列DelayedQueue中存储的元素必须实现Delay接口,其中定义了getDelay()方法;而Delay接口继承自Comparable接口,其中定义了compareTo()方法.各方法作用如下:

  • getDelay(): 规定当前元素的延时,Delay类型的元素必须要等到其延时过期后才能从容器中取出,提前取会取不到.
  • compareTo(): 规定元素在容器中的排列顺序,按照compareTo()的结果升序排列.
public class T08_TransferQueue {

	public static void main(String[] args) throws InterruptedException {
		LinkedTransferQueue<String> strings = new LinkedTransferQueue<String>();
		
		/*new Thread(()->{
			try {
				System.out.println(strings.take());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).start();*/
		
		//strings.transfer("aaa");
		
		strings.put("aaa");
		
		new Thread(()->{
			try {
				System.out.println(strings.take());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).start();
	}
}

执行结果:等待时间长的先往外拿走

[1570266175973, 1570266177973, 1570266177473, 1570266176473, 1570266176973]
1570266175973
1570266176973
1570266176473
1570266177473

阻塞消费队列TransferQueue
TransferQueue继承自BlockingQueue,向其中添加元素的方法除了BlockingQueueadd(),offer(),put()之外,还有一个transfer()方法,该方法会使当前线程阻塞直到消费者将该线程消费为止.

  • DelayedQueue的应用场景:服务器端的消息队列

transfer()put()的区别: put()方法会阻塞直到元素成功添加进队列,transfer()方法会阻塞直到元素成功被消费.

TransferQueue特有的方法如下:

  • transfer(E): 阻塞当前线程直到元素E成功被消费者消费

  • tryTransfer(E): 尝试将当前元素送给消费者线程消费,若没有消费者接受则返回false且放弃元素E,不将其放入容器中.

  • tryTransfer(E,long,TimeUnit): 阻塞一段时间等待消费者线程消费,超时则返回false且放弃元素E,不将其放入容器中.

  • hasWaitingConsumer(): 指示是否有阻塞在当前容器上的消费者线程

  • getWaitingConsumerCount(): 返回阻塞在当前容器上的消费者线程的个数

public class T08_TransferQueue {

	public static void main(String[] args) throws InterruptedException {
		LinkedTransferQueue<String> strings = new LinkedTransferQueue<String>();
		
		/*new Thread(()->{
			try {
				System.out.println(strings.take());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).start();*/
		
		//这种情况就会一直等待消费者的出现而不执行下面的代码
		strings.transfer("aaa");
		
		//strings.put("aaa");
		
		new Thread(()->{
			try {
				System.out.println(strings.take());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).start();
	}
}

零容量的阻塞消费队列SynchronousQueue

SynchronousQueue是一种特殊的TransferQueue,特殊之处在于其容量为0. 因此对其调用add(),offer()方法都会使程序发生错误(抛出异常或阻塞线程).只能对其调用put()方法,其内部调用transfer()方法,将元素直接交给消费者而不存储在容器中。

public class T09_Synchronized {

	public static void main(String[] args) throws InterruptedException {
		BlockingQueue<String> strings = new SynchronousQueue<String>();
		
		new Thread(()->{
			try {
				System.out.println(strings.take());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).start();
		
		strings.put("aaa"); //阻塞等待消费者消费
		//strings.add("aaa");
		System.out.println(strings.size());
	}
}

全部评论

相关推荐

个人背景:学院二本计科专业&nbsp;大二开始实习个人经历:安克创新&nbsp;、理想汽车、字节跳动碎碎念:我做事只有三分钟热度。看到进了大厂的同学,我会羡慕,也会跟着努力上进;但遇到好看的小说,我又会放下手头的事沉迷其中,之前的坚持也就中断了。我有些自卑,总觉得自己学历和外貌都不够好。之前偶然在网上受到关注,我就喜欢上了上网,因为这里有很多人认可我。但我也很在意别人的评价,偶尔看到嘲讽的言论,会触发我的自卑情绪,让我感到愤怒。有时候我会强硬地回怼,有时候又会懦弱地选择无视。我也有虚荣心。不管是拿到安克、理想还是字节的机会,我在分享的时候都会带着这份心思。我会特意强调自己学历不好,是为了衬托出过程的艰难,以此显得自己更厉害。我知道,人往往会炫耀自己缺少的东西,来掩盖内心的空洞。我总想着走捷径,不太喜欢踏踏实实地做事。找实习的时候,我花了更多时间在研究面试技巧上,而不是提升专业能力。我会反复听面试录音分析技巧,看面试教程学习怎么和不同的面试官沟通,还会每天自言自语练习语言表达,同学都觉得我有点奇怪。我的实习生涯里,侥幸和运气占了很大一部分。我总在想,如果有一天我失去了这份幸运,这些特质可能会让我一蹶不振。ps:&nbsp;很多人会问我学习路线和经验&nbsp;但是就像我上面说的&nbsp;我的实习过程靠的很多是关键节点的运气&nbsp;技术上面我可能不如很多人&nbsp;&nbsp;所以请大家理性求助和理性参考我的回答&nbsp;附上我的投递记录
我的offer在哪里...:从去年看到现在,飞升哥就是榜样
我的求职进度条
点赞 评论 收藏
分享
03-23 15:00
已编辑
厦门大学 Java
xiaowl:你这个简历的问题是对于技术点、项目的描述,都是描述action的,对于面试官而言,仅能知道你干了什么,无法判断你为什么这么干,干的好不好。
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务