JDK1.8的版本中ConcurrentHashMap原理解
在多线程环境下的HashMap就无法保证线程安全了,而HashTable又因为针对整个方法加锁而变得效率低下,这时就要考虑使用java.util.concurrent.ConcurrentHashMap
类。
ConcurrentHashMap类依旧采用HashMap的结构,基本变量也和HashMap一致,因此接下来只说和HashMap不同之处。
ConcurrentHashMap用到乐观锁CAS,采用的是目前不开源的sun.misc.Unsafe 类,其是直接操作底层的类,常用到方法和含义如下:
compareAndSwapObject(Object o, long l, Object o1, Object o2)
:将对象o内偏移量为l的值和o1比较,如果一致则将原本值替换为o2compareAndSwapInt(Object o, long l, int i, int i1)
:和compareAndSwapObject一致。getObjectVolatile(Object o, long l)
:获取对象o偏移l位置的值,这个方法常常用来定位给定键在Table表内的位置,即定位到在哪个桶。
重要的变量
整个ConcurrentHashMap中的变量都加上了volatile,使得任何操作有可见,即每次读都可以保证是最新更新的的结果(即使是在读的过程中刚好写入)。
ConcurrentHashMap中新增了控制变量:
//MOVED是表示该节点处于转移状态,处于转移状态的node的hash值会变为-1。通常在增删节点的时候会用到。
static final int MOVED = -1;
//表示table中该节点是红黑树节点
static final int TREEBIN = -2;
//某个桶正在被扩容(resize)操作所保留,而不会存储任何元素。但是本身内部节点依旧可以被查询
static final int RESERVED = -3;
//只是用于计算Hash值
static final int HASH_BITS = 0x7fffffff;
针对节点数的计算则引入了baseCount
、sizeCtl
等变量:
//它在插入和删除元素时会被更新,用于统计当前哈希表的大小。但是在并发操作时可能不是实时更新的,估算用
private transient volatile long baseCount;
//大于0是表示扩容阈值,通常是table大小与Threshold的乘积,为负数则表示处于扩容操作中
private transient volatile int sizeCtl;
//CounterCell类是对于桶内元素的计数,该数组可以减少在高并发的删减节点是对于整体节点数修改的冲突
private transient volatile CounterCell[] counterCells;
ConcurrentHashMap存储节点还是像HashMap一样底层采用Node<K,V>数组的方式:
transient volatile Node<K,V>[] table;
但是新增了控制转移的节点类:ForwardingNode,其内部只有一个指向下一次扩容的数组:nextTable
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
……
}
关键方法
-
putVal方法 调用put方法时,本质都是调用putval方法:
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; //进入死循环,即自旋 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //1.懒加载,第一次初始化数据 if (tab == null || (n = tab.length) == 0) tab = initTable(); //2.且要存入的节点在找到的桶内且该桶无数据,用Unsafe类中compareAndSwapObject方法加入节点 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null))) break; } //3.如果该节点是控制扩容的节点,则尝试去扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; //4.针对单个节点加锁 synchronized (f) { if (tabAt(tab, i) == f) { //链表节点 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //该桶为红黑树结构,调用红黑树的putTreeVal方法。 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
可以看出putval的流程和HashMap一致。但是在刚开始就进入了自旋状态(即for无线循环),插入过程中,通过 CAS 操作尝试更新节点,失败则重试;而在注释4处,其是针对单个桶加锁,即table数据中的单个节点。这也就意味着最多可以有table.size()个线程同时对ConcurrentHashMap进行处理。事实上,ConcurrentHashMap在控制锁粒度方面都是尽量做到最小。
在锁的内部则是根据桶节点的hash值正负来判断是链表还是红黑树结构,然后就是老套的遍历+替换/新增节点了。
在最后则是判断是否树化和节点数量自增并判断是否扩容。
再对整个流程关键点逐个解析:
-
ConCurrentHashMap中将初始化和扩容方法分开(HashMap中是在方法 resize() 中的)。initTable() 是初始化方法:
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //只有第一次存数据时才会被调用 while ((tab = table) == null || tab.length == 0) { //sizeCtl为负数是表示有别的线程在操作table数据,线程暂停 if ((sc = sizeCtl) < 0) Thread.yield(); else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
这里U.compareAndSwapInt(this, SIZECTL, sc, -1) 才是精髓,因为最开始的时候就将sizeCtl的值赋给了sc,所以这里两者是一定相等的,该方法直接将SIZECTL置为-1,使得其他线程调用该方法是会调用Thread.yield(),使其直接进入就绪状态,等到其再度争抢到cpu资源时获取ConCurrentHashMap已经被初始化了。
随后就是根据原sizeCtl或者默认值的值创建数组并修改sizeCtl的值。
-
在注释第2点里直接调用tabAt方法是直接根据键的hash定位到该键属于哪一个桶,而随后调用的casTabAt是直接在该位置直接创建新的节点,两个方法都是调用的Unsafe的方法:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
-
注释第3点的helpTransfer方法,其是协助去扩容,ConCurrentHashMap的扩容是可以多线程的。但是多线程数有上限。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; //当节点是ForwardingNode类且当前table数组和新生成的数组都存在时才会判断是否需要协助扩容 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { //生成扩容戳记,多线程下唯一 int rs = resizeStamp(tab.length); //当前table数组处于扩容状态 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //扩容方法 transfer(tab, nextTab); break; } } return nextTab; } return table; }
该方法是针对节点是否是控制节点ForwardingNode进行判断的,开头也提及了,该类存放的是新的Table数组。
-
-
replaceNode方法,该方法主要被repalce和remove方法调用。remove调用传入的是null
//cv是作为期待的比较值存在的,虽然调用replace和remove是传入都是null,但是一旦传入就必须在满足cv=传入键的节点值才会执行替换操作 final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; synchronized (f) { if (tabAt(tab, i) == f) { //链表桶 if (fh >= 0) { validated = true; for (Node<K,V> e = f, pred = null;;) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) e.val = value; //传入的value为null,则将该节点的下个节点赋给上个节点的next else if (pred != null) pred.next = e.next; else setTabAt(tab, i, e.next); } break; } pred = e; if ((e = e.next) == null) break; } } else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } //找到指定key键的节点后会将validated置为true,然后节点数减一 if (validated) { if (oldVal != null) { if (value == null) addCount(-1L, -1); return oldVal; } break; } } } return null; }
逻辑上和putval的类似,排除掉当前table或当前键不存在的场景、或是需要扩容场景。在替换则是分为了链表和红黑树节点。
-
get就很简单了,ConcurrentHashMap的读操作是不加锁的
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); //先确定key的桶位置 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //桶的节点就直接返回 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //红黑树节点调用其find方法 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //链表节点从头遍历 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
从这里来看,1.8版本的ConcurrentHashMap采用乐观锁结构,针对改操作加锁,读操作不加锁。且针对单个桶加锁,即满足了多线程的安全性又提高了并发性。因此涉及到多线程操作HashMap的时候更推荐用ConcurrentHashMap。
当然,要注意ConcurrentHashMap和ThreadLocal<HashMap>的区别。前者是多线程共享一个ConcurrentHashMap对象,后者是针对HashMap做线程内副本。