HashMap原理

数据结构上:数组+(链表和红黑树)

HashMap线程不安全问题

HashMap 是线程不安全的,原因就在于 HashMap 的 rehashrehash 是 HashMap 扩容过程种的一个步骤。 HashMap 的容量是有限的。当经过多次元素插入,使得 HashMap 达到一定饱和度时,Key 映射位置发生冲突的几率会逐渐提高。 这时候,HashMap 需要扩展它的长度,也就是进行 Resize。 影响发生 Resize 的因素有两个:

1.Capacity

HashMap 的当前长度。上一期曾经说过,HashMap 的长度是 2 的幂。

2.LoadFactor

HashMap 负载因子,默认值为 0.75f。

衡量 HashMap 是否进行 Resize 的 条件如下:

HashMap.Size >= Capacity * LoadFactor

HashMap 的扩容主要分为两步:

  1. 扩容

    创建一个新的 Entry 空数组,长度是原数组的 2 倍

  2. ReHash

    遍历原 Entry 数组,把所有的 Entry 重新 Hash 到新数组。为什么要重新 Hash 呢?因为长度扩大以后,Hash 的规则也随之改变。

    让我们回顾一下 Hash 公式:

    index = HashCode(Key) & (Length - 1)**(计算哈希索引)**

    当原数组长度为 8 时,Hash 运算是和 111B 做与运算;新数组长度为 16,Hash 运算是和 1111B 做与运算。Hash 结果显然不同。

    ReHash 的 Java 代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    /**
    * Transfers all entries from current table to newTable.
    */
    void transfer(Entry[] newTable, boolean rehash) {
    int newCapacity = newTable.length;
    for (Entry<K,V> e : table) {
    while(null != e) {
    Entry<K,V> next = e.next;
    if (rehash) {
    e.hash = null == e.key ? 0 : hash(e.key);
    }
    int i = indexFor(e.hash, newCapacity);
    e.next = newTable[i];
    newTable[i] = e;
    e = next;
    }
    }
    }

    以上过程单线程下不会出现问题,但是当两个线程同时触发resize的时候就有可能出现问题

    假设一个HashMap已经到了Resize的临界点。此时有两个线程A和B,在同一时刻对HashMap进行Put操作:

    img

    此时达到Resize条件,两个线程各自进行Rezie的第一步,也就是扩容:

    img

    这时候,两个线程都走到了ReHash的步骤。让我们回顾一下ReHash的代码:

    1
    Entry<K,V> next = e.next;

    假如此时线程B遍历到Entry3对象,刚执行完红框里的这行代码,线程就被挂起。对于线程B来说:

    e = Entry3

    next = Entry2

    这时候线程A畅通无阻地进行着Rehash,当ReHash完成后,结果如下(图中的e和next,代表线程B的两个引用):

    img

    直到这一步,看起来没什么毛病。接下来线程B恢复,继续执行属于它自己的ReHash。线程B刚才的状态是:

    e = Entry3

    next = Entry2

    1
    int i = indexFor(e.hash, newCapacity);

    当执行到上面这一行时,显然 i = 3,因为刚才线程A对于Entry3的hash结果也是3。

    1
    2
    newTable[i] = e;
    e = next;

    我们继续执行到这两行,Entry3放入了线程B的数组下标为3的位置,并且e指向了Entry2。此时e和next的指向如下:

    e = Entry2

    next = Entry2

    整体情况如图所示:

    img

    接着是新一轮循环,又执行到红框内的代码行:

    1
    Entry<K,V> next = e.next;

    e = Entry2

    next = Entry3

    整体情况如图所示:

    img

    接下来执行下面的三行,用头插法把Entry2插入到了线程B的数组的头结点:

    1
    2
    3
    e.next = newTable[i];
    newTable[i] = e;
    e = next;

    整体情况如图所示:

    img

    第三次循环开始,又执行到红框的代码:

    e = Entry3

    next = Entry3.next = null

    最后一步,当我们执行下面这一行的时候,见证奇迹的时刻来临了:

    1
    e.next = newTable[i];

    newTable[i] = Entry2

    e = Entry3

    Entry2.next = Entry3

    Entry3.next = Entry2

    链表出现了环形!

    整体情况如图所示:

    img

    此时,问题还没有直接产生。当调用Get查找一个不存在的Key,而这个Key的Hash结果恰好等于3的时候,由于位置3带有环形链表,所以程序将会进入死循环!**(此问题在JDK8中已经解决,上图过程看看就好)**

    主要的问题是,两个线程在同一个数组索引下标添加元素时,比如A添加key2,B添加key3,会可能导致key2或者key3丢失。

ConCurrentHashMap安全吗,是怎么保证安全的?

目前有如下一些方式可以获得线程安全的HashMap:

  • Collections.synchronizedMap
  • HashTable
  • ConcurrentHashMap

其中,前两种方式由于全局锁的问题,存在很严重的性能问题。所以,著名的并发编程大师Doug Lea在JDK1.5的java.util.concurrent包下面添加了一大堆并发工具。其中就包含ConcurrentHashMap这个线程安全的HashMap。

ConcurrentHashMap在JDK7和JDK8中的实现方式上有较大的不同。首先我们先来大概回顾一下ConcurrentHashMap在JDK7中的原理是怎样的。

1.分段锁技术(JDK7)

针对HashTable会锁整个hash表的问题,ConcurrentHashMap提出了分段锁的解决方案。

分段锁的思想就是:锁的时候不锁整个hash表,而是只锁一部分

如何实现呢?这就用到了ConcurrentHashMap中最关键的Segment。

ConcurrentHashMap中维护着一个Segment数组,每个Segment可以看做是一个HashMap。

而Segment本身继承了ReentrantLock,它本身就是一个锁。

在Segment中通过HashEntry数组来维护其内部的hash表。

每个HashEntry就代表了map中的一个K-V,用HashEntry可以组成一个链表结构,通过next字段引用到其下一个元素。

上述内容在源码中的表示如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {

// ... 省略 ...
/**
* The segments, each of which is a specialized hash table.
*/
final Segment<K,V>[] segments;

// ... 省略 ...

/**
* Segment是ConcurrentHashMap的静态内部类
*
* Segments are specialized versions of hash tables. This
* subclasses from ReentrantLock opportunistically, just to
* simplify some locking and avoid separate construction.
*/
static final class Segment<K,V> extends ReentrantLock implements Serializable {
// ... 省略 ...
/**
* The per-segment table. Elements are accessed via
* entryAt/setEntryAt providing volatile semantics.
*/
transient volatile HashEntry<K,V>[] table;
// ... 省略 ...
}
// ... 省略 ...

/**
* ConcurrentHashMap list entry. Note that this is never exported
* out as a user-visible Map.Entry.
*/
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
// ... 省略 ...
}
}

img

由上图可见,只要我们的hash值足够分散,那么每次put的时候就会put到不同的segment中去。 而segment自己本身就是一个锁,put的时候,当前segment会将自己锁住,此时其他线程无法操作这个segment, 但不会影响到其他segment的操作。这个就是锁分段带来的好处。

2.线程安全的put

ConcurrentHashMap的put方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;

// 根据key的hash定位出一个segment,如果指定index的segment还没初始化,则调用ensureSegment方法初始化
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 调用segment的put方法
return s.put(key, hash, value, false);
}

最终会调用segment的put方法,将元素put到HashEntry数组中,这里的注释中只给出锁相关的说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 因为segment本身就是一个锁
// 这里调用tryLock尝试获取锁
// 如果获取成功,那么其他线程都无法再修改这个segment
// 如果获取失败,会调用scanAndLockForPut方法根据key和hash尝试找到这个node,如果不存在,则创建一个node并返回,如果存在则返回null
// 查看scanAndLockForPut源码会发现他在查找的过程中会尝试获取锁,在多核CPU环境下,会尝试64次tryLock(),如果64次还没获取到,会直接调用lock()
// 也就是说这一步一定会获取到锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
// 扩容
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 释放锁
unlock();
}
return oldValue;
}

3.线程安全的扩容(Rehash)

HashMap的线程安全问题大部分出在扩容(rehash)的过程中。

ConcurrentHashMap的扩容只针对每个segment中的HashEntry数组进行扩容。

由上述put的源码可知,ConcurrentHashMap在rehash的时候是有锁的,所以在rehash的过程中,其他线程无法对segment的hash表做操作,这就保证了线程安全。

1.JDK8中ConcurrentHashMap的初始化

以无参数构造函数为例,来看一下ConcurrentHashMap类初始化的时候会做些什么。

1
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();

首先会执行静态代码块和初始化类变量。 主要会初始化以下这些类变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
这里用到了Unsafe类,其中objectFieldOffset方法用于获取指定Field(例如sizeCtl)在内存中的偏移量。// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;

static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}这里用到了Unsafe类,其中objectFieldOffset方法用于获取指定Field(例如sizeCtl)在内存中的偏移量。

这里用到了Unsafe类,其中objectFieldOffset方法用于获取指定Field(例如sizeCtl)在内存中的偏移量。

2.内部数据结构

先来从源码角度看一下JDK8中是怎么定义的存储结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
*
* hash表,在第一次put数据的时候才初始化,他的大小总是2的倍数。
*/
transient volatile Node<K,V>[] table;

/**
* 用来存储一个键值对
*
* Key-value entry. This class is never exported out as a
* user-mutable Map.Entry (i.e., one supporting setValue; see
* MapEntry below), but can be used for read-only traversals used
* in bulk tasks. Subclasses of Node with a negative hash field
* are special, and contain null keys and values (but are never
* exported). Otherwise, keys and vals are never null.
*/
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}

可以发现,JDK8与JDK7的实现由较大的不同,JDK8中不在使用Segment的概念,他更像HashMap的实现方式。

img

3.线程安全的hash表初始化

由上文可知ConcurrentHashMap是用table这个成员变量来持有hash表的。

table的初始化采用了延迟初始化策略,他会在第一次执行put的时候初始化table。

put方法源码如下(省略了暂时不相关的代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* <p>The value can be retrieved by calling the {@code get} method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with {@code key}, or
* {@code null} if there was no mapping for {@code key}
* @throws NullPointerException if the specified key or value is null
*/
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 计算key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果table是空,初始化之
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 省略...
}
// 省略...
}

initTable源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// #1
while ((tab = table) == null || tab.length == 0) {
// sizeCtl的默认值是0,所以最先走到这的线程会进入到下面的else if判断中
// #2
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 尝试原子性的将指定对象(this)的内存偏移量为SIZECTL的int变量值从sc更新为-1
// 也就是将成员变量sizeCtl的值改为-1
// #3
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 双重检查,原因会在下文分析
// #4
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 默认初始容量为16
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// #5
table = tab = nt; // 创建hash表,并赋值给成员变量table
sc = n - (n >>> 2);
}
} finally {
// #6
sizeCtl = sc;
}
break;
}
}
return tab;
}

成员变量sizeCtl在ConcurrentHashMap中的其中一个作用相当于HashMap中的threshold,当hash表中元素个数超过sizeCtl时,触发扩容; 他的另一个作用类似于一个标识,例如,当他等于-1的时候,说明已经有某一线程在执行hash表的初始化了,一个小于-1的值表示某一线程正在对hash表执行resize。

这个方法首先判断sizeCtl是否小于0,如果小于0,直接将当前线程变为就绪状态的线程。

当sizeCtl大于等于0时,当前线程会尝试通过CAS的方式将sizeCtl的值修改为-1。修改失败的线程会进入下一轮循环,判断sizeCtl<0了,被yield住;修改成功的线程会继续执行下面的初始化代码。

在new Node[]之前,要再检查一遍table是否为空,这里做双重检查的原因在于,如果另一个线程执行完#1代码后挂起,此时另一个初始化的线程执行完了#6的代码,此时sizeCtl是一个大于0的值,那么再切回这个线程执行的时候,是有可能重复初始化的。关于这个问题会在下图的并发场景中说明。

然后初始化hash表,并重新计算sizeCtl的值,最终返回初始化好的hash表。

4 .线程安全的put

put操作可分为以下两类 当前hash表对应当前key的index上没有元素时 当前hash表对应当前key的index上已经存在元素时(hash碰撞)

4.1 hash表上没有元素时

对应源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

tabAt方法通过Unsafe.getObjectVolatile()的方式获取数组对应index上的元素,getObjectVolatile作用于对应的内存偏移量上,是具备volatile内存语义的。

如果获取的是空,尝试用cas的方式在数组的指定index上创建一个新的Node。

4.2 hash碰撞时

对应源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
else {
V oldVal = null;
// 锁f是在4.1中通过tabAt方法获取的
// 也就是说,当发生hash碰撞时,会以链表的头结点作为锁
synchronized (f) {
// 这个检查的原因在于:
// tab引用的是成员变量table,table在发生了rehash之后,原来index上的Node可能会变
// 这里就是为了确保在put的过程中,没有收到rehash的影响,指定index上的Node仍然是f
// 如果不是f,那这个锁就没有意义了
if (tabAt(tab, i) == f) {
// 确保put没有发生在扩容的过程中,fh=-1时表示正在扩容
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 在链表后面追加元素
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果链表长度超过8个,将链表转换为红黑树,与HashMap相同,相对于JDK7来说,优化了查找效率
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}

不同于JDK7中segment的概念,JDK8中直接用链表的头节点做为锁。 JDK7中,HashMap在多线程并发put的情况下可能会形成环形链表,ConcurrentHashMap通过这个锁的方式,使同一时间只有有一个线程对某一链表执行put,解决了并发问题。

5 线程安全的扩容

put方法的最后一步是统计hash表中元素的个数,如果超过sizeCtl的值,触发扩容。

扩容的代码略长,可大致看一下里面的中文注释,再参考下面的分析。 其实我们主要的目的是弄明白ConcurrentHashMap是如何解决HashMap的并发问题的。 带着这个问题来看源码就好。关于HashMap存在的问题,参考本文一开始说的笔者的另一篇文章即可。

其实HashMap的并发问题多半是由于put和扩容并发导致的。

这里我们就来看一下ConcurrentHashMap是如何解决的。

扩容涉及的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
* 业务中使用的hash表
*/
transient volatile Node<K,V>[] table;

/**
* The next table to use; non-null only while resizing.
* 扩容时才使用的hash表,扩容完成后赋值给table,并将nextTable重置为null。
*/
private transient volatile Node<K,V>[] nextTable;

/**
* Adds to count, and if table is too small and not already
* resizing, initiates transfer. If already resizing, helps
* perform transfer if work is available. Rechecks occupancy
* after a transfer to see if another resize is already needed
* because resizings are lagging additions.
*
* @param x the count to add
* @param check if <0, don't check resize, if <= 1 only check if uncontended
*/
private final void addCount(long x, int check) {
// ----- 计算键值对的个数 start -----
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// ----- 计算键值对的个数 end -----
// ----- 判断是否需要扩容 start -----
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 当上面计算出来的键值对个数超出sizeCtl时,触发扩容,调用核心方法transfer
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果有已经在执行的扩容操作,nextTable是正在扩容中的新的hash表
// 如果并发扩容,transfer直接使用正在扩容的新hash表,保证了不会出现hash表覆盖的情况
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 更新sizeCtl的值,更新成功后为负数,扩容开始
// 此时没有并发扩容的情况,transfer中会new一个新的hash表来扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
// ----- 判断是否需要扩容 end -----
}

/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// 初始化新的hash表,大小为之前的2倍,并赋值给成员变量nextTable
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 扩容完成时,将成员变量nextTable置为null,并将table替换为rehash后的nextTable
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 接下来是遍历每个链表,对每个链表的元素进行rehash
// 仍然用头结点作为锁,所以在扩容的时候,无法对这个链表执行put操作
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// setTabAt方法调用了Unsafe.putObjectVolatile来完成hash表元素的替换,具备volatile内存语义
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

根据上述代码,对ConcurrentHashMap是如何解决HashMap并发问题这一疑问进行简要说明。

  • 首先new一个新的hash表(nextTable)出来,大小是原来的2倍。后面的rehash都是针对这个新的hash表操作,不涉及原hash表(table)。
  • 然后会对原hash表(table)中的每个链表进行rehash,此时会尝试获取头节点的锁。这一步就保证了在rehash的过程中不能对这个链表执行put操作。
  • 通过sizeCtl控制,使扩容过程中不会new出多个新hash表来。
  • 最后,将所有键值对重新rehash到新表(nextTable)中后,用nextTable将table替换。这就避免了HashMap中get和扩容并发时,可能get到null的问题。
  • 在整个过程中,共享变量的存储和读取全部通过volatile或CAS的方式,保证了线程安全。

[ ]转载:HashMap原理及线程不安全详解 - 木子H的个人空间 - OSCHINA - 中文开源技术交流社区

[]参考:Java8中ConcurrentHashMap是如何保证线程安全的 - 知乎 (zhihu.com)