当前位置: 首页>前端>正文

十、【Java 并发】抽象同步队列 AQS

AbstractQueuedSynchronizer 抽象队列同步器

抽象队列同步器 AbstractQueuedSynchronizer,简称 AQS,是用来构建锁或者其他同步组件的基础框架,它使用了一个 int 成员变量表示同步状态,通过内置的 FIFO 队列来完成资源获取线程的排队工作。它是实现同步组件的基础, 并发包中锁的底层就是使用 AQS 实现的。

AQS 的主要使用方式是继承,子类通过继承 AQS 并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用 AQS 提供的3个方法 gestate、setstate(int new State)和 compareAndSetState(int expect, int update)
来进行操作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类,AQS 自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,AQS 既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件( Reentrantlock、ReentrantReadWriteLock 和 CountDownLatch 等)。

AQS 是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用 AQS 实现锁的语义。可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节; AQS 面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和 AQS 很好地隔离了使用者和实现者所需关注的领域。大多数开发者可能永远不会直接使用AQS,但是知道其原理对于架构设计还是很有帮助的。

AQS 原理 & 架构

AQS 的设计是基于模板方法模式,使用者需要继承 AQS 并重写指定的方法,将 AQS 组合在自定义同步组件的实现中,并调用 AQS 提供的模板方法,而这些模板方法将会调用使用者重写的方法。

AQS 模板方法

AQS 可重写的模板方法

方法名称 描述
protected boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前状态, 并判断同步状态是否符合预期,然后再进行 CAS 设置同步状态
protected boolean tryRelease(int arg) 独占式释放同步状态,等待获取同步状态的线程将有机会获取到同步状态
protected int tryAcquireShared(int arg) 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之获取失败
protected boolean tryReleaseShared(int arg) 共享式释放同步状态
protected boolean isHeldExclusively() 当前同步器是否在独占模式下被线程占用,一般该方法表示 是否被当前线程所独占

AQS 提供的模板方法

方法名称 描述
void acquire(int arg) 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回, 否则将会进入同步队列等待,该方法将会调用重写的 tryAcquire(int arg)方法
void acquireInterruptibly(int arg) 与 acquire(int arg)相同,但是该方法响应中断,当前线程未获取到 同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出 InterruptedException 并返回
boolean tryAcquireNanos(int arg) 在 acquireInterruptibly(int arg)基础上增加了超时限制, 如果当前线程在超时时间内没有获取到同步状态, 那么将会返回 false, 如果获取到了返回true
void acquireShared(int arg) 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列 等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态
void acquireSharedInterruptibly(int arg) 与 acquireShared(int arg)相同,该方法响应中断
boolean tryAcquireSharedNanos(int arg, long nanos) 在 acquireSharedInterruptibly (int arg)基础上增加了超时限制
boolean release(int arg) 独占式的释放同步状态,该方法会在释放同步状态之后, 将同步队列中第一个节点包含的线程唤醒
boolean releaseShared(int arg) 共享式的释放同步状态
Collection<Thread> getQueuedThreads() 获取等待在同步队列上的线程集合

AQS 提供的模板方法可以分为下面几类:

  • 独占式获取与释放同步状态
  • 共享式获取与释放同步状态
  • 查询同步队列中的等待线程情况

自定义同步组件使用同步器提供的模板方法来实现自己的同步语义。只有掌握了同步器的工作原理才能更加深人地理解并发包中其他的并发组件,所以下面
通过 AQS 源码中提供的一个独占锁的示例来深入了解一下同步器的工作原理。

顾名思义,独占锁就是在同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能够获取锁,代码如下:

class Mutex implements Lock, java.io.Serializable {

    /**
     * 静态内部类 自定义同步器
     */
    private static class Sync extends AbstractQueuedSynchronizer {

        /**
         * 是否处于占用状态
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        /**
         * 当状态为 0 的时候获取锁
         * @param acquires
         * @return
         */
        @Override
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        /**
         * 释放锁,将状态设置为 0
         * @param releases
         * @return
         */
        @Override
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // 返回一个 Condition,每个 condition 都包含了一个 condition 队列
        Condition newCondition() {
            return new ConditionObject();
        }

        // 反序列化
        private void readObject(ObjectInputStream s)
                throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // 重置为解锁状态
        }
    }

    /**
     * 将操作通过 Sync 上即可
     */
    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

Mutex 是一个自定义独占锁同步组件,它在同一时刻只允许一个线程占有锁。Mutex 中定义了一个静态内部类,该内部类继承了 AQS 并实现了独占式获取和释放同步状态。在 tryAcquire(int releases) 方法中,如果经过 CAS 设置成功,也就是同步状态值设置为 1,则代表获取了同步状态,而在 tryRelease(int releases) 方法中只是将同步状态值重置为 0。

使用 Mutex 时并不会直接和内部 AQS 的实现打交道,而是调用 Mutex 提供的方法,在 Mutex 的实现中,以获取锁的 lock() 方法为例,只需要在方法实现中调用同步器的模板方法 acquire(int args) 即可,当前线程调用该方法获取同步状态失败后会被加入到同步队列中等待,这样就大大降低了实现一个可靠自定义同步组件的门槛。

AQS 同步队列

AQS 依赖内部的一个 FIFO 双向同步队列来完成同步状态的管理,当前线程获
取同步状态失败时,AQS 会将当前线程以及等待状态等信息构造成为一个 Node 并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

同步队列中的节点( Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点,节点的属性类型与名称以及描述。源码如下

static final class Node {
        /** 一种标识,表示被标识的结点是共享式 */
        static final Node SHARED = new Node();
        /** 一种标识,表示被标识的结点是独占式 */
        static final Node EXCLUSIVE = null;

        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;

        /**
         * 等待状态,初始值为 0 , 状态值如下:
         * CANCELLED: 被中断或者获取同步状态超时会被置为该状态,且在该状态下的线程不再被阻塞,从同步队列中取消等待,节点进入该状态将不再改变
         * SIGNAL: 线程如果释放了或者取消了同步状态,则会将对应的结点置为该状态,用于通知下一个节点,准备获取同步状态
         * CONDITION: 当前节点在 Condition 等待队列中,当其他线程对 Condition 调用了signal 方法后,
         *            该节点将会从等待队列中转移到 AQS 同步队列中,等待获取同步锁
         * PROPAGATE: 与共享式获取同步状态有关,处于该状态的节点中的线程处于可运行的状态, 
         *            表示下一次共享式同步状态获取将会无条件地被传播下去,
         *            释放共享资源时需要通知其他节点
         */
        volatile int waitStatus;

        /**
         * 前驱节点, 当节点加入同步队列时被设置
         */
        volatile Node prev;

        /**
         * 后继节点
         */
        volatile Node next;

        /**
         * 获取同步状态的线程.
         */
        volatile Thread thread;

        /**
         * 当前节点在Condition中等待队列上的下一个节点(给Condition等待队列使用)
         */
        Node nextWaiter;

        /**
         * 当前节点是否是在共享模式下等待
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

       
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

Node 是构成同步队列的基础,AQS 拥有首节点 head 和尾结点 tail,没有成功获取同步状态的线程将会成为节点加入该队列的尾部

十、【Java 并发】抽象同步队列 AQS,第1张

AQS 包含了两个节点类型的引用,一个指向头节点,而另-一个指向尾节
点。当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加人到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect, Node update),它需要传递当前线程认为的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

十、【Java 并发】抽象同步队列 AQS,第2张

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。

十、【Java 并发】抽象同步队列 AQS,第3张

如上图,设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可。

独占式同步状态获取与释放

acquire 方法

通过调用 AQS 的 acquire(int arg) 方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出,方法代码如下:

/**
 * 独占式获取同步状态,
 * 如果当前线程获取同步状态成功,则由该方法返回,
 * 否则将会进入同步队列等待,该方法将会调用重写的 tryAcquire(int arg)方法
 * @param arg
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上面方法主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式 Node.EXCLUSIVE, 同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueuedQNode node, int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

下面来详细分析一下整个流程。首先是节点的构造以及加入同步队列,代码如下:

/**
 * 为当前线程以给定模式创建节点并加入同步队列中
 *
 * @param mode 指定的节点模式  Node.EXCLUSIVE 独占式, Node.SHARED 共享式
 * @return 返回创建好的节点对象
 */
private Node addWaiter(Node mode) {
    //将当前线程封装成 Node 节点
    Node node = new Node(Thread.currentThread(), mode);
    // 快速尝试在尾部添加
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            //成功加入直接返回
            pred.next = node;
            return node;
        }
    }
    //调用此方法继续尝试加入同步队列
    enq(node);
    return node;
}
/**
 * 通过自旋方式保证入队列成功
 * @param node 入队列节点
 * @return 入队成功返回前置节点
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        //如果队尾为空说明队列为空
        if (t == null) {
            // 初始化队列,将头结点设置为空节点,头结点即表示当前正在运行的节点
            if (compareAndSetHead(new Node()))
                //再将尾节点值设置成头结点,继续下次循环尾结点就不会为空了
                tail = head;
        } else {
            //将当前队列的尾节点设置成当前节点的前置节点
            node.prev = t;
            //CAS 操作当前节点加入队尾,如果失败的话,继续循环,直到成功返回
            if (compareAndSetTail(t, node)) {
                //设置成功将当前节点设置成前尾节点的后继节点
                t.next = node;
                return t;
            }
        }
    }
}

节点加入同步队列之后,就会进入一个自旋的过程,每个节点中的线程都在自省地观察,当条件满足,获取到了同步状态,就可以从自旋中退出。

/**
 * 通过自旋不断的调用 tryAcquire 尝试获取同步状态
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    // 标示是否成功获取同步状态
    boolean failed = true;
    try {
        //标示线程中断状态
        boolean interrupted = false;
        for (;;) {
            //获取节点的前置节点
            final Node p = node.predecessor();
            //只有当前置节点为首节点才会尝试获取同步状态
            if (p == head && tryAcquire(arg)) {
                //获取同步状态成功后,设置当前节点为首节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 获取同步状态失败,判断当前线程是否需要阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 如果自旋结束,failed 标示还为 true 则代表获取同步状态失败,那么就将该节点从同步队列中移除,同时唤醒下一节点
        if (failed)
            cancelAcquire(node);
    }
}

在 acquireQueued(final Node node, int arg) 方法中,当前线程在自旋中尝试获取同步状态,只有前置结点是头节点才能够舱室获取同步状态,这样设计原因有如下两个:

  • 头结点是成功获取到同步状态的节点,而头结点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前置节点是否是头结点
  • 维护同步队列的 FIFO 原则。
/**
 * 将节点从同步队列中移除
 *
 * @param node the node
 */
private void cancelAcquire(Node node) {
    // 如果节点不存在了直接返回
    if (node == null)
        return;
    //设置该节点线程为空
    node.thread = null;

    // 找到状态不为 CANCELLED 的前置节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // 获取前置节点的下一个节点
    Node predNext = pred.next;

    // 设置当前中断的节点状态为 CANCELLED
    node.waitStatus = Node.CANCELLED;

    // 如果当前中断的节点是尾节点,则将尾节点重新指向
    if (node == tail && compareAndSetTail(node, pred)) {
        //尾结点的下个节点设置为 null
        compareAndSetNext(pred, predNext, null);
    } else {
        // 如果前置节点的状态为 SINGAL 或者即将 SINGAL,那么将当前中断节点移除
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            //将节点移除,并唤醒下一个节点
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}
/**
 * 将节点移除,同时唤醒下一个节点
 *
 * @param node the node
 */
private void unparkSuccessor(Node node) {

    // 重置该节点为初始化状态
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 获取中断节点的后继节点
    Node s = node.next;
    // 判断后继节点的状态,如果为 Node.CANCELED 状态, 回溯获取最近的 waitStatus <= 0 的节点
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果该节点不为 null,则通过 LockSupport 唤醒该节点中的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}

acquire(int arg)方法调用流程,也就是独占式同步状态获取流程,整个流程可以参考下图:

[图片上传失败...(image-980765-1630650770495)]

当线程获取同步状态成功并执行了相应逻辑之后,就需要释放同步状态,使得其后续节点能够继续获取同步状态。通过调用同步器的 release(int arg) 方法可以释放同步状态,该方法在释放同步状态之后,会唤醒其后继节点,进而使后继节点重新尝试获取同步状态,代码如下:

/**
     * 独占式的释放同步状态,该方法会在释放同步状态之后, 将同步队列中第一个节点包含的线程唤醒
     * @param arg
     * @return
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

release 方法执行时,会唤醒头结点的后继节点,unparkSuccessor(Node node)方法使用 LockSupport 来唤醒处于等待状态的线程。

总结

在获取同步状态时,AQS 维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋。移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用 tryRelease(int arg) 方法释放同步状态,然后唤醒头节点的后继节点。

共享式同步状态获取与释放

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。以文件的读写为例,如果一个程序在对文件进行读操作,那么这一时刻对于该文件的写操作均被阻塞,而读操作能够同时进行。写操作要求对资源的独占式访问,而读操作可以是共享式访问。

通过调用 AQS 的 acquireShared(int arg)方法可以共享式地获取同步状态,代码如下:

/**
 * 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待
 * 与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态
 * @param arg
 */
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在 acquireShared(int arg) 方法中,同步器调用 tryAcquireShared(int arg) 方法尝试获取同步状态, tryAcquireShared(int arg) 方法返回值为 int 类型,当返回值大于等于 0 时,表示能够获取到同步状态。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是 tryAcquireShared(int arg) 方法返回值大于等于 0。可以看到,在 doAcquireShared(int arg) 方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于 0 ,表示该次获取同步状态成功并从自旋过程中退出。与独占式一样,共享式获取也需要释放同步状态,通过调用 releaseShared(int arg) 方法释放同步状态,该方法代码如下:

/**
 * 共享式的释放同步状态
 * @param arg
 * @return
 */
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线程同时访问的并发组件(比如Semaphore),它和独占式主要区别在于 tryReleaseShared(int arg) 方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和 CAS 来保证的,因为释放同步状态的操作会同时来自多个线程。


https://www.xamrdz.com/web/2j71993894.html

相关文章: