当前位置: 首页>编程语言>正文

深入AQS AbstractQueuedSynchronizer

所谓的AQS即是抽象的队列式的同步器,内部定义了很多锁相关的方法,我们熟知的ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore等都是基于AQS来实现的。

我们先看下AQS相关的UML图:

深入AQS AbstractQueuedSynchronizer,第1张
图片

1 AQS实现原理

AQS中 维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。

这里volatile能够保证多线程下的可见性,当state=1则代表当前对象锁已经被占有,其他线程来加锁时则会失败,加锁失败的线程会被放入一个FIFO的等待队列中,比列会被UNSAFE.park()操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。

另外state的操作都是通过CAS来保证其并发修改的安全性。

具体原理我们可以用一张图来简单概括:

深入AQS AbstractQueuedSynchronizer,第2张
图片

AQS 中提供了很多关于锁的实现方法,

getState():获取锁的标志state
setState():设置锁的标志state值
tryAcquire(int):独占方式获取锁。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式释放锁。尝试释放资源,成功则返回true,失败则返回false。
这里还有一些方法并没有列出来,接下来我们以ReentrantLock作为突破点通过源码和画图的形式一步步了解AQS内部实现原理。

  1. 结构

文章准备模拟多线程竞争锁、释放锁的场景来进行分析AQS源码:
三个线程(线程一、线程二、线程三)同时来加锁/释放锁

目录如下:

线程一加锁成功时AQS内部实现
线程二/三加锁失败时AQS中等待队列的数据模型
线程一释放锁及线程二获取锁实现原理
通过线程场景来讲解公平锁具体实现原理
通过线程场景来讲解Condition中await()和signal()实现原理
这里会通过画图来分析每个线程加锁、释放锁后AQS内部的数据结构和实现原理

线程一加锁成功

如果同时有三个线程并发抢占锁,此时线程一抢占锁成功,线程二线程三抢占锁失败,具体执行流程如下:

深入AQS AbstractQueuedSynchronizer,第3张
图片

此时AQS内部数据为:

深入AQS AbstractQueuedSynchronizer,第4张
图片

线程二线程三加锁失败:

深入AQS AbstractQueuedSynchronizer,第5张
图片

有图可以看出,等待队列中的节点Node是一个双向链表,这里SIGNALNodewaitStatus属性,Node中还有一个nextWaiter属性,这个并未在图中画出来,这个到后面Condition会具体讲解的。

具体看下抢占锁代码实现:

java.util.concurrent.locks.ReentrantLock .NonfairSync:

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

这里使用的ReentrantLock非公平锁,线程进来直接利用CAS尝试抢占锁,如果抢占成功state值回被改为1,且设置对象独占锁线程为当前线程。如下所示:

 /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }


 protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

线程二抢占锁失败

我们按照真实场景来分析,线程一抢占锁成功后,state变为1,线程二通过CAS修改state变量必然会失败。此时AQSFIFO(First In First Out 先进先出)队列中数据如图所示:

深入AQS AbstractQueuedSynchronizer,第6张
图片

我们将线程二执行的逻辑一步步拆解来看:

java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire():

 /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

先看看tryAcquire()的具体实现:java.util.concurrent.locks.ReentrantLock .nonfairTryAcquire():

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

nonfairTryAcquire()方法中首先会获取state的值,如果不为0则说明当前对象的锁已经被其他线程所占有,接着判断占有锁的线程是否为当前线程,如果是则累加state值,这就是可重入锁的具体实现,累加state值,释放锁的时候也要依次递减state值。

如果state为0,则执行CAS操作,尝试更新state值为1,如果更新成功则代表当前线程加锁成功。

以线程二为例,因为线程一已经将state修改为1,所以线程二通过CAS修改state的值不会成功。加锁失败。

线程二执行tryAcquire()后会返回false,接着执行addWaiter(Node.EXCLUSIVE)逻辑,将自己加入到一个FIFO等待队列中,代码实现如下:

java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter():

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

这段代码首先会创建一个和当前线程绑定的Node节点,Node为双向链表。此时等待对内中的tail指针为空,直接调用enq(node)方法将当前线程加入等待队列尾部:

 /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

第一遍循环时tail指针为空,进入if逻辑,使用CAS操作设置head指针,将head指向一个新创建的Node节点。此时AQS中数据:

深入AQS AbstractQueuedSynchronizer,第7张
图片

执行完成之后,headtailt都指向第一个Node元素。

接着执行第二遍循环,进入else逻辑,此时已经有了head节点,这里要操作的就是将线程二对应的Node节点挂到head节点后面。此时队列中就有了两个Node节点:

深入AQS AbstractQueuedSynchronizer,第8张
图片

addWaiter()方法执行完后,会返回当前线程创建的节点信息。继续往后执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)逻辑,此时传入的参数为线程二对应的Node节点信息:

java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued():

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


acquireQueued()这个方法会先判断当前传入的Node对应的前置节点是否为head,如果是则尝试加锁。加锁成功过则将当前节点设置为head节点,然后空置之前的head节点,方便后续被垃圾回收掉。

如果加锁失败或者Node的前置节点不是head节点,就会通过shouldParkAfterFailedAcquire方法 将head节点的waitStatus变为了SIGNAL=-1,最后执行parkAndChecknIterrupt方法,调用LockSupport.park()挂起当前线程。

此时AQS中的数据如下图:

深入AQS AbstractQueuedSynchronizer,第9张
图片

此时线程二就静静的待在AQS的等待队列里面了,等着其他线程释放锁来唤醒它。

线程三抢占锁失败

看完了线程二抢占锁失败的分析,那么再来分析线程三抢占锁失败就很简单了,先看看addWaiter(Node mode)方法:

 /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

此时等待队列的tail节点指向线程二,进入if逻辑后,通过CAS指令将tail节点重新指向线程三。接着线程三调用enq()方法执行入队操作,和上面线程二执行方式是一致的,入队后会修改线程二对应的Node中的waitStatus=SIGNAL。最后线程三也会被挂起。此时等待队列的数据如图:


深入AQS AbstractQueuedSynchronizer,第10张
图片

线程一释放锁

现在来分析下释放锁的过程,首先是线程一释放锁,释放锁后会唤醒head节点的后置节点,也就是我们现在的线程二,具体操作流程如下:

深入AQS AbstractQueuedSynchronizer,第11张
图片

执行完后等待队列数据如下:

深入AQS AbstractQueuedSynchronizer,第12张
图片

此时线程二已经被唤醒,继续尝试获取锁,如果获取锁失败,则会继续被挂起。如果获取锁成功,则AQS中数据如图:

深入AQS AbstractQueuedSynchronizer,第13张
图片

接着还是一步步拆解来看,先看看线程一释放锁的代码:
java.util.concurrent.locks.AbstractQueuedSynchronizer.release()

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

https://www.xamrdz.com/lan/5bw1995183.html

相关文章: