Java并发系列笔记之Java中的锁

锁是多线程下进行同步互斥的重要实现依托,Java语言中除了提供了使用synchronized关键字提供的隐式锁,也提供了以Lock接口定义的显示锁。一般Lock的子类实现依赖于一个抽象队列同步器(AbstractQueuedSynchronizer, AQS)的子类实现。AQS的目的是提供对同步状态的获取和释放的基础同步功能给依赖者使用,而屏蔽了同步状态获取和释放的具体实现。AQS是一个抽象类,它使用模板模式定义获取和释放同步状态的模板方法,而将模板方法所依赖的获取同步,阻塞管理等其他方法交给需要不同并发策略的子类去实现。本文从AQS的具体实现讲起,接着分析不同锁机制的实现原理。

AbstractQueuedSynchronizer

同步状态status的获取和更新

开头说到“AQS的目的是提供对同步状态的获取和释放的基础同步功能给依赖者使用”,AQS用一个volatile的整型变量status表示同步状态,当status>0是表示被占用的数量,status=0表示未被占用。并且提供了getState(),setState(int)以及compareAndSetState(int expect,int update),最后一个方法用CAS的方法来更新status的值,如果更新成功,则返回true,失败则返回false。
CAS是一种乐观锁技术,其具体实现有三个参数value,expect,update,value是内存地址中当前的值,expect是value预期的值,如果value和expect相同,则将value地址的值改为update,更新成功;否则,什么也不做,更新失败。操作系统保证读-对比-更新的原子性,这个逻辑是在native代码中实现的。具体实现如下图所示,CAS是一种乐观锁技术,所以需要使用轮询来查看是否更新成功,相比重量级的加锁,在锁竞争不是特别激烈的情况下,可以获取很好的性能提升。值得一提的是,CAS在1.8 JDK的锁优化中被大量使用。

可重写尝试获取和释放同步状态的方法

可以重写的方法是实现自定义同步策略的关键,主要分为两大类独占式和共享式,动作上分为尝试获取同步状态和尝试释放共享状态。具体包括如下方法:

  • protected boolean tryAcquire(int arg)//独占式的尝试获取arg个同步状态
  • protected boolean tryRelease(int arg)//独占式的尝试释放arg个同步状态
  • protected int tryAcquireShared(int arg)//共享式的获取arg个同步状态,如果返回大于等于0,则成功,否则失败
  • protected boolean tryReleaseShared(int arg)//共享式的尝试释放arg个同步状态
  • protected boolean isHeldExclusively()//独占状态下用于判定是否被占用状态

所谓独占式和共享式是指是不是只允许单独的一个线程获取共享状态而阻塞其他的线程的获取,如果是的,则是独占式,否则则是共享式。我们可以根据不同的同步需求,继承AQS,实现不同的获取同步状态的策略。

同步控制的模板方法

一般获取同步状态有两种结果:成功获取同步状态,这种情况下线程可以愉快的继续执行;获取同步状态失败,这种情况下需要AQS来维护获取同步状态失败的线程状态。从方法上来看,AQS支持多个种模板方法的实现,主要分为两大类,独占式的获取和释放同步状态和共享式的获取和释放同步状态。其中获取同步状态有非响应中断式的(默认)和响应中断式的,等待固定时间的获取。
独占式:

  1. public final void acquire(int arg)
  2. public final void acquireInterruptibly(int arg)
  3. public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  4. public final boolean release(int arg)

共享式:

  1. public final void acquireShared(int arg)
  2. public final void acquireSharedInterruptibly(int arg)
  3. public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
  4. public final boolean releaseShared(int arg)

那么AQS是如何处理获取同步状态的呢?以独占式的acquire(int)实现为例

模板方法首先尝试获取arg个同步状态(tryAcquire),如果成功了,直接方法返回,代表成功获取同步状态;如果失败了,AQS使用一个称为同步队列的双向队列来维护获取失败的线程信息,队列中的存储单元是封装了线程,线程状态和前驱后继的节点信息,AQS持有双向队列的头部(head)和尾部(tail)节点引用。
addWaiter就是将当前未获取同步状态的线程封装成Node添加在双向队列的尾部,当然插入的过程使用了CAS并且再失败的时候进行自旋。
接着acquireQueue尝试自旋获取同步状态,当然只有前驱节点为头结点的Node等待线程才可以获取同步状态,如果不是头结点或获取同步状态失败,则判断是否应该阻塞当前线程,判断的依据是当前Node前驱节点的waitStaus。如果是应该阻塞,则使用LockSupport的park(Thread)方法阻塞当前线程,否则,继续自旋。如果前驱是头节点并且尝试获取同步状态成功,则设置当前节点为头结点(也就是说头结点是获取同步状态的),并且返回。
总结起来,独占式获取同步状态的状态图如下:

独占式释放同步状态基本思路

基本的流程是尝试使用tryRelease(int)释放同步状态,如果释放成功,则当前线程会唤醒同步队列中后继节点的线程,并且返回true,否则返回false。release做的工作相对较少。

共享式获取同步状态基本思路

acquireShared(int)首先使用tryAcquireShared尝试获取同步状态,如果返回小于0则获取失败,将失败的线程放入同步队列中,并且自旋式尝试获取同步状态,只有前驱节点是头结点才可以尝试获取同步状态,如果成功了则设置当前节点为头结点,如果失败了则判断是否应该阻塞当前线程。整个过程和独占式acquire类似,不同的是tryAcquiredShared方法在获取锁的时候不用判断当前线程是不是独占式线程,任何线程在开始的时候都可以尝试获取同步状态。而tryAquired的实现应该排斥非独占线程获取锁。

共享式释放同步状态基本思路

releaseShared(int)首先尝试释放同步状态,并且使用doReleaseShared()循环唤醒队列中多个等待的线程。

ReentrantLock

看过了队列同步器的实现,我们来看看其具体的锁是如何运用AQS实现同步操作的。ReetrantLock是可重入锁,可重入的意思是同一个线程可以多次获取锁,并且可重入锁是独占式的,也就是同一时刻只有一个线程可以获得锁。并且可重入锁支持公平和非公平两种模式,所谓公平,指的是获取锁的请求是否遵循先到先得的FIFO规则,如果是FIFO则是公平的,否则则是非公平的。ReentrantLock持有一个继承了AQS的内部类Sync,并且定义了两个子类NonfairSync和FairSync,他们重写了AQS定义的尝试获取独占锁的tryAquire(int):boolean方法。我们首先来看非公平的tryAquire的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

其基本流程是:首先判断当前状态是否为0,如果是0使用CAS将设置成acquires,并且设置当前线程为独占线程,否则,如果线程为独占线程,则将状态设置为当前同步状态+acquires,因为独占状态下,不会有其他线程修改同步状态,可以不用CAS进行设置。如果设置成功返回true,否则返回false。
对别公平的tryAcquire实现,其代码如下,唯一的区别是在同步状态为0的时候,程序会检查当前是不是头结点持有的线程,如果是的话才可以尝试CAS设置,也就是对应的hasQueuedPredecessors方法。所以,在一个新的线程尝试获取公平锁的时候,如果它不是FIFO队列的头结点,则获取会失败,从而保证了公平性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

获取完同步状态后,会进行同步状态的释放,也就是重写tryRelease(int)方法,公平锁和非公平锁的共用的Sync的tryRelease方法,只有独占线程才可以释放同步状态,如果同步状态变成了0,才最终释放成功。

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

回顾整个流程

回顾整个流程,自顶向下的看以非公平的ReentrantLock实现过程,在用户需要使用ReentrantLock调用lock()方法,lock()方法会使用Sync的lock()方法使用CAS第一次尝试获取同步状态,如果获取成功,则直接设置当前线程为独占线程,如果失败,则会调用acquire(1)方法尝试一个同步状态,acquire则会调用tryAcquire(1)尝试获取同步状态,这个tryAcquire()是我们在实现AQS的子列NofaireSync中实现的,如果获取成功了,则正常返回,如果失败了,则会调用尝试将当前线程信息打包成Node节点加入到同步队列中,并且自旋尝试获取同步状态,当然只能前驱是头结点的线程才会多次自旋尝试获取同步状态,其余的则会阻塞等待。当同步状态使用完毕后,调用unlock()释放同步状态时,unlock方法会调用release()模板方法,而release模板方法会使用tryRelease()自定义策略释放同步状态,如果成功的话,则使用LockSupport.unpark(Thread)唤醒当前节点的后继节点。
回顾了整个流程我们就清晰了AQS在ReentrantLock实现过程中的作用,其中最重要的就是在获取同步状态失败后对同步队列的管理,而AQS提供了模板方法使得我们可以实现抽象方法所依赖的尝试获取同步状态的方法来控制获取同步状态的策略。

ReentrantReadWriteLock

可重入读写锁锁包含了两个锁读写锁,这两个锁的同步状态用一个int值的高16位和低16位表示,高16位表示读锁,低16位表示写锁。其中写锁是独占锁,也就是一旦一个线程获取了写锁,其他线程对读锁和写锁的请求都会被阻塞。读锁是共享锁,也就是多个线程可以同时获取读锁,但是在有线程获取读锁的过程中,写锁会被阻塞。我们可以通过ReentrantReadWriteLock中Sync的tryAcquire(int)和tryAcquireShared(int)看到这种读写策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

可以看到上面的注释,在写锁的获取中,如果读锁不为0或者写锁不是当前线程,则获取失败。否则说明是已经获取写锁的独占线程,直接设置同步状态即可。否则的话,如果公平锁控制方法writerShouldBlock返回false并且尝试使用CAS设置,如果成功了,则获取写锁才成功。
读锁的过程比较简单,基本思路是如果写锁同步状态不为0并且当前线程不为写锁的持有者,则获取失败,使用与逻辑的原因是如果当前线程获取了写锁,还可以竞争读锁。然后最后使用CAS设置加后的读锁。

Condition

每个Lock实现类还实现了newCondition方法,该方法返回一个Condition对象,AQS的内部类ConditionObject实现了Condition接口,一般newCondition方法会返回一个ConditionObject实例。
Condition使用await/signal方法实现了比Object类提供的wait/notify更细粒度的线程间通信方式,一个锁可以获取多个Condition实例。那么AQS的ConditionObject是如何实现await/signal来实现线程间的通信的呢。
具体来说,每个ConditionObject对象持有一个像同步队列一样的双向队列,其中的每个节点也是Node,当一个获取了锁的线程调用await的时候,会释放同步状态并且将其从同步队列的头部移到当前Condition的等待队列,并且调用unpark方法阻塞当前线程,等待唤醒。

而signal方法则是获取当前ConditionObject等待队列的头部节点,将其移动到同步队列的尾部并且使用LockSupport.unpark唤醒它。

参考文献

本文首发表于我的博客,欢迎关注!转载须注明文章出处,作者保留文章所有权。

坚持原创技术分享,您的支持将鼓励我继续创作!