1.概述
在JAVA中,阻塞队列算起来共有7种(如下图所示)。
数组阻塞队列(ArrayBlockingQueue) :底层基于数组的有界阻塞队列,初始化时需要指定队列大小;
链表阻塞队列(LinkedBlockingQueue) :以链表来存储元素,理论上只要存储空间够大,就是无界的;
同步阻塞队列(SynchronousQueue):队列中不存储元素,队列中放入元素后,只有该元素被消费完成,才能重修放入元素;
优先级无界阻塞队列(PriorityBlockingQueue):底层基于数组的无界队列,支持队列内部按照指定元素排序;
链表阻塞双端队列(LinkedBlockingDeque):底层基于链表的有界双端阻塞队列;
延迟无界阻塞队列(DelayQueue):底层是基于数组的无界延迟队列,它是在PriorityQueue基础上实现的,先按延迟优先级排序,延迟时间短的排在队列前面;
链表阻塞队列与同步阻塞队列结合(LinkedTransferQueue):基于链表的无界阻塞队列;
[图片上传失败...(image-f46846-1689141594951)]
上述队列可根据不同场景选择不同的队列进行使用,本文主要分析日常开发中使用较多的数组阻塞队列。首先通过一些案例来介绍数组队列的使用方式,其次通过分析源码来探索队列的实现原理,最后是对本文的一些总结和思考。
2.实战演练
ArrayBlockingQueue中有三对存取api,如下表所示:
操作类型 | 抛出异常 | 阻塞线程 | 有返回值 | 超时返回 |
---|---|---|---|---|
存入 | add | put | offer | offer(E e, long timeout, TimeUnit unit) |
取出 | remove | take | poll | poll(long timeout, TimeUnit unit) |
分别阐述一下上述三组api,
抛出异常:指的是当存入元素时队列已满、取出元素时队列中为空时,均会抛出异常;
阻塞线程:指的是当存入元素时队列已满、取出元素时队列中为空时,线程会一直等待,直至获取到数据为止;
有返回值:指的是当存入元素或取出元素时,会返回特定的值(offer成功时会返回true,失败会返回false;poll成功时会返回队列中元素,失败时会返回null);
超时返回:指的是在指定时间内未能存入或取出元素时,会返回一个指定值(offer时会返回false,poll时会返回null)。
2.1 抛出异常
测试代码1:
@Slf4j
public class ArrayBlockQueueDemo1 {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
boolean add1 = arrayBlockingQueue.add(1);
log.info("add1:{}", add1);
arrayBlockingQueue.put(2);
boolean add3 = arrayBlockingQueue.offer(3);
log.info("add3:{}", add3);
log.info("arrayBlockingQueue:{}", arrayBlockingQueue.size());
boolean add4 = arrayBlockingQueue.add(4);
log.info("add4:{}", add4);
}
}
运行结果如下:
[图片上传失败...(image-99d774-1689141594951)]
测试代码2:
@Slf4j
public class ArrayBlockQueueDemo2 {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
boolean add1 = arrayBlockingQueue.add(1);
log.info("add1:{}", add1);
arrayBlockingQueue.put(2);
boolean add3 = arrayBlockingQueue.offer(3);
log.info("add3:{}", add3);
log.info("arrayBlockingQueue:{}", arrayBlockingQueue.size());
Integer remove1 = arrayBlockingQueue.remove();
log.info("remove1:{}", remove1);
Integer remove2 = arrayBlockingQueue.remove();
log.info("remove2:{}", remove2);
Integer remove3 = arrayBlockingQueue.remove();
log.info("remove3:{}", remove3);
Integer remove4 = arrayBlockingQueue.remove();
log.info("remove4:{}", remove4);
}
}
```
运行结果如下:
[图片上传失败...(image-31b224-1689141594951)]
由上述运行结果可知,当通过构造函数设置完队列的初始长度后,调用add方法来添加元素,添加成功时会返回true,添加元素个数大于队列长度时,会抛出Queue full的异常,表明队列已满,无法继续添加元素;同理,测试代码2通过remove方法来取出队列中元素,当移除元素个数大于队列中元素个数时,会抛出异常。
2.2 阻塞线程
测试代码1:
```
@Slf4j
public class ArrayBlockQueueDemo2 {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
boolean add = arrayBlockingQueue.add(1);
log.info("add:{}", add);
arrayBlockingQueue.put(2);
boolean offer = arrayBlockingQueue.offer(3);
log.info("offer:{}", offer);
arrayBlockingQueue.put(4);
}
}
```
测试代码2:
```
@Slf4j
public class ArrayBlockQueueDemo2 {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
boolean add = arrayBlockingQueue.add(1);
log.info("add:{}", add);
arrayBlockingQueue.put(2);
boolean offer = arrayBlockingQueue.offer(3);
log.info("offer:{}", offer);
Integer take1 = arrayBlockingQueue.take();
log.info("take1:{}", take1);
Integer take2 = arrayBlockingQueue.take();
log.info("take2:{}", take2);
Integer take3 = arrayBlockingQueue.take();
log.info("take3:{}", take3);
Integer take4 = arrayBlockingQueue.take();
log.info("take1:{}", take4);
}
}
```
上述两个测试案例表明,当队列中数据已满时,调用put方法添加元素时,会造成线程阻塞,直至该元素被放进队列;当调用take方法取出队列中元素时,若队列中元素无元素,线程会阻塞,直至取出元素。
2.3 有返回值
测试代码1:
···
@Slf4j
public class ArrayBlockQueueDemo3 {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
boolean offer1 = arrayBlockingQueue.offer(1);
log.info("offer1:{}", offer1);
boolean offer2 = arrayBlockingQueue.offer(2);
log.info("offer2:{}", offer2);
boolean offer3 = arrayBlockingQueue.offer(3);
log.info("offer3:{}", offer3);
log.info("offer3 size:{}", arrayBlockingQueue.size());
boolean offer4 = arrayBlockingQueue.offer(4);
log.info("offer4:{}", offer4);
}
}
```
测试代码2:
```
@Slf4j
public class ArrayBlockQueueDemo4 {
public static void main(String[] args) {
ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
boolean offer1 = arrayBlockingQueue.offer(1);
log.info("offer1:{}", offer1);
boolean offer2 = arrayBlockingQueue.offer(2);
log.info("offer2:{}", offer2);
boolean offer3 = arrayBlockingQueue.offer(3);
log.info("offer3:{}", offer3);
boolean offer4 = arrayBlockingQueue.offer(4);
log.info("offer4:{}", offer4);
Integer poll1 = arrayBlockingQueue.poll();
log.info("poll1:{}", poll1);
Integer poll2 = arrayBlockingQueue.poll();
log.info("poll2:{}", poll2);
Integer poll3 = arrayBlockingQueue.poll();
log.info("poll3:{}", poll3);
Integer poll4 = arrayBlockingQueue.poll();
log.info("poll4:{}", poll4);
}
}
```
运行结果如下:
![image.png](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/dcfb2b39fe9a484891527247ce21fda5~tplv-k3u1fbpfcp-watermark.image?)
由上述运行结果可知,利用offer来向队列中存储元素时,存储成功时会返回true,失败时会返回false;利用poll来从队列中取出元素,若队列中元素不足时,会返回null。
3.源码解析
首先看下ArrayBlockingQueue类内部的方法(如下图所示),下面针对关键的方法进行分析:
3.1 基本属性
```
//底层数组
final Object[] items;
//下一次获取元素的下标
int takeIndex;
//下一次存入元素的下标
int putIndex;
//队列中元素的个数
int count;
//控制所有访问的锁
final ReentrantLock lock;
//等待取出元素的条件Condition
private final Condition notEmpty;
//等待加入元素的条件
private final Condition notFull;
//itrs主要用来记录集合中数据,方便集合在枚举过程中操作(删除)数据
transient Itrs itrs = null;
```
3.2 构造方法
```
//传入参数为队列的容量,默认为非公平锁,初始化后队列长度固定,不能改变
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//初始化传入数组长度大小,是否为公平方式获取锁
public ArrayBlockingQueue(int capacity, boolean fair) {
//初始化数组长度值小于0,抛出异常
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化长度为capacity的数组
this.items = new Object[capacity];
//初始化锁
lock = new ReentrantLock(fair);
//初始化用于阻塞的condition
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
```
3.3 add(E)
```
//添加元素,利用super调用父类AbstractQueue中add方法
public boolean add(E e) {
return super.add(e);
}
//AbstractQueue中add方法,调用offer方法添加元素,添加成功返回true,否则返回false
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
```
3.4 put(E e)
```
public void put(E e) throws InterruptedException {
//判断添加元素e是否为空,为空会抛出异常
checkNotNull(e);
//获取锁
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
//队列已满,线程等待
while (count == items.length)
notFull.await();
//入队方法,后面单独解释该方法
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}
```
3.5 offer(E e, long timeout, TimeUnit unit)
```
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
//判断元素是否为空,为空抛出异常
checkNotNull(e);
//超时时间转化为纳秒
long nanos = unit.toNanos(timeout);
//获取锁
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
//队列已满
while (count == items.length) {
//队列已满,且等待时间小于等于0,直接返回false
if (nanos <= 0)
return false;
//队列已满,线程阻塞等待
nanos = notFull.awaitNanos(nanos);
}
//入队列
enqueue(e);
return true;
} finally {
//释放锁
lock.unlock();
}
}
```
3.6 remove(Object o)
```
//Object表示要移除的元素
public boolean remove(Object o) {
//判断移除元素是否为空,为空返回false
if (o == null) return false;
//获取数组对象
final Object[] items = this.items;
//获取锁
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//判断当前队列中的元素是否大于0
if (count > 0) {
//获取当前存入队列中元素的位置
final int putIndex = this.putIndex;
//获取下次移除队列中元素的位置
int i = takeIndex;
/**
循环遍历对比要移除元素与队列中元素,若一致则移除并返回true,否则返回false
**/
do {
//对比要移除对象o是否存在于队列中
if (o.equals(items[i])) {
//若存在,移除并返回true
removeAt(i);
return true;
}
//如果遍历至队列尾部,重置i为0
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
//释放锁
lock.unlock();
}
}
```
3.7 removeAt(final int removeIndex)
```
void removeAt(final int removeIndex) {
//获取队列
final Object[] items = this.items;
//判断要移除元素位置下标与当前即将移除的元素是否相同
//若相同,将该位置元素删除
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
//判断下一次移除元素下标是否到达队列尾部,若到达重置下一次移除元素位置下标为0
if (++takeIndex == items.length)
takeIndex = 0;
//队列中元素个数-1
count--;
if (itrs != null)
itrs.elementDequeued();
//下一次移除元素下标与当前传入下标不一致
//依次遍历需要删除元素下标与下一入队元素下标直接的元素,若找到该该元素,执行删除操作,队列长度-1
} else {
// an "interior" remove
// slide over all others up through putIndex.
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
//队列长度-1
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
//通知所有等待入队线程,可以进行入队操作
notFull.signal();
}
```
3.8 take()
```
//从队列中取出数据
public E take() throws InterruptedException {
//获取锁
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
//当队列中无数据,删除线程阻塞等待
while (count == 0)
notEmpty.await();
//有数据,执行队头元素出队操作
return dequeue();
} finally {
//释放锁
lock.unlock();
}
}
```
3.9 poll(long timeout, TimeUnit unit)
```
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
//时间转化为纳秒
long nanos = unit.toNanos(timeout);
//获取锁
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
//判断队列是否为空,为空阻塞等待
while (count == 0) {
//如果等待时间小于0,直接返回null
if (nanos <= 0)
return null;
//阻塞等待
nanos = notEmpty.awaitNanos(nanos);
}
//执行出队操作
return dequeue();
} finally {
//释放锁
lock.unlock();
}
}
```
3.10 enqueue(E x)
```
//入队
private void enqueue(E x) {
//获取队列
final Object[] items = this.items;
//入队,往数组中添加元素
items[putIndex] = x;
//判断队列是否已满,如果是,把下一个元素入队的索引位置置为0,防止下次插入元素越界
if (++putIndex == items.length)
putIndex = 0;
//入队成功,当前队列元素数量+1
count++;
//通知在等待的出队的线程,队列中已有元素,可以进行出参操作
notEmpty.signal();
}
```
3.11 dequeue()
```
//出队
private E dequeue() {
// 获取队列
final Object[] items = this.items;
//根据下一次出队索引takeIndex拿到出队元素
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
//将队列中该1索引位置元素删除
items[takeIndex] = null;
//判断下一次出队索引是否等于队列长度,若等于,表明所有元素出队完成,将下一次出队索引置为0,防止越界
if (++takeIndex == items.length)
takeIndex = 0;
//出队成功,当前队列元素数量-1
count--;
//itrs主要用来记录集合中数据,方便集合在枚举过程中操作(删除)数据
if (itrs != null)
itrs.elementDequeued();
//通知等待入队的线程,队列中已有位置,可以进行入队操作
notFull.signal();
return x;
}
```
3.12 drainTo(Collection<super E> c, int maxElements)
```
//拷贝队列中maxElements长度的元素到集合c中,并从集合中删除拷贝的数据
public int drainTo(Collection<super E> c, int maxElements) {
//判断当前数组对象是否为空,为空则抛出空指针异常
checkNotNull(c);
//判断传入集合对象是否与当前队列一致,若一致则抛出异常
if (c == this)
throw new IllegalArgumentException();
//判断要拷贝
if (maxElements <= 0)
return 0;
//获取当前队列对象
final Object[] items = this.items;
//获取锁
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//对比需要拷贝数据长度与当前队列中元素长度大小,获取最小值(防止数组越界)
int n = Math.min(maxElements, count);
//获取下一次获取队列元素下标
int take = takeIndex;
int i = 0;
try {
//循环遍历队列,从数组i开始获取队列中元素存入传入集合c中,同时删除队列中已拷贝数据
while (i < n) {
@SuppressWarnings("unchecked")
E x = (E) items[take];
c.add(x);
items[take] = null;
//判断下一次出队索引是否等于队列长度,若等于,表明读取所有元素完成,将下一次出队索引置为0,防止越界
if (++take == items.length)
take = 0;
i++;
}
//返回已拷贝数据总个数值
return n;
} finally {
// Restore invariants even if c.add() threw
//判断删除元素个数是否大于0,若大于0,更新队列长度值,
if (i > 0) {
count -= i;
//更新下一次取元素下标
takeIndex = take;
if (itrs != null) {
if (count == 0)
itrs.queueIsEmpty();
else if (i > take)
itrs.takeIndexWrapped();
}
//循环遍历,通知所有入队线程,可以进行入队操作
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal();
}
}
} finally {
//释放锁
lock.unlock();
}
}
```
4.小结
1.ArrayBlockingQueue是限定队列长度的,且队列中不能包含null,队列长度创建后就不能更改;
2.ArrayBlockingQueue插入元素在队尾,删除元素从队头开始;
3.实现BlockingQueue接口的类必须都是线程安全的;
4.队列满的情况下,会阻塞插入线程,队列空的情况下,会阻塞删除元素线程;
5.支持公平、非公平锁,默认非公平锁,由于操作共用一把锁,在高并发场景下,可能会出现性能瓶颈。