概述
对于并发场景,单机情况下,java通过synchronize关键字、juc并发包下的原子类、各种锁的实现,来达到多线程间的同步。
AQS维护了一个volatile int state状态(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。state的访问方式有三种:
- getState()
- setState()
- compareAndSetState()
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
AQS应用
来看一下AQS的子类
可以看到,常用的几个锁ReentrantLock、Semaphore的内部,都有AQS类的实现,用来进行同步的管理。每个锁内部都有一个Sync对象。
AQS的几个抽象方法tryAcquire、tryRelease等方法,在其实现类中都有实现。Sync也是一个抽象类,可以由子类实现公平锁和非公平锁。
举个例子,ReentrantLock的中的Sync,有两个子类,FairSync和NonfairSync,公平所和非公平锁。
Sync实现了AQS的几个抽象方法,并将lock()方法交给子类FairSync和NonfairSync分别去实现公平锁和非公平锁。本文主要以ReentrantLock为例,看一下可重入锁在独占方式下公平锁的实现、可重入锁的实现与AQS的关系。
ReentrantLock
加锁(公平锁)
公平锁:对于请求同一个锁的多线程,按照FIFO的原则分配锁的持有权。
看一下可重入锁中的公平锁的实现:
1 |
|
AQS中获取锁的方法中,分别采用了tryAcquire、acquireQueued、addWaiter、selfInterrupt等方法:
- tryAcquire尝试非阻塞的获取锁
- addWaiter将当前线程放入阻塞队列中
- acquireQueued实现阻塞队列中的当前线程阻塞的获得锁。若等待过程中被中断,则返回true
- selfInterrupt方法,线程自我中断
加锁的整体流程如下:
- 调用自定义同步器FairSync的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
- 没成功,则标记为独占模式,addWaiter()将该线程加入等待队列的队尾;
- acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
以上过程都是通过AQS的acquire方法管理的。
下面来分别看一下其实现。
tryAcquire()
尝试直接去获取资源,不论成功或失败,方法都直接返回,非阻塞。
1 |
|
addWaiter(Node mode)
如果尝试获取锁失败,则将当前线程节点标记为独占模式,放入阻塞队列队尾,并返回当前线程所在的结点。
1 | //AQS将当前线程节点添加到阻塞队列队尾, |
此处采用CAS自旋volatile变量,类似AtomicInteger.getAndIncrement()中的实现。
compareAndSetTail中传进去的是t,但只用于与tail的值相比较,实际修改的是tail的值,t用于存原本的tail节点
acquireQueued
走到这一步时,该线程通过tryAcquire()获取资源失败,然后通过addWaiter()加入阻塞队列队尾。这时候当前线程可以进入等待状态,直到有持有锁的线程释放资源并唤醒当前线程,然后尝试获取锁。看一下实现:
1 |
|
这里有一点需要注意,就是head指向的节点,要么是当前获取到资源的线程,要么是null。
线程如果因为调用park而阻塞的话,能够响应中断请求(中断状态被设置成true),但是不会抛出InterruptedException 。acquireInterruptibly和doAcquireInterruptibly才会抛出中断异常
至于怎么判断改线程是否可以进入等待状态,是在shouldParkAfterFailedAcquire方法中通过前驱节点的waitStatus状态来判断的:
1 |
|
如果可以进入等待状态,则调用parkAndCheckInterrupt方法,挂起当前线程。当线程被唤醒后,返回当前线程在等待过程中是否被中断过:
1 | private final boolean parkAndCheckInterrupt() { |
总结
总结一下加锁的总流程:
释放锁
接下来是释放锁的过程:
1 | //ReentrantLock释放锁 |
分两步:
- 先释放资源
- 如果资源释放完毕,则唤醒等待队列里的下一个线程。
释放资源
释放资源的实现是在ReentrantLock.Sync中实现的,因为公平锁和非公平锁释放锁的方式是一样的。
tryRelease在ReentrantLock.Sync中释放锁的实现:
1 | protected final boolean tryRelease(int releases) { |
每次释放资源直接减去相应量的资源releases。free代表是否释放完。
如果剩余的state==0,则free置为true,利用setExclusiveOwnerThread(null)设置排它锁的拥有线程是null。代表当前线程已将资源释放完毕,把锁释放了。
否则,free为false,代表资源没有释放完。
最后返回释放的结果free。
唤醒后续线程
然后是AQS中实现的unparkSuccessor,唤醒后续线程:
1 | /** |
这里找到阻塞队列中第一个未放弃的线程,然后利用unpark唤醒。
总结来说,release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。