当前位置: 首页>后端>正文

阻塞队列 BlockingQueue

1、简介

1-1、简介

阻塞队列是JDK的 concurrent 包下面提供的一组线程安全的队列。

其最大的特点为阻塞。

即当获取队列元素但是队列为空时,会阻塞当前线程,等待队列中有元素再返回;

当添加元素但是队列已满时,会阻塞当前线程,等待队列可以放入新元素时再放入。

1-2、应用场景

阻塞队列主要是设计用来实现生产者-消费者队列的。

典型的一个应用是用来实现线程池的等待队列(workQueue)。可以参看这里第二章的说明:java线程池的拒绝策略

1-3、主要操作

concurrent 包下面的 BlockingQueue.java 是JDK提供的接口,约定了所有阻塞队列的基本操作。

Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

上表来源于该接口文件的javaDoc。

对于队列的操作有三类:添加(Insert)、取出(Remove)、读(Examine)。

每一类操作在不同的场景下有不同的methods供调用。

  • Throws exception:这一组操作在遇到异常时(add时队列已满;remove时队列已空)会抛出异常
  • Special value:这一组操作在遇到异常时会返回特殊值(null 或 true/false),比如offer时队列已满返回false;poll时队列已空会返回null。
  • Blocks:put在队列已满时阻塞;take在队列已空时阻塞
  • Times out:阻塞等待,直到成功或者超时(阻塞时间满time),超时后直接返回false。
1-4、注意事项
  1. BlockingQueue 不接受 null 值的插入。在“Special value”这种场景下, null 值作为poll()方法的特殊值使用,如果插入的是null,就无法区分究竟是获取失败,还是获取到的就是null。

  2. BlockingQueue 的实现都是线程安全的。但是一些实现里面提供的批量操作,如 addAll, containsAll, retainAll 和 removeAll 不一定是原子操作

2、UML

阻塞队列 BlockingQueue,第1张
BlockingQueue的UML图

可以看到,阻塞队列的绝大多数实现,都是继承 AbstractQueue 抽象类,实现 BlockingQueue 接口。

3、各种具体实现

下面逐个说明每一种阻塞队列的特点以及其具体实现。

3-1、ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现,先进先出(FIFO Queue)。

底层采用数组来实现队列功能。

ArrayBlockingQueue 在并发控制上采用可重入锁(ReentrantLock),添加和取出操作都在同一把锁的控制下进行。

与另一种阻塞队列的经典实现LinkedBlockingQueue相比,ArrayBlockingQueue的构造函数中支持指定公平锁策略。

如果指定了“公平锁” = true 的话,那么所有等待添加/取出的线程也会遵守FIFO原则,先到的先操作。公平锁可以保证等待时间最长的线程优先获取锁,防止“饿死”,但是其效率稍低,需要根据实际场景取舍。

以下为相关的源码分析,说明以注释方式写入了代码中。

关键成员变量:

    /** 实现队列的数组 */
    final Object[] items;

    /** 由于是使用数组模拟队列,因此这个变量相当于一个游标,指示队列取出时的具体位置(数组的下标) */
    int takeIndex;

    /** 指示队列添加时的具体位置(数组的下标) */
    int putIndex;

    /** 队列(数组)中当前存储的元素数量 */
    int count;

    /** 用于保证线程安全的可重入锁 */
    final ReentrantLock lock;

    /** 配合可重入锁的Condition,控制取出线程 */
    private final Condition notEmpty;

    /** 配合可重入锁的Condition,控制添加线程 */
    private final Condition notFull;

以put(e)为例,看一下具体的操作细节:

    // 添加元素
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 添加操作获取锁
        lock.lockInterruptibly();
        try {
            // 元素数量==数组长度,说明队列中元素已满,调用notFull.await()阻塞添加线程,将控制权返还
            while (count == items.length)
                notFull.await();
            // 正常获取锁,并且可以添加的情况下进行操作
            enqueue(e);
        } finally {
            // 解锁
            lock.unlock();
        }
    }
    
    // 添加操作
    private void enqueue(E x) {
        final Object[] items = this.items;
        // 根据游标添加新元素
        items[putIndex] = x;
        // 游标++;如果游标已经到达了数组最后,则返回数组的开头(利用数组模拟队列)
        if (++putIndex == items.length)
            putIndex = 0;
        // 队列中元素数量++
        count++;
        // 唤醒等待中的取出线程
        notEmpty.signal();
    }    
    
    
3-2、LinkedBlockingQueue

LinkedBlockingQueue 的特性与 ArrayedBlockingQueue 差不多。都是BlockingQueue最基础的实现。

绝大多数场景下 LinkedBlockingQueue 的存取效率优于 ArrayedBlockingQueue。

主要差别在于:

  • 底层的数据结构是一个单向列表。
  • 不提供公平锁策略
  • 可以用于实现有界队列,也可以实现无界队列,取决于使用的构造函数。设定为无界队列队列时,其元素最大数量为 Integer.MAX_VALUE。注意:无界危险,可能造成OOM,慎用!

在具体实现方面,LinkedBlockingQueue最重要的特点就是使用了两个可重入锁。由于取出只操作队列头,而添加只操作队列尾,所以可以实现“读写分离”,是“two lock queue”算法的一种变形。

(其实ABQ也完全可以改造成双锁的,进行诸如把count的类型改造成AtomicInteger之类的改造是完全可行的)

关键成员变量:

    /**
     * 节点
     */
    static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) { item = x; }
    }

    /** 队列长度,默认为 Integer.MAX_VALUE */
    private final int capacity;

    /** 队列中的元素数量,注意使用了原子类型 AtomicInteger */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 头结点
     */
    transient Node<E> head;

    /**
     * 尾节点
     */
    private transient Node<E> last;

    /** take, poll 等取出操作使用的锁 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 配合takeLock锁使用的Conditon */
    private final Condition notEmpty = takeLock.newCondition();

    /** put, offer 等添加操作使用的锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 配合putLock锁使用的Conditon */
    private final Condition notFull = putLock.newCondition();

仍然以put(e)为例,看一下具体的操作细节:

    // 添加元素。由于使用了两把锁,所以与ABQ相比一些细节有变化
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // 使用int类型的本地变量记录元素数量
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 获取添加锁
        putLock.lockInterruptibly();
        try {
            // 元素数量=队列长度,说明队列已满,阻塞当前线程
            while (count.get() == capacity) {
                notFull.await();
            }
            // 添加元素的方法(里面直接一句 last = last.next = node; 解决问题)
            enqueue(node);
            // 原子操作,队列元素数量+1,【同时返回+1前的值给c】←注意正确理解这个操作
            c = count.getAndIncrement();
            // 元素数量<队列长度,说明还可以继续添加,唤醒其他添加线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            // 释放添加锁
            putLock.unlock();
        }
        // c=0,意味着这次添加元素之前,队列是空的,那么就有可能有取出线程在阻塞,所以此处有唤醒操作
        if (c == 0)
            signalNotEmpty();
    }

LBQ使用两把锁,造成它的remove()等方法略有不同,因为remove操作要移动的位置不固定,所以必须将两把锁都加锁,才能保证线程安全。

    public boolean remove(Object o) {
        if (o == null) return false;
        // 这里,两把锁都要抓,两把锁都要硬
        fullyLock();
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }
    
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }    

3-plus、为什么ABQ和LBQ的锁机制不一样?

这个问题,网上去搜一下就能发现是没有定论的(包括OverStackFlow上)。

主要的讨论点肯定都是集中在性能上面,但是很难有一个明确的结论。

所以,只能去找原作者问了……

3-3、SynchronousQueue

同步队列。

这是一种比较特殊的阻塞队列。

它的同步指的是取出线程和添加线程需要同步,一个取出线程匹配一个添加线程。

也就是说一个线程往队列中添加一个元素时,添加操作不会立即返回,需要等待另一个线程来将这个元素取出;

一个取出线程操作的时候,同样需要一个相匹配的添加线程。

SynchronousQueue 队列是“虚”的,不提供任何空间(一个都没有)来存储元素。Capacity=0。它只是提供两个线程进行信息交换的场所。

数据必须从某个添加线程交给某个取出线程,而不是写到某个队列中等待被消费。

不能在 SynchronousQueue 中使用 peek 方法,peek 的语义是只读取不移除,这个方法的语义不符合 SynchronousQueue 的特征。

所以peek()直接返回null

    public E peek() {
        return null;
    }

SynchronousQueue 也不能被迭代,因为根本就没有元素可以做迭代。

SynchronousQueue的一个典型应用,是使用newCachedThreadPool()方法构建缓存线程池。
newCachedThreadPool()构建线程池时,使用的构造函数是这样的:

ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue())

也就是说线程池核心线程数量是0,即不保有固定数量的线程,随需创建;可创建的线程数量上限为Integer.MAX_VALUE;空闲的线程最多存活60秒;未消费的任务在SynchronousQueue中等待。

简单看一下SynchronousQueue的实现。

SynchronousQueue定义了一个核心的抽象类和抽象方法:

    abstract static class Transferer<E> {
        // 这个方法用于转移元素,从生产者手上转到消费者手上
        // 也可以被动地,消费者调用这个方法来从生产者手上取元素
        // 第一个参数 e 如果不是 null,代表场景为:将元素从生产者转移给消费者
        //              如果是 null,代表消费者等待生产者提供元素,然后返回值就是相应的生产者提供的元素
        // 第二个参数代表是否设置超时,如果设置超时,超时时间是第三个参数的值
        // 返回值如果是 null,代表超时,或者中断。具体是哪个通过检测中断状态得到
        abstract E transfer(E e, boolean timed, long nanos);
    }

SynchronousQueue是可以指定公平锁策略的,公平锁模式下使用TransferQueue实现Transferer,而非公平锁模式下使用TransferStack实现Transferer。


static final class TransferQueue<E> extends Transferer<E> {
    ……
}

static final class TransferStack<E> extends Transferer<E> {
    ……
}

之后的具体实现比较复杂,贴上来也意义不大,主要说一下TransferQueue中transfer方法的思路。

先看一下TransferQueue中的节点定义:

        static final class QNode {
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // 节点中会保存线程对象的引用,用于阻塞/唤醒
            final boolean isData;         // isData = true 是写入线程,否则为取出线程

然后就是transfer方法:

  • 调用transfer法时,如果队列为空,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素isData = true)。这种情况下,将当前线程加入到等待队列即可。
  • 如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是取出操作线程,当前线程是添加操作线程)。这种情况下,匹配等待队列的队头,出队,返回相应数据。

更简单地来说,就是读的来了,看到前面读的在排队,它就跟着在后面排队;看到都是写的在排队,它就从队伍前面领一个走。

3-4、LinkedTransferQueue(TransferQueue)

TransferQueue 是一个继承 BlockingQueue 的接口,它目前的唯一实现是 LinkedTransferQueue。

TransferQueue 是一种重要的队列,它提供了一个场所,生产者线程使用transfer方法传入一些对象并阻塞,直至这些对象被消费者线程全部取出。上面的同步队列SynchronousQueue就像是一个容量为0的TransferQueue。

从表现出来的行为上来看,TransferQueue是ConcurrentLinkedQueue, 公平锁模式的SynchronousQueue, 无界的LinkedBlockingQueue等队列的超集。

TransferQueue的实现思路其实跟上面SynchronousQueue的思路很像,这里同样不贴代码了,主要列一下其用法。

  1. transfer(E e),若当前存在一个正在等待获取的消费者线程,即立刻移交;否则会将元素e插入到队列尾部,并进入阻塞状态,直到有消费者线程取走该元素。
  2. tryTransfer(E e),若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),即立刻移交; 否则返回false,并且不进入队列,这是一个非阻塞的操作。
  3. tryTransfer(E e, long timeout, TimeUnit unit) 若当前存在一个正在等待获取的消费者线程,即立刻移交;否则会将元素e插入到队列尾部,并且等待被消费者线程获取消费掉,若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。
  4. hasWaitingConsumer() 判断是否存在消费者线程。
  5. getWaitingConsumerCount() 获取所有等待获取元素的消费线程数量。

下面是一个使用例,一个生产者使用transfer方法传输10个字符串,两个消费者线程则各取出5个字符串。生产者在transfer时会一直阻塞直到所有字符串被取出。

public class TransferQueueExam {
    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<>();
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(new Producer("Producer1", queue));
        service.submit(new Consumer("Consumer1", queue));
        service.submit(new Consumer("Consumer2", queue));
        service.shutdown();
    }

    static class Producer implements Runnable {
        private final String name;
        private final TransferQueue<String> queue;

        Producer(String name, TransferQueue<String> queue) {
            this.name = name;
            this.queue = queue;
        }
        @Override
        public void run() {
            System.out.println("begin transfer objects");

            try {
                for (int i = 0; i < 10; i++) {
                    queue.transfer("ABCD_" + i);
                    System.out.println(name + " transfer "+"ABCD_"+i);
                }
                System.out.println("after transformation");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + " is over");
        }
    }

    static class Consumer implements Runnable {
        private final String name;
        private final TransferQueue<String> queue;
        private static Random rand = new Random(System.currentTimeMillis());

        Consumer(String name, TransferQueue<String> queue) {
            this.name = name;
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 5; i++) {
                    String str = queue.take();
                    System.out.println(name + " take " + str);
                    TimeUnit.SECONDS.sleep(rand.nextInt(5));
                }
                System.out.println(name + " is over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
3-5、PriorityBlockingQueue

优先级阻塞队列。是一种带排序的 BlockingQueue 实现。是 PriorityQueue 的线程安全版本。

PriorityBlockingQueue具有以下特点:

  • 队列为无界队列。只能指定初始的队列大小,插入元素时,如果空间不够就会自动扩容。
  • 插入队列的对象必须是可比较大小的(comparable)。
  • 队列中的元素总是按照“自然顺序”进行排序,或者根据构造函数中给定的Comparator进行排序。即“权重队列”。
  • 因为它是无界队列,所以插入操作 put 方法不会阻塞(take 方法在队列为空的时候会阻塞)。
  • 取出时根据优先级(权重)出队,而不是FIFO。

PriorityBlockingQueue 底层使用基于数组的二叉堆来存放元素,并且是小顶堆,数组第一个也是树的根节点总是最小值。

下面结合源码看一下其具体实现,主要关注点是扩容。

关键成员变量:

    /**
     * 队列的默认初始大小。至于为什么是11????不理解
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     * 队列最大长度。由于要存储一些头部信息,所以有 "- 8"
     */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * 基于数组的小顶堆。
     * queue[n]的左右孩子为queue[2*n+1] and queue[2*(n+1)],父节点为 queue[(n-1)/2]
     * 代码中给出的父节点index公式是这样的 int parent = (n - 1) >>> 1
     */
    private transient Object[] queue;

    /**
     * 元素数量
     */
    private transient int size;

    /**
     * 用于排序的comparator,如果为null则使用元素的“自然顺序”
     */
    private transient Comparator<super E> comparator;

    /**
     * 添加和取出操作公用的可重入锁
     */
    private final ReentrantLock lock;

    /**
     * 配合锁使用的取出Condition(添加是无条件动作,不用阻塞,因此不用Condition)
     */
    private final Condition notEmpty;

    /**
     * 扩容时,CAS操作使用的自旋锁
     */
    private transient volatile int allocationSpinLock;

put操作无阻塞,直接调用了offer方法。

    public void put(E e) {
        offer(e); // never need to block
    }

其实add和带超时的offer方法都同样直接调用了这个offer方法。

对于offer方法,最重要的是理解其中扩容时对锁的操作。

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        // 获得锁
        lock.lock();
        int n, cap;
        Object[] array;
        // size大于等于数组的长度时扩容
        while ((n = size) >= (cap = (array = queue).length))  // Doug Lea 你写这样的代码真的不会被打死么……
            // ☆ 扩容。注意这里传入的是quque的复制品array,这一点结合tryGrow方法来看
            tryGrow(array, cap);
        try {
            Comparator<super E> cmp = comparator;
            if (cmp == null)
                // 默认排序方法。这里面主要是堆的操作,此处不展开
                siftUpComparable(n, e, array);
            else
                // 自定义排序方法
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            // 唤醒被阻塞的取出线程
            notEmpty.signal();
        } finally {
            // 释放锁
            lock.unlock();
        }
        return true;
    }

    private void tryGrow(Object[] array, int oldCap) {
        // 首先释放了刚才已经获得的独占锁,使得扩容操作和取出操作可以同时进行,提高吞吐量。
        lock.unlock(); 
        // 创建扩容后的新数组
        Object[] newArray = null;
        // 用 CAS 操作将 allocationSpinLock 由 0 变为 1,相当于获取扩容使用的自旋锁
        // 也就是说,允许其他线程执行添加操作,但是扩容只能由一个线程来操作
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                // 根据规则计算扩容后的数组容量 newCap
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                // 注意这里的条件 queue == array 。
                // 如果 queue != array,说明有其他线程也在做扩容,并且已经分配完内存空间,quque被指向了新空间
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                // 重置为0,相当于释放自旋锁
                allocationSpinLock = 0;
            }
        }
        // newArray == null 说明上面没有执行 newArray = new Object[newCap],即有其他线程正在进行扩容操作
        if (newArray == null) 
            // 扩容失败时,让出CPU控制权,线程由运行中状态变为就绪状态
            Thread.yield();
        // 重新获取最开始释放掉的独占锁
        lock.lock();
        // 扩容成功后调用native方法做数组复制
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
        // 本线程扩容失败后,会退到外层方法offer()中,如果此时扩容仍未完成,
        // 会继续在 while ((n = size) >= (cap = (array = queue).length)) 中循环,直到扩容完成,添加元素成功
    }
3-6、DelayQueue

延时优先级阻塞队列。是一种支持延时获取元素的无界阻塞队列。

DelayQueue队列中只能存入实现了Delayed接口的对象。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>

Delayed接口的定义如下:

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

其继承的Comparable接口定义如下:

public interface Comparable<T> {
    public int compareTo(T o);
}

因此DelayQueue中存入的对象要同时实现getDelay和compareTo两个方法。

getDelay方法是用来检测队列中的元素是否到期;compareTo方法是用来给队列中的元素进行排序。

DelayQueue实现的思路大致如下:

持有一个PriorityQueue,每个Delayed对象实际上都放入了这个队列,队列中的对象按照优先级(按照compareTo)进行了排序,队列头部是最先超时的对象。

当队列中对象的getDelay方法返回的值小于等于0(即对象已经超时)时,才可以将对象从队列中取出。若使用take方法,则方法会一直阻塞,直到队列头部的对象超时被取出;若使用poll方法,则当没有超时对象时,直接返回null。

另外因为DelayQueue是无界队列,所以put操作是非阻塞的,直接调用offer方法。

下面是一个使用例子:

public class DelayQueueExam {

    private static Random rand = new Random(System.currentTimeMillis());

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayElement> queue = new DelayQueue<>();
        for (int i = 0; i < 10; i++) {
            int rndDelayTime = rand.nextInt(1000 * (i + 1));
            queue.put(new DelayElement(rndDelayTime, "DelayElement_" + i + "_" + rndDelayTime));
        }
        while (!queue.isEmpty()) {
            DelayElement delayElement = queue.take();
            System.out.println(delayElement.getName());
        }
    }

    static class DelayElement implements Delayed {
        private long expired;
        private final String name;

        DelayElement(int delay, String name) {
            this.name = name;
            expired = System.currentTimeMillis() + delay;
        }

        public String getName() {
            return name;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(expired - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            long d = (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
            return (d == 0) 0 : ((d < 0) -1 : 1);
        }
    }
}

可以看到take的时候,是按照rndDelayTime的升序输出的,而不是添加元素时候的index。

DelayElement_6_62
DelayElement_0_293
DelayElement_5_328
DelayElement_1_497
DelayElement_2_1261
DelayElement_4_1945
DelayElement_3_2563
DelayElement_8_2778
DelayElement_9_3049
DelayElement_7_7060

Process finished with exit code 0

https://www.xamrdz.com/backend/3p51936794.html

相关文章: