ConcurrentHashMap源码解读(一)

作者 uunnfly 日期 2021-02-26
ConcurrentHashMap源码解读(一)

1.7的ConcurrentHashMap

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment实际继承自可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,每个Segment里包含一个HashEntry数组,我们称之为table,每个HashEntry是一个链表结构的元素。

img

锁分离: 使用了多个锁来控制对hash表的不同部分进行的修改, 每个段其实就是一个小的hash table,只要多个修改操作发生在不同的段上,它们就可以并发进行。

并发度:默认16。并发度可以理解为程序运行时能够同时更新ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的分段锁个数,即Segment[]的数组长度。如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。

1.8的ConcurrentHashMap

抛弃了Segment分段锁机制,利用CAS+Synchronized来保证并发更新的安全,底层采用数组+链表+红黑树的存储结构。

img

  • table:默认为null,初始化发生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方。

  • nextTable:默认为null,扩容时新生成的数组,其大小为原数组的两倍。

  • sizeCtl :size control , 默认为0,用来控制table的初始化和扩容操作

  1. -1: table正在初始化
  2. -N(N>1):正在扩容(并非像jdk注释所写:有N-1个线程在扩容),意义如下表所示:
    | 高RESIZE_STAMP_BITS位 | 低RESIZE_STAMP_SHIFT位 |
    | :—————————— | :——————————- |
    | 扩容盖戳标记 | 并行扩容线程数 +1 |

    -N取对应的二进制的低16位数值为M,代表此时有M-1个扩容线程。而并不代表有N-1个线程

  3. 其他情况:如果table未初始化,表示table需要初始化的大小。如果table初始化完成,表示table的容量,默认是table大小的0.75倍

    注:* 0.75 : (n-(n>>>2)) ;n>>>2无符号右移2位,当n为正数时就是÷4

  • Node: 保存key,value及key的hash值的数据结构。

    1
    2
    3
    4
    5
    6
    7
    class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    ... 省略部分代码
    }
  • ForwardingNode:一个特殊的Node节点(extends Node),hash值为-1,其中存储nextTable的引用。

    只有table发生扩容的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或则已经被移动。

    1
    2
    3
    4
    5
    6
    7
    8
    9
       static final int MOVED     = -1; // hash for forwarding nodes   
    static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
    super(MOVED, null, null, null);
    this.nextTable = tab;
    }
    ......
    }
  • TreeNode:当链表长度过长的时候,会转换为TreeNode。但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。而且TreeNode在ConcurrentHashMap集成自Node类,而并非HashMap中的集成自LinkedHashMap.Entry类,也就是说TreeNode带有next指针,这样做的目的是方便基于TreeBin的访问

  • TreeBin:包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。

1.初始化

注意,ConcurrentHashMap在构造函数中只会初始化sizeCtl值,并不会直接初始化table,而是延缓到第一次put操作。

a. 空构造函数:默认table size是16

1
2
3
4
5
 /**
* Creates a new, empty map with the default initial table size (16).
*/
public ConcurrentHashMap() {
}

b. 传入initialCapacity:

1
2
3
4
5
6
7
8
9
   private static final int MAXIMUM_CAPACITY = 1 << 30;    
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

最大容量为 2^30 (1 << 30) ,并且会调整到2的幂,比如传入100调整到256

initialCapacity + (initialCapacity >>> 1) + 1): +1是为了弥补向下取整,效果是initialCapacity * 1.5,这是一个bug,因为默认的负载因子是0.75,而不是2/3。见https://bugs.openjdk.java.net/browse/JDK-8202422

tableSizeFor: 计算c的上舍入到2的n次幂

1
2
3
4
5
6
7
8
9
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

目标是让n所有的二进制位都变成1, 此时n为2^a - 1,再+1就是2的幂了

情况一:c = 1, n=0,最后n = 0,return 1

情况二:c >=2, 则n的二进制至少有一个1,假设是10000000(实际一共32位,且第一位是符号位)

n >>> 1是01000000

按位与操作可以将指定位赋值为1,n | n >>>1 后就是11000000,与原来的n相比多了1个1。

第二次运算过后就是11110000,又多了2个1,以此类推,可以把最高位1后面的位数全都改成1。最后一次可以多16个1,因此最多支持1+1+2+4+8+16 = 1 + 1 (1-2^6)/(1-2) = 32位,也就是如果n是1 >>> 31, 最后会变成1111…..(32个)

情况三:c <= 0, n的第一个符号位是1,而且后面的操作只会增加“1”,不会去掉1,所以最后n必然 <0,return 1

注:后来代码又修改了:http://hg.openjdk.java.net/jdk/jdk/rev/3c3ff151c75e

1
2
3
4
5
6
7
8
9
10
     private static final int tableSizeFor(int c) {
- int n = c - 1;
- n |= n >>> 1;
- n |= n >>> 2;
- n |= n >>> 4;
- n |= n >>> 8;
- n |= n >>> 16;
+ int n = -1 >>> Integer.numberOfLeadingZeros(c - 1);
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

Integer.numberOfLeadingZeros(c-1) 可以求出c-1的前导0的个数,如10000000的前导0个数是24个,而-1的二进制是111111111111111111111111111…(32个1), 右移24位后就是11111111。即这步操作是从右往左寻找第一个1,然后把这个1右边的全部变成1

c. 传入initialCapacity, loadFactor,concurrencyLevel

1
2
3
4
5
6
7
8
9
10
11
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

concurrencyLevel:估计同步更新此map的线程个数,帮助初始化容量

long size = (long)(1.0 + (long)initialCapacity / loadFactor):+1为了向上取整

2. put

1
2
3
public V put(K key, V value) {
return putVal(key, value, false);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());//把hashCode结果分散
int binCount = 0;//桶的数量
for (Node<K,V>[] tab = table;;) {//自旋
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); //初始化表
//f是table在目标位置的node
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//f为null,即目标位置没有node,直接新增node
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)//目标位置有一个MOVED节点,说明此时正在扩容迁移,帮助迁移节点
tab = helpTransfer(tab, f);
else {//目标位置已有一个正常节点
V oldVal = null;
synchronized (f) {//实际在put时,会锁住当前的桶
if (tabAt(tab, i) == f) {//类似于双重检查锁,synchronized之后检查一下目标位置是否还是这个f。如果不是,会进入下一次循环
if (fh >= 0) {//链表
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {//红黑树
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//put成功且数量+1
return null;
}

看一下如何分散hash

1
2
3
4
   static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash    
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

0x7fffffff 即 0111111…(31个1), 因为桶的最大数量只有 1 << 30,而且第一位是符号位。

而a ^ 0 = a, a ^ 1 = !a, 把 h >>> 16的结果分为两种情况考虑,把从右往左数直到最后一个1的位数称为有效位数

情况一:h 的有效位数不足16位,h >>> 16 = 0, 所以 h ^ (h >>> 16) = h

情况二: h >>> 16 > 0,实际就是把h的高16位移到了低16位,并把高16位补上0。这个结果再和h异或,则h的高16位不变,低16位会改变。

jdk的考虑是:因为表大小限制(一般hash表大小不会达到1 << 16),所以高位都是被浪费的,这样异或一下可以让高位也发挥作用,而且对系统消耗少,又能分散hash

如何初始化table?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
   // Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
static {
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
}

/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin 自旋
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

Thread.yield(): 它让掉当前线程 CPU 的时间片,使正在运行中的线程重新变成就绪状态,并重新竞争 CPU 的调度权。它可能会获取到,也有可能被其他线程获取到。

当sizeCtl < 0时说明有线程正在初始化或者扩容,因此该线程让出时间片。而且下面逻辑是在else里面的,外层又套了一个while,所以后面如果该线程又获取到时间片但table未初始化好且sizeCtl < 0,则会继续yield,这就是一种自旋操作

sun.misc.Unsafe能够提供硬件级别的原子操作,U.compareAndSwapInt是一个CAS操作,this是操作的对象,SIZECTL是内存位置offset,如果SIZECTL处的值等于sc则将SIZECTL处的值设为-1,失败就返回false。SIZECTL是sizeCtl的位置

最终经过重重“考验”,new了一个Node数组,长度为sc,这个sc正常情况就是默认的16或者初始化时传入的容量大小计算后的结果,但从这段代码看也可能=0,不过实际经过初始化的步骤不会出现这种情况。

sc = n - (n >>> 2) 是将node的容量大小 * 0.75,再赋值给sizeCtl, 所以初始化表之后的sizeCtl是触发扩容的最大容量。

初始化之后再次循环到达第一个else if, 实际这是一个死循环,等工作全部完成再break

再看tabAt(tab, i = (n - 1) & hash))

i就是将要放置的节点的位置,n在这里是table的长度,而且n一定是2的幂,所以n-1的二进制是11111111… ,(n-1) % hash 的效果就是hash % n

n为2的幂,则 (n-1) % hash = hash % n

继续看tabAt函数,获得tab指定位的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
       private static final int ASHIFT;
private static final long ABASE;
static {
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");//scale一定是2的幂
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); //scale是2的几次幂
}

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

U.arrayBaseOffset获取数组首元素的偏移地址base, arrayIndexScale获取数组单个元素大小因子scale

(scale & (scale - 1)): 将scale最后一个1改成0,如果scale是2的幂,那它只有一个1,结果就是0,否则结果必然不是0

n&(n-1)作用:将n的二进制表示中的最低位为1的改为0

分析:减去1时如果当前位是0会变成1再向前一位减1,如果当前位是1会变成0且不用再往前减,也就是说从右往左减到第一个1为止,最右边所有0变成了1,而0 & 1 = 0,1 & 1= 1, 所以n & (n-1)会将n的最低位为1的位改成0,并且不会影响其他的1。比如 n = 10100, n -1 = 10011, n & (n-1) = 10000

31 - Integer.numberOfLeadingZeros(scale):scale是2的几次幂

U.getObjectVolatile:从对象的指定偏移量处获取变量的引用,使用volatile的加载语义。

ABASE是数组起始地址, i << ASHIFT = i * 2^ASHIFT ,是第i个元素的偏移量

1
2
3
4
5
6
casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

cas:如果当前值为null的话,将tab的i下标的元素设为value,并返回true,否则返回false

如果用cas操作成功的话,这个值已经put进去了,可以break。加入到空桶这个操作也是没有锁的(因为用的是cas)

如果cas操作不成功,说明有其他线程在初始化桶,需要继续下一次循环。

桶初始化成功且没有在扩容的情况下就可以真正put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
synchronized (f) {//锁上要操作的桶
if (tabAt(tab, i) == f) {//再次确认tab的i下标就是前面拿到的桶(防止这中间发生扩容)
if (fh >= 0) {// fh<0说明是特殊节点: forwarding nodes,roots of trees,transient reservations
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {//binCount每次循环都会 + 1
K ek;
//如果当前节点的hash值和key都满足要求,把值存进去(如果onlyIfAbsent就不存)
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//如果当前节点是最后一个,把新节点加入,否则继续往后找
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//TreeBin的hash值是常量-2
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//put操作对数据产生了影响
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); //binCount超过8个(只会在链表那里达到8个),转成红黑树
if (oldVal != null)//如果oldVal不为空,说明没有对元素个数产生影响,也就不可能触发扩容。
return oldVal;
break;
}
1
2
//走到这里说明插入了一个新节点,给binCount +1      
addCount(1L, binCount);
addCount

先把数量加到了baseCount上,如果CAS操作失败说明有并发冲突,于是写到CounterCell里面,这步交给fullAddCount处理(CounterCell的初始化、扩容、并发)

之后判断数量是否超过容量大小(table * 0.75),超过的话就扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
    /**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
*/
//总元素个数,如果有冲突的元素是放到counterCells里
private transient volatile long baseCount;

/**
* Table of counter cells. When non-null, size is a power of 2.
*/
//存放冲突元素的个数
private transient volatile CounterCell[] counterCells;

private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//cas在if括号里,如果操作成功且counterCells为空,说明没有冲突,直接去下面的逻辑了
//if里面说明有多线程冲突
if ((as = counterCells) != null ||
//尝试修改baseCount
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//在putVal()里一定是 > 0的,check就是binCount
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//如果上面的sumCount 大于现有的容量的话就触发扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//有线程在扩容
if (sc < 0) {
//这里的条件有个bug
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//注意此处,sc < 0就是这里赋值的
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

sumCount 就是把baseCount和CounterCell里面的值全加起来

(rs << RESIZE_STAMP_SHIFT) + 2: rs << RESIZE_STAMP_SHIFT之后是形如 1xxx xxx xxxx xxxx 0000 0000 0000 0000的格式,低RESIZE_STAMP_SHIFT位上会加上2(不会影响到高RESIZE_STAMP_BITS位),所以低RESIZE_STAMP_SHIFT位存储的就是并发线程数 + 1。

sc == rs + 1 || sc == rs + MAX_RESIZERS: 这个判断条件是一个bug https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427 ,应该改为 sc == ( rs<<RESIZE_STAMP_SHIFT ) +1 || sc == ( rs<<RESIZE_STAMP_SHIFT ) + MAX_RESIZERS

fullAddCount 用来把因为并发没能写入的数量写入CounterCell: //todo

helpTransfer

桶初始化后也可能遇到table在扩容的情况,此时需要等扩容完,所以需要帮助元素迁移。

1
2
3
if ((fh = f.hash) == MOVED) {
tab = helpTransfer(tab, f);
}

f是桶的头节点,hash是spread后的hash值,当hash值是-1时, 说明这个头节点是ForwardingNode, table此时正在扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
      /**
* The maximum number of threads that can help resize.
* Must fit in 32 - RESIZE_STAMP_BITS bits.
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
/**
* The number of bits used for generation stamp in sizeCtl.
* Must be at least 6 for 32bit arrays.
*/
private static int RESIZE_STAMP_BITS = 16;
/**
* The bit shift for recording size stamp in sizeCtl.
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/**
* The next table index (plus one) to split while resizing.
*/
private transient volatile int transferIndex;

/**
* Helps transfer if a resize is in progress.
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;//nextTab:扩容后的table
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {//正在nextTab上扩容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

/**
* Returns the stamp bits for resizing a table of size n.
* Must be negative when shifted left by RESIZE_STAMP_SHIFT.
*/
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

resizeStamp:计算n的前导零的个数(<= 32),并将第16位置为1,结果是 0000 0000 0000 0000 1xxx xxx xxxx xxxx, 左移16位显然为负数(符号位为1)(Must be negative when shifted left by RESIZE_STAMP_SHIFT.)

Integer.numberOfLeadingZeros(n)的结果是一种标记:因为n一定是2的幂,所以不同的n得到的结果必然不同,由n的前导零个数就可以代表 不同的n。再将第16位设为1,成为一个盖戳标记。后面可以看到扩容时的sizeCtl意义如下:

高RESIZE_STAMP_BITS位 低RESIZE_STAMP_SHIFT位
扩容盖戳标记 并行扩容线程数 +1

sc >>> RESIZE_STAMP_SHIFT:sc为扩容时的sizeCtl,其高位RESIZE_STAMP_SHIFT位就是数组长度n的盖戳标记,如果不相等,说明当前正在发生的扩容跟此线程认为的扩容不是同一个,所以可以break

sc == rs + 1 || sc == rs + MAX_RESIZERS ://todo

transferIndex <= 0: transferIndex 是当扩容时下一个要split的table下标

U.compareAndSwapInt(this, SIZECTL, sc, sc + 1): 尝试将sizeCtl +1,若成功,辅助扩容;若失败,自旋。

transfer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
    /**
* Minimum number of rebinnings per transfer step. Ranges are
* subdivided to allow multiple resizer threads. This value
* serves as a lower bound to avoid resizers encountering
* excessive memory contention. The value should be at least
* DEFAULT_CAPACITY.
*/
private static final int MIN_TRANSFER_STRIDE = 16;
/** Number of CPUS, to place bounds on some sizings */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* The next table index (plus one) to split while resizing.
*/
private transient volatile int transferIndex;

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//每次扩容是 * 2
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;//注意此处,transferIndex现在是数组长度
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;//本次迁移的范围,下标从nextBound到nextIndex
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {//是从后往前分配的
bound = nextBound;
i = nextIndex - 1;//i的初始值
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit, 重新检查一遍,从n开始
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {//与put时类似,只不过锁住f之后是迁移
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

第一步:确定迁移的桶位个数,即每个线程处理的桶的个数

1
2
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;

如果cpu个数 > 1: stride = 当前table长度/ 8 / cpu个数(不能小于16)

如果cpu个数 == 1: stride = 当前table长度(不能小于16)

第二步:如果未初始化扩容后的数组对象,则初始化

1
2
3
4
5
6
7
8
9
10
11
12
if (nextTab == null) {            // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}

第三步:

阶段一:计算迁移的边界

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
boolean advance = true;
boolean finishing = false;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;//本次迁移的范围,下标从nextBound到nextIndex,左闭右开
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//阶段二代码
//阶段三代码
}

transferIndex在第二步被赋值为table长度,在第一个else if括号内nextIndex被赋值为transferIndex。第一次在while内的循环直接看第二个else if,如果nextIndex - stride就只迁移stride到nextIndex的部分,否则全部迁移

—i 暗示了迁移是从桶的尾部开始迁移

阶段二:判断迁移是否完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//进入if内部说明迁移完成
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);// 2n - n/2 = 2n * 0.75
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//确认是否是最后一个退出迁移的线程,若不是则return,若是则finishing = true
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
//如果是最后一个退出迁移的线程,i = n,然后执行一次总体循环,检查每一个桶位是否迁移完毕
i = n; // recheck before commit
}
}

i 由阶段一可知是需要迁移的下标, < 0时说明迁移完成。用CAS长度减少迁移线程数量。

从上面代码可以猜测:advance是此线程是否要继续扩容的标志,finishing是本次扩容是否完成的标志

阶段三:迁移桶位

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);//fwd就是ForwardingNode
else if ((fh = f.hash) == MOVED)
advance = true;
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
//省略代码,从链表中迁移数据到新数组
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
//省略代码,从红黑树中读取元素放入新数组
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}

逐个桶位进行判断,这个是通过外层最大的for循环来执行的。针对每一个桶位:

  • 如果桶位为null,则尝试通过CAS将一个标识迁移的特殊节点,ForwardingNode放入桶位。

  • 如果桶位上的节点已经是ForwardingNode,则忽略,寻找下一个桶位。

  • 不是以上两种情况,则对桶位节点加锁。成功后,执行数据迁移,迁移完毕后,将桶位节点设置为ForwardingNode,用以标识迁移完毕。

    由此可见,放置ForwardingNode要么是本身桶位是null,要么迁移完毕。

看一下如何从链表中迁移数据到新数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//runBit只可能是0或者1,如果是1表示需要run
int runBit = fh & n;

ConcurrentHashMap.Node<K, V> lastRun = f;
for (ConcurrentHashMap.Node<K, V> p = f.next; p != null; p = p.next)
{
int b = p.hash & n;
if (b != runBit)
{
runBit = b;
lastRun = p;
}
}
if (runBit == 0)
{
//ln是下标不变的头节点,hn是下标需要增加的头节点
ln = lastRun;
hn = null;
}
else
{
hn = lastRun;
ln = null;
}
for (ConcurrentHashMap.Node<K, V> p = f; p != lastRun; p = p.next)
{
int ph = p.hash;
K pk = p.key;
V pv = p.val;
if ((ph & n) == 0)
{
//头插法
ln = new ConcurrentHashMap.Node<K, V>(ph, pk, pv, ln);
}
else
{
hn = new ConcurrentHashMap.Node<K, V>(ph, pk, pv, hn);
}
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);

对于数组长度为n,下标在i上的节点而言,执行2倍扩容后,其下标一部分仍然为i,一部分为i+n。比如hash值为5,13的两个元素,table.length为8时,都在5这个桶,扩容为16后,5还在5这个桶,13去了13这个桶。

迁移后下标值不一致和迁移后下标值一致,并且以一致的首节点作为分界线,也就是lastRun变量。runBit为0,意味着lastRun和之后的部分,迁移后下标不变;runBit不为0,意味着lastRun和之后的部分,迁移后下标变为i+n。但是lastRun之前的节点不一定要不要变下标

遍历首节点到lastRun节点之间的部分,计算其迁移后的下标,构建新的node对象,并且形成链表。而后添加到新的数组中

为什么hash值& n = 1 就是去hn呢? hash值13:1101,hash值21:1 0101, 扩容时容量大小 8:1000, 扩容后容量大小16: 1 0000 。

如果n是2的幂,则 hash | (n -1) = hash % n

1
2
3
4
5
13 | (8 - 1) = 1101 | 111 = 101; //13 % 8 = 5
21 | ( 8 -1) = 10101 | 111 = 101; //21 & 8 = 5

13 | (16 - 1) = 1101 | 1111 = 1101; // 13 % 16 = 13
23 | ( 16 -1) = 10101 | 1111 = 0101; //21 % 16 = 5

在上面的例子中,在n= 2 ^3^时,hash值的低3位就是放置的桶下标;在n = 2 ^4^ 时,hash值的低4位就是桶下标。所以n = 2 ^k^ 时,桶下标是hash值的低k位。

而在二进制(共k个有效位)最前面加个0则值不变,加个1 则下标 +2^k^ 。扩容时需要取hash值的低 k+1位作为桶下标,那么只要看第 k+1 位就行了:是1则下标 + 2^k^ ,是0则不变。而n的二进制只有第k+1位为1,做按位与操作就知道了hash值的情况。

扩容图解

触发扩容的操作

img

CPU核数与迁移任务hash桶数量分配的关系

img

单线程下线程的任务分配与迁移操作

img

多线程如何分配任务?

img

普通链表如何迁移?

img

什么是 lastRun 节点?

img

红黑树如何迁移?

img

hash桶迁移中以及迁移后如何处理存取请求?

img

多线程迁移任务完成后的操作

img

img

参考链接

ConcurrentHashMap源码走读

JUC之ConcurrentHashMap源码之扩容(二)

ConcurrentHashMap源码分析(03)-扩容方法