Java并发系列笔记之并发容器ConcurrentHashMap

线程不安全的HashMap

多线程情况下,HashMap由于在扩容的时候会导致链变成环,在下一次查询的时候会使得出现死循环的出现。变成环的主要原因在于HashMap扩容的逻辑是不具有原子性的,扩容的基本逻辑包括下面三条:

  1. 当前节点为e,获取当前节点的下一个节点next
  2. 将e按照hash移到到扩容后的hash表中,置e.next等于newtable[i],头插法
  3. 将e置为next进入下一次循环

这个过程在单线程中没有问题,如果原来hash值相同的值在新的hash表中仍然相同,假设原来的链表为A-B,那么走完第一步e为A,next为B,此时该线程挂起,另一个线程进入,执行完以上三步后newtable[i]中的链表为B-A,如果第一个线程继续执行,则会使A.next指向newtable[i],而newtable[i]是B-A,此时明显出现了环。而下一次遍历该链表的时候,会出现死循环。

volatile和CAS

volatile保证可见性是指:volatile变量读取的始终是内存中最后写入的值,volatile变量的写入会使其他线程工作内存的变量失效。也就是说只保证get这个值的时候返回的是内存中的值,保证修改单一变量时候的原子性,但不保证复合操作的原子性,复合操作有可能还在使用旧值进行操作,如i++。Java只保证了基本数据类型的变量和赋值操作才是原子性的,如i=1。
CAS则是使用预期值判断内存中的值是否一致,如果一致则进行修改,否则修改失败,整个读取-对比-写入是原子操作。CAS是一种高效的乐观锁技术,它被广泛使用在jdk的并发实现中。
使用CAS轮询和volatile类型状态变量,可以实现轻量级的乐观锁技术。

线程安全的ConcurrentHashMap

jdk1.7和jdk1.8中的ConcurrentHashMap实现有了较大的区别,其中jdk1.7中使用分段锁的Segment实现,而jdk1.8则使用CAS+synchronized相结合的方式

jdk1.7中的ConcurrentHashMap关键方法

put方法

put的基本思想是将根据hash的高位获取Segment的下标(如果Segment数组是16,则取高4位),然后获取锁,使用ReentrantLock的trylock方法获取锁,如果获取成功,首先会是否超过负载值,如果超过了,就进行扩容,否则,直接插入,扩容的过程是在Segment的内部进行的,扩容会将数组扩大两倍。

get方法

get也是首先获取Segment,然后在Segment的HashEntry数组的(hash & tablesize-1)下标下进行查找。

size方法

size方法的基本策略是将所有Segment中count相加,但是这样会导致不准确,jdk的做法是首先直接统计两次,如果两次相同,则成功,如果失败,则将所有的segment加锁统计。

jdk1.8中ConcurrentHashMap实现要点

关键数据结构

Node,用来包装基本的hash,key,value值。
TreeNode树节点,确切的说是红黑树节点,继承了Node,用来作为TreeBin红黑树的节点,
ForwardingNode,继承自Node,其hash为-1,用来在扩容时指示当前节点已经完成转移。
其中ConcurrentHashMap维护一个Node的数组,在非扩容阶段,只有Node和TreeBin节点,扩容阶段会出现ForwardingNode

CAS的使用

除了对状态变量的更新使用到的CAS,对Node数组的操作也是包含了三个核心CAS方法,分别是获取数组的第i个元素,替换数组中的第i个元素,设置数组中第i个位置的值。

扩容

扩容的时机是在容量超过负载的时候进行的,这段逻辑在插入一个元素并且统计总数的方法里面实现的。jdk1.8的扩容和之前不一样之处在于允许多个线程参与扩容过程。其基本过程如下

  1. 如果newTable为空,则创建一个原来两倍的数组。
  2. 将table中的数据复制到newTable中,可以多线程操作,
  3. 复制阶段,如果遍历的节点为forward节点,则说明此节点已经复制过,调到下一个节点
  4. 如果节点不为forward节点,则使用synchronized对节点加锁,并且复制这个链表或者树到i或者i+n位置
  5. 复制结束后将table指向newtable

put方法要点总结

put方法主要有四个大的判断逻辑,也就是主要的四步

  1. table是否为空,如果为空的话,则初始化表
  2. 如果table不为空,则查看(n-1)&hash位置的值是否为null,如果为null的话,CAS插入待插入节点,插入结束
  3. 如果遍历到的节点为forward节点,则使用helpTransfer方法帮助其扩容,helpTransfer会调用扩容的transfer方法
  4. 否则的话,将当前遍历的节点加锁,根据是Node还是TreeBin执行不同的插入逻辑,如果链表的长度大于阈值,则将其转换成红黑树
  5. 最后调用addCount方法将ConcurrentHashMap的size+1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

get方法要点

get方法主要是区分是链表还是树,其中实现是通过设置TreeBin的hash为-2,以此来区分不同的实现,然后根据不同的数据结构进行查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

size方法要点

为了统计ConcurrentHashMap的数量,jdk设计了那个变量,一个是baseCount整型,一个是CounterCell数组,其值都是volatile变量,获取size的过程就是要统计baseCount和CounterCell数组中所有值的和。
至于为什么要用两个数据结构,是考虑到多线程情况下,如果有多个线程在进行put操作,那么HashMap的size都要加一,首先是尝试CAS增加baseCount,如果加入成功,则成功,如果失败,再尝试CAS修改CounterCell数组,如果成功,也算成功。如果还是失败,则轮训以上两个步骤,结束。

总结

ConcurrentHashMap在jdk 1.7和1.8之间的差异主要体现在一下几个方面

  1. 基本的数据结构转换:1.7中使用Segment+HashEntry的数据结构,而1.8中使用Node数组存储,Node分为链表Node和红黑树TreeNode
  2. 1.7中以Segment为单位实现锁分离,1.8中大量使用CAS+状态位来控制同步,单位是Node,锁的粒度更小
  3. 1.7中的扩容机制是以Segment中加锁进行的,而1.8中可以有多个线程参与扩容的过程,速度更快。

参考文献

ConcurrentHashMap总结
J.U.C之Java并发容器:ConcurrentHashMap
Java并发编程艺术

声明

本文首发表于我的博客,欢迎关注!转载须注明文章出处,作者保留文章所有权。

坚持原创技术分享,您的支持将鼓励我继续创作!