当前位置: 首页>大数据>正文

Java的AQS源码浅析

最近面试问过很多候选人Java锁有关的知识,有的人回答的很好,有的人回答的不好,但是可以感受到的是,大家的理解基本都停留在“八股文”的阶段,实质上对Java的锁以及多线程的同步机制这种底层原理,理解的不是很好。网上这类文章已经很多了,但是看了下有好多文章都是相互抄的,而且都是过时的,典型的例如AQS里的addWaiter方法在JDK16里就没见到,或许代码进行了重构了。通过文章也梳理也我分享一下一般看源代码的习惯是怎样的。

AQS全称是AbstractQueuedSynchronizer,首先他是一个抽象类,其次他使用了队列来进行排队,然后作用是用来做线程间的同步的。他是Java里所有锁的基础,包括CountDownLatch以及读写锁,可重入锁等等都是基于AQS实现的。我们从ReentrantLock入手来管中窥豹,大概得看看AQS的源代码。

首先你要了解的是使用方法,一个典型的ReentrantLock使用方法写在了这个类的注释里

接着,你要有一个鸟瞰图,从大的层面看一看他的实现,ReentrantLock其实只是一层外层的包装,实际上内部具体是由Sync这个类来实现的,而这个类又有两个子类分别是FairSync和NonfairSync,可以看到这两个子类覆写的是initialTryLock和trayAcquire这两个方法,这说明这两个方法在实现上会有一些差异,也仅仅在这有些差别。

那么现在我们从两个方法入手,分别是lock和tryLock方法,这二者的区别是带try的只是试一下,如果能获取到最好,获取不到就算了,返回一个false告诉你没拿到锁。lock的实现如下

可以看到他调用的正是子类的initialTryLock方法,接着看看initialTryLock方法,我们取的是NoFairSync,也就是他的默认实现。方法也很简单,通过CAS来尝试着获取锁,如果成功了就把当前的owner设置成本线程,否则的话如果失败了,check下当前的owner是不是本线程,如果是的话,直接state+1,实现了计数,也就是可重入的能力,如果都不是就返回false,获取锁失败了。失败了,就会调用acquire(1)。

这个方法则是由抽象类AQS提供的,方法如下 ,调用了子类的tryAcquire方法,我们还是看看NoFiairSync的实现

这里会再次调用CAS来再次尝试一次

如果还是失败了,则就进入了AQS的acquire的实现,这个方法最复杂也最难懂,是AQS的核心,一眼从参数上看上去就非常复杂,他兼容了share or not share,是否可中断,是否超时等等各种情况,因此复杂度就上来了,我们还是聚焦在当前的场景下,既他传入的是acquire(null, arg, false, false, false, 0L);不share,不中断,不超时....

方法体如下,贴了一部分,不贴全了

? for (;;) {

? ? ? ? ? ? if (!first && (pred = (node == null) null : node.prev) != null &&

? ? ? ? ? ? ? ? !(first = (head == pred))) {

? ? ? ? ? ? ? ? if (pred.status < 0) {

? ? ? ? ? ? ? ? ? ? cleanQueue();? ? ? ? ? // predecessor cancelled

? ? ? ? ? ? ? ? ? ? continue;

? ? ? ? ? ? ? ? } else if (pred.prev == null) {

? ? ? ? ? ? ? ? ? ? Thread.onSpinWait();? ? // ensure serialization

? ? ? ? ? ? ? ? ? ? continue;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? if (first || pred == null) {

? ? ? ? ? ? ? ? boolean acquired;

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? if (shared)

? ? ? ? ? ? ? ? ? ? ? ? acquired = (tryAcquireShared(arg) >= 0);

? ? ? ? ? ? ? ? ? ? else

? ? ? ? ? ? ? ? ? ? ? ? acquired = tryAcquire(arg);

? ? ? ? ? ? ? ? } catch (Throwable ex) {

? ? ? ? ? ? ? ? ? ? cancelAcquire(node, interrupted, false);

? ? ? ? ? ? ? ? ? ? throw ex;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? if (acquired) {

? ? ? ? ? ? ? ? ? ? if (first) {

? ? ? ? ? ? ? ? ? ? ? ? node.prev = null;

? ? ? ? ? ? ? ? ? ? ? ? head = node;

? ? ? ? ? ? ? ? ? ? ? ? pred.next = null;

? ? ? ? ? ? ? ? ? ? ? ? node.waiter = null;

? ? ? ? ? ? ? ? ? ? ? ? if (shared)

? ? ? ? ? ? ? ? ? ? ? ? ? ? signalNextIfShared(node);

? ? ? ? ? ? ? ? ? ? ? ? if (interrupted)

? ? ? ? ? ? ? ? ? ? ? ? ? ? current.interrupt();

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? return 1;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? if (node == null) {? ? ? ? ? ? ? ? // allocate; retry before enqueue

? ? ? ? ? ? ? ? if (shared)

? ? ? ? ? ? ? ? ? ? node = new SharedNode();

? ? ? ? ? ? ? ? else

? ? ? ? ? ? ? ? ? ? node = new ExclusiveNode();

? ? ? ? ? ? } else if (pred == null) {? ? ? ? ? // try to enqueue

? ? ? ? ? ? ? ? node.waiter = current;

? ? ? ? ? ? ? ? Node t = tail;

? ? ? ? ? ? ? ? node.setPrevRelaxed(t);? ? ? ? // avoid unnecessary fence

? ? ? ? ? ? ? ? if (t == null)

? ? ? ? ? ? ? ? ? ? tryInitializeHead();

? ? ? ? ? ? ? ? else if (!casTail(t, node))

? ? ? ? ? ? ? ? ? ? node.setPrevRelaxed(null);? // back out

? ? ? ? ? ? ? ? else

? ? ? ? ? ? ? ? ? ? t.next = node;

? ? ? ? ? ? } else if (first && spins != 0) {

? ? ? ? ? ? ? ? --spins;? ? ? ? ? ? ? ? ? ? ? ? // reduce unfairness on rewaits

? ? ? ? ? ? ? ? Thread.onSpinWait();

? ? ? ? ? ? } else if (node.status == 0) {

? ? ? ? ? ? ? ? node.status = WAITING;? ? ? ? ? // enable signal and recheck

? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? long nanos;

? ? ? ? ? ? ? ? spins = postSpins = (byte)((postSpins << 1) | 1);

? ? ? ? ? ? ? ? if (!timed)

? ? ? ? ? ? ? ? ? ? LockSupport.park(this);

? ? ? ? ? ? ? ? else if ((nanos = time - System.nanoTime()) > 0L)

? ? ? ? ? ? ? ? ? ? LockSupport.parkNanos(this, nanos);

? ? ? ? ? ? ? ? else

? ? ? ? ? ? ? ? ? ? break;

? ? ? ? ? ? ? ? node.clearStatus();

? ? ? ? ? ? ? ? if ((interrupted |= Thread.interrupted()) && interruptible)

? ? ? ? ? ? ? ? ? ? break;

? ? ? ? ? ? }

? ? ? ? }

他搞了一个死循环通过多次的循环+if条件来实现一个函数多个功能的目的。

第一次循环先走到这里 , 先初始化一个node

if (node == null) {? ? ? ? ? ? ? ? // allocate; retry before enqueue

? ? ? ? ? ? ? ? if (shared)

? ? ? ? ? ? ? ? ? ? node = new SharedNode();

? ? ? ? ? ? ? ? else

? ? ? ? ? ? ? ? ? ? node = new ExclusiveNode();

? ? ? ? ? ? }

第二次循环到这里,将当前节点加入到tail尾巴上,并把prev指向上一个node,同时把上一个node的next指向当前节点,可以看到这个实现了一个链表

else if (pred == null) {? ? ? ? ? // try to enqueue

? ? ? ? ? ? ? ? node.waiter = current;

? ? ? ? ? ? ? ? Node t = tail;

? ? ? ? ? ? ? ? node.setPrevRelaxed(t);? ? ? ? // avoid unnecessary fence

? ? ? ? ? ? ? ? if (t == null)

? ? ? ? ? ? ? ? ? ? tryInitializeHead();

? ? ? ? ? ? ? ? else if (!casTail(t, node))

? ? ? ? ? ? ? ? ? ? node.setPrevRelaxed(null);? // back out

? ? ? ? ? ? ? ? else

? ? ? ? ? ? ? ? ? ? t.next = node;

? ? ? ? ? ? }

这里插一张网络上的图来说明,更助于理解这段代码,在AQS里有一个数据结构Node,这个数据结构有一个prev和next属性,实现了一个双向链表的功能,每一个调用lock的线程进来,如果拿不到锁就会加入这个队列进行等待,那么这个队列的操作过程就在上面的代码里。

如果队列里没有了节点,当前就是头节点,又来了尝试去获取锁,就是下面这段代码。否则就调用park,挂起当前的线程。

if (first || pred == null) {

? ? ? ? ? ? ? ? boolean acquired;

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? if (shared)

? ? ? ? ? ? ? ? ? ? ? ? acquired = (tryAcquireShared(arg) >= 0);

? ? ? ? ? ? ? ? ? ? else

? ? ? ? ? ? ? ? ? ? ? ? acquired = tryAcquire(arg);

? ? ? ? ? ? ? ? } catch (Throwable ex) {

? ? ? ? ? ? ? ? ? ? cancelAcquire(node, interrupted, false);

? ? ? ? ? ? ? ? ? ? throw ex;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? if (acquired) {

? ? ? ? ? ? ? ? ? ? if (first) {

? ? ? ? ? ? ? ? ? ? ? ? node.prev = null;

? ? ? ? ? ? ? ? ? ? ? ? head = node;

? ? ? ? ? ? ? ? ? ? ? ? pred.next = null;

? ? ? ? ? ? ? ? ? ? ? ? node.waiter = null;

? ? ? ? ? ? ? ? ? ? ? ? if (shared)

? ? ? ? ? ? ? ? ? ? ? ? ? ? signalNextIfShared(node);

? ? ? ? ? ? ? ? ? ? ? ? if (interrupted)

? ? ? ? ? ? ? ? ? ? ? ? ? ? current.interrupt();

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? return 1;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

以上就是里面的比较核心的内容,如果是你来写,是不是很简单,如果是你会怎么写呢?这段代码比较诟病的地方就是同一个函数做了很多事情,包含初始化node节点,队列的更新,并且是通过多次循环+if来实现了,看起来很难懂。

最后,我们问自己几个问题

锁释放的时候,会如何呢?代码也比较简单,如下,unLock的时候,传入1

每一次调用release(1)就会扣减一次,对应就是可重入锁的计数器的减少。同时考虑了异常调用,当别的线程试着去释放当前锁的时候,抛错。同时如果锁释放完了,将state设置为0,将当前线程置为空,彻底释放锁。

接下来就是SinalNext(head),这里使用的是LockSupport.unpark(s.waiter);唤醒我们的头结点,就是插入队列的第一个节点。

公平锁和非公平锁有什么差别呢?看了半天,我们发现无论是公平锁还是非公平锁,逻辑似乎都是一样的,都要入队列,都维持了队列,那么说好的非公平锁体现在哪里呢?

原来在公平锁的获取锁的过程中,总是事先判断下队列是否为空,如果不为空则加入等待队列,而非公平锁则不一样,不管你队列空不空,先抢一把再说,有很大概率当前线程就直接抢到了锁,他就没有进入等待队列,这对于先到的线程来说,肯定不公平。所以很多候选人都回答,你公平锁因为要维持队列,所以性能更差显然是不准确的。即使非公平锁他们大概率都要入队列,维持队列,不同的仅仅是获得锁的等待时间不同而已,机会不一样。

总结一下,全篇的内容,主要介绍了AQS的部分实现机制,并通过ReentrantLock的实现简单讲解了AQS的源代码。Java的锁机制也都是通过一个volatile修饰的state变量,通过底层的CAS操作设置这个值实现了锁的功能。同时对于无法获取锁的线程则通过一个双向队列的维持,借助Java自身的park对这些线程设置为等待。在锁释放的时候,再去队列头通过unpark来唤醒该线程继续工作,就是这么简单,没什么神秘的。

最后用chatgpt生成的代码注释帮助理解

/**

* 尝试获取锁或信号量,如果成功获取则返回1,否则继续尝试获取直至成功或被中断或超时

*

* @param node? ? ? ? ? 当前节点

* @param arg? ? ? ? ? 获取锁或信号量时的参数

* @param shared? ? ? ? 是否是共享模式

* @param interruptible 是否允许被中断

* @param timed? ? ? ? 是否使用定时等待

* @param time? ? ? ? ? 定时等待的超时时间

* @return 成功获取返回1,超时返回0,被中断返回负数

*/

final int acquire(Node node, int arg, boolean shared,

? ? ? ? ? ? ? ? ? boolean interruptible, boolean timed, long time) {

? ? // 获取当前线程

? ? Thread current = Thread.currentThread();

? ? // 用于重试计数的变量

? ? byte spins = 0, postSpins = 0;

? ? // 用于标记当前线程是否被中断以及当前节点是否是队列中的首节点

? ? boolean interrupted = false, first = false;

? ? // 当前节点的前驱节点

? ? Node pred = null;

? ? // 循环尝试获取锁或信号量

? ? for (;;) {

? ? ? ? // 检查是否是队列中的首节点

? ? ? ? if (!first && (pred = (node == null) null : node.prev) != null &&

? ? ? ? ? ? !(first = (head == pred))) {

? ? ? ? ? ? // 非首节点,检查前驱节点是否已取消或是有新的前驱节点

? ? ? ? ? ? if (pred.status < 0) {

? ? ? ? ? ? ? ? // 前驱节点已取消,清理队列

? ? ? ? ? ? ? ? cleanQueue();

? ? ? ? ? ? ? ? continue; // 重新开始循环尝试获取锁或信号量

? ? ? ? ? ? } else if (pred.prev == null) {

? ? ? ? ? ? ? ? // 确保串行化,避免过度自旋

? ? ? ? ? ? ? ? Thread.onSpinWait();

? ? ? ? ? ? ? ? continue; // 重新开始循环尝试获取锁或信号量

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? // 尝试获取锁或信号量

? ? ? ? if (first || pred == null) {

? ? ? ? ? ? // 首节点或前驱节点为null,说明当前节点还未入队列

? ? ? ? ? ? boolean acquired;

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? // 尝试获取锁或信号量

? ? ? ? ? ? ? ? if (shared)

? ? ? ? ? ? ? ? ? ? acquired = (tryAcquireShared(arg) >= 0);

? ? ? ? ? ? ? ? else

? ? ? ? ? ? ? ? ? ? acquired = tryAcquire(arg);

? ? ? ? ? ? } catch (Throwable ex) {

? ? ? ? ? ? ? ? // 取消获取操作,抛出异常

? ? ? ? ? ? ? ? cancelAcquire(node, interrupted, false);

? ? ? ? ? ? ? ? throw ex;

? ? ? ? ? ? }

? ? ? ? ? ? // 如果成功获取锁或信号量

? ? ? ? ? ? if (acquired) {

? ? ? ? ? ? ? ? // 更新头节点和前驱节点的引用,设置节点状态,唤醒其他等待线程

? ? ? ? ? ? ? ? if (first) {

? ? ? ? ? ? ? ? ? ? node.prev = null;

? ? ? ? ? ? ? ? ? ? head = node;

? ? ? ? ? ? ? ? ? ? pred.next = null;

? ? ? ? ? ? ? ? ? ? node.waiter = null;

? ? ? ? ? ? ? ? ? ? if (shared)

? ? ? ? ? ? ? ? ? ? ? ? signalNextIfShared(node);

? ? ? ? ? ? ? ? ? ? if (interrupted)

? ? ? ? ? ? ? ? ? ? ? ? current.interrupt();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? // 返回成功

? ? ? ? ? ? ? ? return 1;

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? // 尝试入队或重试

? ? ? ? if (node == null) {

? ? ? ? ? ? // 未分配节点,分配新节点并重试

? ? ? ? ? ? if (shared)

? ? ? ? ? ? ? ? node = new SharedNode();

? ? ? ? ? ? else

? ? ? ? ? ? ? ? node = new ExclusiveNode();

? ? ? ? } else if (pred == null) {

? ? ? ? ? ? // 前驱节点为null,说明当前节点还未入队列,尝试将当前节点入队列

? ? ? ? ? ? node.waiter = current;

? ? ? ? ? ? Node t = tail;

? ? ? ? ? ? node.setPrevRelaxed(t); // 避免不必要的内存屏障

? ? ? ? ? ? if (t == null)

? ? ? ? ? ? ? ? tryInitializeHead();

? ? ? ? ? ? else if (!casTail(t, node))

? ? ? ? ? ? ? ? node.setPrevRelaxed(null); // 入队失败,回退

? ? ? ? ? ? else

? ? ? ? ? ? ? ? t.next = node;

? ? ? ? } else if (first && spins != 0) {

? ? ? ? ? ? // 重试次数限制,减少对先前线程的不公平竞争

? ? ? ? ? ? --spins;

? ? ? ? ? ? // 进行自旋等待

? ? ? ? ? ? Thread.onSpinWait();

? ? ? ? } else if (node.status == 0) {

? ? ? ? ? ? // 设置状态为等待中,以便其他线程进行唤醒

? ? ? ? ? ? node.status = WAITING;

? ? ? ? } else {

? ? ? ? ? ? // 需要定时等待

? ? ? ? ? ? long nanos;

? ? ? ? ? ? spins = postSpins = (byte) ((postSpins << 1) | 1);

? ? ? ? ? ? if (!timed)

? ? ? ? ? ? ? ? LockSupport.park(this); // 非定时等待

? ? ? ? ? ? else if ((nanos = time - System.nanoTime()) > 0L)

? ? ? ? ? ? ? ? LockSupport.parkNanos(this, nanos); // 定时等待

? ? ? ? ? ? else

? ? ? ? ? ? ? ? break; // 超时,结束等待

? ? ? ? ? ? // 清除节点的状态

? ? ? ? ? ? node.clearStatus();

? ? ? ? ? ? // 检查线程是否被中断,并根据需要退出循环

? ? ? ? ? ? if ((interrupted |= Thread.interrupted()) && interruptible)

? ? ? ? ? ? ? ? break;

? ? ? ? }

? ? }

? ? // 取消获取操作,并根据中断状态返回相应结果

? ? return cancelAcquire(node, interrupted, interruptible);

}


https://www.xamrdz.com/bigdata/7ck1995091.html

相关文章: