说一说ConcurrentHashMap的实现原理
HashMap线程不安全,而Hashtable是线程安全,但是它使用了synchronized进行方法同步,插入、读取数据都使用了synchronized,当插入数据的时候不能进行读取(相当于把整个Hashtable都锁住了,全表锁),当多线程并发的情况下,都要竞争同一把锁,导致效率极其低下。而在JDK1.5后为了改进Hashtable的痛点,ConcurrentHashMap应运而生。
ConcurrentHashMap使用的是分段锁技术,将ConcurrentHashMap锁一段一段的存储,然后给每一段数据配一把锁(segment),当一个线程占用一把锁(segment)访问其中一段数据的时候,其他段的数据也能被其它的线程访问,默认分配16个segment。默认比Hashtable效率提高16倍。
ConcurrentHashMap取消了segment分段锁,而采用CAS和synchronized来保证并发安全。数据结构跟HashMap1.8的结构一样,数组+链表/红黑二叉树。
synchronized只锁定当前链表或红黑二叉树的首节点,这样只要hash不冲突,就不会产生并发,效率又提升N倍。
HashMap是线程不安全的,我们来看下线程安全的ConcurrentHashMap,在JDK7的时候,这种安全策略采用的是分段锁的机制,ConcurrentHashMap维护了一个Segment数组,Segment这个类继承了重入锁ReentrantLock,并且该类里面维护了一个 HashEntry<K,V>[] table数组,在写操作put,remove,扩容的时候,会对Segment加锁,所以仅仅影响这个Segment,不同的Segment还是可以并发的,所以解决了线程的安全问题,同时又采用了分段锁也提升了并发的效率。
在JDK8中彻底抛弃了JDK7的分段锁的机制,新的版本主要使用了Unsafe类的CAS自旋赋值+synchronized同步+LockSupport阻塞等手段实现的高效并发。
在JDK8里面,去掉了分段锁,将锁的级别控制在了更细粒度的table元素级别,也就是说只需要锁住这个链表的head节点,并不会影响其他的table元素的读写,好处在于并发的粒度更细,影响更小,从而并发效率更好,但不足之处在于并发扩容的时候,由于操作的table都是同一个,不像JDK7中分段控制,所以这里需要等扩容完之后,所有的读写操作才能进行,所以扩容的效率就成为了整个并发的一个瓶颈。好在Doug lea大神对扩容做了优化,本来在一个线程扩容的时候,如果影响了其他线程的数据,那么其他的线程的读写操作都应该阻塞,但Doug lea说你们闲着也是闲着,不如来一起参与扩容任务,这样人多力量大,办完事你们该干啥干啥,别浪费时间,于是在JDK8的源码里面就引入了一个ForwardingNode类,在一个线程发起扩容的时候,就会改变sizeCtl这个值。
转载自:https://blog.csdn.net/qq_36174081/article/details/90606392
HashMap线程不安全,而Hashtable是线程安全,但是它使用了synchronized进行方法同步,插入、读取数据都使用了synchronized,当插入数据的时候不能进行读取(相当于把整个Hashtable都锁住了,全表锁),当多线程并发的情况下,都要竞争同一把锁,导致效率极其低下。而在JDK1.5后为了改进Hashtable的痛点,ConcurrentHashMap应运而生。
ConcurrentHashMap使用的是分段锁技术,将ConcurrentHashMap锁一段一段的存储,然后给每一段数据配一把锁(segment),当一个线程占用一把锁(segment)访问其中一段数据的时候,其他段的数据也能被其它的线程访问,默认分配16个segment。默认比Hashtable效率提高16倍。
ConcurrentHashMap取消了segment分段锁,而采用CAS和synchronized来保证并发安全。数据结构跟HashMap1.8的结构一样,数组+链表/红黑二叉树。
synchronized只锁定当前链表或红黑二叉树的首节点,这样只要hash不冲突,就不会产生并发,效率又提升N倍。
HashMap是线程不安全的,我们来看下线程安全的ConcurrentHashMap,在JDK7的时候,这种安全策略采用的是分段锁的机制,ConcurrentHashMap维护了一个Segment数组,Segment这个类继承了重入锁ReentrantLock,并且该类里面维护了一个 HashEntry<K,V>[] table数组,在写操作put,remove,扩容的时候,会对Segment加锁,所以仅仅影响这个Segment,不同的Segment还是可以并发的,所以解决了线程的安全问题,同时又采用了分段锁也提升了并发的效率。
在JDK8中彻底抛弃了JDK7的分段锁的机制,新的版本主要使用了Unsafe类的CAS自旋赋值+synchronized同步+LockSupport阻塞等手段实现的高效并发。
在JDK8里面,去掉了分段锁,将锁的级别控制在了更细粒度的table元素级别,也就是说只需要锁住这个链表的head节点,并不会影响其他的table元素的读写,好处在于并发的粒度更细,影响更小,从而并发效率更好,但不足之处在于并发扩容的时候,由于操作的table都是同一个,不像JDK7中分段控制,所以这里需要等扩容完之后,所有的读写操作才能进行,所以扩容的效率就成为了整个并发的一个瓶颈。好在Doug lea大神对扩容做了优化,本来在一个线程扩容的时候,如果影响了其他线程的数据,那么其他的线程的读写操作都应该阻塞,但Doug lea说你们闲着也是闲着,不如来一起参与扩容任务,这样人多力量大,办完事你们该干啥干啥,别浪费时间,于是在JDK8的源码里面就引入了一个ForwardingNode类,在一个线程发起扩容的时候,就会改变sizeCtl这个值。
得分点
数组+链表+红黑树、锁头节点
参考答案
标准回答
在JDK8中,ConcurrentHashMap的底层数据结构与HashMap一样,也是采用“数组+链表+红黑树”的形式。同时,它又采用锁定头节点的方式降低了锁粒度,以较低的性能代价实现了线程安全。底层数据结构的逻辑可以参考HashMap的实现,下面我重点介绍它的线程安全的实现机制。
加分回答
ConcurrentHashMap实现线程安全的难点在于多线程并发扩容,即当一个线程在插入数据时,若发现数组正在扩容,那么它就会立即参与扩容操作,完成扩容后再插入数据到新数组。在扩容的时候,多个线程共同分担数据迁移任务,每个线程负责的迁移数量是
(数组长度 >>> 3) / CPU核心数。 也就是说,为线程分配的迁移任务,是充分考虑了硬件的处理能力的。多个线程依据硬件的处理能力,平均分摊一部分槽的迁移工作。另外,如果计算出来的迁移数量小于16,则强制将其改为16,这是考虑到目前服务器领域主流的CPU运行速度,每次处理的任务过少,对于CPU的算力也是一种浪费。
延伸阅读
插入数据的关键代码如下:
final V putVal(K key, V value, boolean onlyIfAbsent) { ... for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 1.若数组还未初始化,则初始化数组。 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 2.若该位置头节点为空,则初始化头节点。 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } // 3.若头节点哈希值为MOVED,则表示数组正在扩容,则当前线程也去参与扩容。 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 4.若该位置头节点非空,则追加一个新节点。 else { V oldVal = null; // 加锁,锁的不是整个数组,而是头节点。 synchronized (f) { if (tabAt(tab, i) == f) { // 头节点为链表节点(参考状态常量) if (fh >= 0) { ... } // 头节点为红黑树节点 else if (f instanceof TreeBin) { ... } } } // 当链表中节点数达到8时,则扩容数组或者转为红黑树。 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); ... } } } // 计数,若发现数组过小则进行扩容。 addCount(1L, binCount); return null; } 数组扩容的关键代码如下:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { ... // 每个CPU处理的节点数(步长) if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) // 最少设定为16 stride = MIN_TRANSFER_STRIDE; if (nextTab == null) { ... // 扩容至2倍 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; ... } // 特殊节点,标识该位置的元素已被转移,并指向新的数组。 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // bound为遍历的边界,因为每个线程负责转移一部分节点,不是全部。 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // advance表示从transferIndex-1遍历到bound位置的过程中,是否需要继续。 // 自旋 while (advance) { ... // 抢任务(CAS修改TRANSFERINDEX) else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { ... } } // i越界,整个集合遍历完成。 if (i < 0 || i >= n || i + n >= nextn) { // 转移完成 if (finishing) { // 替换旧数组 nextTable = null; table = nextTab; // 将sizeCtl设置为扩容后的1.5倍 sizeCtl = (n << 1) - (n >>> 1); return; } ... } // tab[i]迁移完毕 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // tab[i]正在迁移 else if ((fh = f.hash) == MOVED) advance = true; else { synchronized (f) { ... } } } }