当前位置: 首页>后端>正文

ArrayBlockingQueue

1.概述
在JAVA中,阻塞队列算起来共有7种(如下图所示)。

数组阻塞队列(ArrayBlockingQueue) :底层基于数组的有界阻塞队列,初始化时需要指定队列大小;

链表阻塞队列(LinkedBlockingQueue) :以链表来存储元素,理论上只要存储空间够大,就是无界的;

同步阻塞队列(SynchronousQueue):队列中不存储元素,队列中放入元素后,只有该元素被消费完成,才能重修放入元素;

优先级无界阻塞队列(PriorityBlockingQueue):底层基于数组的无界队列,支持队列内部按照指定元素排序;

链表阻塞双端队列(LinkedBlockingDeque):底层基于链表的有界双端阻塞队列;

延迟无界阻塞队列(DelayQueue):底层是基于数组的无界延迟队列,它是在PriorityQueue基础上实现的,先按延迟优先级排序,延迟时间短的排在队列前面;

链表阻塞队列与同步阻塞队列结合(LinkedTransferQueue):基于链表的无界阻塞队列;

[图片上传失败...(image-f46846-1689141594951)]
上述队列可根据不同场景选择不同的队列进行使用,本文主要分析日常开发中使用较多的数组阻塞队列。首先通过一些案例来介绍数组队列的使用方式,其次通过分析源码来探索队列的实现原理,最后是对本文的一些总结和思考。

2.实战演练
ArrayBlockingQueue中有三对存取api,如下表所示:

操作类型 抛出异常 阻塞线程 有返回值 超时返回
存入 add put offer offer(E e, long timeout, TimeUnit unit)
取出 remove take poll poll(long timeout, TimeUnit unit)

分别阐述一下上述三组api,
抛出异常:指的是当存入元素时队列已满、取出元素时队列中为空时,均会抛出异常;
阻塞线程:指的是当存入元素时队列已满、取出元素时队列中为空时,线程会一直等待,直至获取到数据为止;
有返回值:指的是当存入元素或取出元素时,会返回特定的值(offer成功时会返回true,失败会返回false;poll成功时会返回队列中元素,失败时会返回null);
超时返回:指的是在指定时间内未能存入或取出元素时,会返回一个指定值(offer时会返回false,poll时会返回null)。
2.1 抛出异常
测试代码1:

@Slf4j
public class ArrayBlockQueueDemo1 {

    public static void main(String[] args) throws InterruptedException {

        ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        boolean add1 = arrayBlockingQueue.add(1);
        log.info("add1:{}", add1);
        arrayBlockingQueue.put(2);

        boolean add3 = arrayBlockingQueue.offer(3);
        log.info("add3:{}", add3);
        log.info("arrayBlockingQueue:{}", arrayBlockingQueue.size());

        boolean add4 = arrayBlockingQueue.add(4);
        log.info("add4:{}", add4);
     }
}   

运行结果如下:

[图片上传失败...(image-99d774-1689141594951)]

测试代码2:

@Slf4j
public class ArrayBlockQueueDemo2 {

    public static void main(String[] args) throws InterruptedException {

        ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        boolean add1 = arrayBlockingQueue.add(1);
        log.info("add1:{}", add1);
        arrayBlockingQueue.put(2);

        boolean add3 = arrayBlockingQueue.offer(3);
        log.info("add3:{}", add3);
        log.info("arrayBlockingQueue:{}", arrayBlockingQueue.size());
        
        Integer remove1 = arrayBlockingQueue.remove();
        log.info("remove1:{}", remove1);
        Integer remove2 = arrayBlockingQueue.remove();
        log.info("remove2:{}", remove2);
        Integer remove3 = arrayBlockingQueue.remove();
        log.info("remove3:{}", remove3);
        Integer remove4 = arrayBlockingQueue.remove();
        log.info("remove4:{}", remove4);
       }
}

```
运行结果如下:
 

[图片上传失败...(image-31b224-1689141594951)]

由上述运行结果可知,当通过构造函数设置完队列的初始长度后,调用add方法来添加元素,添加成功时会返回true,添加元素个数大于队列长度时,会抛出Queue full的异常,表明队列已满,无法继续添加元素;同理,测试代码2通过remove方法来取出队列中元素,当移除元素个数大于队列中元素个数时,会抛出异常。

2.2 阻塞线程
测试代码1:
```
@Slf4j
public class ArrayBlockQueueDemo2 {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        boolean add = arrayBlockingQueue.add(1);
        log.info("add:{}", add);
        arrayBlockingQueue.put(2);
        boolean offer = arrayBlockingQueue.offer(3);
        log.info("offer:{}", offer);
        arrayBlockingQueue.put(4);
      }
}
``` 

测试代码2:
```
@Slf4j
public class ArrayBlockQueueDemo2 {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        boolean add = arrayBlockingQueue.add(1);
        log.info("add:{}", add);
        arrayBlockingQueue.put(2);
        boolean offer = arrayBlockingQueue.offer(3);
        log.info("offer:{}", offer);
        Integer take1 = arrayBlockingQueue.take();
        log.info("take1:{}", take1);
        Integer take2 = arrayBlockingQueue.take();
        log.info("take2:{}", take2);
        Integer take3 = arrayBlockingQueue.take();
        log.info("take3:{}", take3);
        Integer take4 = arrayBlockingQueue.take();
        log.info("take1:{}", take4);
    }
}

``` 

上述两个测试案例表明,当队列中数据已满时,调用put方法添加元素时,会造成线程阻塞,直至该元素被放进队列;当调用take方法取出队列中元素时,若队列中元素无元素,线程会阻塞,直至取出元素。

2.3 有返回值
测试代码1:
···
@Slf4j
public class ArrayBlockQueueDemo3 {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        boolean offer1 = arrayBlockingQueue.offer(1);
        log.info("offer1:{}", offer1);
        boolean offer2 = arrayBlockingQueue.offer(2);
        log.info("offer2:{}", offer2);
        boolean offer3 = arrayBlockingQueue.offer(3);
        log.info("offer3:{}", offer3);
        log.info("offer3 size:{}", arrayBlockingQueue.size());
        boolean offer4 = arrayBlockingQueue.offer(4);
        log.info("offer4:{}", offer4);
    }
}
```
 
 
测试代码2:
```
@Slf4j
public class ArrayBlockQueueDemo4 {
    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        boolean offer1 = arrayBlockingQueue.offer(1);
        log.info("offer1:{}", offer1);
        boolean offer2 = arrayBlockingQueue.offer(2);
        log.info("offer2:{}", offer2);
        boolean offer3 = arrayBlockingQueue.offer(3);
        log.info("offer3:{}", offer3);
        boolean offer4 = arrayBlockingQueue.offer(4);
        log.info("offer4:{}", offer4);

        Integer poll1 = arrayBlockingQueue.poll();
        log.info("poll1:{}", poll1);
        Integer poll2 = arrayBlockingQueue.poll();
        log.info("poll2:{}", poll2);
        Integer poll3 = arrayBlockingQueue.poll();
        log.info("poll3:{}", poll3);
        Integer poll4 = arrayBlockingQueue.poll();
        log.info("poll4:{}", poll4);
    }
}

```
运行结果如下:

![image.png](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/dcfb2b39fe9a484891527247ce21fda5~tplv-k3u1fbpfcp-watermark.image?)

由上述运行结果可知,利用offer来向队列中存储元素时,存储成功时会返回true,失败时会返回false;利用poll来从队列中取出元素,若队列中元素不足时,会返回null。

3.源码解析
首先看下ArrayBlockingQueue类内部的方法(如下图所示),下面针对关键的方法进行分析:


3.1 基本属性
```
    //底层数组
    final Object[] items;

    //下一次获取元素的下标
    int takeIndex;

    //下一次存入元素的下标
    int putIndex;

    //队列中元素的个数
    int count;

    //控制所有访问的锁
    final ReentrantLock lock;

    //等待取出元素的条件Condition
    private final Condition notEmpty;

    //等待加入元素的条件
    private final Condition notFull;
    
    //itrs主要用来记录集合中数据,方便集合在枚举过程中操作(删除)数据
    transient Itrs itrs = null;
```
3.2 构造方法
```
//传入参数为队列的容量,默认为非公平锁,初始化后队列长度固定,不能改变
 public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
//初始化传入数组长度大小,是否为公平方式获取锁
 public ArrayBlockingQueue(int capacity, boolean fair) {
 //初始化数组长度值小于0,抛出异常
        if (capacity <= 0)
            throw new IllegalArgumentException();
        //初始化长度为capacity的数组
        this.items = new Object[capacity];
        //初始化锁
        lock = new ReentrantLock(fair);
        //初始化用于阻塞的condition
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

```
3.3 add(E)
```
//添加元素,利用super调用父类AbstractQueue中add方法
public boolean add(E e) {
    return super.add(e);
}
//AbstractQueue中add方法,调用offer方法添加元素,添加成功返回true,否则返回false
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
}
```
3.4 put(E e)
```
public void put(E e) throws InterruptedException {
        //判断添加元素e是否为空,为空会抛出异常
        checkNotNull(e);
        //获取锁
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
            //队列已满,线程等待
            while (count == items.length)
                notFull.await();
            //入队方法,后面单独解释该方法
            enqueue(e);
        } finally {
            //释放锁
            lock.unlock();
        }
    }

```
3.5 offer(E e, long timeout, TimeUnit unit)
```
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        //判断元素是否为空,为空抛出异常
        checkNotNull(e);
        //超时时间转化为纳秒
        long nanos = unit.toNanos(timeout);
        //获取锁
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
            //队列已满
            while (count == items.length) {
               //队列已满,且等待时间小于等于0,直接返回false
                if (nanos <= 0)
                    return false;
                //队列已满,线程阻塞等待
                nanos = notFull.awaitNanos(nanos);
            }
            //入队列
            enqueue(e);
            return true;
        } finally {
            //释放锁
            lock.unlock();
        }
    }
```
3.6 remove(Object o)
```
//Object表示要移除的元素
 public boolean remove(Object o) {
        //判断移除元素是否为空,为空返回false
        if (o == null) return false;
        //获取数组对象
        final Object[] items = this.items;
        //获取锁
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            //判断当前队列中的元素是否大于0
            if (count > 0) {
                //获取当前存入队列中元素的位置
                final int putIndex = this.putIndex;
                //获取下次移除队列中元素的位置
                int i = takeIndex;
                /**
                循环遍历对比要移除元素与队列中元素,若一致则移除并返回true,否则返回false
                **/
                do {
                 //对比要移除对象o是否存在于队列中
                    if (o.equals(items[i])) {
                    //若存在,移除并返回true
                        removeAt(i);
                        return true;
                    }
                    //如果遍历至队列尾部,重置i为0
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
        //释放锁
            lock.unlock();
        }
    }

```
3.7 removeAt(final int removeIndex)
```
void removeAt(final int removeIndex) {
        //获取队列
        final Object[] items = this.items;
        //判断要移除元素位置下标与当前即将移除的元素是否相同
        //若相同,将该位置元素删除
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            //判断下一次移除元素下标是否到达队列尾部,若到达重置下一次移除元素位置下标为0
            if (++takeIndex == items.length)
                takeIndex = 0;
            //队列中元素个数-1
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        //下一次移除元素下标与当前传入下标不一致
        //依次遍历需要删除元素下标与下一入队元素下标直接的元素,若找到该该元素,执行删除操作,队列长度-1
        } else {
            // an "interior" remove
            // slide over all others up through putIndex.
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            //队列长度-1
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        //通知所有等待入队线程,可以进行入队操作
        notFull.signal();
    }

```
3.8 take()
```
//从队列中取出数据
public E take() throws InterruptedException {
        //获取锁
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
            //当队列中无数据,删除线程阻塞等待
            while (count == 0)
                notEmpty.await();
            //有数据,执行队头元素出队操作
            return dequeue();
        } finally {
            //释放锁
            lock.unlock();
        }
    }

```
3.9 poll(long timeout, TimeUnit unit)
```
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        //时间转化为纳秒
        long nanos = unit.toNanos(timeout);
        //获取锁
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
        //判断队列是否为空,为空阻塞等待
            while (count == 0) {
             //如果等待时间小于0,直接返回null
                if (nanos <= 0)
                    return null;
                //阻塞等待
                nanos = notEmpty.awaitNanos(nanos);
            }
            //执行出队操作
            return dequeue();
        } finally {
            //释放锁
            lock.unlock();
        }
    }

```
3.10 enqueue(E x)
```
//入队
private void enqueue(E x) {
        //获取队列
        final Object[] items = this.items;
        //入队,往数组中添加元素
        items[putIndex] = x;
        //判断队列是否已满,如果是,把下一个元素入队的索引位置置为0,防止下次插入元素越界
        if (++putIndex == items.length)
            putIndex = 0;
        //入队成功,当前队列元素数量+1
        count++;
        //通知在等待的出队的线程,队列中已有元素,可以进行出参操作
        notEmpty.signal();
    }
```
3.11 dequeue()
```
//出队
private E dequeue() {
        // 获取队列
        final Object[] items = this.items;
        //根据下一次出队索引takeIndex拿到出队元素
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        //将队列中该1索引位置元素删除
        items[takeIndex] = null;
        //判断下一次出队索引是否等于队列长度,若等于,表明所有元素出队完成,将下一次出队索引置为0,防止越界
        if (++takeIndex == items.length)
            takeIndex = 0;
        //出队成功,当前队列元素数量-1
        count--;
        //itrs主要用来记录集合中数据,方便集合在枚举过程中操作(删除)数据
        if (itrs != null)
            itrs.elementDequeued();
        //通知等待入队的线程,队列中已有位置,可以进行入队操作
        notFull.signal();
        return x;
    }

```
3.12 drainTo(Collection<super E> c, int maxElements)
```
//拷贝队列中maxElements长度的元素到集合c中,并从集合中删除拷贝的数据
public int drainTo(Collection<super E> c, int maxElements) {
        //判断当前数组对象是否为空,为空则抛出空指针异常
        checkNotNull(c);
        //判断传入集合对象是否与当前队列一致,若一致则抛出异常
        if (c == this)
            throw new IllegalArgumentException();
        //判断要拷贝
        if (maxElements <= 0)
            return 0;
        //获取当前队列对象
        final Object[] items = this.items;
        //获取锁
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            //对比需要拷贝数据长度与当前队列中元素长度大小,获取最小值(防止数组越界)
            int n = Math.min(maxElements, count);
            //获取下一次获取队列元素下标
            int take = takeIndex;
            int i = 0;
            try {
            //循环遍历队列,从数组i开始获取队列中元素存入传入集合c中,同时删除队列中已拷贝数据
                while (i < n) {
                    @SuppressWarnings("unchecked")
                    E x = (E) items[take];
                    c.add(x);
                    items[take] = null;
                    //判断下一次出队索引是否等于队列长度,若等于,表明读取所有元素完成,将下一次出队索引置为0,防止越界
                    if (++take == items.length)
                        take = 0;
                    i++;
                }
                //返回已拷贝数据总个数值
                return n;
            } finally {
                // Restore invariants even if c.add() threw
                //判断删除元素个数是否大于0,若大于0,更新队列长度值,
                if (i > 0) {
                    count -= i;
                    //更新下一次取元素下标
                    takeIndex = take;
                    if (itrs != null) {
                        if (count == 0)
                            itrs.queueIsEmpty();
                        else if (i > take)
                            itrs.takeIndexWrapped();
                    }
                    //循环遍历,通知所有入队线程,可以进行入队操作
                    for (; i > 0 && lock.hasWaiters(notFull); i--)
                        notFull.signal();
                }
            }
        } finally {
            //释放锁
            lock.unlock();
        }
    }

```
4.小结
1.ArrayBlockingQueue是限定队列长度的,且队列中不能包含null,队列长度创建后就不能更改;
2.ArrayBlockingQueue插入元素在队尾,删除元素从队头开始;
3.实现BlockingQueue接口的类必须都是线程安全的;
4.队列满的情况下,会阻塞插入线程,队列空的情况下,会阻塞删除元素线程;
5.支持公平、非公平锁,默认非公平锁,由于操作共用一把锁,在高并发场景下,可能会出现性能瓶颈。

https://www.xamrdz.com/backend/3zj1930800.html

相关文章: