当前位置: 首页>大数据>正文

Java并发编程——ReentrantLock实现原理

一、前言

ReentrantLock主要利用CAS+AQS队列来实现。它支持公平锁和非公平锁,两者的实现类似。

CAS:Compare and Swap,比较并交换。CAS有3个操作数:内存值V、预期值A、要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。该操作是一个原子操作,被广泛的应用在Java的底层实现中。在Java中,CAS主要是由sun.misc.Unsafe这个类通过JNI调用CPU底层指令实现

二、AbstractQueuedSynchronizer

AbstractQueuedSynchronizer (AQS)类如其名,抽象的队列式同步容器,AQS定义类一套多线程访问共享资源的同步器,是一个用于构建锁和同步容器的框架。事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题。

Java并发编程——ReentrantLock实现原理,第1张

AQS使用一个FIFO的队列表示排队等待锁的线程,队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus。

Java并发编程——ReentrantLock实现原理,第2张

AQS以模板方法模式在内部定义了获取和释放同步状态的模板方法,并留下钩子函数供子类继承时进行扩展,由子类决定在获取和释放同步状态时的细节,从而实现满足自身功能特性的需求。除此之外,AQS通过内部的同步队列管理获取同步状态失败的线程,向实现者屏蔽了线程阻塞和唤醒的细节。

2.1 AQS队列节点数据结构

Java并发编程——ReentrantLock实现原理,第3张
  • prev:指向队列中的上一个节点;
  • waitStatus:节点的等待状态,初始化为0,表示正常同步等待:
    • CANCELLED=1:节点因超时或者被中断而取消时设置为取消状态;
    • SIGNAL=-1:指示当前节点被释放后,需要调用unpark通知后面节点,如果后面节点发生竞争导致获取锁失败,也会将当前节点设置为SIGNAL;
    • CONDITION=-2:指示该线程正在进行条件等待,条件队列中会用到;
    • PROPAGATE=-3:共享模式下释放节点时设置的状态,表示无限传播下去。
  • thread:当前节点操作的线程;
  • nextWaiter:该字段在Condition条件等待中会用到,指向条件队列的下一个节点。或者链接到SHARED常量,表示节点正在以共享模式等待;
  • next:指向队列中的下一个节点。
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    /**
     * 同步等待队列(双向链表)中的节点
     */ 
    static final class Node {
    
        /** 用于指示节点正在共享模式下等待的标记 */
        static final Node SHARED = new Node();
        
        /** 用于指示节点正在独占模式下等待的标记 */
        static final Node EXCLUSIVE = null;

        /** 线程被取消了 */
        static final int CANCELLED =  1;
        
        /** 
         * 如果前驱节点的等待状态是SIGNAL,表示当前节点将来可以被唤醒,那么当前节点就可以安全的挂起了 
         * 否则,当前节点不能挂起 
         */
        static final int SIGNAL    = -1;
        
        /** 线程正在等待条件 */
        static final int CONDITION = -2;
        
        /**
         * 指示下一个acquireShared应无条件传播
         */
        static final int PROPAGATE = -3;

        //值就是前四个int值和0(CANCELLED/SIGNAL/CONDITION/PROPAGATE/0)
        volatile int waitStatus;

        /**
         * 前驱节点
         */
        volatile Node prev;

        /**
         * 后继节点
         */
        volatile Node next;

        /**
         * 节点中的线程
         */
        volatile Thread thread;


        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 返回该节点前一个节点
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // 用于addWaiter中
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
}

2.2 AQS同步器数据结构

AQS中同步等待队列的实现是一个带头尾指针(这里用指针表示引用是为了后面讲解源码时可以更直观形象,况且引用本身是一种受限的指针)且不带哨兵结点(后文中的头结点表示队列首元素结点,不是指哨兵结点)的双向链表。

Java并发编程——ReentrantLock实现原理,第4张

如上图,AQS中:

  • state:所有线程通过通过CAS尝试给state设值,当state>0时表示被线程占用;同一个线程多次获取state,会叠加state的值,从而实现了可重入;
  • exclusiveOwnerThread:在独占模式下该属性会用到,当线程尝试以独占模式成功给state设值,该线程会把自己设置到exclusiveOwnerThread变量中,表明当前的state被当前线程独占了;
  • 等待队列(同步队列):等待队列中存放了所有争夺state失败的线程,是一个双向链表结构。state被某一个线程占用之后,其他线程会进入等待队列;一旦state被释放(state=0),则释放state的线程会唤醒等待队列中的线程继续尝试cas设值state;
  • head:指向等待队列的头节点,延迟初始化,除了初始化之外,只能通过setHead方法进行修改;
  • tail:指向等待队列的队尾,延迟初始化,只能通过enq方法修改tail,该方法主要是往队列后面添加等待节点。
/**
 * Head of the wait queue, lazily initialized.  Except for
 * initialization, it is modified only via method setHead.  Note:
 * If head exists, its waitStatus is guaranteed not to be
 * CANCELLED.
 */
private transient volatile Node head;//指向队列首元素的头指针

/**
 * Tail of the wait queue, lazily initialized.  Modified only via
 * method enq to add new wait node.
 */
private transient volatile Node tail;//指向队列尾元素的尾指针

为了接下来能够更好的理解加锁和解锁过程的源码,对该同步队列的特性进行简单的讲解:

  • 1、同步队列是个先进先出(FIFO)队列,获取锁失败的线程将构造结点并加入队列的尾部,并阻塞自己。如何才能线程安全的实现入队是后面讲解的重点,毕竟我们在讲锁的实现,这部分代码肯定是不能用锁的。
  • 2、队列首结点可以用来表示当前正获取锁的线程。
  • 3、当前线程释放锁后将尝试唤醒后续处结点中处于阻塞状态的线程。

同步状态变量

private volatile int state; //同步状态变量

这是一个带volatile前缀的int值,是一个类似计数器的东西。在不同的同步组件中有不同的含义。以ReentrantLock为例,state可以用来表示该锁被线程重入的次数。当state为0表示该锁不被任何线程持有;当state为1表示线程恰好持有该锁1次(未重入);当state大于1则表示锁被线程重入state次。因为这是一个会被并发访问的量,为了防止出现可见性问题要用volatile进行修饰。

持有同步状态的线程标志

/**
 * The current owner of exclusive mode synchronization.
 */
 //持有同步状态的线程标志
private transient Thread exclusiveOwnerThread;

如注释所言,这是在独占同步模式下标记持有同步状态线程的。ReentrantLock就是典型的独占同步模式,该变量用来标识锁被哪个线程持有。

AbstractQueuedSynchronizer自定义同步器实现

自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了,自定义同步容器实现时主要实现以下几种方法:

  • 1、isHeldExclusively():该线程是否正在独占资源,只有用到condition才需要取实现它。
  • 2、tryAcquire(int):独占方式,尝试获取资源,成功则返回true,失败则返回false。
  • 3、tryRelease(int):独占方式,尝试释放资源,成功则返回true,失败则返回false。
  • 4、tryAcquireShared(int):共享方式,尝试获取资源,附属表示获取失败,0表示获取成功,但是没有剩余资源可用,其他线程不能在获取,正数表示获取成功,并且有剩余资源,也就是自己可以重复获取(可重入)。
  • 5、tryReleaseShare(int):共享方式。尝试释放资源,成功返回true,失败返回false。
  • 6、以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()的时候,会调用tryAcquire()独占该锁并将state+1。此后其他线程在tryAcquire()独占该锁并将state+1。此后表示其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(几锁释放)为止,其他线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

了解AQS的主要结构后,就可以开始进行ReentrantLock的源码解读了。

三、ReentrantLock

ReentrantLock的基本实现可以概括为:先通过CAS尝试获取锁。如果此时已经有线程占据了锁,那就加入AQS队列并且被挂起。当锁被释放之后,排在CLH队列队首的线程会被唤醒,然后CAS再次尝试获取锁。在这个时候,如果:

  • 非公平锁:如果同时还有另一个线程进来尝试获取,那么有可能会让这个线程抢先获取。

  • 公平锁:如果同时还有另一个线程进来尝试获取,当它发现自己不是在队首的话,就会排到队尾,由队首的线程获取到锁。

  • 可重入锁。可重入锁是指同一个线程可以多次获取同一把锁。ReentrantLock和synchronized都是可重入锁。

  • 可中断锁。可中断锁是指线程尝试获取锁的过程中,是否可以响应中断。synchronized是不可中断锁,而ReentrantLock则提供了中断功能。

公平锁与非公平锁。公平锁是指多个线程同时尝试获取同一把锁时,获取锁的顺序按照线程达到的顺序,而非公平锁则允许线程“插队”。

3.1 Reentrantlock类结构

public class ReentrantLock implements Lock, java.io.Serializable {

}

可以看出Reentrantlock 实现类lock接口,首先看下这个lock接口。

public interface Lock {

    //普通的获取锁的方法,lock会阻塞直到成功
    void lock();

    //与lock不同的时,它可以相应中断,
    void lockInterruptibly() throws InterruptedException;
     
    //尝试获取锁,立即返回,不阻塞,成功返回true,失败返回false
    boolean tryLock();

    //先尝试获取锁,如果成功则返回true,失败则阻塞等待,等待时间由参数决定,
    //在等待的同时相应中断,抛出中断异常,如果在等待的过程中获得类锁则返回true,
    //否则直到时间超时,则返回false。
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    //普通释放锁的方法    
    void unlock();

    //新建一个条件,lock可以关联多个条件,关于条件在以后篇幅中记录
    Condition newCondition();
}

可以看出,显示锁,与synchronied相比,支持非阻塞的方式获取锁,可以响应中断,可以限时,这使得它非常的灵活。

现在回到Reentrantlock类,可以看到其内部基本上定义类三个内部类:

public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        //......
    }
    
    //非公平锁
    static final class NonfairSync extends Sync {
        //......
    }
    
    //公平锁
    static final class FairSync extends Sync {
        //......
    }
}

很显然从上面的类结构,我们可以得出jdk利用CAS算法和AQS实现类ReentrantLock。

3.2 构造函数分析

  • ReentrantLock出于性能考虑,默认采用非公平锁。
public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;
    
    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair new FairSync() : new NonfairSync();
    }
}

四、NonfairSync非公平锁

NonfairSync非公平锁加锁流程(简化):

  • 1、基于CAS尝试将state(锁数量)从0设置为1。
    • A、如果设置成功,设置当前线程为独占锁的线程;
    • B、如果设置失败,还会再获取一次锁数量,
      • B1、如果锁数量为0,再基于CAS尝试将state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程;

      • B2、如果锁数量不为0或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1;如果不是,则将该线程封装在一个Node内,并加入到等待队列中去。等待被其前一个线程节点唤醒。

4.1 非公平模式下的加锁

public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;
    
    static final class NonfairSync extends Sync {

        final void lock() {
            //直接通过CAS,通过改变共享资源 state值,成功则设置当前线程为持有锁的线程(独占),
            //失败则调用顶层AQS的acquire方法,再次尝试,失败则将线程放到阻塞队列
            if (compareAndSetState(0, 1))
                //将当前线程标记为已持有锁
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    }
}

ReentrantLock 中使用 state 用于表示是否被锁和持有的数量,如果当前未被锁定,则立即获得锁,否则调用acquire(1);方法来获得锁,其中acquire为AQS中的方法,其实现为:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    /**
     * 该方法主要做了三件事情:
     * 1. 调用 tryAcquire 尝试获取锁,该方法需由 AQS 的继承类实现,获取成功直接返回
     * 2. 若 tryAcquire 返回 false,则调用 addWaiter 方法,将当前线程封装成节点,
     *    并将节点放入同步队列尾部
     * 3. 调用 acquireQueued 方法让同步队列中的节点循环尝试获取锁
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
}

4.1.1、tryAcquire(arg)尝试去获取锁。如果尝试获取锁成功,方法直接返回。

  • 这段代码调用 tryAcquire 方法来尝试获取锁,该方法需要被子类重写,在 NonFairSync 中的实现为:
public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;
    
    static final class NonfairSync extends Sync {

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
    abstract static class Sync extends AbstractQueuedSynchronizer { 
        final boolean nonfairTryAcquire(int acquires) {
            //获取当前线程
            final Thread current = Thread.currentThread();
            //获取state变量值
            int c = getState();
            if (c == 0) {
                //没有线程占用锁
                if (compareAndSetState(0, acquires)) {
                    //占用锁成功,设置独占线程为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //当前线程已经占用该锁 锁重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                    
                // 更新state值为新的重入次数  
                setState(nextc);
                return true;
            }
            //获取锁失败
            return false;
        }
    }
}

非公平锁tryAcquire的流程是:检查state字段,若为0,表示锁未被占用,那么尝试占用,若不为0,检查当前锁是否被自己占用,若被自己占用,则更新state字段,表示重入锁的次数。如果以上两点都没有成功,则获取锁失败,返回false。

Java并发编程——ReentrantLock实现原理,第5张
tryAcquire()尝试获取锁

不同的锁可以有不同的tryAcquire()实现,所以,你可以看到ReentrantLock锁里面会有非公平锁和公平锁的实现方式。
ReentrantLock公平锁的实现代码在获取锁之前多了一个判断:!hasQueuedPredecessors(),这个是判断如果当前线程节点之前没有其他节点了,那么我们才可以尝试获取锁,这就是公平锁的体现。

4.1.2、进入等待队列。由于上文中提到锁已经被占用了,所以执行tryAcquire失败,并且入等待队列。如果锁一直不释放,那么后续线程就会被挂起。

  • 先看下入队的过程。先看addWaiter(Node.EXCLUSIVE)
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    /**
     * 将新节点和当前线程关联并且入队列
     * @param mode 独占/共享
     * @return 新节点
     */
    private Node addWaiter(Node mode) {
        //初始化节点,设置关联线程和模式(独占 or 共享)
        Node node = new Node(Thread.currentThread(), mode);
        // 获取尾节点引用
        Node pred = tail;
        // 尾节点不为空,说明队列已经初始化过
        if (pred != null) {
            node.prev = pred;
            // 设置新节点为尾节点
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 尾节点为空,说明队列还未初始化,需要初始化head节点并入队新节点
        enq(node);
        return node;
    }
}
  • 假如多个线程同时尝试入队列,由于队列尚未初始化,tail==null,故至少会有一个线程会走到enq(node)。我们假设同时走到了enq(node)里。
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    /**
     * 初始化队列并且入队新节点
     */
    private Node enq(final Node node) {
        //开始自旋
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                // 如果tail为空,则新建一个head节点,并且tail指向head
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                // tail不为空,将新节点入队
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
}

这里体现了经典的自旋+CAS组合来实现非阻塞的原子操作。由于compareAndSetHead的实现使用了unsafe类提供的CAS操作,所以只有一个线程会创建head节点成功。假设线程B成功,之后B、C开始第二轮循环,此时tail已经不为空,两个线程都走到else里面。假设B线程compareAndSetTail成功,那么B就可以返回了,C由于入队失败还需要第三轮循环。最终所有线程都可以成功入队。

当线程进入等待队列后,此时AQS队列如下:


Java并发编程——ReentrantLock实现原理,第6张
Java并发编程——ReentrantLock实现原理,第7张
addWaiter(Node mode)进入等待队列

4.1.3、挂起。B和C相继执行acquireQueued(final Node node, int arg)。这个方法让已经入队的线程尝试获取锁,若失败则会被挂起。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    /**
     * 已经入队的线程尝试获取锁
     */ 
    final boolean acquireQueued(final Node node, int arg) {
        //标记是否成功获取锁
        boolean failed = true;
        try {
            //标记线程是否被中断过
            boolean interrupted = false;
            for (;;) {
                //获取前驱节点
                final Node p = node.predecessor();
                //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    // 获取成功,将当前节点设置为head节点
                    setHead(node);
                    // 原head节点出队,在某个时间点被GC回收
                    p.next = null; // help GC
                    //获取成功
                    failed = false;
                    //返回是否被中断过
                    return interrupted;
                }
                // 判断获取失败后是否可以挂起,若可以则挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 线程若被中断,设置interrupted为true
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}

code里的注释已经很清晰的说明了acquireQueued的执行流程。假设B和C在竞争锁的过程中A一直持有锁,那么它们的tryAcquire操作都会失败,因此会走到第2个if语句中。

我们再看下shouldParkAfterFailedAcquire和parkAndCheckInterrupt都做了哪些事吧。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    /**
     * 判断当前线程获取锁失败之后是否需要挂起.
     */ 
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //前驱节点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            // 前驱节点状态为signal,返回true
            return true;
            
        // 前驱节点状态为CANCELLED 
        if (ws > 0) {
            // 从队尾向前寻找第一个状态不为CANCELLED的节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 将前驱节点的状态设置为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    /**
     * 挂起当前线程,返回线程中断状态并重置
     */ 
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }   
}

线程入队后能够挂起的前提是,它的前驱节点的状态为SIGNAL,所以shouldParkAfterFailedAcquire会先判断当前节点的前驱是否状态符合要求,若符合则返回true,然后调用parkAndCheckInterrupt,将自己挂起。如果不符合,再看前驱节点是否>0(CANCELLED),若是那么向前遍历直到找到第一个符合要求的前驱,若不是则将前驱节点的状态设置为SIGNAL。

整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心挂起,需要去找个安心的挂起点,同时可以再尝试下看有没有机会去尝试竞争锁。

注意:
是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL 来决定,而不是本节点的waitStatus 决定。

如果当前节点的prev不是head,或者争抢失败,则会将前面节点的状态设置为SIGNAL。

如果前面的节点状态大于0,表示节点被取消,这个时候会把该节点从队列中移除掉。

下图为尝试CAS争抢锁,但失败了,然后把head节点状态设置为SIGNAL


Java并发编程——ReentrantLock实现原理,第8张
acquireQueued(final Node node, int arg)

然后再会循环一次尝试获取锁,如果获取失败了,就调用LockSupport.park(this)挂起线程。

那么时候才会触发唤起线程呢?这个时候我们得先看看释放锁是怎么做的了。

五、FairSync公平锁

FairSync公平锁加锁流程(简化):

  • 1、如果锁数量为0,如果当前线程是等待队列中的头节点,基于CAS尝试将state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程;

  • 2、如果锁数量不为0或者当前线程不是等待队列中的头节点或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1;如果不是,则将该线程封装在一个Node内,并加入到等待队列中去。等待被其前一个线程节点唤醒。

5.1 公平模式下的加锁

public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;

    static final class FairSync extends Sync {
        final void lock() {
            acquire(1);
        }
    }
}

调用acquire(1);方法来获得锁,其中acquire为AQS中的方法,其实现为:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    /**
     * 该方法主要做了三件事情:
     * 1. 调用 tryAcquire 尝试获取锁,该方法需由 AQS 的继承类实现,获取成功直接返回
     * 2. 若 tryAcquire 返回 false,则调用 addWaiter 方法,将当前线程封装成节点,
     *    并将节点放入同步队列尾部
     * 3. 调用 acquireQueued 方法让同步队列中的节点循环尝试获取锁
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
}

5.2 FairSync:tryAcquire(int acquires)

public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;

    static final class FairSync extends Sync {
        protected final boolean tryAcquire(int acquires) {
            //获取当前线程
            final Thread current = Thread.currentThread();
            //获取state变量值
            int c = getState();
            if (c == 0) {
                //如果没有线程持有锁
                //调用hasQueuedPredecessors函数判断队列中是否有优先于此线程的排队线程
                //如果没有才去尝试竞争锁
                
                // hasQueuedPredecessors()和非公平锁相比,这里多了一个判断:是否有线程在等待
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    //占用锁成功,设置独占线程为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            
            //当前线程已经占用该锁 锁重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                    
                // 更新state值为新的重入次数      
                setState(nextc);
                return true;
            }
            //获取锁失败
            return false;
        }
    }
}

可以看到这里的逻辑和NonfairSync唯一的区别就是FairSync多了hasQueuedPredecessors函数的调用,在竞争锁之前需要先看队列中是否有优先于此线程的排队线程,如果有的话,那么FairSync不会去竞争锁,而是直接进入后面入队阻塞的逻辑。那么这里强调的优先于此线程的排队线程是什么意思呢?因为FairSync是公平锁的实现,获取锁总要有个先来后到。

5.3 hasQueuedPredecessors

  • 判断队列中是否有优先于此线程的排队线程
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
}

步骤:

  • 1、如果h==t成立,h和t均为null或是同一个具体的节点,无后继节点,返回false。
  • 2、如果h!=t成立,head.next是否为null,如果为null,返回true。
    • 什么情况下h!=t的同时h.next==null?,有其他线程第一次正在入队时,可能会出现。见AQS的enq方法,compareAndSetHead(node)完成,还没执行tail=head语句时,此时tail=null,head=newNode,head.next=null。
  • 3、如果h!=t成立,head.next != null,则判断head.next是否是当前线程,如果是返回false,否则返回true(head节点是获取到锁的节点,但是任意时刻head节点可能占用着锁,也可能释放了锁(unlock()),未被阻塞的head.next节点对应的线程在任意时刻都是有必要去尝试获取锁)

六、释放锁

NonfairSync非公平锁和FairSync公平锁的释放锁代码一样。

6.1 unlock()

步骤:

  • 1、获取当前的锁数量,然后用这个锁数量减去解锁的数量(这里为1),最后得出结果c。

  • 2、判断当前线程是不是独占锁的线程,如果不是,抛出异常。

  • 3、如果c==0,说明锁被成功释放,将当前的独占线程置为null,锁数量置为0,返回true。

  • 4、如果c!=0,说明释放锁失败,锁数量置为c,返回false。

  • 5、如果锁被释放成功的话,唤醒距离头节点最近的一个非取消的节点。

6.2 ReentrantLock:unlock()

public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;
    
    /**
     * 释放这个锁
     *1)如果当前线程持有这个锁,则锁数量被递减
     *2)如果递减之后锁数量为0,则锁被释放。
     *如果当前线程不持久有这个锁,抛出异常
     */
    public void unlock() {
        sync.release(1);
    }
}

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    public final boolean release(int arg) {
        //如果成功释放锁
        if (tryRelease(arg)) {
            //获取头节点:(注意:这里的头节点就是当前正在释放锁的节点)
            Node h = head;
            //头结点存在且等待状态不是取消
            if (h != null && h.waitStatus != 0)
                //唤醒距离头节点最近的一个非取消的节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
}

6.3 Sync:tryRelease(int releases)

最后我们再看下tryRelease的执行过程

public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;
    
    /**
     * 释放当前线程占用的锁
     * @param releases
     * @return 是否释放成功
     */
    abstract static class Sync extends AbstractQueuedSynchronizer { 
        protected final boolean tryRelease(int releases) {
            // 计算释放后state值
            int c = getState() - releases;

            if (Thread.currentThread() != getExclusiveOwnerThread())
                // 如果不是当前线程占用锁,那么抛出异常
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                // 锁被重入次数为0,表示释放成功
                free = true;
                // 清空独占线程
                setExclusiveOwnerThread(null);
            }
            // 更新state值
            setState(c);
            return free;
        }
    }
}

6.4 AbstractQueuedSynchronizer:unparkSuccessor(Node node)

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    /**
     * 唤醒离头节点node最近的一个非取消的节点
     * @param node 头节点
     */ 
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            //将ws设为0状态(即什么状态都不是)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * 获取头节点的下一个等待状态不是cancel的节点
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            /*
             * 注意:从后往前遍历找到离头节点最近的一个非取消的节点,
             * 从后往前遍历据说是在入队(enq())的时候,可能nodeX.next==null
             */         
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //唤醒离头节点最近的一个非取消的节点
            LockSupport.unpark(s.thread);
    }
}

tryRelease()具体由子类实现。一般处理流程是让state减1。

如果释放锁成功,并且头节点waitStatus!=0,那么会调用unparkSuccessor()通知唤醒后续的线程节点进行处理。

注意:在遍历队列查找唤醒下一个节点的过程中,如果发现下一个节点状态是CANCELLED那么就会忽略这个节点,然后从队列尾部向前遍历,找到与头结点最近的没有被取消的节点进行唤醒操作。

Java并发编程——ReentrantLock实现原理,第9张

唤醒之后,节点对应的线程2又从acquireQueued()方法的阻塞处醒来继续参与争抢锁。并且争抢成功了,那么会把head节点的下一个节点设置为null,让自己所处的节点变为head节点:

Java并发编程——ReentrantLock实现原理,第10张

这样一个AQS独占式、非中断的抢占锁的流程就结束了。

七、完整流程

最后我们再以另一个维度的流程来演示下这个过程。

首先有4个线程争抢锁,线程1,成功了,其他三个失败了,分别依次入等待队列:

Java并发编程——ReentrantLock实现原理,第11张

线程2、线程3依次入队列:


Java并发编程——ReentrantLock实现原理,第12张

现在突然发生了点事情,假设线程3用的是带有超时时间的tryLock,超过了等待时间,线程3状态变为取消状态了,这个时候,线程4追加到等待队列中后,发现前一个节点的状态是1取消状态,那么会执行操作把线程3节点从队列中移除掉:

Java并发编程——ReentrantLock实现原理,第13张

最后,线程1释放了锁,然后把head节点ws设置为0,并且找到了离head最靠近的一个waitStatus<=0的线程并唤醒,然后参与竞争获取锁:

Java并发编程——ReentrantLock实现原理,第14张

最终,线程2获取到了锁,然后把自己变为了Head节点,并取代了原来的Head节点:

Java并发编程——ReentrantLock实现原理,第15张

接着就一直这样循环,我就不再画图了,聪明的你应该对此了如指掌了。

八、可打断原理

8.1 不可打断模式

在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    private final boolean parkAndCheckInterrupt() {
        // 如果打断标记已经是 true, 则 park 会失效
        LockSupport.park(this);
        // interrupted 会清除打断标记
        return Thread.interrupted();
    }   
    
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 还是需要获得锁后, 才能返回打断状态
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 如果是因为 interrupt 被唤醒, 返回打断状态为 true
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }   
    
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 如果打断状态为 true
            selfInterrupt();
    }   
    
    static void selfInterrupt() {
        // 重新产生一次中断
        Thread.currentThread().interrupt();
    }   
}

8.2 可打断模式

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
            
        // 如果没有获得到锁, 进入 ㈠   
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }   
    
    // ㈠ 可打断的获取锁流程
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 在 park 过程中如果被 interrupt 会进入此
                    // 这时候抛出异常, 而不会再次进入 for (;;)    
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}

九、总结:

9.1 公平锁与非公平锁对比

  • FairSync:lock()少了插队部分(即少了CAS尝试将state从0设为1,进而获得锁的过程)
  • FairSync:tryAcquire(int acquires)多了需要判断当前线程是否在等待队列首部的逻辑(实际上就是少了再次插队的过程,但是CAS获取还是有的)。

ReentrantLock是基于AbstractQueuedSynchronizer实现的,AbstractQueuedSynchronizer可以实现独占锁也可以实现共享锁,ReentrantLock只是使用了其中的独占锁模式。

相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。

参考:
https://blog.csdn.net/fuyuwei2015/article/details/83719444

https://blog.51cto.com/u_3265857/2369641

https://segmentfault.com/a/1190000014769953

https://www.jb51.net/article/105762.htm

https://www.cnblogs.com/java-zhao/p/5133402.html

https://www.cnblogs.com/java-zhao/p/5131544.html

https://www.itzhai.com/articles/aqs-and-lock-implementation-in-concurrent-packages.html


https://www.xamrdz.com/bigdata/7l21995515.html

相关文章: