当前位置: 首页>后端>正文

多线程并发总结(六)--阻塞队列和线程池

1. 阻塞队列

  • 队列:是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。
    在队列中插入一个队列元素称为入队,从队列中删除一个队列元素称为出队。因为队列只允许在一端插入,在另一端删除,所以只有最早进入队列的元素才能最先从队列中删除,故队列又称为先进先出(FIFO—first in first out)线性表。

  • 阻塞队列是支持阻塞插入和支持阻塞移除的队列:

    阻塞的插入:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
    阻塞的移除:意思是在队列为空时,获取元素的线程会等待队列变为非空。

2. 阻塞队列的用途

? 阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

2.1 为什么有生产者消费者模式

? 在并发编程中,为了平衡生产线程和消费线程的工作能力来提高程序整体处理数据的速度,因此生产者消费者模式就有了用武之地。

? 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。

? 因此就需要将生产者和消费者隔离开,并且用一个容器将产品装好。生产者生产好产品就把产品放入容器,消费者需要的时候就从容器中拿。 而这个容器就可以用队列来实现。然而有容器为空和满的两种情况导致生产者在容器满的时候无法放入队列,消费者当容器为空的时候无法从队列获取产品。所以阻塞队列就应运而生。

2.2 阻塞队列接口(BlockingQueue)

? 下面看看BlockingQueue中定义的方法区别

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(E) offer(E) put(E) offer(E, time, unit)
移除方法 remove() poll() take() poll(E, time, unit)
检查队列方法 element() peek() 不可用 不可用
  • 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queuefull")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。

  • 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。

  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。

  • 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。

2.3 有哪些阻塞队列

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。

    是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列。在操作队列的时候使用同一把显示锁

  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。

    是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。在操作队列的时候使用三把锁实现:lock全局锁、放入时队列不满锁notFull和取出时队列不为空锁notEmpty。

  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。

    PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。

    DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

  • SynchronousQueue:一个不存储元素的阻塞队列。

    SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。主要可以解决生产者消费者隔离问题。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
    • (1) transfer方法
      如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
    • (2) tryTransfer方法
      tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

    LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。

3.线程池

3.1 使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。有时候任务执行的时间还远小于线程创建和销毁的时间。

  • 第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

3.2 线程池相关的类

  • Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。

  • ExecutorService接口继承了Executor,在其上做了一些shutdown()、submit()的扩展,可以说是真正的线程池接口;

  • AbstractExecutorService抽象类实现了ExecutorService接口中的大部分方法;

  • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。

  • ScheduledExecutorService接口继承了ExecutorService接口,提供了带"周期执行"功能ExecutorService;

  • ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。

多线程并发总结(六)--阻塞队列和线程池,第1张
线程池类关系图.png

3.3 线程池的创建各个参数含义

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, /*核心线程数,最大线程数*/

? long keepAliveTime, TimeUnit unit, /*空闲线程存活时间 */

? BlockingQueue<Runnable> workQueue, /*阻塞队列 */

ThreadFactory threadFactory, /*线程工厂 */

RejectedExecutionHandler handler) /*线程池拒绝策略处理器 */

从构造方法可以看出创建线程池有五类7个参数可以配置,下面来一一阐述一下

3.3.1 corePoolSize

? 线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;

? 如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;

? 如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

3.3.2 maximumPoolSize

? 线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize

3.3.3 keepAliveTime

? 线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于corePoolSize时才有用

3.3.4 TimeUnit

keepAliveTime的时间单位

3.3.5 workQueue

? workQueue必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能。

? 一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。

  • (1)当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize。
  • (2)由于1,使用无界队列时maximumPoolSize将是一个无效参数。
  • (3)由于1和2,使用无界队列时keepAliveTime将是一个无效参数。
  • (4)更重要的,使用无界queue可能会耗尽系统资源,有界队列则有助于防止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范围。
3.3.6 threadFactory

? 创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。

? Executors静态工厂里默认的threadFactory,线程的命名规则是“pool-数字-thread-数字”。

3.3.7 RejectedExecutionHandler

? 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

  • (1) AbortPolicy:直接抛出异常,默认策略;
  • (2) CallerRunsPolicy:用调用者所在的线程来执行任务;
  • (3) DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  • (4) DiscardPolicy:直接丢弃任务;

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

3.4 线程池工作机制

    1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
    1. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
    1. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务。
    1. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
多线程并发总结(六)--阻塞队列和线程池,第2张
线程池工作机制.png

3.5 提交任务

3.5.1 execute()
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
3.5.2 submit()

? submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

3.6 关闭线程池

? 可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程

? 只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);  // 将线程池状态设置成SHUTDOWN
        interruptIdleWorkers();    // 中断空闲线程
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);  // 将线程池状态设置成STOP
            interruptWorkers();     // 中断所有线程
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

3.7 如何合理配置线程池

? 要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
  • 任务的优先级:高、中和低。
  • 任务的执行时间:长、中和短。
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

? 针对CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。(可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数)

? 针对混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。

? 针对优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行

? 执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行

测试用例代码见: git@github.com:oujie123/UnderstandingOfThread.git


https://www.xamrdz.com/backend/3aa1942683.html

相关文章: