当前位置: 首页>编程语言>正文

Java多线程同步-锁

Java多线程同步

前言:
本章节是参考网上文章并自行研究锁的一部分总结,由于本人从事Android开发,所以在针对锁的底层实现时,会对比x86和ARM架构下对应的实现,如有问题请及时指出;

1. Java锁

  • Lock出现之前,Java使用synchronized实现多线程同步,JDK5之后,JUC包下引入Lock接口及其实现类实现同步功能,Lock相比synchronized更灵活,拥有手动获取和释放锁、非阻塞式获取锁、响应中断、获取超时等新特点;
  • Java提供了种类丰富的锁,按照特性的不同,可以按以下宏观概念理解:
    • 乐观锁和悲观锁;
    • 自旋锁;
    • 无锁、偏向锁、轻量级锁、重量级锁;
    • 公平锁和非公平锁;
    • 可重入锁;
    • 独享锁(排他锁)和共享锁;

1.1 乐观锁VS悲观锁

  1. 悲观锁:
    对于同一数据的多线程并发操作,悲观锁认为,自己在使用数据的时候一定有其他线程来修改数据,因此get数据的时候先加锁,确保数据不会被其他线程修改,Java中的synchronized关键字和Lock的实现类都是悲观锁;
  2. 乐观锁:
    认为自己在使用数据时不会被别的线程修改,所以不会添加锁,只是在更新数据的时候去判断之前有没有其他线程更新了这个数据。如果没有被更新,当前线程write成功;如果被更新,当前线程根据不同的实现类,执行不同的操作(例如报错或自动重试)。乐观锁在Java中是通过使用无锁编程实现的,采用CAS算法,Java原子类中的递增操作就是通过CAS自旋实现的;
  3. 使用场景:
    • 悲观锁适合write场景,synchronized,ReentrantLock,都是显式的加锁后再操作同步资源或代码块;
    • 乐观锁适合read场景,AtomicInteger.incrementAndGet(),Java层不锁定,直接操作资源;
  4. 乐观锁使用CAS算法实现多线程同步:
    CAS(Compare and Swap),无锁算法,在不使用锁,并且线程没有被阻塞的情况下实现多线程之间数据同步。

    CAS算法涉及到三个操作数:需要读写的内存值V;进行比较的的值A;待写入的新值B;
    当且仅当V==A,通过原子方式让V=B("比较+更新"整体是一个原子操作),否则由一个do()while{}循环,继续从头执行上述步骤,自旋次数也是有上限的(默认10次,看具体的操作系统,可以通过JVM参数修改);java.utils.concurrent包中的原子类就是通过CAS实现的,例如AtomicInteger,后续介绍:

  5. CAS高效,因为自旋减少切换线程所需的开销,但存在几个问题:
    • ABA的问题:CAS需要在操作值之前检查内存值是否发生变化,没有变化才更新内存值;但如果内存值由A->B->A最终变回A,但其实内存值是变化过的,这种情况不能被忽略,所以在变量前添加版本号即可;
    • while循环时间长开销大:如果长时间while,会一直自旋,一直占用CPU资源,开销也很大;
    • 只能保证一个共享变量的原子操作:无法保证同时对多个共享变量的原子性操作,所以JDK1.5开始提供了AtomicReference,把多个变量放到一个对象来保证执行CAS操作时的原子性;
    • CAS无锁算法和加锁的效率,不一定谁低谁高,不同场景需要使用不同的方式解决安全问题;如果线程数和CPU核数一致,CAS效率高;但如果线程很多,远大于CPU核数,则CAS效率很低,因为CAS会占用CPU资源;

1.2 自旋锁

1.2.1 自旋锁介绍

  1. 阻塞或唤醒一个Java线程需要操作系统切换CPU状态,这种状态切换需要耗费处理器时间。如果同步代码内容很简单,状态转换消耗的时间可能比执行代码的还长,为了这种情况切换线程(线程挂起、恢复现场,状态同步),得不偿失;

  2. 如果机器有多个处理器,能让2个或以上线程同时并行执行,则可以让后来的请求锁的thread不放弃CPU的执行时间,而是通过自旋的方式"稍稍等待一下",看看当前持有锁的线程是否很快就释放锁。如果自旋完成后,前面线程释放了锁,则当前线程可以直接获取锁并且执行同步代码,而不必走"阻塞->唤醒"这样耗时耗资源的步骤,这就是自旋锁;

  3. 说白了,自旋锁可以简单理解为:如果发现锁被其他线程拿着,在当前线程,一定条件的while循环,而不是阻塞当前线程:

    Java多线程同步-锁,第1张
    Java自旋锁.png
  4. 缺点:
    不能代替线程阻塞,自旋是可以减少线程切换的开销,但会占用CPU执行时间。如果锁被占用的时间很少,那自旋锁效果很好,反之,也会浪费CPU资源。所以自旋的while是有限制的,超过自旋次数就挂起线程;

1.2.2 自旋锁具体实现原理

  1. 实现算法就是CAS,对应具体的实现类就是juc包下Atomic开头的原子类,下面从java.util.concurrent.atomic.AtomicInteger.getAndAddInt()逐步分析:
    public class AtomicInteger extends Number implements java.io.Serializable {
        private static final long serialVersionUID = 6214790243416807050L;
        // Unsafe对象可以直接获取并操作内存的数据
        private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
        // 该成员变量对应真正value在AtomicInteger对象所在内存地址中的偏移量offset
        private static final long VALUE;
    
        static {
            try {
                VALUE = U.objectFieldOffset
                    (AtomicInteger.class.getDeclaredField("value"));
            } catch (ReflectiveOperationException e) {
                throw new Error(e);
            }
        }
        // 真正的int值,volatile修饰,保证线程之间是可见的,在volatile章节已介绍可见性的实现
        private volatile int value;
        
        ......
        
        public final int getAndAdd(int delta) {
            // 最终调用Unsafe.getAndAddInt()
            return U.getAndAddInt(this, VALUE, delta);
        }
    }
    

    CAS 本质:依赖核

  2. 接着看sun.misc.Unsafe类:
    // sun.misc.Unsafe.getAndAddInt()
    public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            // 根据value在AtomicInteger对象所在内存地址中的偏移量offset获取当前内存中的值v,然后判断内存中的值是否等于v
            v = this.getIntVolatile(o, offset);
            // 然后调用native方法compareAndSwapInt(),对比v和(v+delta)的值是否相等,如果相等则表示没有其他线程更改过这个值,接着将新值设置到内存
        } while(!this.compareAndSwapInt(o, offset, v, v + delta));
        return v;
    }
    
    可以看到,native方法compareAndSwapInt()其实包括了2个操作:比较+更新,多个操作如何保证原子性?
  3. 继续往下找,以openjdk中HotSpot虚拟机为例,查看openjdk/hotspot/src/share/vm/prims/unsafe.cpp文件,从中找到Unsafe_CompareAndSwapInt,代码如下:
    UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
      UnsafeWrapper("Unsafe_CompareAndSwapInt");
      // p就是内存中的Unsafe对象
      oop p = JNIHandles::resolve(obj);
      // 从Unsafe对象中,根据偏移量offset拿到真正保存int值的地址
      jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
      // 调用Atomic::cmpxchg()函数进行"比较+更新"的操作,x就是待更新的值,e是原值,Atomic::cmpxchg()的返回值
      return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
    UNSAFE_END
    
  4. 上述最终调用Atomic::cmpxchg()函数实现,该函数的实现是平台相关的,不同平台实现不同,Linux x86系统的实现如下:
    inline jint     Atomic::cmpxchg(jint exchange_value, volatile jint* dest, jint compare_value) {
      // 这里mp表示当前系统是否为多核架构
      // 如果是就给总线加锁,所以同一芯片上的其他处理器就暂时不能通过总线访问内存,保证了该指令在多处理器环境下的原子性
      int mp = os::is_MP();
      // LOCK_IF_MP():判断是否是多核架构,如果是,则需要添加"lock"指令,才能保证不被其他处理器打断cmpxchgl操作;
      __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"    // 汇编模版,%1指向输入参数中的exchange_value,%3指向dest,类推...
                        : "=a" (exchange_value)              // 输出操作数,"a"对应累加寄存器EAX,下面的"r"指其他寄存器
                        : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)  // 输入参数
                        : "cc", "memory");                   // 一些其他参数,"memory"
      return exchange_value;
    }
    

    这段ASM汇编代码简单翻译一下就是:汇编指令cmpxchgl会比较EAX寄存器的值(compare_value,该线程已经拿到的值)和exchange_value(待更新的值),如果相等,就把dest的值(即真实地址中的值)赋给exchange_value并返回给上层Unsafe_CompareAndSwapInt()中,在该函数中会将dest的值和当前线程已拿到的值e比较,此时不等,则CAS失败,继续do{}while{};否则,将待更新的值exchange_value写入EAX寄存器,CAS成功;

  5. 总结:
    Linux x86平台,多核架构,JDK中AtomicXX类的CAS算法是由汇编指令"lock cmpxchgl"实现的;单核架构,不添加"lock"指令;该汇编指令保证了"比较+更新"操作的原子性;
  6. 对比 Linux ARM架构的实现,代码在linux/arch/arm/include/asm/atomic.h文件中:
    ...
    
    #if __LINUX_ARM_ARCH__ >= 6
    
    /*
     * ARMv6 UP and SMP safe atomic ops.  We use load exclusive and
     * store exclusive to ensure that these are atomic.  We may loop
     * to ensure that the update happens.
     */
    
    ...                         \
    
    #define ATOMIC_OP_RETURN(op, c_op, asm_op)              \
    static inline int atomic_##op##_return_relaxed(int i, atomic_t *v)  \
    {                                   \
        unsigned long tmp;                      \
        int result;                         \
                                        \
        prefetchw(&v->counter);                     \
                                        \
        __asm__ __volatile__("@ atomic_" #op "_return\n"        \
    "1: ldrex   %0, [%3]\n"                     \
    "   " #asm_op " %0, %0, %4\n"                   \
    "   strex   %1, %0, [%3]\n"                     \
    "   teq %1, #0\n"                       \
    "   bne 1b"                         \
        : "=&r" (result), "=&r" (tmp), "+Qo" (v->counter)       \
        : "r" (&v->counter), "Ir" (i)                   \
        : "cc");                            \
                                        \
        return result;                          \
    }
    
    • 可以从上面汇编代码里看到,Linux ARMv6及之后的架构,使用ldrex和strex指令独占的访问内存,实现原子性;
    • 简单理解,这俩指令会将被访问的内存段设置"独占"标记,这俩指令也是ARM架构平台实现线程同步工具的基础;
  7. 补充LongAdder,专门用于基础数据类型递增的场景,替换AtomicXX类:
    1. 大致实现:
      在increment()操作中,创建Cell数组,分块Cell累加,累加单元和CPU核数有关,利用多核CAS运算,提高整个递增的效率,最后longValue()获取结果时,所有的Cell累加后将结果抛到调用者;
    2. 使用场景:
      累加操作,且量级很大的时候替代AtomicXX使用;

1.2.3 Unsafe魔法类

不只是AtomicXXX的实现,看过Java锁的代码后发现,所有的锁最终都会调用sun.misc.Unsafe的API来保证操作的原子性,接着了解一下黑科技Unsafe类;

  1. 主要功能如下:

    Java多线程同步-锁,第2张
    Java魔法类Unsafe.png
  2. Unsafe.java是单例,提供静态方法getUnsafe()获取Unsafe对象,当且仅当调用getUnsafe()的类为BootstrapClassLoader加载时才合法,否则抛出SecurityException异常:

    public final class Unsafe {
      // 单例对象
      private static final Unsafe theUnsafe;
    
      private Unsafe() {
      }
      @CallerSensitive
      public static Unsafe getUnsafe() {
        Class var0 = Reflection.getCallerClass();
        // 仅在引导类加载器`BootstrapClassLoader`加载时才合法
        if(!VM.isSystemDomainLoader(var0.getClassLoader())) {    
          throw new SecurityException("Unsafe");
        } else {
          return theUnsafe;
        }
      }
    }
    
  3. Unsafe类是JDK提供给Java开发者的一个操作系统后门,可以直接对操作系统的内存等资源进行操作,即:增强了Java语言对底层资源操作的能力。既然可以像C/C++一样操作内存空间,使用Unsafe是比较高风险的,需要谨慎,下面简单描述下Unsafe常见的能力:

    1. 内存操作
      • 主要包含堆外内存的分配、拷贝、释放、给定地址值操作等方法;
      • 通常Java中创建的对象都位于堆内内存(heap),heap由JVM管控,是Java进程内存,遵循JVM垃圾回收机制;而Unsafe提供的接口是对 "对外内存"进行操作,不受JVM内存管理机制约束,所以使用不当很危险;
      • 使用堆外内存的好处:1.改善垃圾回收时造成的停顿现象;2.提升I/O性能,通常在I/O通信过程中,会存在堆内内存到堆外内存的数据拷贝操作,对于需要频繁进行内存间数据拷贝且生命周期较短的暂存数据,都建议存储到堆外内存;
      • 应用:nio包下的DirectByteBuffer是Java层使用堆外内存的具体实现,内部接口就是使用Unsafe实现的;
    2. CAS
      • CAS:实现并发算法时用到的一种技术,Compare And Swap;
      • CAS操作包含3个操作数:内存值value、预期的原值old、新值new;
      • 执行CAS操作时,比较value和old,如果value==old,则更新内存值value=new,否则根据具体的实现或进行自旋,或报错等等;
      • 应用:JUC包下的AtomicXXX最终执行Unsafe.compareAndSwapXXX();还有Java AQS、CurrentHashMap等;
    3. Class相关
      • 提供Class和它的静态字段的操作相关方法,包含静态字段内存定位、定义类、定义匿名类、检验&确保初始化等;
      • 应用:从Java 8开始,JDK使用invokedynamic(JDK 7引入的运行动态语言的一条新的虚拟机指令)及VM Anonymous Class(一种模版机制)结合来实现Java语言层面上的Lambda表达式;
    4. 操作对象
      • 对对象成员属性、非常规的实例化等操作(Unsafe.allocateInstance()等);
      • 常规的对象实例化:使用new关键字,如果Foo类只有有参构造函数,且没显示声明无参构造函数,则只能调用有参构造函数创建对象;
      • 非常规对象实例化:上述Foo,可以使用Unsafe.allocateInstance()绕过JVM安全检查,直接调用Foo隐式的无参构造函数实例化对象;
      • 应用:Gson反序列化;日常开发需要注意这俩问题:Gson与Kotlin碰撞出一个不安全的操作、Gson 又搞了个坑
    5. 线程调度
      • 包括线程阻塞(park())、唤醒(unpark())、锁机制(Unsafe.monitorEnter()...)等方法;
      • Java锁的核心类AQS;
    6. 获取系统信息
      • 获取系统相关信息:返回系统指针的大小、获取内存页的大小;
      • 应用:nio包下的工具类Bits,DirectByteBuffer中使用了Bits;

1.3 无锁 VS 偏向锁 VS 轻量级锁 VS 重量级锁

  1. JDK 1.6中引入了这4个概念,具体是指锁的四种状态,针对synchronized设定的,对于Android系统的synchronized可以参考synchronized 第1.3节第9点最后的流程图;
  2. 无锁:
    没有对资源进行锁定,所有的线程都能访问并修改同一个资源,但同时只有一个线程能修改成功;
  3. 偏向锁:
    是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁,降低获取锁的代价(如果一段synchronized代码,只有一个线程在执行,每次执行都CAS操作,该操作中包含"获取锁->释放锁",没有必要)。只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁。释放后锁的状态恢复到无锁或轻量级锁;
  4. 轻量级锁:
    是指当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能;偏向锁和轻量级锁对应 Android虚拟机ART 的"瘦锁"态。
  5. 重量级锁:
    若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁,这里的重量级锁对应 Android虚拟机ART 的"胖锁"态,即 使用ART封装的Mutex锁。

1.4 公平锁 VS 非公平锁

  1. 公平锁:
    公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁。公平锁的优点是等待锁的线程不会饿死。缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大。
  2. 非公平锁:
    非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取锁的场景。非公平锁的优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞直接获得锁,CPU不必唤醒所有线程。缺点是处于等待队列中的线程可能会饿死,或者等很久才会获得锁。
  3. Java中具体的实现,ReentrantLock可以选择创建哪种锁,默认是创建非公平锁:
    public ReentrantLock(boolean fair) {
        sync = fair new FairSync() : new NonfairSync();
    }
    
  4. 在JDK8中,对比ReentrantLock实现 公平锁/非公平锁 获取锁的代码:
    // 非公平锁的实现
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            ...
            return true;
        }
        return false;
    }
    
    // 公平锁的实现
    static final class FairSync extends Sync {
        ...
        
        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 区别!!!
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                ...
                return true;
            }
            return false;
        }
    }
    
    唯一区别在于,公平锁在尝试获取锁时,增加了!hasQueuedPredecessors()判断,接着查看该方法:
    public final boolean hasQueuedPredecessors() {
        ...
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    
    该方法的作用是,判断当前线程是否位于同步队列中的第一个,如果是则返回true;即:通过同步队列来实现多个线程按照申请锁的顺序来获取锁;而非公平锁则直接尝试获取锁;

1.5 可重入锁

  1. 可重入锁也叫递归锁,指在同一线程,使用同一个锁对象时,外层方法获得锁后,再进入内层方法时,自动获得锁,不会因为外层方法没释放锁导致线程阻塞,当然可重入的次数是有上限的;
  2. synchronized是可重入锁,之前了解过,ART虚拟机,使用synchronized时,对象锁的可重入状态封装在c++代码LockWord类的成员变量uint32_t value_中,同一个线程多次获取对象锁时,这种情况说明是瘦锁状态,该value_字段的27-16位就保存了 Lock Count,即同一线程获取该对象锁的次数;
  3. ReentrantLock也是可重入锁,ReentrantLock内部通过Sync类实现所有的同步机制,而Sync继承自AbstractQueuedSynchronizer(AQS,后面介绍),AbstractQueuedSynchronizer中state变量用来记录锁被获取的次数(state是一个被volatile修饰的int类型),通过公平锁FairSync.lock()方法逐步分析,代码如下:
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
    
        // 如果创建的是公平锁,那ReentrantLock.lock()方法最终会调到这里
        final void lock() {
            // Sync继承自AQS,该方法定义在AQS中
            acquire(1);
        }
    
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 介绍公平锁/非公平锁的时候知道,区别就在这里,获得锁的顺序是否按照申请锁的顺序,由一个队列维护先后顺序
                ...
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
    
    接着看AQS.acquire(int)方法:
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    简单理解就是:
    当前线程获得锁之前,调用lock.lock()时,会走到AQS.tryAcquire()中,具体实现在子类,但都是先拿到state进行判断,如果state==0,说明该锁没有被任何一个线程持有,则让当前线程安全的(通过CAS操作保证获取锁的过程是原子的)拿到该锁;
    如果state!=0,则说明该锁已经被某个线程持有,则继续判断持有锁的线程是否是当前线程;
    如果是当前线程,则state++,说明当前线程再一次拿到该锁;
    如果不是当前线程,则回到AQS.acquire()方法中,执行acquireQueued(),将本次获取锁的请求放入队列,本次获取锁的操作失败,走到后续的失败处理流程中,后续介绍;

1.6 独享锁 VS 共享锁

  • 独享锁和共享锁同样也是一种概念;

  • 是对可重入锁的进一步优化,因为单纯的可重入锁ReentrantLock,state状态记录的是包括读和写,即"读,写"都需要加锁,效率还是比较低,所以引入"独享锁(排他锁)和共享锁";

  • 独享锁也叫排他锁,是指该锁一次只能被一个线程所持有。如果线程T对共享数据A加上排它锁后,则其他线程不能再对A加任何类型的锁。获得排它锁的线程即能读数据又能修改数据。synchronized和Lock接口的实现类都是独享锁;

  • 共享锁是指该锁可被多个线程所持有。如果线程T对数据A加上共享锁后,则其他线程只能对A再加共享锁,不能加排它锁。获得共享锁的线程只能读数据,不能修改数据。

  • 具体的实现类是java.util.concurrent.locks.ReentrantReadWriteLock,实现了ReadWriteLock接口

    Java多线程同步-锁,第3张
    Java独享锁共享锁.png
  • 内部有两把锁ReadLock和WriteLock分别标记读和写,都是Sync的子类,Sync是AQS(AbstractQueuedSynchronizer)的子类,state就保存在AQS中;读锁是共享锁,写锁是独享锁。读锁可保证并发读非常高效,而读写、写读、写写的过程互斥,因为读锁和写锁是分离的。所以ReentrantReadWriteLock的并发性相比一般的互斥锁有了很大提升;

  • 其原理简单概括:将32位int类型的变量state分为【高16位】和【低16位】,分别存储读锁个数和写锁个数,先看写锁的代码tryAcquire():

    // 写锁代码
    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState(); // 取到当前ReentrantReadWriteLock锁的state
        int w = exclusiveCount(c); // 取写锁的个数w,即独享锁个数
        if (c != 0) {       // 如果已经有线程持有了锁(c!=0)
            // 如果写线程数(w)为0(换言之仅存在读锁) 或者持有锁的线程不是当前线程就返回失败
            // 当已经存在某个线程拿到读锁时,不允许其他线程再获得写锁,否则无法保证数据对所有线程都是最新的,所以此处直接返回false,表示获取写锁失败
            if (w == 0 || current != getExclusiveOwnerThread()) 
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)    // 如果写锁的重入数大于最大数(65535,2的16次方-1)就抛出一个Error。
                throw new Error("Maximum lock count exceeded");
            // 成功拿到写锁!
            setState(c + acquires);
            return true;
        }
        // 如果当且写线程数为0,并且当前线程需要阻塞(该方法只用于公平锁,即:因为最终调用的是hasQueuedPredecessors(),只有在等待队列的线程,才会被阻塞),则获取写锁失败;或者如果通过CAS增加写线程数失败也返回失败
        if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) 
            return false;
        // 如果c=0,w=0或者c>0,w>0(重入),则设置当前线程或锁的拥有者,获得写锁成功
        setExclusiveOwnerThread(current); 
        return true;
    }
    

    接着看读锁的代码:

    // 读锁代码
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        // 如果其他线程已经获取了写锁,则当前线程获取读锁失败,并进入等待状态
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        // 拿到读锁个数
        int r = sharedCount(c);
        // readerShouldBlock()方法依然调用的是hasQueuedPredecessors()
        // 执行compareAndSetState()方法,更新state的值
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            ...
            return 1;
        }
        return fullTryAcquireShared(current);
    }
    
  • 总结:

    • 对于ReentrantReadWriteLock,获得读锁和写锁的操作是互斥的,可能会造成线程阻塞;而获得读锁和读锁的操作是共享的,不会造成线程阻塞;
    • 对于ReentrantLock,不论创建的是公平锁还是非公平锁,不论是读操作还是写操作,添加的都是独享锁;
    • 所以ReentrantReadWriteLock在"读读操作"时的效率比ReentrantLock高;

2. AbstractQueuedSynchronizer原理

2.1 介绍

  • AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架
  • Java中的大部分同步类(Lock接口、ReadWriteLock接口、Semaphore、CountDownLatch等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的;
  • 即:Java层又实现的一套同步锁框架,使用一个阻塞队列(双向链表),解决多线程并发问题;只不过AQS通过CAS无锁算法和自旋,可以让多个线程在尽可能不暂停线程的情况下,达到抢占锁的目的;
  • 本节将从ReentrantLock的实现分析AQS;

2.2 Lock的使用方法

介绍AQS原理前补一下ReentrantLock的基本使用方法,它比synchronized更加灵活,上锁和解锁的逻辑被开发者控制:

 Lock mLock;
 
 public void testReentrantLock() {
     mLock = new ReentrantLock(true);
     
     try {
         if (mLock.tryLock(1000, TimeUnit.MILLISECONDS)) {
             // Do somethings ...
         } else {
             // Other things ...
         }
     } catch (Throwable e) {
         e.printStackTrace();
     } finally {
         mLock.unlock();
     }
 }

2.3 AQS的实现原理

  • 从最主要的ReentrantLock.lock()加锁方法着手分析,以NonfairSync实现为例,代码如下:
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    
    • 如果对state CAS成功,则当前线程直接获得锁成功;
    • 如果对state CAS失败,则进入acquire();方法进行后续处理;
  • 获取锁成功的逻辑,在上面已经介绍过,不同类型的锁有不同的实现;而对于获取锁失败后,肯定存在某种排队等待的机制,让线程继续拥有获得锁的机会,而不是直接结束流程;从上面代码可以知道,需要看下acquire()方法的实现逻辑,而该方法定义在java.util.concurrent.locks.AbstractQueuedSynchronizer中,所以下面分析一下AQS的原理;
  • AQS的整体结构如下图所示:
    Java多线程同步-锁,第4张
    AQS框架.png
  • AQS核心思想:如果共享资源没有被占用,则将使用共享资源的线程设置为当前线程;如果共享资源被占用,则通过阻塞、等待唤醒的机制,确保锁可以正常的被分配,线程仍然有机会获得锁;
  • 这种阻塞等待唤醒机制是依赖CLH(一种单链表)变体实现的,AQS中的队列是CLH变体的虚拟双向队列(FIFO),通过将每条请求共享资源的线程封装成一个节点Node来实现锁的分配,而每个Node都是去操作AQS的volatile成员变量state实现同步的;
  • 继续回到acquire()方法代码:
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    tryAcquire()方法是尝试获得锁和处理重入问题,如果失败则会继续执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,该步骤内部就是将当前线程信息封装成Node对象,加入等待队列中,接着将该线程park;具体来说,是将新Node添加到双向链表的尾部,并将头节点指向刚创建的Node对象,接着看acquireQueued()方法;
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 开始自旋,对整条链表,从后往前遍历
            for (;;) {
                // 先拿到当前节点的【前驱节点】
                final Node p = node.predecessor();
                // 如果正好是头节点,则说明当前节点是等待队列的头部,轮到当前节点再次尝试拿锁
                if (p == head && tryAcquire(arg)) {
                    // 如果成功拿到锁,则指针前移,将node的前驱节点置null,即让prev出队
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 并且不需要让当前线程中断,因为成功拿到锁了,所以此处是唯一跳出循环的地方
                    return interrupted;
                }
                // 然后根据前驱节点,判断当前节点是否需要阻塞
                // 如果需要则通过parkAndCheckInterrupt()将当前线程阻塞,防止死循环浪费CPU资源
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    这就是获取锁时的阻塞等待机制的实现,至于被阻塞的线程什么时候被唤醒,需要了解一下解锁的过程;
  • ReentrantLock的解锁过程不区分公平锁和非公平锁,查看java.util.concurrent.locks.ReentrantLock.unlock()函数:
    public void unlock() {
        // 最终还是调用了AQS.release(int)函数
        sync.release(1);
    }
    
  • 到java.util.concurrent.locks.AbstractQueuedSynchronizer:
    public final boolean release(int arg) {
        // 已获取锁的线程执行完同步代码块后,尝试释放锁,即将当前锁的成员变量exclusiveOwnerThread置空,并通过CAS操作安全的更新AQS.state的值
        if (tryRelease(arg)) {
            // 如果lock没有被任何线程占用,则通过unparkSuccessor()函数将当前线程Node的后继节点(因为是一条链表,此时后继节点一定是被阻塞的)唤醒
            // 【注意】:为什么是当前阻塞队列head的后继节点呢?因为每个阻塞队列的head是一个Dummy(哑元)节点,是个占位节点,他的next才是阻塞队列的首个节点
            // unparkSuccessor()内部通过LockSupport.unpark(thread)将后继节点线程唤醒
            // 唤醒节点之前也会把当前线程节点状态置成初始状态0,方便后续操作将该节点从链表中移除
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

2.3.1 wait/notify和await/singal和park/unpark

  • obj.wait()/notify():依托于synchronized,即底层依赖Monitor实现线程阻塞和唤醒,和synchronized配合使用;
    public void test() {
        Object o = new Object();
        synchronized (obj) {
            try {
                obj.wait();
            } cache (...) {
            }
        }
    }
    
  • ReentrantLock.ConditionObject.await()/signal():是条件变量的方法,底层还是基于park()/unpark();;即:ConditionObject内部本质就是一个双向队列(但只用了nextWaiter);
  • LockSupport.park(thread)/unpark(thread):精准的暂停或唤醒某一个thread对象,而上面的obj.notify();是随机唤醒一个thread;底层设计基于"许可",C层使用volatile int _counter保存这个许可;(以上是JDK的实现);

2.4 AQS的应用

AQS是Java中自定义线程同步锁的基础框架,在此基础上可以自定义各种同步工具:

  • ReentrantLock
  • ReentrantReadWriteLock
  • Semaphore:
    限量锁,在构造方法中设置permits最大允许多少个线程可访问一段代码,如果访问的线程超过permit,则剩下的thread等待;场景类似厕所蹲坑;
  • CountDownLatch:
    可以在构造函数中,设置门闩个数,当门闩个数为0,才会走之后的逻辑;场景类似LOL,等10个人都加载完毕,游戏才能开始;
  • ThreadPoolExecutor

2.5 总结

JDK 1.6中引入了多种多线程同步工具

  • AtomicXXX原子类,是通过CAS无锁算法,在一定程度上实现同步的,而CAS算法的实现,是通过Unsafe魔法类保证"比较+更新"操作的原子性的;
  • Lock接口、ReadWriteLock接口的实现类等,是基于AQS实现同步,AQS引入双向链表队列结构,将"申请锁"的操作封装并放入等待队列中,实现线程排队获取锁的机制,此外AQS的底层也是通过CAS算法保证操作的原子性;
  • AQS本质:原子变量(int state)+队列(双向链表)+ park/unpark阻塞唤醒线程,用上面3要素来搭建一个Java层的锁框架,真正实现还是各种和业务场景相关的子类;

参考

Java核心技术 卷I
不可不说的Java“锁”事
Java CAS 原理剖析
Java魔法类:Unsafe应用解析
从ReentrantLock的实现看AQS的原理及应用


https://www.xamrdz.com/lan/53n1995269.html

相关文章: