AQS--AbstractQueuedSynchronizer
-抽象的队列同步器
前置知识
- 公平锁与非公平锁
- 可重入锁
- LockSupport
- 自旋锁
- 链表
- 模板设计模式
1. AQS是什么
抽象的队列同步器
util
->concurrent
->locks
下有三个相互有关联的类
- AbstractOwnablSynchronizer
- AbstractQueuedLongSynchronizer
- AbstractQueuedSynchronizer
通常讲的AQS指的是第三个:AbstractQueuedSynchronizer
技术解释:是用来构建锁或者其他同步器组件的重量级基础框架以及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类型的变量表示持有锁的状态。
CLH队列,是一个单向链表,AQS中的队是CLH变体的虚拟双向队列(FIFO)
有一个head还有一个tail
2.AQS为什么是最重要的基石
和AQS相关的:
- ReentrantLock
- CountDownlatch
- ReentrantReadWriteLock
- Semaphore
这些类中都有private static final class Sync extends AbstractQueuedSynchronizer{ ...}
2.1 进一步理解锁和同步器
- 锁--面向锁的使用者:定义了程序员和锁交互的使用层面的API,隐藏了实现细节,直接调用。
- 同步者--面向锁的实现者:统一规范,并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等等。
3. AQS能做什么
加了锁就会导致拥堵阻塞,抢不到资源的线程必然涉及到一种排队等候机制,但是等候线程仍然保留获取 锁的可能,并且获取锁流程仍在继续。
这个用来保证锁分配的阻塞等待唤醒机制,主要是用CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列之中来,这个队列就是AQS的抽象表现。他将这些线程封装为结点(node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的控制效果
4. AQS初步
AQS使用一个Volatile 的int 类型成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,将每条要去抢占资源的线程封装成为一个Node结点来实现锁的分配,通过CAS完成对State值的修改。
4.1 AQS中的int变量
/**
* The synchronization state.
*/
private volatile int state;
4.2 AQS中的CLH队列
由单向(只有pre
)改良为双向队列
共有四个指针,头、尾、前、后
由尾部入队,由头部出队
小总结:有阻塞就需要排队,使用state自旋进入双向CLH队列
4.3 AQS中的结点Node内部类
Node<Thread>中包括pre,next,thread:双向的,存储对象是线程
Node的等待状态waitState成员变量
Node的等待状态waitState的几个可能的值
static final class Node {
//共享
static final Node SHARED = new Node();
//独占
static final Node EXCLUSIVE = null;
//表示线程获取锁的请求被取消了
static final int CANCELLED = 1;
//表示线程已经准备好了,就等资源释放了
static final int SIGNAL = -1;
//表示线程在等待队列中,等待Condition唤醒
static final int CONDITION = -2;
//共享式同步状态获取将会无条件地传播下去
//当前线程处于SHARED情况下,该字段才会使用;
static final int PROPAGATE = -3;
//初始为0,状态是上面地几种
volatile int waitStatus;
4.4 AQS的基本结构
5. 通过ReentrantLock理解AQS
- Lock接口的实现类,基本都是通过【聚合】一个【队列同步器】的子类完成线程访问控制的
通过查看ReentrantLock源码可以看到,公平锁和非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:
hasQueuedPredecessors()
hasQueuedPredecessors是公平锁加锁时判断等待队列中是否存在有效结点的方法。 - 公平锁,先到先得,线程获取锁的时候,如果等待队列中已经有线程正在等待,则进入等待队列。
- 非公平锁,不管是否有等待线程,如果可以获取锁,则立刻占有锁对象。也就是说,队列的第一位在unpark()后,也仍然需要竞争锁(存在竞争的情况下)
5.1. 源码解析
5.1.1 lock/acquire 部分代码
- lock()
final void lock() {
if (compareAndSetState(0, 1))//乐观CAS,如果无人占用,则上锁(改变AQS的state)
//设置当前线程为资源占有线程
setExclusiveOwnerThread(Thread.currentThread());
else
//否则,抢占一个位置
acquire(1);
}
- acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&//尝试抢占(1:刚好为空了,2:可重入)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- tryAcquire(arg)
//AbstractQueuedSynchronizer.java
protected boolean tryAcquire(int arg) {
//接口中只有一行异常,意味着所有实现或者子类必须重写(实现)这个方法
throw new UnsupportedOperationException();
}
//ReeentrantLock.java
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
...
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//如果当前资源空闲
//同lock()方法
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//若非空闲,那是否为当前线程本身正在占用(可重入锁机制)
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
//准备入队,封装线程为Node结点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;//获取队尾
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//队尾为空时,直接入队
enq(node);
return node;
}
...
...
private Node enq(final Node node) {
for (;;) {//自旋
Node t = tail;//tail为尾指针
if (t == null) { // Must initialize,尾指针为空,则队列为空
if (compareAndSetHead(new Node()))//新建一个空的头结点(傀儡结点,哨兵结点只用于占位)
tail = head;//首尾相等,队列长度为1
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {//(乐观地设置尾结点为准备入队的结点
t.next = node;
return t;
}
}
}
}
- acquireQueued(addWaiter(Node.EXCLUSIVE),arg)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//自旋
//取当前结点的前一结点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//如果前结点已经是头节点,就意味着该结点排第一位,那就尝试抢占
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&//抢占失败之后应该park住,如果自己的前结点不为-1 就将自己的前结点设置为-1(随时等待资源释放)
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
...
final Node predecessor() throws NullPointerException {
Node p = prev;//该节点的前一个结点,这里也是设置哨兵结点的一个作用,防止只有一个有效结点时取前结点报错
if (p == null)
throw new NullPointerException();
else
return p;
}
...
...
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//获取前结点状态
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release该前结点已经设置为等待释放的状态
* to signal it, so it can safely park.//等待唤醒
*/
return true;
if (ws > 0) {//>0意味着该前结点已经设置为了CANCELLED
/*
* Predecessor was cancelled. Skip over predecessors and indicate retry.
* 该结点被取消,忽视该结点,循环获取前置的未被取消的结点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//将前置结点的状态设置为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
...
...
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
//入队的线程park住了
return Thread.interrupted();
}
5.1.2 unlock/release 部分代码
unlock()
public void unlock() {
sync.release(1);
}
ReentrantLock
的
unlock()方法,调用了
sync的
release(1)`。其中sync就是AQS的子类
release()
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
...
...
protected boolean tryRelease(int arg) {
//子类必须实现该方法
throw new UnsupportedOperationException();
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;//1-1=0,将AQS的状态清空为0
if (Thread.currentThread() != getExclusiveOwnerThread())
//如果前来释放锁的线程并不是锁的持有线程,则报错
//IllegalMonitorStateException()
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//可重入时是否清空了所有层的锁
free = true;
//设置当前资源线程为空,空闲可利用
setExclusiveOwnerThread(null);
}
//设置AQS的状态为可用
setState(c);
return free;
}
unparkSuccessor(h);
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
//头结点只会设置 waitStatus 为初期值
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {//清空无效结点、取消结点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//第二个结点才是证证幺唤醒的结点
LockSupport.unpark(s.thread);
}
release的时候永远是unpark第二个结点,而非头节点