当前位置: 首页>编程语言>正文

spring 延迟线程池 spring线程池shutdown后开启


先给答案

  • 不一定,要想在调用ThreadPoolTaskExecutor.shutdown方法的时候让线程池等待正在执行的任务执行完毕后再关闭,需要手动设置waitForTasksToCompleteOnShutdown属性值为true。
  • 这里讨论的是在基于JVM不关闭的情况下调用ThreadPoolTaskExecutor.shutdown方法
  • 注意,这里讨论的是ThreadPoolTaskExecutor线程池,不同的线程池实现上有一些差别,但核心原理是一样的。

I)、ThreadPoolTaskExecutor的基本使用方式

一般情况下我们使用线程池都是利用配置类构件线程池所需要的参数,然后注入到spring容器中让spring容器帮我们管理这个线程池。在需要使用到线程池的地方我们直接使用@Autowired注入即可,如下:

@Configuration
public class ThreadPoolAutoconfiguration {

   	/**
   	 * 这里的参数一般是从配置文件中读取,我这里就直接写死
   	 */
    @Bean("testThreadPoolTaskExecutor")
    public ThreadPoolTaskExecutor testThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor orderThreadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        //核心线程数量
        orderThreadPoolTaskExecutor.setCorePoolSize(2);
        //最大线程数量
        orderThreadPoolTaskExecutor.setMaxPoolSize(5);
        //队列中最大任务数
        orderThreadPoolTaskExecutor.setQueueCapacity(200);
        //线程名称前缀
        orderThreadPoolTaskExecutor.setThreadNamePrefix("test-thread-pool");
        //线程空闲后最大空闲时间
        orderThreadPoolTaskExecutor.setKeepAliveSeconds(30);
        //当到达最大线程数是如何处理新任务
        orderThreadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return orderThreadPoolTaskExecutor;
    }

}

II)、ThreadPoolTaskExecutor类的结构关系

注意这里ExecutorConfigurationSupport类也实现了InitializingBean接口,后面有用到。

spring 延迟线程池 spring线程池shutdown后开启,spring 延迟线程池 spring线程池shutdown后开启_java,第1张

从上面图中可以看到ThreadPoolTaskExecutor类继承ExecutorConfigurationSupport类,并且他们都持有一个ThreadPoolTaskExecutor类型的属性,那么在ThreadPoolTaskExecutor注入容器的时候这两个ThreadPoolTaskExecutor属性是在哪里创建的?这两个属性是否是指向的同一个线程池呢?看下面探究。


III)、executor和threadPoolExecutor是在哪里被赋的值

  • 从上面的结构中我们虽然知道了ThreadPoolTaskExecutor持有两个属性分别是executorthreadPoolExecutor,那么当ThreadPoolTaskExecutor对象创建并被注入到容器的时候,这两个属性是在哪里赋的值呢?
  • 从上面的结构图中可以看到ExecutorConfigurationSupport类还实现了InitializingBean接口,问题点就在这个接口的afterPropertiesSet方法里。
  • afterPropertiesSet方法在对象的属性被设置完成后会被吊起,所以看看该方法做了什么。
①、ExecutorConfigurationSupport类
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
		implements BeanNameAware, InitializingBean, DisposableBean {
  
    @Override
    public void afterPropertiesSet() {
      // 调用初始化方法
       initialize();
    }

    public void initialize() {
       // 设置线程名称前缀
       if (!this.threadNamePrefixSet && this.beanName != null) {
          setThreadNamePrefix(this.beanName + "-");
       }
       // 这里进行executor属性赋值操作
       this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
    }

    // 这个方法交给子类去实现,所以具体的赋值操作需要看ThreadPoolTaskExecutor类中的实现
    protected abstract ExecutorService initializeExecutor(
          ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);
}
②、ThreadPoolTaskExecutor类
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
      implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {

	@Override
	protected ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

        // 创建一个等待队列
		BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

		ThreadPoolExecutor executor;
		if (this.taskDecorator != null) {
            // executor属性赋值
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler) {
				@Override
				public void execute(Runnable command) {
					Runnable decorated = taskDecorator.decorate(command);
					if (decorated != command) {
						decoratedTaskMap.put(decorated, command);
					}
					super.execute(decorated);
				}
			};
		}
		else {
            // executor属性赋值
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler);

		}
		
        // 运行核心线程超时
		if (this.allowCoreThreadTimeOut) {
			executor.allowCoreThreadTimeOut(true);
		}

        // 将executor赋值给threadPoolExecutor,这里可以看到其实threadPoolExecutor和executor指向的是一个东西
		this.threadPoolExecutor = executor;
		return executor;
	}
}

从上的源码中可以看到其实threadPoolExecutor和executor指向的是一个线程池对象,并且这两个属性的赋值时机是在afterPropertiesSet中被赋值的。


IV)、ThreadPoolTaskExecutor的shutdown方法

根据上面的分析我们知道了ThreadPoolTaskExecutor中其实就是组合了一个ThreadPoolExecutor对象,对ThreadPoolExecutor类的方法进行了包装和一些修改增强而已,最核心的方法其实还是在ThreadPoolExecutor中。下面看看ThreadPoolTaskExecutor的shutdown方法是如何关闭线程池的。

①、shutdown源码

从下面代码中可以看到,只有设置了this.waitForTasksToCompleteOnShutdown属性为true时,才会执行shutdown()方法,否则执行的是shutdownNow()方法,如果我们没有手动设置过this.waitForTasksToCompleteOnShutdown属性值,那么该属性的值为false。

所以说即使我们调用了ThreadPoolTaskExecutor线程池的shutdown方法也**【不一定会等待正在执行的任务和阻塞队列中的任务】**执行完毕后再完全关闭线程池(除非我们设置了this.waitForTasksToCompleteOnShutdown值为true)

public void shutdown() {
   
   if (this.executor != null) {
     	// 如果设置了waitForTasksToCompleteOnShutdown属性值为true,那么会调用executor.shutdown()方法关闭线程池
      // 但是我们一般往spring容器中注入的时候或自己new线程池的时候一般没有设置这个属性,所以这里如果不设置,那么一定为false
      if (this.waitForTasksToCompleteOnShutdown) {
         this.executor.shutdown();
      }
      else {
         // 调用executor.shutdownNow方法获取返回值,返回值是线程池等待队列中的所有worker任务
         // 遍历取出的每一个worker任务,并且调用他们的cancel方法,取消这个任务。这里cancelRemainingTask方法其实
         // 实质就是调用了线程的 interrupt()方法中断线程而已。
         for (Runnable remainingTask : this.executor.shutdownNow()) {
            cancelRemainingTask(remainingTask);
         }
      }
      awaitTerminationIfNecessary(this.executor);
   }
}

// 取消任务
protected void cancelRemainingTask(Runnable task) {
		if (task instanceof Future) {
			((Future<?>) task).cancel(true);
		}
}
②、取消任务核心方法
// 取消线程任务,取消成功返回true,反之则返回false
public boolean cancel(boolean mayInterruptIfRunning) {
  	// 做一些检查
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {   
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    // 调用线程的interrupt()方法中断线程
                    t.interrupt();
            } finally { 
                // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

V)、源码调试验证

为了真实的还原场景,新建一个web应用来进行调试,将ThreadPoolTaskExecutor线程池注入到容器当中。

①、测试代码
// 注入线程池
@Autowired
@Qualifier("testThreadPoolTaskExecutor")
private ThreadPoolTaskExecutor testThreadPoolTaskExecutor;

@GetMapping("/test01")
public void testThreadPool01() throws InterruptedException {

    AtomicInteger count = new AtomicInteger(0);
  	
  	// 提交任务给线程池
    testThreadPoolTaskExecutor.execute(() -> {
        // 子线程运行时间大约18s左右
        while (count.incrementAndGet() <= 8) {
            System.out.println("执行executor.execute counter = " + count);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                log.error("子线程被中断,异常信息:", e);
            }
        }
    });

    // 睡眠一秒后关闭线程池,确保线程池中的任务正在运行
    TimeUnit.SECONDS.sleep(1);
    testThreadPoolTaskExecutor.shutdown();
}
②、调试结果

可以清楚的看到,这里确实是执行的shutdownNow方法,而不是shutdown方法!!!

spring 延迟线程池 spring线程池shutdown后开启,spring 延迟线程池 spring线程池shutdown后开启_赋值_02,第2张


VI)、总结

  • 我们在使用多线程 + 线程池设计某些场景下,如果我们希望在线程池调用shutdown的时候我们的用户线程能优雅的正常退出,或等待用户线程任务执行完毕后再关闭线程池,那么要记得设置waitForTasksToCompleteOnShutdown属性为true,不然的话在调用ThreadPoolTaskExecutor的shutdown方法的时候其实中断的是所有线程(也就是说不会等到等待队列中的线程任务和正在执行的线程任务执行完)
  • 可以对比ThreadPoolTaskExecutor和ThreadPoolExecutor这两个线程池,可以发现ThreadPoolTaskExecutor线程池中包装了ThreadPoolExecutor,只是对ThreadPoolExecutor的一些功能做了增强和扩展而已,核心的方法都是调用的ThreadPoolExecutor中的。



https://www.xamrdz.com/lan/5zg1937210.html

相关文章: