1. 并发编程基础
1.1 什么是线程
线程是进程中的一个实体,线程本身是不会独立存在的。进程是代码在数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,线程则是进程的一个执行路径,一个进程中至少有一个线程,进程中的多个线程共享进程的资源
操作系统在分配资源时是把资源分配给进程的,但是 CPU 资源比较特殊,它是被分配到线程的,因为要真正占用 CPU 运行的是线程,所以也说线程是 CPU 分配的基本单位
多个线程共享进程的堆和方法区资源,但每个线程有自己的程序计数器和栈区域
- 程序计数器是一块内存区域,用来记录线程当前要执行的指令地址
- 需要注意的是,如果执行的是 native 方法,那么 pc 计数器记录的是 undefined 地址,只有执行的是 Java 代码时 pc 计数器记录的才是下一条指令的地址
- 每个线程都有自己的栈资源,用于存储该线程的局部变量,这些局部变量是该线程私有的,其他线程是访问不了的,除此之外栈还用来存放线程的调用栈帧
- 堆是一个进程中最大的一块内存,堆是被进程中的所有线程共享的,是进程创建时分配的,堆里面主要存放使用 new 操作创建的对象实例
- 方法区则用来存放 JVM 加载的类、常量及静态变量等信息,也是线程共享的
1.2 线程三种创建方式的优缺点
Java 中有三种线程创建方式,分别为实现 Runnable 接口的 run 方法,继承 Thread 类并重写 run 方法,以及使用 FutureTask 方式
使用继承方式的好处是方便传参,可以在子类里添加成员变量,通过 set 方法设置参数或者通过构造函数进行传递,而如果使用 Runnable 方式,则只能使用主线程里面被声明为 final 的变量。不好的地方是 Java 不支持多继承,而如果继承了 Thread 类,那么子类不能再继承其他类,而 Runnable 则没有这个限制。前两种方式都没办法拿到任务执行的返回结果,但是 FutureTask 方式可以。
1.3 线程通知与等待
Java 中的 Object 类是所有类的父类,鉴于继承机制,Java 把所有类都需要的方法放到了 Object 类里面,其中就包含通知与等待系列的函数
wait() / wait(long timeout)
当一个线程调用一个共享变量的 wait() 方法时,该调用线程会被阻塞挂起,直到发生下面几件事情之一才返回:
1.其他线程调用了该共享对象的 notify() 或者 notifyAll() 方法
2.其他线程调用了该线程的 interrupt() 方法,该线程抛出 InterruptedException 异常返回
3.如果带有超时参数,没有在指定时间的 timeout ms 时间内被其他线程调用该共享变量的 notify() 或者 notifyAll() 方法唤醒,那么该函数还是会因为超时而返回
4.不加参数的 wait() 方法内部就是调用了 wait(0)
当线程调用共享对象的 wait() 方法时,当前线程只会释放当前共享对象的锁,当前线程持有的其他共享对象的监视器锁并不会被释放
虚假唤醒
一个线程可以从挂起状态变为可以运行状态(也就是被唤醒),即使该线程没有被其他线程调用 notify()、notifyAll() 方法进行通知,或者被中断,或者等待超时,这就是所谓的虚假唤醒 。
虚假唤醒在应用实践中很少发生,但要防患于未然,做法就是不停的测试该线程被唤醒的条件是否满足,不满足则继续等待,也就是说在一个循环中调用 wait() 方法进行防范。退出循环的条件是满足了唤醒该线程的条件。
synchronized (obj) {
while (条件不满足) {
obj.wait();
}
}
notify()
一个线程调用共享对象的 notify() 方法后,会唤醒一个在该共享变量上调用 wait 系列方法后被挂起的线程。一个共享变量上可能会有多个线程在等待,具体唤醒哪个等待的线程是随机的。这个被唤醒的线程还需要和其他线程一起竞争该锁,只有该线程竞争到了共享变量的监视器锁后才可以继续执行
notifyAll()
notifyAll() 方法会唤醒所有在该共享变量上由于调用 wait 系列方法而被挂起的线程
1.4 等待线程执行终止的 join 方法
Thread 类中的 join 方法可以用来等待多个线程全部加载完毕再汇总处理
线程 A 调用线程 B 的 join 方法后会被阻塞,当其他线程调用了线程 A 的 interrupt() 方法中断了线程 A 时,线程 A 会抛出 InterruptedException 异常而返回
1.5 让线程睡眠的 sleep 方法
Thread 类有一个静态的 sleep() 方法,当一个执行中的线程调用了 Thread 的 sleep() 方法后,调用线程会暂时让出指定的执行权,也就是在这期间不参与 CPU 的调度,但是该线程所拥有的监视器资源,比如锁还是持有不让出的。指定的睡眠时间到了后该函数会正常返回,线程就处于就绪状态,然后参与 CPU 的调度,获取到 CPU 的资源后就可以运行了。
1.6 让出 CPU 执行权的 yield 方法
当一个线程调用了 Thread 类的静态方法 yield() 时,是在告诉线程调度器自己占有的时间片中还没有使用完的部分自己不想使用了,这暗示线程调度器现在就可以进行下一轮的线程调度
sleep() 和 yield() 方法的区别在于,当线程调用 sleep() 方法时调用线程会被阻塞挂起指定的时间,在这期间线程调度器不会去调度该线程。而调用 yield() 方法时,线程只是让出自己剩余的时间片,并没有被阻塞挂起,而是处于就绪状态,线程调度器下一次调度时就有可能调度到当前线程执行。
1.7 线程中断
Java 中的线程中断是一种线程间的协作模式,通过设置线程的中断标志并不能直接终止该线程的执行,而是被中断的线程根据中断状态自行处理
- void interrupt() : 中断线程,当线程 A 运行时,线程 B 可以调用线程 A 的 interrupt() 方法来设置线程 A 的中断标志为 true 并立即返回。设置标志仅仅是设置标志,线程 A 实际并没有被中断,它会继续往下执行。
- boolean isInterrupted() : 检测当前线程是否被中断,如果是返回 true,否则返回 false
- boolean interrupted() : 检测当前线程是否被中断,如果是返回 true,否则返回 false,如果该方法发现当前线程被中断,则会清除中断标志,并且该方法是 static 方法,可以通过 Thread 类直接调用。
1.8 线程上下文切换
线程上下文切换时机有:当前线程的 CPU 时间片使用完处于就绪状态时,当前线程被其他线程中断时
1.9 线程死锁
什么是死锁
死锁是指两个或两个以上的线程在执行过程,因争夺资源而造成的互相等待的现象,在无外力作用的情况下,这些线程会一直等待而无法继续运行下去
产生死锁的条件。
死锁的产生必须具备以下四个条件:
- 互斥条件:指线程对已经获取到的资源进行排它性使用,即该资源同时只由一个线程占用。如果此时还有其他线程请求使用该资源,则请求者只能等待,直至占有资源的线程释放该资源
- 请求并持有条件:指一个线程已经持有了至少一个资源,但又提出了新的资源请求,而新资源已被其他线程占有,所以当前线程会被阻塞,但阻塞的同时并不释放自己已经获取的资源
- 不可剥夺条件:指线程获取到的资源在自己使用完之前不能被其他线程抢占,只有在自己使用完毕后才由自己释放该资源
- 环路等待条件:指在发生死锁时,必然存在一个线程一资源的环形链,即线程集合 {T0, T1, T2, ... , Tn} 中的 T0 正在等待一个 T1 占用的资源,T1 正在等待 T2 占用的资源,......Tn 正在等待已被 T0 占用的资源。
如何避免线程死锁
要想避免死锁,只需要破坏掉至少一个构造死锁的必要条件即可,但是目前只有 请求并持有 和 环路等待 条件是可以被破坏的
资源的有序分配会避免死锁,因为资源的有序性破坏了资源的请求并持有条件和环路等待条件,因此避免了死锁。
1.10 守护线程与用户线程
Java 中的线程分为两类,分别为 daemon 线程(守护线程)和 user 线程(用户线程)。在 JVM 启动时会调用 main 函数,main 函数所在的线程就是一个用户线程,而垃圾回收线程则是守护线程
守护线程和用户线程区别之一是当最后一个非守护线程结束时,JVM 会正常退出,而不管当前是否有守护线程,也就是说守护线程是否结束并不影响 JVM 的退出。言外之意,只要有一个用户线程还没结束,正常情况下 JVM 就不会退出
创建守护线程的的方式是,设置线程的 daemon 参数为 true 即可
总的来说,如果希望在主线程结束后 JVM 进程马上结束,那么在创建线程时可以将其设置为守护线程,如果希望在主线程结束后子线程继续工作,等子线程结束后再让 JVM 进程结束,那么就将子线程设置为用户线程
1.11 ThreadLocal
ThreadLocal 是 JDK 包提供的,它提供了线程本地变量,也就是如果你创建了一个 ThreadLocal 变量,那么访问这个变量的每个线程都会有这个变量的一个本地副本。当多个线程操作这个变量时,实际操作的是自己本地内存里面的变量,从而避免了线程安全问题。
ThreadLocal 是一个 HashMap 结构,其中 key 就是当前 ThreadLocal 的实例引用,value 是通过 set 方法传递的值。ThreadLocal 变量在父线程中被设置值后,在子线程中是获取不到的。
2. 并发编程的其他基础知识
2.1 为什么要进行多线程并发编程
多核 CPU 时代的到来打破了单核 CPU 对多线程效能的限制。多个 CPU 意味着每个线程可以使用自己的 CPU 运行,这减少了线程上下文切换的开销,但随着对应用系统性能和吞吐量要求的提高,出现了处理海量数据和请求的要求,这些都会高并发编程有着迫切的需求。
2.2 Java 中的线程安全问题
- 共享资源:就是说该资源被多个线程所持有或者说多个线程都可以去访问该资源
线程安全问题是指当多个线程同时读写一个共享资源并且没有任何同步措施时,导致出现脏数据或者其他不可预见的结果的问题
2.3 Java 中共享变量的内存可见性问题
当一个线程操作共享变量时,它首先从主内存复制共享变量到自己的工作内存,然后对工作内存里的变量进行处理,处理完后将变量值更新到主内存
假如线程 A 和线程 B 使用不同的 CPU 执行,此时由于 Cache 的存在,将会导致内存不可见问题
2.4 synchronized
2.4.1 synchronized 关键字介绍
synchronized 块是 Java 提供的一种原子性内置锁,Java 中的每个对象都可以把它当做一个同步锁来使用,这些 Java 内置的使用者看不到的锁被称为 内部锁,也叫做 监视器锁 。
内置锁是排它锁,也就是当一个线程获取这个锁后,其他线程必须等待该线程释放锁后才能获取该锁。
另外,由于 Java 中的线程是与操作系统中的原生线程一一对应的,所以当阻塞一个线程时,需要从用户态切换到内核态执行阻塞操作,这是很耗时的操作,而 synchronized 的使用就会导致上下文切换。
2.4.2 synchronized 的内存语义
进入 synchronized 块的内存语义是把在 synchronized 块内使用到的变量从线程的工作内存中清除,这样在 synchronized 块内使用到该变量时就不会从线程的工作内存中获取,而是直接从主内存中获取。退出 synchronized 块的内存语义是把在 synchronized 块内对共享变量的修改刷新到主内存。
除了可以解决共享变量内存可见性问题外,synchronized 经常被用来实现原子性操作。另外请注意,synchronized 关键字会引起线程上下文切换并带来线程调度开销。
2.5 volatile
对于解决内存可见性的问题,Java 还提供了一种弱形式的同步,也就是使用 volatile 关键字。该关键字可以确保对一个变量的更新对其他线程马上可见。当一个变量被声明为 volatile 时,线程在写入变量时不会把值缓存在寄存器或者其他地方,而是把值刷新回主内存。当其他线程读取该共享变量时,会从主内存重新获取最新值,而不是使用当前线程的工作内存中的值。
2.6 Java 中的原子性操作
所谓原子性操作,是指执行一系列操作时,这些操作要么全部执行,要么全部不执行,不存在只执行其中一部分的情况。
线程安全性:即内存可见性和原子性
2.7 Java 中的 CAS 操作
CAS 即 Compare and Swap,是 JDK 提供的非阻塞原子性操作,它通过硬件保证了比较 -- 更新操作的原子性。JDK 里面的 Unsafe 类提供了一系列的 compareAndSwap 方法。
// 比如说下面这个
boolean compareAndSwapLong(Object obj, long valueOffset, long expect, long update);
其中 compareAndSwap 的意思是比较并交换。
CAS 有四个操作数,分别为:对象内存位置、对象中的变量的偏移量、变量预期值和新的值。其操作含义是,如果对象 obj 中内存偏移量为 valueOffset 的变量值为 expect ,则使用新的值 update 替换旧的值 expect。这是处理器提供的一个原子性指令。
ABA 问题
CAS 操作有个经典的 ABA 问题。
ABA 问题的产生是因为变量的状态值产生了环形转换,就是变量的值可以从 A 到 B,然后再从 B 到 A 。如果变量的值只能朝着一个方向转换,比如 A 到 B,B 到 C,不构成环形,就不会存在问题。JDK 中的 AtomicStampedReference 类给每个变量的状态值都配备了一个时间戳,从而避免了 ABA 问题的产生。
2.8 Unsafe 类
- JDK 的 rt.jar 包中的 Unsafe 类提供了硬件级别的原子性操作,Unsafe 类中的方法都是 native 方法,它们使用 JNI 的方式访问本地 C++ 实现库
2.9 Java 指令重排序
Java 内存模型允许编译器和处理器对指令重排序以提高运行性能,并且只会对不存在数据依赖性的指令重排序。在单线程下重排序可以保证最终执行的结果与程序顺序执行的结果一致,但是在多线程下就会存在问题。
重排序在多线程下会导致非预期的程序执行结果,而使用 volatile 修饰变量就可以避免重排序和内存可见性问题。
写 volatile 变量时,可以确保 volatile 写之前的操作不会被编译器重排序到 volatile 写之后。读 volatile 变量时,可以确保 volatile 读之后的操作不会被编译器重排序到 volatile 读之前。
2.10 伪共享
2.10.1 什么是伪共享
为了解决主内存与 CPU 之间运行速度差的问题,会在 CPU 与主内存之间添加一级或多级高速缓冲器(Cache)。这个 Cache 一般是被集成到 CPU 内部的,所以也叫 CPU Cache 。
在 Cache 内部是按行存储的,其中每一行称为一个 Cache 行。Cache 行是 Cache 与主内存进行数据交换的单位。
由于存放到 Cache 行的是内存块而不是单个变量,所以可能会把多个变量存放到一个 Cache 行中。当多个线程同时修改一个缓存行里面的多个变量时,由于同时只能有一个线程操作缓存行,所以相比将每一个变量放到一个缓存行,性能会有所下降,这就是伪共享。
2.10.2 如何避免伪共享
在 JDK 8 之前一般都是通过字节填充的方式来避免该问题,也就是创建一个变量时使用填充字段填充该变量所在的缓存行,这就避免了将多个变量存放在同一个缓存行中。
JDK 8 提供了一个 sun.misc.Contented 注解,用来解决伪共享问题。在默认情况下,@Contented 注解只用于 Java 核心类,比如 rt 包下的类。如果用户类路径下的类需要使用这个注解,则需要添加 JVM 参数:-XX:-RestrictContented 。
总结来说,在多线程下访问同一个缓存行的多个变量时才会出现伪共享,在单线程下访问一个缓存行里面的多个变量反而会对程序运行起到加速作用
2.11 锁的概述
2.11.1 乐观锁与悲观锁
- 悲观锁是指对数据被外界修改持保守态度,认为数据很容易就会被其他线程修改,所以在数据被处理前先对数据进行加锁,并在整个数据处理过程中,使数据处于锁定状态
- 乐观锁是相对悲观锁来说的,它认为数据在一般情况下不会造成冲突,所以在访问记录前不会加排它锁,而是在进行数据提交更新时,才会对数据冲突与否进行检测
2.11.2 公平锁与非公平锁
根据线程获取锁的抢占机制,锁可以分为 公平锁 和 非公平锁
公平锁表示线程获取锁的顺序是按照线程请求锁的时间早晚来决定的,也就是最早请求锁的线程将最早获取到锁。
非公平锁则在运行时闯入,也就是先来不一定先得。
ReentrantLock 提供了公平和非公平锁的实现公平锁:ReentrantLock pairLock = new ReentrantLock(true)
非公平锁:ReentrantLock pairLock = new ReentrantLock(false) ,默认是非公平锁
在没有公平性需求的前提下尽量使用非公平锁,因为公平锁会带来性能开销
2.11.3 独占锁与共享锁
根据锁只能被单个线程持有还是能被多个线程共同持有,锁可以分为 独占锁 和 共享锁 。
独占锁保证任何时候都只有一个线程能得到锁,ReentrantLock 就是以独占方式实现的。共享锁则可以同时由多个线程持有,例如 ReadWriteLock 读写锁,它允许一个资源可以被多个线程同时进行读操作。
- 独占锁是一种悲观锁,由于每次访问资源都先加上互斥锁,这限制了并发性,因为读操作并不会影响数据的一致性,而独占锁只允许在同一时间由一个线程读取数据,其他线程必须等待当前线程释放锁才能进行读取
- 共享锁则是一种乐观锁,它放宽了加锁的条件,允许多个线程同时进行读操作
2.11.4 可重入锁
- 当一个线程要获取一个被其他线程持有的独占锁时,该线程会被阻塞,那么当一个线程再次获取它自己已经获取的锁时,如果不被阻塞,那么该锁就是可重入的。
synchronized 内部锁是可重入锁。
可重入锁的原理是在锁内部维护了一个线程标示,用来标示该锁目前被哪个线程占用,然后关联一个计数器,当计数器值为 0 时说明该锁没有被任何线程占用,当一个线程获取了该锁,计数器值会变为 1,这时其他线程再来获取锁时会发现锁的所有者不是自己而被阻塞挂起。但是当获取了该锁的线程再次获取锁时发现锁拥有者是自己,计数器值就 + 1,当释放锁后,计数器值 - 1。当计数器值为 0 时,锁里面的线程标示被重置为 null ,这时候被阻塞的线程会被唤醒来竞争获取该锁。
2.11.5 自旋锁
由于 Java 中的线程是与操作系统中的线程一一对应的,所以当一个线程在获取锁失败后,会被切换到用户态而被挂起。当该线程获取到锁时又需要将其切换到内核状态而唤醒该线程。而从用户状态切换到内核状态的开销是比较大的,在一定程度上会影响并发性能。
自旋锁则是,当前线程在获取锁时,如果发现锁已经被其他线程占有,它不马上阻塞自己,在不放弃 CPU 使用权的情况下,多次尝试获取(默认次数是 10,可以使用 -XX:PreBlockSpinsh 参数设置该值),很有可能在后面几次尝试中其他线程已经释放了锁。如果尝试指定的次数后仍没有获取到锁则当前线程才会被阻塞挂起。
由此看来自旋锁是使用 CPU 时间换取线程阻塞与调度的开销,但是很有可能这些 CPU 时间白白浪费了。
3. ThreadLocalRandom
3.1 Random 类及其局限性
每个 Random 实例里面都有一个原子性的种子变量用来记录当前的种子值,当要生成新的随机数时需要根据当前种子计算新的种子并更新会原子变量。当多线程下使用单个 Random 实例生成随机数时,当多个线程同时计算随机数来计算新的种子时,多个线程会竞争同一个原子变量的更新操作,由于原子变量的更新是 CAS 操作,同时只有一个线程会成功,所以会造成大量线程进行自旋重试,这会降低并发性能,所以 ThreadLocalRandom 应运而生。
3.2 ThreadLocalRandom
每个线程都维护一个种子变量,则每个线程生成随机数时都根据自己老的种子计算新的种子,并使用新种子更新老的种子,再根据新种子计算随机数,就不会存在竞争问题了,这会大大提高并发性能。
ThreadLocalRandom 使用 ThreadLocal 的原理,让每个线程都持有一个本地的种子变量,该种子变量只有在使用随机数时才会被初始化。在多线程下计算新种子时是根据自己线程内维护的种子变量进行更新,从而避免了竞争。
4. JUC 中的原子操作类
JUC 包提供了一系列的原子性操作类,这些类都是使用非阻塞算法 CAS 实现的,相比使用锁实现原子性操作这在性能上有很大提高。
4.1 AtomicLong
- AtomicLong 是原子性递增或递减类,其内部使用 Unsafe 来实现
因为 AtomicLong 类是在 rt.jar 包下面的,AtomicLong 类就是通过 BootStarp 类加载器进行加载的,所以其内部实现时可以直接通过 Unsafe.getUnsafe() 方法获取到 Unsafe 类的实例
在高并发情况下 AtomicLong 还会存在性能问题。JDK 8 提供了一个在高并发下性能更好的 LongAdder 类
使用 AtomicLong 时,在高并发下大量线程会同时去竞争更新同一个原子变量,但是由于同时只有一个线程的 CAS 操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试 CAS 的操作,而这会白白浪费 CPU 资源。
4.2 LongAdder
为了解决高并发下多线程对一个变量 CAS 争夺失败后进行自旋而造成的降低并发性能的问题,LongAdder 在内部维护多个 Cell 元素(一个动态 Cell 数组)来分担对单个变量进行争夺的开销,每个 Cell 里面有一个初始值为 0 的 long 型变量,这样,在同等并发量的情况下,争夺单个变量更新操作的线程量会减少。
另外,多个线程在争夺同一个 Cell 原子变量时如果失败了,它并不是在当前 Cell 变量上一直自旋 CAS 重试,而是尝试在其他 Cell 的变量上进行 CAS 尝试,这个改变增加了当前线程重试 CAS 成功的可能性。
最后,在获取 LongAdder 当前值时,是把所有 Cell 变量的 value 值累加后再加上 base 返回的。
由于 Cells 占用的内存是相对较大的,所以一开始并不创建它,而是在需要时创建,也就是 惰性加载 。
另外,数组元素 Cell 使用 @sun.misc.Contented 注解进行修饰,这避免了 Cells 数组内多个原子变量被放入同一个缓存行,也就是避免了 伪共享,这对性能也是一个提升。
LongAccumulator
LongAdder 类是 LongAccumulator 的一个特例,只是后者提供了更加强大的功能,可以让用户自定义规则。
5. CopyOnWriteArrayList
并发包中的并发 list 只有 CopyOnWriteArrayList,它是无界 list 。
CopyOnWriteArrayList 使用写时复制的策略来保证 list 的一致性,而 获取 - 修改 - 写入 三步操作并不是原子性的,所以在增删改的过程中都使用了独占锁,来保证在某个时间只有一个线程能对 list 数组进行修改。另外 CopyOnWriteArrayList 提供了弱一致性的迭代器,从而保证在获取迭代器后,其他线程对 list 的修改是不可见的,迭代器遍历的数组是一个快照。另外,CopyOnWriteArraySet 的底层就是使用它实现的。
6. JUC中锁原理
6.1 LockSupport
- LockSupport 是个工具类,它的主要作用是挂起和唤醒线程,该工具类是创建锁和其他同步类的基础。
- LockSupport 类与每个使用它的线程都会关联一个许可证,在默认情况下调用 LockSupport 类的方法的线程是不持有许可证的。LockSupport 是使用 Unsafe 类实现的。
6.1.1 void park()
- 如果调用 park 方法的线程已经拿到了与 LockSupport 关联的许可证,则调用 LockSupport.park() 时会马上返回,否则调用线程会被禁止参与线程的调度,也就是会被阻塞挂起。
6.1.2 void unpark()
- 当一个线程调用 unpark 时,如果参数 thread 线程没有持有 thread 与 LockSupport 类关联的许可证,则让 thread 线程持有。
- 如果 thread 之前因调用 park() 而被挂起,则调用 unpark() 后,该线程会被唤醒。
- 如果 thread 之前没有调用 park(),则调用 unpark 方法后,再调用 park 方法,会立刻返回。
6.1.3 其他方法
1.void parkNanos(long nanos)
2.park(Object blocker)
3.void parkNanos(Object blocker, long nanos)
4.void parkUntil(Object blocker, long deadline)
6.2 AQS
- AbstractQueuedSynchronizer 抽象同步队列简称 AQS,它是实现同步器的基础组件,并发包中锁的底层就是使用 AQS 实现
- AQS 是一个 FIFO 的双向队列,其内部通过节点 head 和 tail 记录队首和队尾元素,队列元素的类型为 Node。其中 Node 中的 thread 变量用来存放进入 AQS 队列里的线程
- 在 AQS 中维持了一个单一的状态信息 state,可以通过 getState、setState、compareAndSetState 函数修改其值。
- AQS 有个内部类 ConditionObject,用来结合锁实现线程同步。
- 对于 AQS 来说,线程同步的关键是对状态值 state 进行操作。
6.2.1 条件变量的支持
notify 和 wait ,是配合 synchronized 内置锁实现线程间同步的基础设施一样,条件变量的 signal 和 await 方法也是用来配合锁(使用 AQS 实现的锁)实现线程间同步的基础设施。
它们的不同在于,synchronized 同时只能与一个共享变量的 notify 或 wait 方法实现同步,而 AQS 的一个锁可以对应多个条件变量。
ReentrantLock lock = new ReentrantLock();
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();
lock.newCondition() 的作用其实是 new 了一个在 AQS 内部声明的 ConditionObject 对象,ConditionObject 是 AQS 的内部类,可以访问 AQS 内部的变量(例如状态变量 state)和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的 await() 方法时被阻塞的线程。注意这个条件队列和 AQS 队列不是一回事。
注意不要混淆 AQS 阻塞队列与条件变量队列:
- 当多个线程同时调用 lock.lock() 方法获取锁时,只有一个线程获取到了锁,其他线程会被转换为 Node 节点插入到 lock 锁对应的 AQS 阻塞队列里面,并做自旋 CAS 尝试获取锁。
- 如果获取到锁的线程又调用了对应的条件变量的 await() 方法,则该线程会释放获取到的锁,并被转换为 Node 节点插入到条件变量对应的条件队列里面。
- 这时候因为调用 lock.lock() 方法被阻塞到 AQS 队列里面的一个线程会获取到被释放的锁,如果该线程也调用了条件变量的 await() 方法则该线程也会被放入条件变量的条件队列里面。
- 当另外一个线程调用条件变量的 signal() 或者 signalAll() 方法时,会把条件队列里面的一个或者全部 Node 节点移动到 AQS 的阻塞队列里面,等待时机获取锁。
也就是说,一个锁对应一个 AQS 阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列。
6.3 独占锁 ReentrantLock
- ReentrantLock 是可重入的独占锁,同时只能有一个线程可以获取该锁,其他获取该锁的线程会被阻塞而被放入该锁的 AQS 阻塞队列里面。
6.3.1 获取锁
void lock()
- 调用该方法时,如果锁当前没有被其他线程占用并且当前线程之前没有获取过该锁,则当前线程会获取到该锁,然后设置当前锁的拥有者为当前线程,并设置 AQS 的状态值 state 为 1,然后直接返回。如果当前线程之前已经获取过该锁,则这次只是简单的把 AQS 的状态值加 1 后返回。如果该锁已经被其他线程持有,则调用该方法的线程会被放入 AQS 队列后阻塞挂起。
当然还有其他的获取锁的方法
- void lockInterruptibly():对中断进行响应
- boolean tryLock():尝试获取锁,如果当前该锁没有被其他线程持有,则当前线程获取该锁并返回 true,否则返回 false。注意,该方法不会引起当前线程阻塞。
6.3.2 释放锁
void unlock()
- 尝试释放锁,如果当前线程持有该锁,则调用该方法会让线程对该线程持有的 AQS 状态值减 1,如果减去 1 后当前状态值为 0 ,则当前线程会释放该锁,否则仅仅减 1 而已。如果当前线程没有持有该锁而调用了该方法则会抛出 IllegalMonitorStateException 异常。
总的来说,ReentrantLock 的底层是使用 AQS 实现的可重入独占锁。在这里 AQS 状态值为 0 表示当前锁空闲,为大于等于 1 的值则说明该锁已经被占用。该锁内部有公平与非公平实现,默认情况下是非公平的实现。
6.4 读写锁 ReentrantReadWriteLock
ReentrantReadWriteLock 的底层是使用 AQS 实现的。ReentrantReadWriteLock 巧妙的使用 AQS 的状态值的高 16 位表示获取到读锁的个数,低 16 位表示获取写锁的线程的可重入次数,并通过 CAS 对其进行操作实现了读写分离,这在读多写少的场景下比较适用。
6.5 StampedLock
StampedLock 是并发包里面 JDK8 版本新增的一个类,该锁提供了三种模式的读写控制,当调用获取锁系列函数时,会返回一个 long 型的变量,我们称之为 戳记(stamp),这个戳记代表了锁的状态。其中 try 系列获取锁的函数,当获取锁失败后会返回为 0 的 stamp值。当调用释放锁和转换锁的方法时需要传入获取锁时返回的 stamp 值。
StampedLock 提供的三种读写模式的锁:
- 写锁 writeLock:独占锁,不可重入
- 悲观读锁 readLock:共享锁,不可重入
- 乐观读锁 tryOptimisticRead:只是使用位操作进行检验,不涉及 CAS 操作,所以效率会高很多
StampedLock 提供的读写锁与 ReentrantReadWriteLock 类似,只是前者提供的是不可重入锁。但是前者通过提供乐观读锁在多线程多读的情况下提供了更好的性能,这是因为获取乐观读锁时不需要进行 CAS 操作设置锁的状态,而只是简单的测试状态。
7. Java 并发包中的并发队列
7.1 ConcurrentLinkedQueue
-
ConcurrentLinkedQueue 是线程安全的无界非阻塞队列,其底层数据结构使用单向链表实现,对于入队和出队操作使用 CAS 来实现线程安全。
7.2 LinkedBlockingQueue
- LinkedBlockingQueue 也是使用单向链表实现的,其也有两个 Node ,分别用来存放首、尾节点,并且还有一个初始值为 0 的原子变量 count ,用来记录队列元素个数。
- 另外还有两个 ReentrantLock 的实例,分别用来控制元素入队和出队的原子性,其中 takeLock 用来控制同时只有一个线程可以从队列头获取元素,其他线程必须等待;putLock 控制同时只能有一个线程可以获取锁,在队列尾部添加元素,其他线程必须等待。
- 另外,notEmpty 和 notFull 是条件变量,它们内部都有一个条件队列用来存放进队和出队时被阻塞的线程,其实这是 生产者-消费者 模型。
-
LinkedBlockingQueue 默认队列容量为 0x7fffffff,用户也可以自己指定容量,所以从一定程度上可以说 LinkedBlockingQueue 是有界阻塞队列。
7.3 ArrayBlockingQueue
LinkedBlockingQueue 是基于有界链表方式实现的阻塞队列,而 ArrayBlockingQueue 是基于基于有界数组实现的阻塞队列。
ArrayBlockingQueue 的内部有一个数组 items ,用来存放队列元素,putIndex 变量表示入队元素下标,takeIndex 是出队下标,count 统计队列元素个数。另外,有个独占锁 lock 用来保证出、入队操作的原子性,这保证了同时只有一个线程可以进行入队、出队操作。另外,notEmpty、notFull 条件变量用来进行出、入队的同步。
ArrayBlockingQueue 是有界队列,所以构造函数必须传入队列大小参数。
7.4 PriorityBlockingQueue
PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素。其内部是使用平衡二叉树堆实现的,所以直接遍历队列元素不保证有序。
PriorityBlockingQueue 队列在内部使用二叉树堆维护元素优先级,使用数组作为元素存储的数据结构,这个数组是可扩容的。当当前元素个数 >= 最大容量时会通过 CAS 算法扩容,出队时始终保证出队的元素是堆树的根节点,而不是在队列里面停留时间最长的元素。使用元素的 compareTo 方法提供默认的元素优先级比较规则,用户可以自定义优先级的比较规则。
7.5 DelayQueue
DelayQueue 并发队列是一个无界阻塞延迟队列,队列中的每个元素都有个过期时间,当从队列获取元素时,只有过期元素才会出队列。队头元素是最快要过期的队列。
DelayQueue 内部使用 PriorityQueue 存放数据,使用 ReentrantLock 实现线程同步。另外队列里面的元素要实现 Delayed 接口,其中一个是获取当前元素到过期时间剩余时间的接口,在出队时判断元素是否过期了,一个是元素之间比较的接口,因为这是一个有优先级的队列。
8. ThreadPoolExecutor
8.1 介绍
线程池主要解决两个问题:一是当执行大量异步任务时线程池能够提供较好的性能。在不使用线程池时,每当需要执行异步任务时直接 new 一个线程来运行,而线程的创建和销毁是需要开销的。线程池里面的线程是可复用的,不需要每次执行异步任务时都重新创建和销毁线程。二是线程池提供了一种 资源限制 和 管理 的手段,比如可以限制线程的个数,动态新增线程等。每个 ThreadPoolExecutor 也保留了一些基本的统计数据,比如当前线程池完成的任务数目等。
另外,线程池也提供了许多可调参数和可扩展性接口,以满足不同情景的需要,程序员可以使用更方便的 Executors 的工厂方法,比如 newCachedThreadPool(线程池线程个数最多可达 Integer.MAX_VALUE,线程自动回收)、newFixedThreadPool(固定大小的线程池)和 newSingleThreadExecutor(单个线程)等来创建线程池,当然用户还可以自定义。
线程池参数
- corePoolSize:线程池核心线程个数。
- workQueue:用于保存等待执行的任务的阻塞队列,比如基于数组的有界 ArrayBlockingQueue、基于链表的无界 LinkedBlockingQueue、最多只有一个元素的同步队列 SynchronousQueue 及优先级队列 PriorityBlockingQueue 等。
- maximunPoolSize:线程池最大线程数量。
- ThreadFactory:创建线程的工厂。
- RejectedExecutionHandler:饱和策略,当队列满并且线程个数达到 maximunPoolSize 后采取的策略,比如 AbortPolicy(抛出异常)、CallerRunsPolicy(使用调用者所在线程来运行任务)、DiscardOldestPolicy(调用 poll 丢弃一个任务,执行当前任务)及 DiscardPolicy(默默丢弃,不抛出异常)。
- keepAliveTime:存活时间。如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态,则这些闲置的线程能存活的最大时间。
- TimeUnit:存活时间的时间单位。
线程池类型
- newFixedThreadPool:创建一个核心线程个数和最大线程个数都为 nThreads 的线程池,并且阻塞队列长度为 Integer.MAX_VALUE。keepAliveTime = 0 说明只要线程个数比核心线程个数多并且当前空闲则回收。
- newSingleThreadExecutor:创建一个核心线程个数和最大线程个数都为 1 的线程池,并且阻塞队列长度为 Integer.MAX_VALUE。keepAliveTime = 0 说明只要线程个数比核心线程个数多并且当前空闲则回收。
- newCachedThreadPool:创建一个按需创建线程的线程池,初始线程个数为 0 ,最多线程个数为 Integer.MAX_VALUE,并且阻塞队列为同步队列。keepAliveTime = 60 说明只要当前线程在 60s 内空闲则回收。这个类型的特殊之处在于,加入同步队列的任务会被马上执行,同步队列里面最多只有一个任务。
public void execute(Runnable command)
-
execute() 方法的作用是提交任务 command 到线程池进行执行
- 从图中可以看出,ThreadPoolExecutor 的实现实际上是一个生产消费模型,当用户添加任务到线程池时相当于生产者生产元素,worker 线程工作集中的线程直接执行任务或者从任务队列里面获取任务时则相当于消费者消费元素。
总结:线程池巧妙的使用一个 Integer 类型的原子变量来记录线程池状态和线程池中的线程个数。通过线程池状态来控制任务的执行,每个 Worker 线程可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销。
9. ScheduledThreadPoolExecutor
Executor 其实是个工具类,它提供了好多静态方法,可根据用户的选择返回不同的线程池实例。ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor 并实现了 ScheduledExecutorService 接口。线程池队列是 DelayedWorkQueue,其和 DelayedQueue 类似,是一个延迟队列。
9.3.1 schedule(Runnable command, long delay, TimeUnit unit)
该方法的作用是提交一个延迟执行的任务,任务从提交时间算起延迟单位为 unit 的 delay 时间后开始执行。提交的任务不是周期性任务,任务只会执行一次。
9.3.2 scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
fixed-delay 类型的任务的执行原理为,当添加一个任务到延迟队列后,等待 initialDelay 时间,任务就会过期,过期的任务就会被从队列移除,并执行。执行完毕后,会重新设置任务的延迟时间,然后再把任务放入延迟队列,循环往复。需要注意的是,如果一个任务在执行中抛出了异常,那么这个任务就结束了,但是不影响其他任务的执行。
9.3.3 scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
相对于 fixed-delay 任务来说,fixed-rate 方式执行规则为,时间为 initdelday + n * period 时启动任务,但是如果当前任务还没有执行完,下一次要执行的时间到了,则不会并发执行,下次要执行的任务会延迟执行,要等到当前任务执行完毕后再执行。
总结:其内部使用 DelayQueue 来存放具体任务。任务分为三种,其中一次性执行任务执行完毕就结束了,fixed-delay 任务保证同一个任务在多次执行之间间隔固定时间,fixed-rate 任务保证按照固定的频率执行。任务类型使用 period 的值来区分。
10. Java 并发包中线程同步器--线程协作
10.1 CountDownLatch
10.1.1 CountDownLatch 与 join 方法的区别
一个区别是,调用一个子线程的 join() 方法后,该线程会一直被阻塞直到子线程运行完毕,而 CountDownLatch 则使用计数器来允许子线程运行完毕或者在运行中递减计数,也就是 CountDownLatch 可以在子线程运行的任何时候让 await() 方法返回而不一定必须等到线程结束。
另外,使用线程池来管理线程时一般都是直接添加 Runnable 到线程池,这时候就没有办法再调用线程的 join() 方法了,就是说 CountDownLatch 相比 join() 方法让我们对线程的同步有更灵活的控制。
10.1.2 原理
CountDownLatch 是使用 AQS 实现的,使用 AQS 的状态变量来存放计数器的值。首先在初始化 CountDownLatch 时设置状态值(计数器值),当多个线程调用 countDown() 方法时实际是原子性递减 AQS 的状态值。当线程调用 await() 方法后当前线程会被放入 AQS 的阻塞队列等待计数器为 0 再返回。其他线程调用 countDown() 方法让计数器值递减 1,当计数器值变为 0 时,当前线程还要调用 AQS 的 doReleaseShared 方法来激活由于调用 await() 方法而被阻塞的线程。
10.2 回环屏障 CyclicBarrier
CountDownLatch 在解决多个线程同步方面相对于调用线程的 join() 方法已经有了不少优化,但是 CountDowmLatch 的计数器是一次性的,也就是等到计数器值变为 0 后,再调用 CountDownLatch 的 await() 和 countDown() 方法都会立刻返回,这就起不到线程同步的效果了。
所以,为了满足计数器可以重置的需要,JDK 开发组提供了 CyclicBarrier 类,并且 Cyclicbarrier 类的功能并不限于 CountDownLatch 的功能。
从字面意思理解,CyclicBarrier 是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。这里之所以叫做回环是因为当所有等待线程执行完毕,并重置 CyclicBarrier 的状态后它可以被重用。之所以叫做屏障是因为线程调用 await 方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了 await 方法后,线程们就会冲破屏障,继续向下运行。
- CyclicBarrier 与 CountDowmLatch 的不同在于,前者是可以复用的,并且前者特别适合分段任务有序执行的场景。
- CyclicBarrier 通过 ReentrantLock 实现计数器原子性更新,并使用条件变量队列来实现线程同步。
10.3 信号量 Semaphore
Semaphore 信号量也是 Java 中的一个同步器,与 CountDownLatch 和 CyclicBarrier 不同的是,它内部的计数器是递增的,并且在一开始初始化 Semphore 时可以指定一个初始值,但是并不需要知道需要同步的线程个数,而是在需要同步的地方调用 acquire() 方法时指定需要同步的线程个数。
Semaphore 完全可以达到 CountDownLatch 的效果,但是 Semaphore 的计数器是不可以自动重置的,不过通过变相的改变 aquire() 方法的参数还是可以实现 CyclicBarrier 的功能的。
Semaphore 也是使用 AQS 实现的,并且获取信号量时有公平策略和非公平策略之分。
11. 并发编程实践--一些注意事项
11.1 ArrayBlockingQueue
需要注意 put 、offer 方法的使用场景以及它们之间的区别,take 方法的使用,也需要注意使用 ArrayBlockingQueue 时需要设置合理的队列大小以避免 OOM,队列满或者剩余元素比较少时,要根据具体场景制定一些抛弃策略以避免队列满时业务线程被阻塞。
- put() 方法是阻塞的,也就是说如果当前队列满,则在调用 put 方法向队列放入一个元素时调用线程会被阻塞直到队列有空余空间。
- offer() 方法是非阻塞的,如果当前队列满,则会直接返回,也就是丢弃当前元素。
- pool() 方法是从队列头部获取并移除一个元素,如果队列为空则返回 null ,该方法是不阻塞的
- take() 方法是获取当前队列头部元素并从队列里面移除它。如果队列为空则阻塞当前线程直到队列不为空然后返回元素。
11.2 ConcurrentHashMap
put(K key, V value) 方法判断如果 key 已经存在,则使用 value 覆盖原来的值并返回原来的值,如果不存在则把 value 放入并返回 null。
而 putIfAbsent(K key, V value) 方法则是如果 key 已经存在则直接返回原来对应的值并不使用 value 覆盖,如果 key 不存在则放入 value 并返回 null ,另外要注意,判断 key 是否存在和放入是原子性操作。
11.3 SimpleDateFormat
多线程共用一个 SimpleDateFormat 实例对日期进行解析或格式化会导致程序出错,因为在内部实现中,其操作步骤不是原子性的,比如说重置日期对象属性值与使用解析好的属性性设置日期对象是两个步骤,所以在多线程环境下使用同一个 SimpleDateFormat 实例会导致程序错误。
那如何解决呢?
1.第一种方式:每次使用时都 new 一个 SimpleDateFormat 的实例,这样可以保证每个实例使用自己的 Calender 实例,但是每次使用都 new 一个对象,并且使用后由于没有其他引用,又需要回收,开销会很大。
2.第二种方式:出错的原因在于其内部实现中步骤不是一个原子性操作,我们可以使用 synchronized 进行同步,这意味着多个线程要竞争锁,在高并发场景下会导致系统响应性能下降。
3.第三种方式:使用 ThreadLocal,这样每个线程只需要使用一个 SimpleDateFormat 实例,这相比第一种方式大大节省了对象的创建销毁开销,并且不需要使用多个线程同步。但要注意,使用完线程变量后,要进行清理(remove()),以避免内存泄漏。
11.4 Timer
当一个 Timer 运行多个 TimerTask 时,只要其中一个 TimerTask 在执行中向 run 方法外抛出了异常,则其他任务也会自动终止。
ScheduledThreadPoolExecutor 是并发包提供的组件,其提供的功能包含但不限于 Timer。Timer 是固定的多线程生产单线程消费,但是 ScheduledThreadPoolExecutor 是可以配置的,既可以是多线程生产多线程消费也可以是多线程生产多线程消费,所以在日常开发中使用定时器功能时应该优先使用 ScheduledThreadPoolExecutor。
11.5 创建线程和线程池时要指定与业务相关的名称
在日常开发中,当在一个应用中需要创建多个线程或者线程池时最好给每个线程或线程池根据业务类型设置具体的名称,以便在出现问题时方便进行定位。
另外,在使用线程池的情况下当程序结束时一定要记得调用 shutdown() 关闭线程池
11.6 有关 FutureTask
在线程池中使用 FutureTask 时,当拒绝策略为 DiscardPolicy 和 DiscardOldestPolicy 时,在被拒绝的任务的 FutureTask 对象上调用 get() 方法会导致调用线程一直阻塞,所以在日常开发中尽量使用带超时参数的 get() 方法以避免线程一直阻塞。
11.7 有关ThreadLocal
在线程中使用完 ThreadLocal 变量后,要及时调用 remove() 方法以避免内存泄漏。
更多实践内容请参考我的文集:《J2SE-并发编程》