AQS

概述

对于并发场景,单机情况下,java通过synchronize关键字、juc并发包下的原子类、各种锁的实现,来达到多线程间的同步。

AQS维护了一个volatile int state状态(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。state的访问方式有三种:

  • getState()
  • setState()
  • compareAndSetState()  

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

AQS应用

来看一下AQS的子类

可以看到,常用的几个锁ReentrantLock、Semaphore的内部,都有AQS类的实现,用来进行同步的管理。每个锁内部都有一个Sync对象。

AQS的几个抽象方法tryAcquire、tryRelease等方法,在其实现类中都有实现。Sync也是一个抽象类,可以由子类实现公平锁和非公平锁。

举个例子,ReentrantLock的中的Sync,有两个子类,FairSync和NonfairSync,公平所和非公平锁。

Sync实现了AQS的几个抽象方法,并将lock()方法交给子类FairSync和NonfairSync分别去实现公平锁和非公平锁。本文主要以ReentrantLock为例,看一下可重入锁在独占方式下公平锁的实现、可重入锁的实现与AQS的关系。

ReentrantLock

加锁(公平锁)

公平锁:对于请求同一个锁的多线程,按照FIFO的原则分配锁的持有权。

看一下可重入锁中的公平锁的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

//ReentrantLock加锁
final void lock() {
//调用AQS的aquire方法
acquire(1);
}


//AQS中阻塞获取锁
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

AQS中获取锁的方法中,分别采用了tryAcquire、acquireQueued、addWaiter、selfInterrupt等方法:

  • tryAcquire尝试非阻塞的获取锁
  • addWaiter将当前线程放入阻塞队列中
  • acquireQueued实现阻塞队列中的当前线程阻塞的获得锁。若等待过程中被中断,则返回true
  • selfInterrupt方法,线程自我中断

加锁的整体流程如下:

  • 调用自定义同步器FairSync的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  • 没成功,则标记为独占模式,addWaiter()将该线程加入等待队列的队尾;
  • acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  • 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

以上过程都是通过AQS的acquire方法管理的。

下面来分别看一下其实现。

tryAcquire()

尝试直接去获取资源,不论成功或失败,方法都直接返回,非阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

//FairSync实现非阻塞获取锁的方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && //是阻塞队列队首元素
compareAndSetState(0, acquires)) { //CAS修改状态
setExclusiveOwnerThread(current);
return true;
}
}//可重入,state+=acquires
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//获取锁失败
return false;
}

addWaiter(Node mode)

如果尝试获取锁失败,则将当前线程节点标记为独占模式,放入阻塞队列队尾,并返回当前线程所在的结点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//AQS将当前线程节点添加到阻塞队列队尾,
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 尝试快速方式直接放到队尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}

//上一步失败则采用enq入队
enq(node);
return node;
}

//自旋将node加入队尾
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果队列为空,则初始化,创建一个空的标志结点作为head结点,并将tail也指向它
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else { //node放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}


//CAS尾结点
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

此处采用CAS自旋volatile变量,类似AtomicInteger.getAndIncrement()中的实现。

compareAndSetTail中传进去的是t,但只用于与tail的值相比较,实际修改的是tail的值,t用于存原本的tail节点

acquireQueued

走到这一步时,该线程通过tryAcquire()获取资源失败,然后通过addWaiter()加入阻塞队列队尾。这时候当前线程可以进入等待状态,直到有持有锁的线程释放资源并唤醒当前线程,然后尝试获取锁。看一下实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

//AQS获取锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //一直循环到获取锁
final Node p = node.predecessor();
//前一个元素是阻塞队列队首元素 并且 尝试获取锁成功
if (p == head && tryAcquire(arg)) {
setHead(node); //拿到资源后,将head指向该节点。所以head所指的节点,就是当前获取到资源的那个结点或null。
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //尝试获取锁失败后,是否需要进入等待状态
parkAndCheckInterrupt()) //进入等待状态。若被唤醒,则检查等待过程中是否被中断过
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

这里有一点需要注意,就是head指向的节点,要么是当前获取到资源的线程,要么是null。

线程如果因为调用park而阻塞的话,能够响应中断请求(中断状态被设置成true),但是不会抛出InterruptedException 。acquireInterruptibly和doAcquireInterruptibly才会抛出中断异常

至于怎么判断改线程是否可以进入等待状态,是在shouldParkAfterFailedAcquire方法中通过前驱节点的waitStatus状态来判断的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

/** waitStatus值表示线程已被取消 */
static final int CANCELLED = 1;
/** waitStatus值表示后续的线程需要被唤醒 */
static final int SIGNAL = -1;
/** waitStatus值表示线程正在等待状态。 */
static final int CONDITION = -2;
/** waitStatus值表示下一个获得共享应该无条件地传播。*/
static final int PROPAGATE = -3;

/** 0代表以上状态都不是*/


//AQS判断当先线程是否可以进入等待状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前驱节点的waitStatus已经设置为SIGNAL,即前驱节点线程释放锁后,会唤醒后续线程
* 因此,该线程可以放心的等着被唤醒了
*/
return true;
if (ws > 0) {
/*
* waitStatus大于0时,代表前驱节点的线程处于取消状态
* 因此,一直向前查找到最后一个没有没取消的节点,中间被取消的节点成为无效节点,
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它释放锁后通知自己一下
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

如果可以进入等待状态,则调用parkAndCheckInterrupt方法,挂起当前线程。当线程被唤醒后,返回当前线程在等待过程中是否被中断过:

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

总结

总结一下加锁的总流程:

释放锁

接下来是释放锁的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//ReentrantLock释放锁
public void unlock() {
sync.release(1);
}


//AQS中释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head; //持有资源的当前线程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //唤醒等待队列里的下一个线程
return true;
}
return false;
}

分两步:

  1. 先释放资源
  2. 如果资源释放完毕,则唤醒等待队列里的下一个线程。

释放资源

释放资源的实现是在ReentrantLock.Sync中实现的,因为公平锁和非公平锁释放锁的方式是一样的。

tryRelease在ReentrantLock.Sync中释放锁的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean tryRelease(int releases) {
//释放占有资源
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

每次释放资源直接减去相应量的资源releases。free代表是否释放完。

如果剩余的state==0,则free置为true,利用setExclusiveOwnerThread(null)设置排它锁的拥有线程是null。代表当前线程已将资源释放完毕,把锁释放了。

否则,free为false,代表资源没有释放完。

最后返回释放的结果free。

唤醒后续线程

然后是AQS中实现的unparkSuccessor,唤醒后续线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 唤醒节点的继任者,如果存在的话。
*/
private void unparkSuccessor(Node node) {
/*
* 如果状态为负值(即可能需要信号),则尝试清除信号。 允许失败或状态被等待线程修改
* @param node 刚刚释放完资源的当前线程,用户唤醒后续线程
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);



// 将唤醒后继线程节点,这通常只是下一个节点。
Node s = node.next;


//但如果下一个节点为空或状态为取消,则从尾部向后遍历以找到实际未取消的后继者。
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

这里找到阻塞队列中第一个未放弃的线程,然后利用unpark唤醒。

总结来说,release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

参考文章

Java并发之AQS详解

JAVA并发编程学习笔记之AQS源码分析(获取与释放)

volatile关键字与CAS操作

线程状态

坚持原创技术分享,您的支持将鼓励我继续创作!
0%