现场论证 ConcurrentHashMap 如何成为线程安全的 HashMap
人只应当忘却自己而爱别人,这样人才能安静、幸福高尚。
——托尔斯泰《安娜•卡列尼娜》
经过周末的一顿修养和潜心研究,小a感觉已经到了出关之日了,这不又开始call 大Q 赶紧上线,面试开始~
Q:你了解ConcurrentHashMap吗,说一下?
一个支持全并发的检索和高预期并发更新的哈希表。该类遵从与Hashtable相同的功能规范,并包含了与Hashtable的每个方法对应的方法版本。然而,尽管所有的操作都是线程安全的,但检索操作并不需要锁定,也不支持以阻止所有访问的方式锁定整个表。在依赖其线程安全但不依赖其同步细节的程序中,该类完全可以与Hashtable互操作。
检索操作(包括get)一般不会阻塞,所以可能会与更新操作(包括put和remove)重叠。检索操作反映的是最近完成的更新操作在开始时持有的更新操作的结果。(更确切地说,一个给定的key的更新操作与该key的任何(非空)检索通知更新后的值有一个先行发生的关系。)。对于像putAll和clear这样的聚合操作,并发的检索可能只反映出插入或删除一些条目。同样的,迭代器、Spliterators和枚举操作也会返回反映哈希表在迭代器/枚举创建时或创建后的某个时刻的状态的元素。它们不会抛出ConcurrentModificationException。但是,迭代器被设计成同一时间只能被一个线程使用。请记住,包括size、isEmpty和containsValue在内的聚合状态方法的结果通常只有在map没有在其他线程中进行并发更新时才有用。否则,这些方法的结果反映的是瞬态状态,对于监控或估计的目的可能足够了,但对于程序控制来说并不适用。
当有太多的哈希冲突时,表会被动态扩容,预期的平均效果是每个映射保持大约两个bins(对应于0.75的负载因子阈值的调整大小)。随着映射的添加和删除,这个平均数可能会有很大的差异,但总的来说,这维持了一个普遍能接受的时间/空间权衡的哈希表。然而,调整这个或任何其他类型的散列表的大小可能是一个相对缓慢的操作。在可能的情况下,推荐提供一个大小估计值作为可选的初始化容量(initialCapacity)构造方法参数。一个额外的可选的loadFactor构造方法参数提供了进一步的自定义初始表容量的方法,通过指定表密度来计算要为给定的元素数量分配的空间量。
另外,为了与这个类的以前版本的兼容性,构造方法可以选择性地指定一个预期的并发度(concurrencyLevel)作为内部大小的额外提示。请注意,使用多个具有完全相同的hashCode()的键肯定会降低任何哈希表的性能。为了减轻影响,当键是Comparable的时候,这个类可能会使用键之间的比较顺序来帮助打破联系。
ConcurrentHashMap的Set投影可以被创建(使用newKeySet()或newKeySet(int))),或者在只有keySet(Object)时被查看(使用keySet(Object),而映射的值(也许是瞬时)没有被使用或者所有的映射值都是相同的。
ConcurrentHashMap可以通过使用LongAdder值并通过computeIfAbsent进行初始化,将一个ConcurrentHashMap作为可扩展的频率映射(直方图或多集的一种形式)。例如,要在ConcurrentHashMap<String,LongAdder> freqs中添加一个计数,可以使用freqs.computeIfAbsent(k -> new LongAdder()).crement()。
这个类和它的视图和迭代器实现了Map和迭代器接口的所有可选方法。
和HashMap类似,但与HashMap不同的是,这个类不允许使用null作为键或值。
ConcurrentHashMaps支持一组顺序和并行的批量操作,与大多数Stream方法不同的是,该类被设计成可以安全地、而且通常是合理地应用于即使是被其他线程并发更新的映射;例如,当计算共享注册表中的值的快照摘要时。有三种操作,每种操作有四种形式,接受的函数有Key、Valve、Entries和(Key、Value)参数和/或返回值。由于ConcurrentHashMap的元素没有任何特定的排序,在不同的并行执行中可能会以不同的顺序进行处理,所以提供的函数的正确性不应该依赖于任何排序,也不应该依赖于任何其他对象或值在计算过程中可能会瞬时变化的对象或值;而且除了forEach操作外,最好是无副作用的。对Map.Entry对象的批量操作不支持方法setValue。
forEach.Entry对象上的批量操作不支持方法setValue。forEach:对每个元素执行给定的操作。变体形式在执行操作前对每个元素应用给定的变换。
search(搜索):返回第一个可用的非空结果。返回在每个元素上应用给定函数的第一个可用的非空结果;跳过进一步的
- table 初始化和扩容的控制。若为负值,则表示将被初始化或扩容:
-1:初始化
-N :N-1个线程正在进行扩容 - table为null,即未初始化时,保留创建时要使用的初始表大小,或者默认为0
- table 初始化后,保留下一次要扩容的阈值
- 下一个要用的 table;仅在扩容时非空
- 扩容时要拆分的下一个表索引(加1)
- 转移过程中插入到头部的节点
Q:说说 ConcurrentHashMap 的底层数据结构?
这种问题我肯定早有准备
-
整体的结构示意图
可见底层结构和JDK8 的HashMap相同: 数组 + 链表 + 红黑树Node 结构
Key-value节点。这个类永远不会作为用户可修改的Map.Entry(即支持setValue,从源码看出这是该类所不支持的操作)导出,但可以用于批量任务中的只读遍历。 带有负哈希字段的 Node 的子类是特殊的,它包含空的键和值(但从不被导出)。 否则,key和vals永远是空的。
注意到其中有些 CAS 操作,要正确取到真实数据需要知道变量所在的内存偏移量。
- ASHIFT
tab[i]中第i个元素在相对于数组首元素的偏移量
- ASHIFT
-
ABASE
数组首元素的内存偏移地址
所以((long)i << ASHIFT) + ABASE就是i最后的地址 -
compareAndSwapObject
tab[i]和c比较,如果相等就tab[i]=v否则tab[i]=c -
tabAt 方法
在ConcurrentHashMap中仅仅保证了table的地址volatile,而这里是通过U.getObjectVolatile反射机制直接取内存地址的数据
Q:讲讲怎么添加元素的?
JDK8中摒弃了segment锁,直接将每个桶的头结点做为锁,一个锁只保护一个桶。而旧版的一个segment锁,保护了多个hash桶,锁的粒度变小了,写操作的并发性得到了极大的提升。
- 将指定的键映射到该表中的指定值。键和值都不能为空。value可以通过调用get方法,用一个与原始key相等的key来检索。
再看其中put以及putIfAbsent的具体实现的putVal方法// final 修饰,禁止子类重写,但 put 方法是可重写的 final V putVal(K key, V value, boolean onlyIfAbsent) { // 不支持空的键、值 if (key == null || value == null) throw new NullPointerException(); // 计算hash值 int hash = spread(key.hashCode()); int binCount = 0; // 自旋重试插入元素 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 判断表是否为空 if (tab == null || (n = tab.length) == 0) // 空,则初始化 tab = initTable(); // 判断索引位是否为null else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 为空,则创建新节点,通过CAS设置在索引位置 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // 添加进空bin时无需加锁 } // 判断当前桶是否为转移节点(该桶的点正在扩容) else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 当前桶有值,产生哈希冲突 else { V oldVal = null; // 该加锁了,确保线程安全,注意这里是锁定当前桶,而不影响其它桶! synchronized (f) { // 双重检验,是否为原头节点 if (tabAt(tab, i) == f) { // 判断当前桶的哈希值是否≥0,因为这说明才是个普通的链表结构,而负哈希值都有其它特定意义 if (fh >= 0) { binCount = 1; // 遍历链表 for (Node<K,V> e = f;; ++binCount) { K ek; // 判断 key 是否已存在 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; // 判断是否允许覆盖 if (!onlyIfAbsent) // 覆盖原值 e.val = value; // 结束遍历 break; } // 是新节点 Node<K,V> pred = e; if ((e = e.next) == null) { // 尾插法添加 pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 判断桶节点是否是红黑树结构 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // binCount不等于0了,说明进行过了添加元素操作 if (binCount != 0) { // 判断节点个数是否超过树化阈值 if (binCount >= TREEIFY_THRESHOLD) // 是,则将链表转红黑树 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 添加计数,检查是否需要扩容 addCount(1L, binCount); return null; }
关于sun.misc.Unsafe
// Unsafe mechanics private static final sun.misc.Unsafe U; private static final long SIZECTL; private static final long TRANSFERINDEX; private static final long BASECOUNT; private static final long CELLSBUSY; private static final long CELLVALUE; private static final long ABASE; private static final int ASHIFT; static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentHashMap.class; // 获取sizeCtl域在内存中的偏移量 SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class<?> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class<?> ak = Node[].class; // 数组首元素的偏移地址 ABASE = U.arrayBaseOffset(ak); // arrayIndexScale可获取数组的转换因子,即数组中元素的增量地址 // arrayBaseOffset搭配arrayIndexScale,可定位数组中每个元素在内存中的位置 int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
<p> “挨踢”业行情日益严峻,企业招聘门槛愈来愈高,大厂hc更是少之又少,而Java技术面试普遍对基础知识的掌握考察特别深,大多数同学突击所看的 Java 面试基础知识点根本达不到面试官近乎挑剔的要求。 本专刊针对如今的校招及社招痛点,深入解析 JDK 的核心源码,探究 JDK 的设计精髓及最佳实践,同时以模拟面试的场景切入,让同学们在阅读过程中也能轻松掌握面试技巧。 本专刊购买后即可解锁所有章节,故不可以退换哦~ </p>