当前位置: 首页>后端>正文

阻塞队列--概述

什么是阻塞队列

首先通过接口类BlockingQueue中的注释来简单了解阻塞队列。
阻塞队列是一个支持附加操作的特殊队列:在队列为空时回收元素会阻塞等待直到队列非空,或在队列已满时插入元素,会阻塞等待直到队列不满。

阻塞队列的方法提供了四种不同的处理方式:抛异常、返回特殊值(null或false)、阻塞当前线程直到操作成功以及阻塞一段时间,超时退出。这四种处理方式分别对应不同的函数接口:

具体操作 抛异常 返回特殊值 无限阻塞 有时限阻塞
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable
  • 阻塞队列存储的元素不能为空,null值保留作为poll操作失败的返回值。
  • 阻塞队列可以是有界的,也可以是无界的。对于无界阻塞队列,不可能会出现队列满的情况,所以使用put或offer方法永远不会被阻塞,且使用offer方法时,永远返回true。
  • 阻塞队列主要用于生产者消费者场景,同时也附加的支持集合接口。可通过调用remove(x)的方法来移除队列中的任意元素,但这种方法并不高效,且并不常用,一般只用于取消排队的消息。
  • 阻塞队列是线程安全的,所有排队方法通过使用内部锁或其他方式的同步控制来实现原子性。然而,批量集合操作如addAll、containsAll、retainAll和removeAll则是非原子性的,除非在实现中另行定义。所以,调用addAll(c),可能在添加部分c中元素后失败抛异常。
  • 阻塞队列本质上不支持任何close或shutdown等操作来停止元素的添加。这种需求可通过由生产者插入特殊元素的方式实现,消费者拿到这个元素之后就获取到了元素插入已终止的消息。
    对于无界阻塞队列,队列不可能会出现满的情况,所以使用put或offer方法永远不会被阻塞,且使用offer方法永远返回true。
  • 内存一致性,同其他并发容器一样,线程对元素执行的操作在元素插入队列之前执行。
  • 内存可见性,遵循happens-before,在另一个线程对阻塞队列中某个元素访问或删除之后操作。

JDK提供的7个阻塞队列

ArrayBlockingQueue

是一个用数组实现的有界阻塞队列,按先进先出的原则对元素进行排序。put和take方法分别为添加和删除的阻塞方法。默认情况下不保证线程公平。
ArrayBlockingQueue内部使用一把重入锁ReentrantLock来保证多个线程之间的插入删除元素的同步;同时使用两个条件对象Condition来实现阻塞逻辑,调用其await和signal方法来实现线程的等待和唤醒。
由于使用了ReentrantLock,所以ArrayBlockingQueue存在线程公平与不公平两种选择。
插入删除元素的具体执行逻辑:ArrayBlockingQueue
PS:这7个阻塞队列本来想着一个一个解析的,但看了下源码,逻辑其实没有很复杂,所以后面几个就只记一下内部实现的主要点。

LinkedBlockingQueue

用链表实现的有界阻塞队列,默认和最大长度为Integer.MAX_VALUE,队列按照先进先出的原则对元素排序。
插入元素在表尾,删除元素在表头。
LinkedBlockingQueue内部使用了两把重入锁ReentrantLock,分别用来保护插入操作和删除操作。
同样也是使用两个条件对象来实现阻塞逻辑。

PriorityBlockingQueue

支持优先级的无界阻塞队列,默认按元素自然顺序升序排列,可通过自定义类实现compareTo方法或指定构造参数Comparator来指定元素排序规则,不保证同优先级元素的顺序。
PriorityBlockingQueue内部使用的是数组对象来存储元素,且数组容量初始化为11。
其内部只使用了一把重入锁ReentrantLock,和一个条件对象Condition,只用于阻塞和唤醒删除元素操作的线程。
当插入元素时,若此时数组已满,也不需要等待,它会尝试扩容,因此插入操作也不会有阻塞的可能。
PriorityBlockingQueue内部还有一个allocationSpinLock自旋锁,用于扩容时的同步保护,在执行扩容操作前,需先自旋尝试将allocationSpinLock置为1,设置成功后才能继续往下执行。

DelayQueue
  • 支持延时获取元素的无界阻塞队列,内部使用优先级队列PriorityQueue来存储元素。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素,只有过了delay时间才能从队列中提取元素。
    DelayQueue就是往优先级队列中添加元素,然后与元素的delay(过期值)作为排序因素,从而实现先过期的元素排在队首,每次从队列中取出的元素都是最先要过期的元素。

  • 应用场景:
    1)订单,下单一段时间后没有付款就取消订单;
    2)关闭空闲连接,服务器中,关闭有一段时间空闲的客户端连接;
    3)缓存对象,超过了缓存时间,则清除;
    4)任务超时处理,处理超时未响应的请求等。

  • DelayQueue内部也是由一把重入锁ReentrantLock实现线程间的同步。

  • DelayQueue内部还有一个Thread类型的对象leader,用来记录等待队头元素的线程。使用leader可以减少不必要的等待时间。
    当多个线程调用take方法去取元素,如果当前leader非空,说明有线程在取 ,则当前线程等待;如果leader为空,则将当前线程设置为leader。
    当一个线程成为leader,它只需要等待下一个delay时间过去,但其他线程将会无限等待,leader线程必须从take()或poll(...)返回之前向其他线程发出信号,除非在这个期间,某个线程成为了leader线程。
    当队头元素被更早到期的元素替换时,leader字段都会被重置为null,并向一些等待线程(不一定是当前的leader)发出信号。所以,等待线程在等待过程中可能会得到或失去leadership。
    这整个过程的等待唤醒逻辑就是通过调用DelayQueue的类型为Condition的成员变量available的await和signal等方法实现的。

SynchronousQueue

一个不存储元素的阻塞队列,每一个put的线程会阻塞到直到有一个take线程取走元素为止,每一个take的线程会阻塞到直到有一个put的线程放入元素为止。
由于SynchronousQueue不存储元素,所以类似peek操作或者迭代器操作都是无效的。
支持公平访问队列,默认情况下线程采用非公平性策略访问队列。
SynchronousQueue只是一个对外的封装层,其真正的实现逻辑在其类型为Transferer的成员变量transferer的transfer方法中;抽象类Transferer有两个具体的实现类:TransferStack和TransferQueue,分别在非公平和公平的模式下使用。
Transferer类内部是通过自旋锁及CAS操作实现多个线程间的同步。
SynchronousQueue可当做一个传递中介,负责将生产者线程处理的数据直接传递给消费者线程,适用于传递性场景,吞吐量高。
其内部的具体实现逻辑可参考:SynchronousQueue

LinkedTransferQueue
  • 由链表结构组成的无界阻塞TransferQueue队列。相比其他阻塞队列,多了tryTransfer和transfer方法。
  • LinkedTransferQueue可以认为是LinkedBlockingQueue和SynchronousQueue的结合体。LinkedBlockingQueue内部使用了ReentrantLock,性能不高,而SynchronousQueue则无法存储元素,
  • LinkedTransferQueue实现了TransferQueue接口,该接口包含的方法如下:
public interface TransferQueue<E> extends BlockingQueue<E> {
    //如果当前有消费者正在等待接收元素,则将生产者传入的元素立即transfer给消费者,否则返回false
    boolean tryTransfer(E e);
    //如果当前有消费者正在等待接收元素,transfer方法可以把生产者传入的元素立即transfer给消费者;
    //如果没有消费者等待接收,transfer方法则将元素存放在队列的tail结点,等该元素被消费了才返回
    void transfer(E e) throws InterruptedException;
    //tryTransfer方法增加了超时时间
    boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    //若当前至少有一位消费者在等待,则返回true
    boolean hasWaitingConsumer();
    //返回当前等待的消费者线程数
    int getWaitingConsumerCount();

LinkedTransferQueue内部是通过自旋以及CAS操作来实现线程间的同步。

LinkedBlockingDeque

有链表结构组成的双向阻塞队列,即可以从队列的两端插入或移除元素。
其内部同样拥有一把重入锁ReentrantLock,两个条件对象notEmpty和notFull。整体逻辑同LinkedBlockingQueue相似。
在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀,双向阻塞队列可以运用在“工作窃取”模式中。


https://www.xamrdz.com/backend/3jg1930488.html

相关文章: