JDK1.8的版本中ConcurrentHashMap原理解

在多线程环境下的HashMap就无法保证线程安全了,而HashTable又因为针对整个方法加锁而变得效率低下,这时就要考虑使用java.util.concurrent.ConcurrentHashMap类。

ConcurrentHashMap类依旧采用HashMap的结构,基本变量也和HashMap一致,因此接下来只说和HashMap不同之处。

ConcurrentHashMap用到乐观锁CAS,采用的是目前不开源的sun.misc.Unsafe 类,其是直接操作底层的类,常用到方法和含义如下:

  • compareAndSwapObject(Object o, long l, Object o1, Object o2):将对象o内偏移量为l的值和o1比较,如果一致则将原本值替换为o2
  • compareAndSwapInt(Object o, long l, int i, int i1):和compareAndSwapObject一致。
  • getObjectVolatile(Object o, long l):获取对象o偏移l位置的值,这个方法常常用来定位给定键在Table表内的位置,即定位到在哪个桶。

重要的变量

整个ConcurrentHashMap中的变量都加上了volatile,使得任何操作有可见,即每次读都可以保证是最新更新的的结果(即使是在读的过程中刚好写入)。

ConcurrentHashMap中新增了控制变量:

//MOVED是表示该节点处于转移状态,处于转移状态的node的hash值会变为-1。通常在增删节点的时候会用到。
static final int MOVED     = -1; 

//表示table中该节点是红黑树节点
static final int TREEBIN   = -2; 

//某个桶正在被扩容(resize)操作所保留,而不会存储任何元素。但是本身内部节点依旧可以被查询
static final int RESERVED  = -3; 

//只是用于计算Hash值
static final int HASH_BITS = 0x7fffffff; 

针对节点数的计算则引入了baseCountsizeCtl等变量:

//它在插入和删除元素时会被更新,用于统计当前哈希表的大小。但是在并发操作时可能不是实时更新的,估算用
private transient volatile long baseCount;

//大于0是表示扩容阈值,通常是table大小与Threshold的乘积,为负数则表示处于扩容操作中
private transient volatile int sizeCtl;

//CounterCell类是对于桶内元素的计数,该数组可以减少在高并发的删减节点是对于整体节点数修改的冲突
private transient volatile CounterCell[] counterCells;

ConcurrentHashMap存储节点还是像HashMap一样底层采用Node<K,V>数组的方式:

transient volatile Node<K,V>[] table;

但是新增了控制转移的节点类:ForwardingNode,其内部只有一个指向下一次扩容的数组:nextTable

static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ……
}

关键方法

  1. putVal方法 调用put方法时,本质都是调用putval方法:

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        //进入死循环,即自旋
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //1.懒加载,第一次初始化数据
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //2.且要存入的节点在找到的桶内且该桶无数据,用Unsafe类中compareAndSwapObject方法加入节点
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
                    break;   
            }
            //3.如果该节点是控制扩容的节点,则尝试去扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //4.针对单个节点加锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        //链表节点
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //该桶为红黑树结构,调用红黑树的putTreeVal方法。
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
    

    可以看出putval的流程和HashMap一致。但是在刚开始就进入了自旋状态(即for无线循环),插入过程中,通过 CAS 操作尝试更新节点,失败则重试;而在注释4处,其是针对单个桶加锁,即table数据中的单个节点。这也就意味着最多可以有table.size()个线程同时对ConcurrentHashMap进行处理。事实上,ConcurrentHashMap在控制锁粒度方面都是尽量做到最小。

    在锁的内部则是根据桶节点的hash值正负来判断是链表还是红黑树结构,然后就是老套的遍历+替换/新增节点了。

    在最后则是判断是否树化和节点数量自增并判断是否扩容。

    再对整个流程关键点逐个解析:

    • ConCurrentHashMap中将初始化和扩容方法分开(HashMap中是在方法 resize() 中的)。initTable() 是初始化方法:

      private final Node<K,V>[] initTable() {
          Node<K,V>[] tab; int sc;
          //只有第一次存数据时才会被调用
          while ((tab = table) == null || tab.length == 0) {
              //sizeCtl为负数是表示有别的线程在操作table数据,线程暂停
              if ((sc = sizeCtl) < 0)
                  Thread.yield(); 
              else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                  try {
                      if ((tab = table) == null || tab.length == 0) {
                          int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                          @SuppressWarnings("unchecked")
                          Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                          table = tab = nt;
                          sc = n - (n >>> 2);
                      }
                  } finally {
                      sizeCtl = sc;
                  }
                  break;
              }
          }
          return tab;
      }
      

      这里U.compareAndSwapInt(this, SIZECTL, sc, -1) 才是精髓,因为最开始的时候就将sizeCtl的值赋给了sc,所以这里两者是一定相等的,该方法直接将SIZECTL置为-1,使得其他线程调用该方法是会调用Thread.yield(),使其直接进入就绪状态,等到其再度争抢到cpu资源时获取ConCurrentHashMap已经被初始化了。

      随后就是根据原sizeCtl或者默认值的值创建数组并修改sizeCtl的值。

    • 在注释第2点里直接调用tabAt方法是直接根据键的hash定位到该键属于哪一个桶,而随后调用的casTabAt是直接在该位置直接创建新的节点,两个方法都是调用的Unsafe的方法:

      static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
          return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
      }
      
      static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                          Node<K,V> c, Node<K,V> v) {
      
    • 注释第3点的helpTransfer方法,其是协助去扩容,ConCurrentHashMap的扩容是可以多线程的。但是多线程数有上限。

        final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
            Node<K,V>[] nextTab; int sc;
            //当节点是ForwardingNode类且当前table数组和新生成的数组都存在时才会判断是否需要协助扩容
            if (tab != null && (f instanceof ForwardingNode) &&
                (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
                //生成扩容戳记,多线程下唯一
                int rs = resizeStamp(tab.length);
                //当前table数组处于扩容状态
                while (nextTab == nextTable && table == tab &&
                       (sc = sizeCtl) < 0) {
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                        //扩容方法
                        transfer(tab, nextTab);
                        break;
                    }
                }
                return nextTab;
            }
            return table;
        }
      

      该方法是针对节点是否是控制节点ForwardingNode进行判断的,开头也提及了,该类存放的是新的Table数组。

  2. replaceNode方法,该方法主要被repalce和remove方法调用。remove调用传入的是null

    //cv是作为期待的比较值存在的,虽然调用replace和remove是传入都是null,但是一旦传入就必须在满足cv=传入键的节点值才会执行替换操作
    final V replaceNode(Object key, V value, Object cv) {
        int hash = spread(key.hashCode());
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0 ||
                (f = tabAt(tab, i = (n - 1) & hash)) == null)
                break;
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                boolean validated = false;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        //链表桶
                        if (fh >= 0) {
                            validated = true;
                            for (Node<K,V> e = f, pred = null;;) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    V ev = e.val;
                                    if (cv == null || cv == ev ||
                                        (ev != null && cv.equals(ev))) {
                                        oldVal = ev;
                                        if (value != null)
                                            e.val = value;
                                        //传入的value为null,则将该节点的下个节点赋给上个节点的next
                                        else if (pred != null) 
                                            pred.next = e.next;
                                        else
                                            setTabAt(tab, i, e.next);
                                    }
                                    break;
                                }
                                pred = e;
                                if ((e = e.next) == null)
                                    break;
                            }
                        }
                        else if (f instanceof TreeBin) {
                            validated = true;
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> r, p;
                            if ((r = t.root) != null &&
                                (p = r.findTreeNode(hash, key, null)) != null) {
                                V pv = p.val;
                                if (cv == null || cv == pv ||
                                    (pv != null && cv.equals(pv))) {
                                    oldVal = pv;
                                    if (value != null)
                                        p.val = value;
                                    else if (t.removeTreeNode(p))
                                        setTabAt(tab, i, untreeify(t.first));
                                }
                            }
                        }
                    }
                }
                //找到指定key键的节点后会将validated置为true,然后节点数减一
                if (validated) {
                    if (oldVal != null) {
                        if (value == null)
                            addCount(-1L, -1);
                        return oldVal;
                    }
                    break;
                }
            }
        }
        return null;
    }
    

    逻辑上和putval的类似,排除掉当前table或当前键不存在的场景、或是需要扩容场景。在替换则是分为了链表和红黑树节点。

  3. get就很简单了,ConcurrentHashMap的读操作是不加锁的

     public V get(Object key) {
         Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
         int h = spread(key.hashCode());
         //先确定key的桶位置
         if ((tab = table) != null && (n = tab.length) > 0 &&
             (e = tabAt(tab, (n - 1) & h)) != null) {
             //桶的节点就直接返回
             if ((eh = e.hash) == h) {
                 if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                     return e.val;
             }
             //红黑树节点调用其find方法
             else if (eh < 0)
                 return (p = e.find(h, key)) != null ? p.val : null;
             //链表节点从头遍历
             while ((e = e.next) != null) {
                 if (e.hash == h &&
                     ((ek = e.key) == key || (ek != null && key.equals(ek))))
                     return e.val;
             }
         }
         return null;
     }
    

从这里来看,1.8版本的ConcurrentHashMap采用乐观锁结构,针对改操作加锁,读操作不加锁。且针对单个桶加锁,即满足了多线程的安全性又提高了并发性。因此涉及到多线程操作HashMap的时候更推荐用ConcurrentHashMap。

当然,要注意ConcurrentHashMap和ThreadLocal<HashMap>的区别。前者是多线程共享一个ConcurrentHashMap对象,后者是针对HashMap做线程内副本。

全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客企业服务