问题背景
最近开发的项目需要多任务并行运行,然后每个任务需要多线程运行,要求如下:
- 多任务并行,一个任务可设置线程数
- 限制整个项目开启任务的线程数500,大于500则等待线程执行完毕,再进行创建线程
- 大于500是等待线程结束,最多等待10分钟
注意事项:
- 可以通过复制文章的代码自己创建工程,也可以下载源码进行参考
项目创建
1 引入pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.7</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.yg</groupId>
<artifactId>taskFrame</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>taskFrame</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
2 启动类
package com.yg.taskframe;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TaskFrameApplication {
public static void main(String[] args) {
SpringApplication.run(TaskFrameApplication.class, args);
}
}
3 Runnable线程类
package com.yg.taskframe.core;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
@Slf4j
public class CreateRunnable implements Runnable {
private CountDownLatch countDownLatch;
public CreateRunnable(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("CreateRunnable error: ", e);
}
countDownLatch.countDown();
}
}
4 线程池管理服务类
package com.yg.taskframe.core;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.CountDownLatch;
/**
* @Author suolong
* @Date 2022/3/8 14:46
* @Version 1.5
*/
@Slf4j
public class AsynThreadService {
public ThreadPoolTaskExecutor asyncServiceExecutor;
public CountDownLatch countDownLatch;
public AsynThreadService(ThreadPoolTaskExecutor asyncServiceExecutor, CountDownLatch countDownLatch) {
this.asyncServiceExecutor = asyncServiceExecutor;
this.countDownLatch = countDownLatch;
}
// public void submit(ComputeDTO computeDTO, String pathCode, List<QueryRequest.ParamItem> param) {
// asyncServiceExecutor.submit(new CreateRunnable(countDownLatch));
// }
public void waitComplete() {
try {
//等待当前线程池对象的所有countDownLatch都已经释放,说明线程都执行完毕了,然后关闭线程池
log.info("countDownLatch remain {}", this.countDownLatch.getCount());
this.countDownLatch.await();
log.info("Close asynThreadService");
asyncServiceExecutor.shutdown();
} catch (Exception e) {
log.error("AsyncServiceExecutor shutdown error: ", e);
}
}
}
5 线程池和 countDownLatch 创建类
package com.yg.taskframe.core;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author suolong
* @Date 2022/3/8 14:43
* @Version 2.0
*/
@Slf4j
public class AsynThreadPool {
private static AtomicInteger curCount = new AtomicInteger(0);
private static Integer maxCount = 500;
public static AsynThreadService createCountDownLatch(Long taskId, int threads) throws InterruptedException {
CountDownLatch = new CountDownLatch(threads);
//创建新的线程池
ThreadPoolTaskExecutor asyncServiceExecutor = creatThreadPool(taskId, threads);
//超过了CountDownLatch总数,每次等待1一分钟,总共等待十次10分钟
if (curCount.get() >= maxCount) {
int n = 10;
while (n-- > 0) {
if (curCount.get() >= maxCount) {
log.info("CountDownLatch count more than 500, loading over for other threads");
//每一分钟检查一次,大于总的线程数,进行一分钟等待,然后继续查询是否大于总的线程数,一共等待10次
TimeUnit.SECONDS.sleep(60);
} else {
//一旦发现 curCount.get() < maxCount,立马结束循环
n = 0;
}
if (n == 0) {
log.warn("Part of asynThreadService aren't finish");
}
}
}
//原子获取累加
curCount.addAndGet(threads);
AsynThreadService asynThreadService = new AsynThreadService(asyncServiceExecutor, countDownLatch);
return asynThreadService;
}
//释放countDownLatch
public static void free(int count) {
log.info("CountDownLatch : {}, count: {}", curCount.get(), count);
curCount.addAndGet(-count);
log.info("CountDownLatch remain: {}", curCount.get());
}
//创建线程池
public static ThreadPoolTaskExecutor creatThreadPool(Long taskId, int threads) {
ThreadPoolTaskExecutor asyncServiceExecutor = null;
try {
asyncServiceExecutor = new ThreadPoolTaskExecutor();
// 线程池维护线程的最少数量
// asyncServiceExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors() + 1);
asyncServiceExecutor.setCorePoolSize(threads);
// 线程池维护线程的最大数量
asyncServiceExecutor.setMaxPoolSize(threads + 1);
// 线程池所使用的缓冲队列
asyncServiceExecutor.setQueueCapacity(2000);
// asyncServiceExecutor.prefersShortLivedTasks();
asyncServiceExecutor.setThreadNamePrefix("TaskId" + taskId.toString() + "-Thread-");
asyncServiceExecutor.setBeanName("TaskId" + taskId);
// asyncServiceExecutor.setKeepAliveSeconds(20);
//调用者执行
// asyncServiceExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
asyncServiceExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 线程全部结束才关闭线程池
asyncServiceExecutor.setWaitForTasksToCompleteOnShutdown(true);
// 如果超过60s还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
asyncServiceExecutor.setAwaitTerminationSeconds(30);
asyncServiceExecutor.initialize();
} catch (Exception e) {
log.error("Create ThreadPoolTaskExecutor failed", e);
}
return asyncServiceExecutor;
}
}
6 任务创建类
package com.yg.taskframe.core;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* @Author suolong
* @Date 2022/3/8 14:52
* @Version 2.0
*/
@Slf4j
@AllArgsConstructor
@NoArgsConstructor
public class ParallelTask {
private int threads;
private String url;
public ParallelTask nextTask;
public ParallelTask prevTask;
public ParallelTask(int threads) {
this.threads = threads;
}
public AsynThreadService startTask(Long taskId) throws InterruptedException {
//任务里面线程大于1,就创建线程池,创建线程池要先判断所有线程池的总数是否大于500,
AsynThreadService asynThreadService = AsynThreadPool.createCountDownLatch(taskId, threads);
return asynThreadService;
}
}
7 启动一个任务入口类
package com.yg.taskframe.service;
import com.yg.taskframe.core.AsynThreadPool;
import com.yg.taskframe.core.AsynThreadService;
import com.yg.taskframe.core.ParallelTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
/**
* @Author suolong
* @Date 2022/5/6 14:36
* @Version 2.0
*/
@Slf4j
@Service
public class TaskService {
/**
* @Caption 每开启一个任务就可以创建一个线程池,这就是多任务并行执行,使用countDownLatch可以限制总共的线程数,超过设置的countDownLatch,可以等待前面的任务执行完成
* @Param
* @Return
*/
public void startTask(Long taskId, int threads) throws InterruptedException {
log.info("Begin task");
//可以通过配置设置线程数
// int threads = threads;
if (threads == 0) {
threads = 1;
}
log.info("threads: {}", threads);
// Random random = new Random();
// //任务id可以通过配置下发
// Long taskId = random.nextInt(100) + 500L;
//初始化一个任务的线程数
ParallelTask parallelTask = new ParallelTask(threads);
//创建单个任务线程为threads的线程池,并设置任务名为taskId组合
AsynThreadService asynThreadService = parallelTask.startTask(taskId);
//单线程查询doquery
List<String> params = new ArrayList<>();
params.add("1");
submit(params, asynThreadService);
//线程执行完成,关闭线程池,减去countDownLatch
int countDownLatchCount = (int) asynThreadService.countDownLatch.getCount();
while (countDownLatchCount-- > 0) {
asynThreadService.countDownLatch.countDown();
}
asynThreadService.waitComplete();
AsynThreadPool.free(threads);
}
public void submit(List<String> params, AsynThreadService asynThreadService) {
//执行线程
log.info("Begin 0004 thread");
Future<String> futureResult = asynThreadService.asyncServiceExecutor.submit(() ->
submitSingle(params, asynThreadService.countDownLatch)
);
}
/**
* @Caption
* @Param
* @Return 异步线程返回结果
*/
public String submitSingle(List<String> params, CountDownLatch countDownLatch) {
log.info("Submit single thread");
try {
log.info("params size: {}", params.size());
long start = System.currentTimeMillis();
//可以远程调用其他服务
String resultVOlist = doQuery(params);
log.info("resultVOlist:{}", resultVOlist);
return resultVOlist;
} catch (Exception e) {
log.error("dataxQueryController not found error", e);
return "dataxQueryController not found error";
} finally {
log.info("Close one countDownLatch");
countDownLatch.countDown();
}
}
public String doQuery(List<String> params) {
if (params.size() < 10) {
return "success";
}
return "fail";
}
}
8 使用接口添加任务,通过需要设置每个线程池的线程数和任务ID
package com.yg.taskframe.controller;
import com.yg.taskframe.service.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author suolong
* @Date 2022/5/6 15:07
* @Version 2.0
*/
@Slf4j
@RestController
public class TaskController {
@Autowired
TaskService taskService;
@PostMapping("/task")
public String addTask(@RequestParam Long taskId, @RequestParam int threads) {
try {
taskService.startTask(taskId, threads);
return "success";
} catch (Exception e) {
log.error("Exception", e);
return "fail";
}
}
}
9 整个项目目录
总结
- 通过模板可以进行多任务并行运行,并且可以控制总线程数
作为程序员第 122 篇文章,每次写一句歌词记录一下,看看人生有几首歌的时间,wahahaha ...