先给答案
- 不一定,要想在调用
ThreadPoolTaskExecutor.shutdown
方法的时候让线程池等待正在执行的任务执行完毕后再关闭,需要手动设置waitForTasksToCompleteOnShutdown
属性值为true。 - 这里讨论的是在基于JVM不关闭的情况下调用
ThreadPoolTaskExecutor.shutdown
方法 - 注意,这里讨论的是
ThreadPoolTaskExecutor
线程池,不同的线程池实现上有一些差别,但核心原理是一样的。
I)、ThreadPoolTaskExecutor的基本使用方式
一般情况下我们使用线程池都是利用配置类构件线程池所需要的参数,然后注入到spring容器中让spring容器帮我们管理这个线程池。在需要使用到线程池的地方我们直接使用
@Autowired
注入即可,如下:
@Configuration
public class ThreadPoolAutoconfiguration {
/**
* 这里的参数一般是从配置文件中读取,我这里就直接写死
*/
@Bean("testThreadPoolTaskExecutor")
public ThreadPoolTaskExecutor testThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor orderThreadPoolTaskExecutor = new ThreadPoolTaskExecutor();
//核心线程数量
orderThreadPoolTaskExecutor.setCorePoolSize(2);
//最大线程数量
orderThreadPoolTaskExecutor.setMaxPoolSize(5);
//队列中最大任务数
orderThreadPoolTaskExecutor.setQueueCapacity(200);
//线程名称前缀
orderThreadPoolTaskExecutor.setThreadNamePrefix("test-thread-pool");
//线程空闲后最大空闲时间
orderThreadPoolTaskExecutor.setKeepAliveSeconds(30);
//当到达最大线程数是如何处理新任务
orderThreadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return orderThreadPoolTaskExecutor;
}
}
II)、ThreadPoolTaskExecutor类的结构关系
注意这里ExecutorConfigurationSupport类也实现了InitializingBean接口,后面有用到。
从上面图中可以看到ThreadPoolTaskExecutor
类继承ExecutorConfigurationSupport
类,并且他们都持有一个ThreadPoolTaskExecutor类型的属性,那么在ThreadPoolTaskExecutor注入容器的时候这两个ThreadPoolTaskExecutor属性是在哪里创建的?这两个属性是否是指向的同一个线程池呢?看下面探究。
III)、executor和threadPoolExecutor是在哪里被赋的值
- 从上面的结构中我们虽然知道了ThreadPoolTaskExecutor持有两个属性分别是
executor
和threadPoolExecutor
,那么当ThreadPoolTaskExecutor
对象创建并被注入到容器的时候,这两个属性是在哪里赋的值呢? - 从上面的结构图中可以看到
ExecutorConfigurationSupport
类还实现了InitializingBean
接口,问题点就在这个接口的afterPropertiesSet
方法里。 - afterPropertiesSet方法在对象的属性被设置完成后会被吊起,所以看看该方法做了什么。
①、ExecutorConfigurationSupport类
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
implements BeanNameAware, InitializingBean, DisposableBean {
@Override
public void afterPropertiesSet() {
// 调用初始化方法
initialize();
}
public void initialize() {
// 设置线程名称前缀
if (!this.threadNamePrefixSet && this.beanName != null) {
setThreadNamePrefix(this.beanName + "-");
}
// 这里进行executor属性赋值操作
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}
// 这个方法交给子类去实现,所以具体的赋值操作需要看ThreadPoolTaskExecutor类中的实现
protected abstract ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);
}
②、ThreadPoolTaskExecutor类
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
// 创建一个等待队列
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
// executor属性赋值
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
Runnable decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
}
else {
// executor属性赋值
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
}
// 运行核心线程超时
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
// 将executor赋值给threadPoolExecutor,这里可以看到其实threadPoolExecutor和executor指向的是一个东西
this.threadPoolExecutor = executor;
return executor;
}
}
从上的源码中可以看到其实threadPoolExecutor和executor指向的是一个线程池对象,并且这两个属性的赋值时机是在afterPropertiesSet
中被赋值的。
IV)、ThreadPoolTaskExecutor的shutdown方法
根据上面的分析我们知道了ThreadPoolTaskExecutor
中其实就是组合了一个ThreadPoolExecutor
对象,对ThreadPoolExecutor
类的方法进行了包装和一些修改增强而已,最核心的方法其实还是在ThreadPoolExecutor中。下面看看ThreadPoolTaskExecutor的shutdown方法是如何关闭线程池的。
①、shutdown源码
从下面代码中可以看到,只有设置了
this.waitForTasksToCompleteOnShutdown
属性为true时,才会执行shutdown()
方法,否则执行的是shutdownNow()
方法,如果我们没有手动设置过this.waitForTasksToCompleteOnShutdown
属性值,那么该属性的值为false。所以说即使我们调用了ThreadPoolTaskExecutor线程池的shutdown方法也**【不一定会等待正在执行的任务和阻塞队列中的任务】**执行完毕后再完全关闭线程池(除非我们设置了this.waitForTasksToCompleteOnShutdown值为true)。
public void shutdown() {
if (this.executor != null) {
// 如果设置了waitForTasksToCompleteOnShutdown属性值为true,那么会调用executor.shutdown()方法关闭线程池
// 但是我们一般往spring容器中注入的时候或自己new线程池的时候一般没有设置这个属性,所以这里如果不设置,那么一定为false
if (this.waitForTasksToCompleteOnShutdown) {
this.executor.shutdown();
}
else {
// 调用executor.shutdownNow方法获取返回值,返回值是线程池等待队列中的所有worker任务
// 遍历取出的每一个worker任务,并且调用他们的cancel方法,取消这个任务。这里cancelRemainingTask方法其实
// 实质就是调用了线程的 interrupt()方法中断线程而已。
for (Runnable remainingTask : this.executor.shutdownNow()) {
cancelRemainingTask(remainingTask);
}
}
awaitTerminationIfNecessary(this.executor);
}
}
// 取消任务
protected void cancelRemainingTask(Runnable task) {
if (task instanceof Future) {
((Future<?>) task).cancel(true);
}
}
②、取消任务核心方法
// 取消线程任务,取消成功返回true,反之则返回false
public boolean cancel(boolean mayInterruptIfRunning) {
// 做一些检查
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
// 调用线程的interrupt()方法中断线程
t.interrupt();
} finally {
// final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
V)、源码调试验证
为了真实的还原场景,新建一个web应用来进行调试,将
ThreadPoolTaskExecutor
线程池注入到容器当中。
①、测试代码
// 注入线程池
@Autowired
@Qualifier("testThreadPoolTaskExecutor")
private ThreadPoolTaskExecutor testThreadPoolTaskExecutor;
@GetMapping("/test01")
public void testThreadPool01() throws InterruptedException {
AtomicInteger count = new AtomicInteger(0);
// 提交任务给线程池
testThreadPoolTaskExecutor.execute(() -> {
// 子线程运行时间大约18s左右
while (count.incrementAndGet() <= 8) {
System.out.println("执行executor.execute counter = " + count);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
log.error("子线程被中断,异常信息:", e);
}
}
});
// 睡眠一秒后关闭线程池,确保线程池中的任务正在运行
TimeUnit.SECONDS.sleep(1);
testThreadPoolTaskExecutor.shutdown();
}
②、调试结果
可以清楚的看到,这里确实是执行的shutdownNow方法,而不是shutdown方法!!!
VI)、总结
- 我们在使用多线程 + 线程池设计某些场景下,如果我们希望在线程池调用shutdown的时候我们的用户线程能优雅的正常退出,或等待用户线程任务执行完毕后再关闭线程池,那么要记得设置
waitForTasksToCompleteOnShutdown
属性为true,不然的话在调用ThreadPoolTaskExecutor
的shutdown方法的时候其实中断的是所有线程(也就是说不会等到等待队列中的线程任务和正在执行的线程任务执行完) - 可以对比ThreadPoolTaskExecutor和ThreadPoolExecutor这两个线程池,可以发现
ThreadPoolTaskExecutor
线程池中包装了ThreadPoolExecutor
,只是对ThreadPoolExecutor
的一些功能做了增强和扩展而已,核心的方法都是调用的ThreadPoolExecutor
中的。