从ReentrantLock类看AQS的原理及实践

从ReentrantLock类看AQS的原理及实践

一、学习AQS的前置知识

大名鼎鼎的AQS抽象队列同步器是JUC并发包下许多同步组件的基础框架,它的设计可以说考虑了方方面面的需求显得尤为复杂,从ReentrantLock的lock方法来看AQS,线程因为无法获取锁进入同步队列阻塞等待所使用的就是LockSupport的part方法,这也是我们要说的第一个前置知识。

1.LockSupport工具类

image-20240905220638103

LockSupport是一个工具类,提供了基本的线程阻塞唤醒功能(相较于wait、notify、notifyAll可以做到精准唤醒指定的线程),它是创建锁和其他同步组件的基础工具,内部是使用sun.misc.Unsafe类实现的。LockSupport和使用它的线程都会关联一个许可permmit,park方法表示消耗一个许可,调用park方法时,如果许可可用则park方法返回,如果没有许可则一直阻塞直到许可可用。unpark方法表示增加一个许可,多次调用并不会积累许可,因为许可数最大值为1。当然这里所讲的permit在LockSupport的代码当中没有体现,需要到HotSpot的源码当中查看,以下为截图供查看:

1、/share/vm/runtime/park.hpp中许可的定义字段_counter

img

2、park()的实现(部分截图)【os/bsd/vm/os_bsd.cpp】

img

3、unpark()实现(部分截图)【os/bsd/vm/os_bsd.cpp】

img

在Java 6中,LockSupport增加了park(Object blocker)、parkNanos(Object blocker,long nanos)和parkUntil(Object blocker,long deadline)3个方法,用于实现阻塞当前线程的功能,其中参数blocker是用来标识当前线程在等待的对象(以下称为阻塞对象),该对象主要用于问题排查和系统监控

从表的线程dump结果可以看出,代码片段的内容都是阻塞当前线程10秒,但从线程dump结果可以看出,有阻塞对象的parkNanos方法能够传递给开发人员更多的现场信息。这是由于在Java 5之前,当线程阻塞(使用synchronized关键字)在一个对象上时,通过线程dump能够查看到该线程的阻塞对象,方便问题定位,而Java 5推出的Lock等并发工具时却遗漏了这一点,致使在线程dump时无法提供阻塞对象的信息。因此,在Java 6中,LockSupport新增了上述3个含有阻塞对象的park方法,用以替代原有的park方法。

image-20240905221052742

2.Java中断机制

image-20231118213227293

注意:线程调用LockSupport的part方法会进入WAITING状态,如果此时对该线程调用interrupt方法,线程会立即苏醒恢复。

3.模板设计模式

二、ReentrantLock类的简介

ReentrantLock类是可重入互斥锁,与使用同步方法和语句访问的隐式监视器锁具有相同的基本行为和语义,但具有扩展功能。ReentrantLock由最后一次成功锁定但尚未解锁的线程拥有,当锁不被另一个线程拥有时,调用加锁的线程将返回并成功获取锁。如果当前线程已经拥有锁,该lock方法将立即返回,这可以使用isHeldByCurrentThreadgetHoldCount方法进行检查。
此类的构造函数接受可选的公平参数,当设置为true时,在争用情况下,锁倾向于授予等待时间最长的线程访问权限(即同步队列的队头虚节点的下一个节点),否则该锁不保证任何特定的访问顺序。使用由许多线程访问的公平锁的程序可能会显示比使用默认设置的程序更低的总体吞吐量(即更慢,通常慢得多),但在获取锁的时间上具有较小的差异并保证不会出现饥饿

关于公平与非公平的额外说明:

在AQS持有锁的线程释放锁以后,可能来竞争锁的线程有两种类型,一种就是在同步队列中线程,准确的说就是队列中第一个等待的线程,因为只有它才有争抢锁的资格(FIFO规则)。另外一种想要争抢锁的线程就是队列外的线程,也就是没有排队的线程。如果释放的锁可以被这些没有排队的线程优先抢到的话,那对于排队的线程来说就是非公平的,如果我们优先队列中的线程获取到锁的话,那对于他们就是公平的。说到底就是允不允许争抢锁的时候插队,当然这里的插队,是指插在队头!允许插队就是不公平的,不允许就是公平的。

为了帮助大家更好地理解ReentrantLock的特性,我们先将ReentrantLock跟常用的Synchronized进行比较,其特性如下(蓝色部分为本篇文章主要剖析的点):

img

三、ReentrantLock类的结构

1.ReentrantLock类的总体架构

img

借鉴网上的一张ReentrantLock类的结构图,可以看出ReentrantLock首先继承了Lock接口并实现了其中的6个方法,其次ReentrantLock包含了一个内部类对象sync是AQS的实现类Sync,根据构造函数的公平参数sync可以Sync的两个实现子类FairSyncNonfairSync之一。

2.Lock接口

我们先来看一下Lock接口的信息:

image-20231117111109191

Lock实现提供了比使用synchronized方法和语句更广泛的锁定操作。它们允许更灵活的结构,可能具有完全不同的属性,并且可能支持多个关联的Condition对象

虽然synchronized方法和语句的作用域机制使使用监视器锁进行编程变得更加容易,并且有助于避免许多涉及锁的常见编程错误,但有时您需要以更灵活的方式使用锁。例如,一些遍历并发访问数据结构的算法需要使用hand-over-handchain locking:获取节点A的锁,然后获取节点B的锁,然后释放A并获取C,然后释放B并获得D等。Lock接口的实现允许在不同范围内获取和释放锁,并允许以任何顺序获取和释放多个锁,从而允许使用此类技术。随着灵活性的增加,责任也随之增加。在大多数情况下,应该使用以下习惯用法:

image-20231117120915463

当加锁和解锁发生在不同的作用域时,必须注意确保持有锁时执行的所有代码都受到try-finallytry-catch的保护,以确保在必要时释放锁。==Lock实现通过提供非阻塞尝试获取锁 (tryLock())、获取可中断锁 (lockInterruptically) 以及尝试获取可以超时的锁(tryLock(long, TimeUnit)),从而提供了使用同步方法和语句的附加功能。==

ReentrantLock作为Lock接口的实现类,实现了Lock接口定义的所有方法,并且都是委托给内部的sync对象实现,以lock方法为例,其余方法都是同理:

image-20231117122652255

3.Sync内部类

本篇不会涉及到AQS的条件队列,因此为了便于理解,不相关的方法不会贴出,这里我们也只是简单的解释其中出现的方法,后面设计到ReentrantLock的加锁和解锁流程时涉及到的会展开详细解释:

公平性锁保证了锁的获取按照FIFO原则,而代价是进行大量的线程切换。非公平性锁虽然可能造成线程饥饿,但极少的线程切换,保证了其更大的吞吐量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

// 由于ReentrantLock有公平和非公平两种模式,因此lock方法需要留给FairSync和NonfairSync单独实现
abstract void lock();

// 尝试获取锁的逻辑在公平锁和非公平锁模式下都是同样的,首先判断当前锁是否被持有,如果没有被持有则通过CAS加锁(加锁成功后会设置锁的独占线程),如果锁已经被持有了则判断当前线程是否是锁持有线程,如果是同一个线程的重复加锁则进行锁可重入次数的递增
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 发现锁还是空闲状态就直接通过CAS尝试获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// ReentrantLock是可重入锁
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

// 解锁的逻辑无论公平还是非公平模式都是一致的,Sync直接定义好了,首先判断锁持有线程是否是当前线程,如果不是则抛出IllegalMonitorStateException异常,否则锁重入次数递减,如果递减为0则表明锁被完全释放了,清空锁的独占线程
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 只有持有锁的线程才能释放锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否[完全]释放锁的标记
boolean free = false;
if (c == 0) {
// 可重入锁被完全释放,设置锁的独占线程为空
free = true;
setExclusiveOwnerThread(null);
}
// 更新锁的重入次数
setState(c);
return free;
}

// 检查当前线程是否是独占锁的持有者
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}

// 获取ReentrantLock独占锁的持有者线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}

// 获取当前线程锁重入的次数
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}

// 检查锁是否已经被占用
final boolean isLocked() {
return getState() != 0;
}
}

image-20231117155101378

为什么nonfairTryAcquire方法是在Sync类中定义的而不是NonfairSync类定义的呢?其实是因为ReentrantLock类提供给开发者的tryLock方法需要使用到nonfairTryAcquire方法,无论是公平模式还是非公平模式下的ReentrantLock都是如此,因此作者选择将nonfairTryAcquire方法放入NonfairSync和FairSync的父类Sync中实现,这一点后面也会看到。

image-20240829162839826

当然,ReentrantLock所依靠的真正实现类其实还是Sync的两个实现类FairSyncNonfairSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

// 非公平加锁,后续的线程可以直接插队参与竞争锁
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

// 公平加锁,符合FIFO队列的顺序加锁特点
final void lock() {
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

三、AQS的重点原理

看了这么多,最终还是需要AQS框架的介入,首先,我们通过下面的架构图来整体了解一下AQS框架:

img

  • 上图中有颜色的为Method,无颜色的为Attribution。
  • 总的来说,AQS框架共分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。
  • 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

下面我们会从整体到细节,从流程到方法逐一剖析AQS框架,主要分析过程如下:

img

1.AQS的原理概述

AQS核心思想是:如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。

CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配

img

AQS使用一个volatile的int类型的成员变量state来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对state值的修改

2.AQS的数据结构

先来看下AQS中最基本的数据结构——Node,Node即为上面CLH变体队列中的节点。

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static final class Node {
// 共享锁的标记
static final Node SHARED = new Node();
// 独占锁的标记
static final Node EXCLUSIVE = null;

// 表明节点因为超时或中断而取消
static final int CANCELLED = 1;
// 表明后继节点需要被唤醒
static final int SIGNAL = -1;
// 条件队列中才会使用到
static final int CONDITION = -2;
// 共享锁模式下才会使用到
static final int PROPAGATE = -3;

// 同步队列中节点的当前状态,初始化为0
volatile int waitStatus;

// 前驱节点
volatile Node prev;

// 后继节点
volatile Node next;

// 入队等待的线程
volatile Thread thread;

// 双重作用,既可以指示链接的下一个Condition节点,又可以表示锁的两种模式:SHARED或EXCLUSIVE
Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

在了解数据结构后,接下来了解一下AQS的同步状态——State。AQS中维护了一个名为state的字段,意为同步状态,是由volatile修饰的,用于展示当前临界资源的获锁情况。

image-20231117161623954

这几个方法都是final修饰的,说明子类中无法重写它们。我们可以通过修改state字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。

img

img

对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是AQS架构图中的第一层:API层。

3.AQS重要方法与ReentrantLock的关联

从架构图中可以得知,AQS提供了大量用于自定义同步器实现的protected方法。自定义同步器实现的相关方法也只是为了通过修改state字段来实现多线程的独占模式或者共享模式。自定义同步器需要实现以下方法(ReentrantLock需要实现的方法如下,并不是全部):

方法名 描述
protected boolean isHeldExclusively() 该线程是否正在独占资源。只有用到Condition才需要去实现它。
protected boolean tryAcquire(int arg) 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。
protected boolean tryRelease(int arg) 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。
protected int tryAcquireShared(int arg) 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected boolean tryReleaseShared(int arg) 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。

一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。ReentrantLock是独占锁,所以实现了tryAcquire-tryRelease。

AQS使用模板设计模式封装了很多同步组件都需要的步骤,仅仅预留了tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared和isHeldExclusively这五个方法为protected方法,那么为什么AQS本身没有任何抽象方法但AQS类本身却被声明为抽象类呢?为什么AQS被声明为了抽象类其内部却没有抽象方法呢?

1.第一个问题的答案就是AQS本身的初衷是为其他众多的同步组件提供强大的基础框架,本身不希望被直接拿来使用,因此表明为抽象类。

2.第二个问题的答案是AQS作为并发包下的基础性框架因为要兼顾很多同步组件的需求,因此内容非常丰富如支持独占和共享两种模式,而很多组件如ReentrantLock本身只需要实现独占模式,如果将上面的五个预留的方法声明为abstract,则ReentrantLock就必须全部实现,这显然是不合适的,因此AQS使用protected方法,具体的同步组件可以自由选择实现哪些方法。

4.AQS的应用场景

除了上边ReentrantLock的可重入性的应用,AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了JUC中的几种同步工具,大体介绍一下AQS的应用场景:

同步工具 同步工具与AQS的关联
ReentrantLock 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。
Semaphore 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数。
CountDownLatch 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过。
ReentrantReadWriteLock 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数
ThreadPoolExecutor Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease)。

四、ReentrantLock类的加锁流程

我们以非公平锁为例,全流程讲解整个加锁的过程,首先我们先看加锁的入口函数:

1
2
3
public void lock() {
sync.lock();
}

所以说,ReentrantLock的加锁是委托给内部的sync对象完成的,以非公平的NonFairSync为例:

1
2
3
4
5
6
7
8
9
final void lock() {
// 新的线程直接尝试CAS获取锁,可以完成插队加锁
if (compareAndSetState(0, 1))
// 加锁成功后设置独占锁的持有者线程为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 加锁失败后的流程
acquire(1);
}

当第一个线程请求加锁时发现当前state值为0,锁还没有被占用,于是调用compareAndSetState加锁成功,开始处理后续业务逻辑;当然这是理想情况,如果此时恰好第二个线程提前抢占了锁,则本次compareAndSetState加锁失败,就会进入acquire方法寻求解决方案,让我们走进acquire的世界吧:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

这个AQS中的acquire方法中先后出现了四个函数,首先会调用子类(本节中就是NonfairSync)重写的tryAcquire方法,也许你会疑惑为什么在AQS中为什么不把tryAcquire方法定义为抽象方法而是保护方法,原因就是保证实现类可以根据自己的需求重写特定的方法实现功能逻辑,例如ReentrantLock中的NonfairSync只需要实现tryAcquiretryRelease方法即可,其余的像tryAcquireSharedtryReleaseShared等方法根本不需要:

image-20231117170552419

回到正题中,让我们看看NonfairSynctryAcquire方法的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 此刻发现锁已经被释放了,可以进行竞争抢占
if (c == 0) {
// 加锁成功后还是设置独占锁的持有者线程为当前线程,并返回true表示加锁成功
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 上一步竞争抢占锁失败或者锁一直被占用着,进一步判断是不是持有锁线程的重入加锁
else if (current == getExclusiveOwnerThread()) {
// ReentrantLock锁可重入
int nextc = c + acquires;
// ReentrantLock锁有递归次数限制,不能超过整型表示的最大正数2^31-1
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 线程已经持有锁,不需要通过CAS方法修改state
setState(nextc);
return true;
}
// 返回false表示加锁失败
return false;
}

看到tryAcquire内部其实还是调用了父类Sync就定义好的nonfairTryAcquire方法,整个tryAcquire做的事情就是检查一下独占锁有没有被释放,如果锁被释放了就进行竞争抢占,否则判断是不是重入加锁,如果都不是表明当前线程可能只能进入同步队列等待了。

针对tryAcquire加锁成功acquire方法的判断逻辑就已经结束了,下面我们进一步查看tryAcquire加锁失败即返回false的后续操作,首当其冲的就是addWaiter方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private Node addWaiter(Node mode) {
// 把线程封装到Node节点中,其中mode在这里是独占模式Node.EXCLUSIVE
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 尾节点不为空,表明同步队列已经初始化具有虚头节点,直接插入队尾即可
if (pred != null) {
node.prev = pred;
// CAS修改队尾指针成功,可能会因为并发线程修改队尾指针导致本次CAS失败,这种情况需要交给后续enq保证节点入队
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 负责同步队列的初始化和保证节点入队尾
enq(node);
// 返回插入的新节点
return node;
}

private Node enq(final Node node) {
// 自旋很重要!
for (;;) {
Node t = tail;
// 初始化同步队列
if (t == null) {
// CAS修改队头指针为虚节点,可能会因为并发线程进入enq方法初始化队列而CAS失败,但是自旋保证了即使发生这种情况也会使节点正确入队尾
if (compareAndSetHead(new Node()))
// 初始化的状态是队头、队尾指针都指向虚节点
tail = head;
} else {
// 新节点插入队尾
node.prev = t;
// CAS修改队尾指针为新节点,可能会因为并发线程插入队尾导致CAS失败,但是自旋保证了即使发生这种情况也会使节点正确入队尾
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

因此,总的来说addWaiter方法利用CAS保证了同步队列的初始化和节点入队尾的并发安全性。

接着,既然节点已经插入同步队列的尾部了,那么后续的阻塞等待逻辑怎么完成的呢,这就需要我们再一次查看acquireQueued方法的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
final boolean acquireQueued(final Node node, int arg) {
// 是否获取到锁的标记,默认为true表示没有获取到锁
boolean failed = true;
try {
// 加锁过程中是否中断的标记,ReentrantLock的lock方法并不处理中断,仅仅记录中断标记
boolean interrupted = false;
// 自旋很重要!
for (;;) {
// 新插入节点的前驱节点
final Node p = node.predecessor();
// 只有头节点的后一个节点(即第一个有效节点,符合FIFO规则)才有资格竞争锁
if (p == head && tryAcquire(arg)) {
// 新插入的节点加锁成功的话,虚节点后移一位变成当前持有锁的
setHead(node);
// help GC
p.next = null;
// 修改标记为false表示获取到锁
failed = false;
// 返回中断标记
return interrupted;
}
// 新插入的节点不是第一个有效节点,或者虽然是第一个有效节点但是尝试加锁失败
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果阻塞等待期间当前线程被中断了,修改中断标记为true
interrupted = true;
}
} finally {
if (failed)
// 发生异常,取消正在获取锁的线程节点
cancelAcquire(node);
}
}

上面的代码告诉我们只有队头节点的后一个节点才有资格去竞争锁,这也符合AQS的FIFO同步队列的规则定义,有的同学可能会认为调用ReentrantLock的lock方法加锁失败会就会立即去同步队列阻塞等待,这其实是错误的观点,不是每个节点都有资格竞争锁的,而且第一个有效节点也不会立刻就阻塞等待而是会再次尝试加锁多次,实在获取不到锁才会阻塞等待。

下面我们去看一下acquireQueued中获取不到锁处理流程的第一个函数shouldParkAfterFailedAcquire,顾名思义就是在加锁失败后判断是否应该阻塞等待的逻辑函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前置节点的状态
int ws = pred.waitStatus;
// 前置节点状态为SIGNAL,表示前置节点可以负责后继节点的唤醒工作,当前节点可以安心阻塞等待了,返回true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 前置节点的状态为CANCELLED,移除前面所有无效的Cancelled节点,我们本节用不到
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 节点的默认状态为0,通过CAS修改前置节点状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 除了前置节点状态为SIGNAL这种情况外,其余都返回false表示当前节点不应该阻塞,需要再次尝试
return false;
}

一开始节点的默认状态都是0,因此shouldParkAfterFailedAcquire会修改前置节点状态为SIGNAL,然后再次自旋重试加锁(针对有效节点),如果还是没有获取到锁则再次进入shouldParkAfterFailedAcquire方法并返回true表示当前节点可以阻塞了,随即调用后续的parkAndCheckInterrupt方法阻塞当前线程:

1
2
3
4
5
6
private final boolean parkAndCheckInterrupt() {
// 当前线程阻塞,等待后续解锁唤醒,也可能会被中断唤醒(我们这里可以理解为虚假唤醒,因为原则上ReentrantLock不响应中断),唤醒后需要去竞争加锁,可能会被外来线程插队(这就是非公平锁的特点)
LockSupport.park(this);
// 唤醒后返回当前线程阻塞等待期间是否被中断的标识,会清除线程的中断位
return Thread.interrupted();
}

到这里,acquire方法内的前三个主要函数都已经讲解完毕,我们再次回顾该方法的逻辑:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

同步队列的第一个有效节点被唤醒并成功加锁后,返回阻塞等待期间是否被中断的标识,如果确实被中断过,此时调用selfInterrupt方法设置当前线程的中断位,因此可以看出,ReentrantLock的lock方法确实不处理中断:

1
2
3
4
static void selfInterrupt() {
// 设置中断位
Thread.currentThread().interrupt();
}

五、ReentrantLock类的解锁流程

讲完ReentrantLock的加锁流程,下面我们看一下ReentrantLock的解锁流程,首先看一下解锁的入口函数:

1
2
3
public void unlock() {
sync.release(1);
}

所以说,ReentrantLock的解锁也是委托给内部的sync对象完成的,以非公平的NonFairSync为例(其实解锁对于公平或非公平模型都是一样的操作):

1
2
3
4
5
6
7
8
9
10
11
public final boolean release(int arg) {
// 尝试[完全]释放锁成功后的情况,这里的完全指的是重入锁全部释放
if (tryRelease(arg)) {
Node h = head;
// 同步队列头的虚节点状态是SIGNAL,可以唤醒其后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

首先解锁的第一个方法就是tryRelease方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 只有持有锁的线程才能释放锁,否则抛出IllegalMonitorStateException,跟synchronized相似
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 完全释放锁的标记,默认false表示没有完全释放锁
boolean free = false;
// 重入次数归零表明已经完全释放锁
if (c == 0) {
// 修改标记位true表示完全释放锁
free = true;
// 独占锁的持有者线程置为空
setExclusiveOwnerThread(null);
}
// 持有锁的线程修改state值不需要使用CAS操作
setState(c);
return free;
}

对于完全释放锁后唤醒同步队列中等待的第一个节点,unparkSuccessor给出了操作逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void unparkSuccessor(Node node) {
// 把虚节点的状态重置为0,让其退出唤醒后继节点的工作,这个操作成不成功都无所谓
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 虚节点的后继节点,也就是我们需要唤醒的节点
Node s = node.next;
// 从前往后找非取消节点存在可能无法完整遍历同步队列的问题,因为addWaiter方法中新节点入队的CAS操作与设置上一次尾节点的next引用不是原子性的,可能同步队列还没有来得及建立最后面节点之间的next指向,因此只能使用从后往前的遍历方式!
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒后继节点
if (s != null)
LockSupport.unpark(s.thread);
}

六、ReentrantLock类的其他方法

1.lockInterruptibly

该方法不同于lock方法不响应等待阻塞过程中的中断,而是一旦等待过程中发现线程中断就抛出中断异常InterruptedException,用的时机很少。

1
2
3
4
5
6
7
8
9
10
11
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
image-20231118204104012

2.tryLock尝试加锁

该方法不同于lock方法会一直阻塞等待获取锁,而是去尝试获取锁,如果拿不到锁也无所谓返回false即可,后续业务逻辑会自行决定如何处理,这也是为了避免我们的业务系统因为迟迟获取不到锁而发生严重阻塞,导致大量请求的堆积甚至系统发生崩溃。

1
2
3
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

3.tryLock超时加锁

该方法相对于tryLock原方式增加了超时等待限制,如果获取不到锁会先等待一定时间,如果超时后还拿不到锁就返回false,增加了tryLock调用的灵活性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 进入方法时检查当前线程是否被中断,如果是的话抛出InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁,如果没有获取到就调用超时版本的doAcquireNanos方法
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 等待时间小于等于零,直接返回false表示没有拿到锁
if (nanosTimeout <= 0L)
return false;
// 计算超时时刻
final long deadline = System.nanoTime() + nanosTimeout;
// 下面的代码逻辑与前面的tryLock方法中的一样
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 计算剩余等待时间
nanosTimeout = deadline - System.nanoTime();
// 超时时刻已到,返回false表示没有拿到锁
if (nanosTimeout <= 0L)
return false;
// 调用LockSupport的超时阻塞方法,注意如果剩余阻塞等待时间还不到spinForTimeoutThreshold,那么选择自旋可能程序效率更高,因为线程状态切换也需要一定开销(操作系统调度线程从用户态到内核态需要一定时钟周期)
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

七、自定义同步工具-独占锁

了解AQS基本原理以后,按照上面所说的AQS知识点,自己实现一个同步工具。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class LeeLock  {

// 不可重入的非阻塞独占锁
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire (int arg) {
return compareAndSetState(0, 1);
}

@Override
protected boolean tryRelease (int arg) {
setState(0);
return true;
}

@Override
protected boolean isHeldExclusively () {
return getState() == 1;
}
}

private Sync sync = new Sync();

public void lock () {
sync.acquire(1);
}

public void unlock () {
sync.release(1);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class LeeMain {

static int count = 0;
static LeeLock leeLock = new LeeLock();

public static void main (String[] args) throws InterruptedException {

Runnable runnable = new Runnable() {
@Override
public void run () {
try {
leeLock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
leeLock.unlock();
}

}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}

上述代码每次运行结果都会是20000。通过简单的几行代码就能实现同步功能,这就是AQS的强大之处。

八、自定义同步工具-共享锁

设计一个同步工具:该工具在同一时刻,只允许至多两个线程同时访问,超过两个线程的访问将被阻塞,我们将这个同步工具命名为TwinsLock。

首先,确定访问模式。TwinsLock能够在同一时刻支持多个线程的访问,这显然是共享式访问,因此,需要使用同步器提供的acquireShared(int args)方法等和Shared相关的方法,这就要求TwinsLock必须重写tryAcquireShared(int args)方法和tryReleaseShared(int args)方法,这样才能保证同步器的共享式同步状态的获取与释放方法得以执行。

其次,定义资源数。TwinsLock在同一时刻允许至多两个线程的同时访问,表明同步资源数为2,这样可以设置初始状态status为2,当一个线程进行获取,status减1,该线程释放,则status加1,状态的合法范围为0、1和2,其中0表示当前已经有两个线程获取了同步资源,此时再有其他线程对同步状态进行获取,该线程只能被阻塞。在同步状态变更时,需要使用compareAndSet(int expect,int update)方法做原子性保障。

最后,组合自定义同步器。前面提到,自定义同步组件通过组合自定义同步器来完成同步功能,一般情况下自定义同步器会被定义为自定义同步组件的内部类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);

private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -7889272986162341211L;

Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}

public int tryAcquireShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}

public boolean tryReleaseShared(int returnCount) {
for (;;) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}

final ConditionObject newCondition() {
return new ConditionObject();
}
}

public void lock() {
sync.acquireShared(1);
}

public void unlock() {
sync.releaseShared(1);
}

public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}

@Override
public Condition newCondition() {
return sync.newCondition();
}
}

在上述示例中,TwinsLock实现了Lock接口,提供了面向使用者的接口,使用者调用lock()方法获取锁,随后调用unlock()方法释放锁,而同一时刻只能有两个线程同时获取到锁。TwinsLock同时包含了一个自定义同步器Sync,而该同步器面向线程访问和同步状态控制。以共享式获取同步状态为例:同步器会先计算出获取后的同步状态,然后通过CAS确保状态的正确设置,当tryAcquireShared(int reduceCount)方法返回值大于等于0时,当前线程才获取同步状态,对于上层的TwinsLock而言,则表示当前线程获得了锁。

下面编写一个测试来验证TwinsLock是否能按照预期工作。在测试用例中,定义了工作者线程Worker,该线程在执行过程中获取锁,当获取锁之后使当前线程睡眠1秒(并不释放锁),随后打印当前线程名称,最后再次睡眠1秒并释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TwinsLockTest {

public void test() {
final Lock lock = new TwinsLock();
class Worker extends Thread {
public void run() {
while (true) {
lock.lock();
try {
SleepUtils.second(1);
System.out.println(Thread.currentThread().getName());
SleepUtils.second(1);
} finally {
lock.unlock();
}
}
}
}
// 启动10个线程
for (int i = 0; i < 10; i++) {
Worker w = new Worker();
w.setDaemon(true);
w.start();
}
// 每隔1秒换行
for (int i = 0; i < 10; i++) {
SleepUtils.second(1);
System.out.println();
}
}
}

运行该测试用例,可以看到线程名称成对输出,也就是在同一时刻只有两个线程能够获取到锁,这表明TwinsLock可以按照预期正确工作。

九、Condition接口详解

任意一个Java对象,都拥有一组监视器方法,主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

image-20240905222014198

1.Condition接口与示例

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ConditionUseCase {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
}

public void conditionSignal() throws InterruptedException {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
}

当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。

image-20240905222334246

获取一个Condition必须通过Lock的newCondition()方法。下面通过一个有界队列的示例来深入了解Condition的使用方式。有界队列是一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作将会阻塞插入线程,直到队列出现空位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class BoundedQueue<T> {
private Object[] items;
// 添加的下标,删除的下标和数组当前数量
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();

public BoundedQueue(int size) {
items = new Object[size];
}

// 添加一个元素,如果数组满,则添加线程进入等待状态,直到有空位
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[addIndex] = t;
if (++addIndex == items.length)
addIndex = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

// 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
@SuppressWarnings("unchecked")
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[removeIndex];
if (++removeIndex == items.length)
removeIndex = 0;
--count;
notFull.signal();
return (T) x;
} finally {
lock.unlock();
}
}
}

在添加和删除方法中使用while循环而非if判断,目的是防止过早或意外的通知,只有条件符合才能够退出循环。回想之前提到的等待/通知的经典范式,二者是非常类似的。

2.Condition的实现分析

ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键

image-20240905223051037

下面将分析Condition的实现,主要包括:等待队列、等待和通知,下面提到的Condition如果不加说明均指的是ConditionObject。

2.1等待队列

等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node。

一个Condition包含一个等待队列,Condition拥有首节点firstWaiter和尾节点lastWaiter。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列,等待队列的基本结构如图所示。

image-20240905223202540

如图所示,Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。

在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切地说是同步器)拥有一个同步队列多个等待队列,其对应关系如图所示。

image-20240905223344262

如图所示,Condition的实现是同步器的内部类,因此每个Condition实例都能够访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用。

2.2等待

调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。

如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中

image-20240905223731507

调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。

当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException。

image-20240905223946829

2.3通知

调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。

image-20240905224053472

调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。

image-20240905224158458

通过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移动到同步队列。当节点移动到同步队列后,当前线程再使用LockSupport唤醒该节点的线程。被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。

成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了锁。Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。