前言 concurrentMap 是 hashMap 在并发情况下的替代map, 是线程安全 本文针对1.8的源码分析 问题如何保证线程安全? 读加锁么,写加锁么? 在jdk1.8与jdk1.7的差别? 为什么key不能为null, value不能为null? 原理概况 通过Node[]数组来保存添加到Map中的键值对 keyhash不冲突,放在数组 Key hash冲突,以链表形式放在数组 链表长度超过8个,又小于64,扩容数组,如果数组长度大于等于64,则转换成树 扩容后,某个链表的数组长度小于6个,则会将该树转为链表 源码结构源码 元素的基本节点 static class Node implements Map.Entry { final int hash; final K key; volatile V val; volatile Node next; Node(int hash, K key, V val, Node next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } // 略 树的节点 static final class TreeNode extends Node { TreeNode parent; // red-black tree links TreeNode left; TreeNode right; TreeNode prev; // needed to unlink next upon deletion boolean red; TreeNode(int hash, K key, V val, Node next, TreeNode parent) { super(hash, key, val, next); this.parent = parent; } // 略 } 重要方法 在ConcurrentHashMap中使用了unSafe方法,通过直接操作内存的方式来保证并发处理的安全性,使用的是硬件的安全机制 /* * 用来返回节点数组的指定位置的节点的原子操作 */ @SuppressWarnings("unchecked") static final Node tabAt(Node [] tab, int i) { return (Node )U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } /* * cas原子操作,在指定位置设定值 */ static final boolean casTabAt(Node [] tab, int i, Node c, Node v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } put源码 /* * 当添加一对键值对的时候,首先会去判断保存这些键值对的数组是不是初始化了, * 如果没有的话就初始化数组 * 然后通过计算hash值来确定放在数组的哪个位置 * 如果这个位置为空则直接添加,如果不为空的话,则取出这个节点来 * 如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容,复制到新的数组,则当前线程也去帮助复制 * 最后一种情况就是,如果这个节点,不为空,也不在扩容,则通过synchronized来加锁,进行添加操作 * 然后判断当前取出的节点位置存放的是链表还是树 * 如果是链表的话,则遍历整个链表,直到取出来的节点的key来个要放的key进行比较,如果key相等,并且key的hash值也相等的话, * 则说明是同一个key,则覆盖掉value,否则的话则添加到链表的末尾 * 如果是树的话,则调用putTreeVal方法把这个元素添加到树中去 * 最后在添加完成之后,会判断在该节点处共有多少个节点(注意是添加前的个数),如果达到8个以上了的话, * 则调用treeifyBin方法来尝试将处的链表转为树,或者扩容数组 */ final V putVal(K key, V value, boolean onlyIfAbsent) { // k,v都不能为空,否则会报空指针异常 if (key == null || value == null) throw new NullPointerException(); // 取得Key的hash值 int hash = spread(key.hashCode()); // 用来计算在这个节点总共有多少个元素,用来控制扩容或者转移树 int binCount = 0; for (Node [] tab = table;;) { Node f; int n, i, fh; // 第一次Put,没有初始化,则初始化table if (tab == null || (n = tab.length) == 0) tab = initTable(); // 通过哈希计算出一个表中的位置,因为n是数组的长度,所以(n-1)&hash肯定不会出现数组越界 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 如果这个位置没有元素的话,则通过cas的方式尝试添加,注意这个时候是没有加锁的 if (casTabAt(tab, i, null, // 创建一个Node添加到数组中去,null表示的是下一个节点为空 new Node (hash, key, value, null))) break; // no lock when adding to empty bin } // 如果检测到某个节点的hash值是MOVED,则表示正在进行数组扩张的数据复制阶段 // 则当前线程也会参与去复制,通过允许多线程复制的功能,一次来减少数组的复制所带来的性能损失 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { /* * 如果这个位置有元素的话,就采用synchronized的方式加锁 * 如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历 * 如果找到了key和key的hash值都一样的节点,则把它的值替换 * 如果没找到的话,则添加在链表的最后面 * 如果是树的话,会对该节点上的关联数目进行判断 * 如果在8个以上的话,则会调用treeifyBin方法,来尝试转化为树,或者扩容 */ V oldVal = null; // 再次取出要存储的位置的元素,跟前面取出来的作比较 synchronized (f) { if (tabAt(tab, i) == f) { // 取出来的元素的hash值大于0 if (fh >= 0) { binCount = 1; // 遍历这个链表 for (Node e = f;; ++binCount) { K ek; // 如果hash,key相同,则替换value if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; // 当使用putIfAbsent的时候,只有在这个key没有设置值的时候才设置 if (!onlyIfAbsent) e.val = value; break; } Node pred = e; // 如果不是同样的hash,同样的Key的时候,则判断该节点的下一个节点是否为空, 为空吧这个要加入的节点设置为当前节点的下一个节点 if ((e = e.next) == null) { pred.next = new Node (hash, key, value, null); break; } } } // 如果已经转成红黑树类型了 else if (f instanceof TreeBin) { Node p; binCount = 2; // 将元素添加到树中去 if ((p = ((TreeBin )f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 当在同一个节点的数目达到8个的时候,则扩张数组或将给节点的数据转为tree if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 计数 addCount(1L, binCount); return null; } 扩容源码 节点个数 超过8,调用 treeifyBin 扩容。 节点数组长度小于64,扩张数组长度一倍,否则的话把链表转为树 /** * Replaces all linked nodes in bin at given index unless table is * too small, in which case resizes instead. * 当数组长度小于64的时候,扩张数组长度一倍,否则的话把链表转为树 */ private final void treeifyBin(Node [] tab, int index) { Node b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //MIN_TREEIFY_CAPACITY 64 tryPresize(n << 1); // 数组扩容 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { //使用synchronized同步器,将该节点出的链表转为树 if (tabAt(tab, index) == b) { TreeNode hd = null, tl = null; //hd:树的头(head) for (Node e = b; e != null; e = e.next) { TreeNode p = new TreeNode (e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) //把Node组成的链表,转化为TreeNode的链表,头结点任然放在相同的位置 hd = p; //设置head else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin (hd));//把TreeNode的链表放入容器TreeBin中 } } } } } 读源码 读支持并发操作 /* * 相比put方法,get就很单纯了,支持并发操作, * 当key为null的时候回抛出NullPointerException的异常 * get操作通过首先计算key的hash值来确定该元素放在数组的哪个位置 * 然后遍历该位置的所有节点 * 如果不存在的话返回null */ public V get(Object key) { Node [] tab; Node e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; } 1.7及以下版本原理 ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁 分段锁结构 static final class Segment extends ReentrantLock implements Serializable { /** * The number of elements in this segment"s region. */ transient volatileint count; /** * Number of updates that alter the size of the table. This is * used during bulk-read methods to make sure they see a * consistent snapshot: If modCounts change during a traversal * of segments computing size or checking containsValue, then * we might have an inconsistent view of state so (usually) * must retry. */ transient int modCount; /** * The table is rehashed when its size exceeds this threshold. * (The value of this field is always (int)(capacity * * loadFactor).) */ transient int threshold; /** * The per-segment table. */ transient volatile HashEntry [] table; /** * The load factor for the hash table. Even though this value * is same for all segments, it is replicated to avoid needing * links to outer object. * @serial */ final float loadFactor; Segment(float lf, int threshold, HashEntry [] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } } 插入源码 先加锁,再操作 public V put(K key, V value) { Segment s; if (value == null) throw new NullPointerException(); int hash = hash(key.hashCode()); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment )UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); } final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry [] tab = table; int index = (tab.length - 1) & hash; HashEntry first = entryAt(tab, index); for (HashEntry e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry (hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; } 读取源码 /** * Returns the value to which the specified key is mapped, * or {@code null} if this map contains no mapping for the key. * * More formally, if this map contains a mapping from a key * {@code k} to a value {@code v} such that {@code key.equals(k)}, * then this method returns {@code v}; otherwise it returns * {@code null}. (There can be at most one such mapping.) * * @throws NullPointerException if the specified key is null */ public V get(Object key) { Segment
s; // manually integrate access methods to reduce overhead HashEntry [] tab; int h = hash(key.hashCode()); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 原子操作 if ((s = (Segment )UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry e = (HashEntry ) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; } 删除操作 先上锁,再操作 final V remove(Object key, int hash, Object value) { if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry [] tab = table; int index = (tab.length - 1) & hash; HashEntry e = entryAt(tab, index); HashEntry pred = null; while (e != null) { K k; HashEntry next = e.next; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; if (value == null || value == v || value.equals(v)) { if (pred == null) setEntryAt(tab, index, next); else pred.setNext(next); ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { unlock(); } return oldValue; } 扩容 如何扩容。扩容的时候首先会创建一个两倍于原容量的数组,然后将原数组里的元素进行再hash后插入到新的数组里。为了高效ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。 private void rehash(HashEntry node) { /* * Reclassify nodes in each list to new table. Because we * are using power-of-two expansion, the elements from * each bin must either stay at same index, or move with a * power of two offset. We eliminate unnecessary node * creation by catching cases where old nodes can be * reused because their next fields won"t change. * Statistically, at the default threshold, only about * one-sixth of them need cloning when a table * doubles. The nodes they replace will be garbage * collectable as soon as they are no longer referenced by * any reader thread that may be in the midst of * concurrently traversing table. Entry accesses use plain * array indexing because they are followed by volatile * table write. */ HashEntry [] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); HashEntry [] newTable = (HashEntry []) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry e = oldTable[i]; if (e != null) { HashEntry next = e.next; int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry lastRun = e; int lastIdx = idx; for (HashEntry last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry n = newTable[k]; newTable[k] = new HashEntry (h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; } 结论 Q:1.8版本如何保证线程安全? 写操作:同步处理主要是通过Synchronized和unsafe两种方式来完成的在取得某个位置的Node的时候,使用的都是unsafe的方法,来达到并发安全的目的当需要在某个位置设置节点的时候,则会通过Synchronized的同步机制来锁定该位置的节点。在数组扩容的时候,则通过处理的步长和fwd节点来达到并发安全的目的,通过设置hash值为MOVED当把某个位置的节点复制到扩张后的table的时候,也通过Synchronized的同步机制来保证线程安全 读操作:根本没有使用同步机制,也没有使用unsafe方法,所以读操作是支持并发操作的 Q:读加锁么,写加锁么? 在1.7版本及以前版本写使用分段锁(可重入锁)保证安全读使用UNSAFE.getObjectVolatile,保证数据一致性 在1.8版本写时,取位置使用unsafe, 设置节点用Synchronized,保证数据一致性读操作是直接支持并发操作 Q: 在jdk1.8与jdk1.7的差别? 1.7 使用分段锁保证线程安全,1.8 使用Synchronized和unsafe两种方式保证线程安全 1.7扩容时,分段锁局部扩容,hash算好后,重新分配,1.8整个扩容,然后数据重新放入 1.8引入红黑树,在链表太长时,转换,提高查询效率 Q:为什么key不能为null, value不能为null 看源码put入口判断 if (key == null || value == null) throw new NullPointerException(); 设计意图:二义性,防止歧义 如果判断value为null, 不确定这个Null是插入进去的,还是本身真的没值, 导致歧义,为了线程安全性,所以都不能为null