当前位置: 首页>大数据>正文

异步注解@Async自定义线程池

一、自定义线程池

<!--我这里使用版本 14.0.1-->

<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>14.0.1</version>
  <type>bundle</type>
</dependency>

<!--
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.0.1-jre</version>
</dependency>
-->

二、创建异步配置类AsyncTaskConfig

package cn.deepsec.core.async;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;


import java.util.concurrent.*;

/**
 * @Author ds
 * @Date 4/4/23
 */

/**
 * @EnableAsync 支持异步操作
 */
@EnableAsync
@Configuration
public class AsyncTaskConfig {

    @Bean("async-executor-spring")
    public Executor AsyncExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数
        executor.setCorePoolSize(10);
        // 线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        executor.setMaxPoolSize(50);
        // 缓存队列
        executor.setQueueCapacity(20);
        // 空闲时间,当超过了核心线程数之外的线程在空闲时间到达之后会被销毁
        executor.setKeepAliveSeconds(200);
        // 异步方法内部线程名称
        executor.setThreadNamePrefix("async-executor-spring");

        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    /**
     *  1、corePoolSize:核心线程数
     *         * 核心线程会一直存活,及时没有任务需要执行
     *         * 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
     *         * 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
     *
     *     2、queueCapacity:任务队列容量(阻塞队列)
     *         * 当核心线程数达到最大时,新任务会放在队列中排队等待执行
     *
     *     3、maxPoolSize:最大线程数
     *         * 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
     *         * 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
     *
     *     4、 keepAliveTime:线程空闲时间
     *         * 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
     *         * 如果allowCoreThreadTimeout=true,则会直到线程数量=0
     *
     *     5、allowCoreThreadTimeout:允许核心线程超时
     *     6、rejectedExecutionHandler:任务拒绝处理器
     *         * 两种情况会拒绝处理任务:
     *             - 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
     *             - 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
     *         * 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常
     *         * ThreadPoolExecutor类有几个内部实现类来处理这类情况:
     *             - AbortPolicy 丢弃任务,抛运行时异常
     *             - CallerRunsPolicy 执行任务
     *             - DiscardPolicy 忽视,什么都不会发生
     *             - DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
     *         * 实现RejectedExecutionHandler接口,可自定义处理器
     */

    /**
     * com.google.guava中的线程池
     * @return
     */
    @Bean("async-executor-guava")
    public Executor GuavaAsyncExecutor() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-executor-guava").build();
        // 当前可用cpu数
        //最佳线程数可通过计算得出http://ifeve.com/how-to-calculate-threadpool-size/
        int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;
        /**
         * int corePoolSize,
         * int maximumPoolSize,
         * long keepAliveTime,
         * TimeUnit unit,
         * BlockingQueue<Runnable> workQueue,
         * ThreadFactory threadFactory
         */
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(curSystemThreads, 100,
                200, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(), threadFactory);
        //允许核心线程超时
        threadPool.allowsCoreThreadTimeOut();
        return threadPool;
    }

}

三、创建controller类

package cn.deepsec.core.async.controller;

import cn.deepsec.core.async.service.AsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;

/**
 * @Author ds
 * @Date 4/4/23
 */
@Controller
public class AsyncController {

    @Autowired
    private AsyncService asyncService;

    @PostMapping("/AsyncMethond")
    public void AsyncMethond(){
            asyncService.asyncMethod1();
            asyncService.asyncMethod2();
    }

}

四、创建service类

package cn.deepsec.core.async.service;

/**
 * @Author ds
 * @Date 4/4/23
 */
public interface AsyncService {

    void asyncMethod1();

    void asyncMethod2();

}

五、创建serviceimpl

package cn.deepsec.core.async.service.impl;

import cn.deepsec.core.async.service.AsyncService;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.security.SecureRandom;

/**
 * @Author ds
 * @Date 4/4/23
 */

@Service
public class AsyncServiceImpl implements AsyncService {

    private static SecureRandom random = new SecureRandom();

    @Override
    @Async("async-executor-guava")
    public void asyncMethod1(){
        // TODO 这里是你的异步方法,例如下面:
        System.out.println("调用异步方法1");
        long start = System.currentTimeMillis();
        try {
            Thread.sleep(random.nextInt(10000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();
        System.out.println("完成异步方法1,耗时:" + (end - start) + "毫秒");
    }

    @Override
    @Async("async-executor-guava")
    public void asyncMethod2(){
        // TODO 这里是你的异步方法,例如下面:
        System.out.println("调用异步方法2");
        long start = System.currentTimeMillis();
        try {
            Thread.sleep(random.nextInt(10000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();
        System.out.println("完成异步方法2,耗时:" + (end - start) + "毫秒");
    }

}

# 连续访问三次,输出结果如下:
调用异步方法1
调用异步方法2
调用异步方法1
调用异步方法2
完成异步方法2,耗时:86毫秒
调用异步方法1
调用异步方法2
完成异步方法2,耗时:917毫秒
完成异步方法2,耗时:6693毫秒
完成异步方法1,耗时:2006毫秒
完成异步方法1,耗时:9865毫秒
完成异步方法1,耗时:6932毫秒
参考:
https://blog.csdn.net/wang20010104/article/details/127337665
https://www.cnblogs.com/badfisher/p/14805134.html

https://www.xamrdz.com/bigdata/7an1995578.html

相关文章: