AQS:AbstractQueuedSynchronizer
1 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
2 利用了int类型表示状态
3 使用方法是继承
4 子类通过继承并通过实现它的方法管理器状态{acquire和release}的方法操纵状态
5 可以同时实现排它锁和共享锁模式(独占、共享)
AQS同步组件
1 CountDownLatch:闭锁,通过计数来保证线程是否需要一直阻塞
2 Semaphore:控制同一时间并发线程的数目
3 CyclicBarrier:和CountDownLatch相似,都能阻阻塞线程
4 ReentrantLock
5 Condition
6 FutureTask
CountDownLatch
CountDownLatch是一个同步辅助类,应用场景:并行运算,所有线程执行完毕才可执行
package com.ubtechinc.cruzr.lib;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Copyright (C) 2020, UBTECH Robotics
* Description
*
* @author jianlin.duan
* 2020/7/31, Create file
*/
public class CountDownLatchExample {
private final static int TREAD_COUNT = 10;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
CountDownLatch countDownLatch = new CountDownLatch(TREAD_COUNT);
for (int i=0;i<TREAD_COUNT;i++) {
final int latchNum = i;
executorService.execute(()->{
try {
Thread.sleep(100);
System.out.println("latchNum="+latchNum);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
});
}
//此处一直等到子线程的10次countDown都执行完了,才会被唤醒继续
countDownLatch.await();
System.out.println("finish");
}
}
latchNum=8
latchNum=0
latchNum=5
latchNum=4
latchNum=2
latchNum=9
latchNum=3
latchNum=7
latchNum=6
latchNum=1
finish
Semaphore
Semaphore可以很容易控制某个资源可悲同时访问的线程个数,和CountDownLatch使用有些类似,提供acquire和release两个方法,acquire是获取一个许可,如果没有就等待,release是在操作完成后释放许可出来。Semaphore维护了当前访问的线程的个数,提供同步机制来控制同时访问的个数,Semaphore可以实现有限大小的链表,重入锁(如ReentrantLock)也可以实现这个功能,但是实现上比较复杂。
Semaphore使用场景:适用于仅能提供有限资源,如数据库连接数
package com.ubtechinc.cruzr.lib;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* Copyright (C) 2020, UBTECH Robotics
* Description
*
* @author jianlin.duan
* 2020/7/31, Create file
*/
public class SemaphoreExample {
private final static int TREAD_COUNT = 20;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
Semaphore semaphore = new Semaphore(3);
for (int i=0;i<TREAD_COUNT;i++) {
final int latchNum = i;
executorService.execute(()->{
try {
//每5s内只能执行3个子线程
semaphore.acquire();
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd mm:ss");
System.out.println(sdf.format(new Date())+" latchNum="+latchNum);
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
});
}
}
}
第一个5s
2020-07-31 34:37 latchNum=1
2020-07-31 34:37 latchNum=2
2020-07-31 34:37 latchNum=0
第二个5s
2020-07-31 34:42 latchNum=5
2020-07-31 34:42 latchNum=3
2020-07-31 34:42 latchNum=4
...
2020-07-31 34:47 latchNum=8
2020-07-31 34:47 latchNum=6
2020-07-31 34:47 latchNum=7
2020-07-31 34:52 latchNum=9
2020-07-31 34:52 latchNum=12
2020-07-31 34:52 latchNum=10
2020-07-31 34:57 latchNum=13
2020-07-31 34:57 latchNum=11
2020-07-31 34:57 latchNum=14
2020-07-31 35:02 latchNum=16
2020-07-31 35:02 latchNum=17
2020-07-31 35:02 latchNum=15
2020-07-31 35:07 latchNum=18
2020-07-31 35:07 latchNum=19
CyclicBarrier
与CountDownLatch相似,都是通过计数器实现,当某个线程调用await方法,该线程就进入等待状态,且计数器进行加1操作,当计数器的值达到设置的初始值,进入await等待的线程会被唤醒,继续执行他们后续的操作。由于CyclicBarrier在释放等待线程后可以重用,所以又称循环屏障。使用场景和CountDownLatch相似,可用于并发运算。
CyclicBarrier和CountDownLatch区别:
1 CountDownLatch计数器只能使用一次,CyclicBarrier的计数器可以使用reset方法重置循环使用
2 CountDownLatch主要是视线1个或n个线程需要等待其他线程完成某项操作才能继续往下执行,CyclicBarrier主要是实现多个线程之间相互等待知道所有线程都满足了条件之后才能继续执行后续的操作,CyclicBarrier能处理更复杂的场景
package com.ubtechinc.cruzr.lib;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* Copyright (C) 2020, UBTECH Robotics
* Description
*
* @author jianlin.duan
* 2020/7/31, Create file
*/
public class CyclicBarrierExample {
private final static int TREAD_COUNT = 20;
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
ExecutorService executorService = Executors.newCachedThreadPool();
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd mm:ss");
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
for (int i=0;i<9;i++) {
final int latchNum = i;
executorService.execute(()->{
try {
//等10个线程都跑到此处,才唤醒继续执行
System.out.println(sdf.format(new Date())+" latchNum="+latchNum);
cyclicBarrier.await();
System.out.println(sdf.format(new Date())+" latchNum="+latchNum);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
Thread.sleep(5000);
cyclicBarrier.await();
System.out.println(sdf.format(new Date())+" finish");
}
}
2020-07-31 39:21 latchNum=1
2020-07-31 39:21 latchNum=4
2020-07-31 39:21 latchNum=8
2020-07-31 39:21 latchNum=7
2020-07-31 39:21 latchNum=2
2020-07-31 39:21 latchNum=6
2020-07-31 39:21 latchNum=3
2020-07-31 39:21 latchNum=5
2020-07-31 39:21 latchNum=0
前9个线程执行到等待位置
2020-07-31 39:26 finish
第十个线程也执行到等待位置,且执行完毕,此时10条线程一起继续执行
2020-07-31 39:26 latchNum=1
2020-07-31 39:26 latchNum=8
2020-07-31 39:26 latchNum=7
2020-07-31 39:26 latchNum=6
2020-07-31 39:26 latchNum=2
2020-07-31 39:26 latchNum=4
2020-07-31 39:26 latchNum=0
2020-07-31 39:26 latchNum=5
2020-07-31 39:26 latchNum=3
Condition
多线程建协调通信的工具类
package com.ubtechinc.cruzr.lib;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Copyright (C) 2020, UBTECH Robotics
* Description
*
* @author jianlin.duan
* 2020/7/31, Create file
*/
public class ConditionExample {
public static void main(String[] args) {
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd mm:ss");
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
new Thread(()->{
reentrantLock.lock();
try {
condition.await();
System.out.println(sdf.format(new Date())+" recevied signal");
} catch (InterruptedException e) {
e.printStackTrace();
}
reentrantLock.unlock();
}).start();
new Thread(()->{
reentrantLock.lock();
try {
System.out.println(sdf.format(new Date())+" getlock");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(sdf.format(new Date())+" send signal");
condition.signalAll();
reentrantLock.unlock();
}).start();
}
}
2020-07-31 55:16 await signal
2020-07-31 55:16 getlock
2020-07-31 55:19 send signal
2020-07-31 55:19 recevied signal
本文参考https://blog.csdn.net/GavinZhera/article/details/86471828