队列同步器AbstractQueuedSynchronizer
,是用来构建锁或者其他同步组件的基础框架,在其基础上实现了Reentrantlock
, ReentrantReadWriteLock
, CountDownLatch
等。
AQS
队列同步器AbstractQueuedSynchronizer
,AQS,使用一个整型的volatile修饰的变量state
来维护同步状态。
AQS的设计是基于模板方法模式的,子类通过继承AQS并实现它的抽象方法和使用getState()
, setState()
, compareAndSetState()
方法进行同步状态管理。
锁是面向使用者的,定义了使用者与锁交互的接口,隐藏了实现细节。AQS是面向的是锁的实现者,简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和AQS很好地隔离使用者和实现者所需关注的领域。
可重写方法
方法名称 | 描述 |
---|---|
protected boolean tryAcquire(int arg) | 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态 |
protected boolean tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
protected int tryAcquireShared(int arg) | 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之获取失败 |
protected boolean tryReleaseShared(int arg) | 共享式释放同步状态 |
protected boolean isHeldexClusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被 当前线程所独占 |
模板方法
方法名称 | 描述 |
---|---|
void acquire(int arg) | 独占式获取同步状态。如果当前线程获取同步状态成功,则由该方法返回,否则将会进入同步队列等待。该方法将会调用重写的tryAcquire() 方法 |
void acquirelnterruptibly(int arg) | 与acquire(int arg) 相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException |
boolean tryAcquireNanos(int arg, long nanos) | 在acquirelnterruptibly(int arg) 基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回 false,如果获取到了返回true |
void acquireShared(int arg) | 共享式的获取同步状态、如果当前线程未获取到同步状态、将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态。该方法将会调用重写的tryAcquireShared() 方法 |
void acquireSharedIntemuptibly(int arg) | 与acquireShared(int arg) 相同,该方法响应中断 |
boolean tryAcquireSharedNanos(in arg, long nanos) | 在acquireSharedIntemuptibly(int arg) 基础上增加了超时限制 |
boolean release(int arg) | 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第 一个节点包含的线程唤醒。该方法将会调用重写的tryRelease() 方法 |
boolean releaseShared(int arg) | 共享式的释放同步状态。该方法将会调用重写的tryReleaseShared() 方法 |
Collection<Thread> getQueuedTreads() | 获取等待在同步队列上的线程集合 |
同步队列
AQS依赖内部的同步队列来完成同步状态的管理,这是一个FIPO双向队列。如下图所示,当前线程获取同步状态失败时,AQS会将当前线程以及等待状态等信息构造成为一个节点并将其加入同步队列,同时会阻塞当前线程。当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
- 由于线程获取同步状态存在并发情况,所以使用基于CAS的设置尾结点方法
- 首节点释放同步状态时会唤醒其后继结点,后继结点获取同步状态成功后将自己设置为首节点。由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证
acquire()
AQS中的模板方法,用于独占式获取同步状态。如下所示
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 调用重写方法
tryAcquire()
获取同步状态 - 如果获取同步状态失败,则调用
addWaiter()
方法使用CAS方法向同步队列中添加结点 acquireQueued()
方法内是一个死循环,当其前驱结点是头结点且通过调用tryAcquire()
成功获取同步状态时设置当前结点为头结点并返回,否则通过LockSupport.park(this)
阻塞
由于存在以上流程,所以等待队列中的节点都在自旋获得同步状态(阻塞)
release()
AQS中的模板方法,用于独占式的释放同步状态,如下所示
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- 调用重写方法
tryRelease()
释放同步状态 - 如果成功释放同步状态,则调用
unparkSuccessor()
通过LockSupport.unpark(object)
唤醒头结点的后继结点
acquireShared()
AQS中的模板方法,用于共享式的获取同步状态,如下所示
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 调用重写方法
tryAcquireShared()
获取同步状态 - 如果获取同步状态失败,则调用
doAcquireShared()
方法,与acquireQueued()
类似,采用自旋与park阻塞的方式
releaseShared()
AQS中的模板方法,用于共享式的释放同步状态。如下所示,与release()
方法类似,区别是存在并发的情况,需要通过循环和CAS保证线程安全
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
ReentrantLock
ReentrantLock
的实现依赖于AQS,该锁是可重入的。通过继承AQS的静态内部类来实现公平锁FairSync
和非公平锁NonfairSync
,将ReentrantLock
的api代理到FairSync
或NonfairSync
上。无论是公平锁还是非公平锁,ReentrantLock::tryLock()
调用的是Sync.nonfairTryAcquire()
。
无论是公平锁还是非公平锁,释放锁的实现都是一致的,调用栈为ReentrantLock::unlock()
->AQS::release(int arg)
->Sync::tryRelease(int releases)
,下面分析一下Sync::tryRelease(int releases)
的源码
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
- 如果不是持有锁的线程不是当前线程,则抛出异常
- 修改
state
后,若为0才能释放锁,处理了重入的情况,返回true并设置持有锁线程为null
公平锁
公平锁保证先获取锁的线程一定先被满足,相比于非公平锁效率较低,但能减少饥饿的情况。加锁的调用栈为ReentrantLock::lock()
->FairSync::lock()
->AQS::acquire(int arg)
->FairSync::tryAcquire()
。下面分析一下FairSync::tryAcquire()
的源码
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
- 如果
state == 0
,说明没有线程获得锁,CAS修改state
并设置持有锁的线程为当前线程。期间会调用hasQueuedPredecessors()
方法,如果有当前结点在同步队列中存在前驱结点,则返回true让出资源 - 如果获得锁的线程与当前线程一致,修改
state
以标记重入 - 其他情况返回false,不可以获得锁
非公平锁
非公平锁不能保证先获取锁的线程一定先被满足,相比于公平锁效率更高,但存在饥饿的情况。加锁的调用栈为ReentrantLock::lock()
->NonfairSync::lock()
->AQS::compareAndSetState(), setExclusiveOwnerThread(), acquire()
->NonFairSync::tryAcquire()
->Sync.nonfairTryAcquire()
。
下面分析一下NonfairSync::lock()
的源码,如果state == 0
说明没有线程获得锁,通过CAS更新state
并设置锁持有线程为当前线程并返回。否则调用AQS::acquire()
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
Sync.nonfairTryAcquire()
负责处理锁被其他线程获得的情况。相比于公平锁的对应方法FairSync::tryAcquire()
,少了hasQueuedPredecessors()
的调用
ReentrantReadWriteLock
ReentrantReadWriteLock
的实现依赖于AQS,读锁是共享式的,写锁是独占式的。state
的高16位表示读锁状态,低16位表示写锁状态。下面以公平锁来分析读锁和写锁的实现
写锁
写锁是独占式锁且与读锁也是互斥的。释放锁的过程与ReentrantLock
类似。加锁的调用栈为WriteLock::lock()
->FairSync::lock()
->AQS::acquire(int arg)
->Sync::tryAcquire()
。下面分析一下Sync::tryAcquire()
的源码
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// c: 读锁状态+w
int c = getState();
// w: 写锁状态
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 可重入
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
- 如果c!=0且w=0,说明存在读锁并且当前线程没有获得读锁,此时写锁不能获得
- 如果c!=0且w!=0且
current != getExclusiveOwnerThread()
,说明当前线程获得了读锁和写锁,此时处理重入的情况 - 如果c==0,说明读锁和写锁都没有被获得,则可以获得读锁
读锁
读锁是共享式锁且与写锁是互斥的。加锁的调用栈为ReadLock::lock()
->AQS::acquireShared()
->Sync::tryAcquireShared(int arg)
。由于代码过长,这里不进行源码分析,直接给出分析结果
- 如果写状态为0时,读锁总会被成功地获取,线程安全的增加读状态
- 如果当前线程已经获取了读锁,则线程安全的增加读状态
- 如果写状态不为0且不是被当前线程获取时,则进入等待状态
锁降级
锁降级指的是写锁降级为读锁,为了保证数据的可见性。下面用一个小例子说明
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// 必须先释放读锁
rwl.readLock().unlock();
// 必须持有写锁
rwl.writeLock().lock();
try {
if (!cacheValid) {
// 准备数据
data = ...
cacheValid = true;
}
// 如果之前没有持有写锁, 那么其他线程可能获得写锁并修改准备好的数据
// 获得读锁
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock();
}
// 完成锁降级, 阻止其他线程改变准备好的数据, 保证准备数据的可见性在本线程内
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
Condition
Condition
定义了等待/通知两种类型的方法,依赖于Lock
对象。ConditionObject
是AQS的内部类,因为Condition
的操作需要获取相关锁。
等待队列
等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个 线程引用,该线程就是在Condition
对象上等待的线程。由于Condition
的调用方法必定受到了锁的保护,所以等待队列新增结点没有CAS保证。
如图所示,对于一个AQS对象来说,拥有一个同步队列和多个属于不同Condition
对象的等待队列。
await()
将当前线程阻塞在此Condition
上直到收到了signal()
唤醒或者中断。代码如下所示
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 当前线程加入等待队列
Node node = addConditionWaiter();
// 释放锁, 即移出同步队列
long savedState = fullyRelease(node);
int interruptMode = 0;
// 是否是等待队列的头结点, 循环+park
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 获取同步状态, 竞争锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
signal()
唤醒一个阻塞在此Condition
上的线程,必须获得与此Condition
有关的锁。代码如下所示
public final void signal() {
// 当前线程必须是获取锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 唤醒等待队列的第一个结点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
其他并发工具类
CountdownLatch
: 允许一个或多个线程等待其他线程完成操作CyclicBarrier
: 让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,所有被屏障拦截的线程才会继续运行Semaphore
: 控制同时访问特定资源的线程数量。它通过协调各个线程,以保证合理的使用公共资源Exchanger
: 进行线程间的数据交换。两个线程通过exchange方法交换数据,如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,才进行数据交换。
CountdownLatch
的计数器只能使用一次,而CyclicBarrier
的计数器可以使用reset方法重置。所以CyclicBarrier
能处理更为复杂的业务场景。
REFERENCE
- Java并发编程的艺术
文档信息
- 本文作者:wzx
- 本文链接:https://masterwangzx.com/2021/01/11/java-aqs/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)