当前位置: 首页>后端>正文

线程池任务执行分析

背景

我们自己创建的线程其只能start()执行一次,一旦执行完毕或被中断,即走terminated终止状态结束线程了,你难道没有这样的疑问为何线程池中的线程却可以一直执行?核心及非核心线程是如何实现的呢?

线程池回顾

  1. 如何创建一个线程池
//实际调用对象
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize:核心线程池
  • maxinumPoolSize:表示最大允许被创建的线程数
  • keepAliveTime,unit: 非核心线程数的存活时间
  • workQueue: 用来暂时保存任务的工作队列
  • threadFactory:用来创建线程,可定义名称及优先级
  • handler: 任务拒绝策略
    1. 当调用shutdown 等方法关闭线程池后,这时候即使线程池内部还有没执行完的任务正在执行,但是由于线程池已经关闭,我们再继续想线程池提交任务就会遭到拒绝
    2. 当达到最大线程数,线程池已经没有能力继续处理新提交的任务时,这是也就拒绝。
  1. 参考此分享文章:深入理解线程池线程池任务执行流程如下:
    线程池任务执行分析,第1张
  2. 对应线程池运行流程图如下:


    线程池任务执行分析,第2张

任务调度

  • 当用户提交一个任务会通过Executor.execute()方法执行,他的步骤上方已经总结过了,我们直接看新增线程执行任务的addWorker(Runnable firstTask, boolean core)方法,参数Runnable:运行任务,core是否为核心线程
 private final HashSet<Worker> workers = new HashSet<>(); //线程池中的所有工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
//根据当前状态,判断是否添加成功,上方执行方法中的addWorker两个参数firstTask = null ,core = true /false 具体分析
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c); //获取运行状态

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && //状态 > shutDown 表示此时已经不再接受任务
            //shutdown状态不接受新任务,但可以执行已经加入队列中的任务,所以当进入shutdown状态,且传进来的任务为null时,并且任务队列不为null时,是允许添加新线程的,把这个条件取反就是不允许
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty())) 
                return false;

            for (;;) { //使用CAS操作避免加锁
                int wc = workerCountOf(c); //获取工作线程
                if (wc >= CAPACITY || 
                    wc >= (core corePoolSize : maximumPoolSize))
                    return false; //大于线程最大容量2的29次方量(所以newCacheExecutor并不能得到Integer.MAX_Value的),或者大于最大允许线程量则不能添加啦
                if (compareAndIncrementWorkerCount(c)) //可添加就CAS操作线程数+1,成功说明可添加
                    break retry; //break跳出retry对应的循环,执行循环后面的添加worker逻辑
                c = ctl.get();  // Re-read ctl 重新读取状态
                if (runStateOf(c) != rs) 
                    continue retry; //状态改变了,跳到外层循环继续重新执行循环
                // else CAS failed due to workerCount change; retry inner loop
                //在内存层循环中不停的尝试CAS操作增加线程数
            }
        }
        //找了上方break retry可以正常使用CAS新增线程数
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); //通过Worker包装runnable任务,稍后我们分析
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock(); //加锁
                try {
                
                    int rs = runStateOf(ctl.get());
                    //如果线程池状态rs < Shutdown即只能是Running
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) { //或者shutDown状态但是没有新任务
                        if (t.isAlive()) // 线程已经启动,并且当前没有任何异常的话,则是true,否则为false
                            throw new IllegalThreadStateException(); //我还没有启动呢
                        workers.add(w); //正常添加到线程池中workers工作线程
                        int s = workers.size();
                        if (s > largestPoolSize) //largestPoolSize:记录着线程池中出现过最大线程数量
                            largestPoolSize = s;
                        workerAdded = true; //可以正常工作的标记
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) { //如果正常工作,则开启线程任务
                    t.start();
                    workerStarted = true; //开始工作标记
                }
            }
        } finally {
            if (! workerStarted) //该任务没有开始,则添加到失败
                addWorkerFailed(w); 
        }
        return workerStarted;
    }
  1. 真正的执行工作交给了Worker(firstTask)类完成的
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable //实现了Runnable接口,因此t.start()执行的就是worker的run方法啊
         {
     
        final Thread thread;
       
        Runnable firstTask;
        
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);  //创建thread(this:Worker) ,则t.start()调用worker的run,同时原来的Runnable被封装为Worker的属性firstTask
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
        
    //getThreadFactory即为ThreadPoolExecutor创建thread工厂(实现ThreadFactory)可修改Thread名称,优先级等操作实现的
    public ThreadFactory getThreadFactory() {
        return threadFactory;
    }
  • 创建线程交由线程池设定的ThreadFactory
  • 当线程执行thread.start()其实就是执行worker.run() 调用runWorker
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask; //这个就是我们执行线程池executor.execute()方法时候的runnable
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        //如果task不为null,并且从workQueue中获取任务不为null,则会一直执行下去
            while (task != null || (task = getTask()) != null) { //task是需要执行的任务,不一定是刚刚添加的那个了,这样其实worker线程并没有完成工作,自然也就不会销毁了
                w.lock();
               
                if ((runStateAtLeast(ctl.get(), STOP) || //检查线程状态,若线程池处于中断状态,调用interrupt将线程中断
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt(); //中断线程
                try {
                    beforeExecute(wt, task); //可以在任务真正执行之前做点啥,空实现
                    Throwable thrown = null;
                    try {
                        task.run(); //执行execute()方法中的run方法,在t.start()线程内,这只是一个方法执行哈!
                    } catch (Throwable x) {
                    } finally {
                        afterExecute(task, thrown); //线程之后可以做啥,空实现
                    }
                } finally {
                    task = null;
                    w.completedTasks++; //该线程执行完成任务+1
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
  • runWorker执行过程如下:
    1. while循环调取getTask()获取task,若不为null,则一直执行下去
    2. 检查线程是否被中断
    3. beforeExecute:可以在任务真正执行之前做点啥,空实现
    4. 执行task.run() 即执行execute()方法中的run方法,在t.start()线程内,这只是一个方法执行哦!
    5. afterExecute:任务执行之后可以做啥,空实现
  1. 重点逻辑是while循环,其是线程池中线程能够一直运行的原因,当我们第一次创建worker并执行任务后,并没有结束线程,而是通过while循环调用getTask()方法从阻塞队列中去task继续调用task.run()执行任务,注意这里run()只是一个普通的方法调用,并不是start()哦!运行线程就是Worker线程中
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 对应ShutDown虽然不添加任务,但是可以执行阻塞队列中的,Stop以后就不能子在执行任务了
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null; //返回null,停止执行任务
            }

            int wc = workerCountOf(c);
            // allowCoreThreadTimeOut 表示是否允许核心线程超时销毁,默认false不销毁.若设置成true,核心线程也会销毁的
            //只有正在工作的线程数大于核心线程数才会为true,佛足额返回false
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //
        
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            //如果timed为true(wx > 核心线程),通过poll取任务,如果为false,通过take取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //这两个参数就是创建线程池中保存时间量
                    workQueue.take();
                if (r != null) //如果有任务就退出死循环,返回任务交给上方的worker线程运行
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
  • 通过以上代码分析:根据wc记录已运行线程数与核心线程数比较
    1. 若wc > 核心线程数,则通过poll()从队列中取任务
    2. 若wc <= 核心线程数,则通过take()取任务
  • 那么poll()与take()区别是什么呢?workQueue创建线程池时设置的阻塞队列,即实现BlockingQueue接口常用的ArrayBlockingQueue,LinkedBlockingQueue,PriorityQueue,SynchronizedQueue;
  1. 以ArrayBlockingQueue为例查看其poll和take方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) { //是否队列中的元素个数为0,说明空队列
                if (nanos <= 0L) //等待时间到了,队列中还未有数据加入,则返回null,
                    return null;
                /**
                * 调用该方法的前提是,当前线程已经成功获得与该条件对象绑定的重入锁,否* * 则调用该方法时会抛出IllegalMonitorStateException。
                * nanosTimeout指定该方法等待信号的的最大时间(单位为纳秒)。若指定时间* * 内收到signal()或signalALL()则返回nanosTimeout减去已经等待的时间;
                *若指定时间内有其它线程中断该线程,则抛出InterruptedException并清除当前线程的打断状态;
                * 若指定时间内未收到通知,则返回0或负数。 
                */
                nanos = notEmpty.awaitNanos(nanos);  //每次signal唤醒重新等待
            }
            return dequeue(); //如果有元素取出
        } finally {
            lock.unlock();
        }
    }
//如果poll超时返回null,则回调到
f ((wc > maximumPoolSize || (timed && timedOut)) //true
                && (wc > 1 || workQueue.isEmpty())) { //队列也是空的,走进去
                if (compareAndDecrementWorkerCount(c)) //CAS可以减少c的个数
                    return null; //返回了null,该线程不能再上方的while循环中继续获取就结束线程啦,非核心线程就over啦,嘿嘿!
                continue;
            }
  • 分析以上流程
    1. poll()中参数表示超时等待时间
    2. 若超过该时间阻塞队列依然为空,则返回null,退出while循环,线程执行结束,非核心线程被销毁
    3. 线程获取lock锁,执行awaitNanos最大等待nanos秒之内若收到signal()或signalALL()被唤醒,执行dequeue去阻塞队列中取任务执行
  1. take方法执行核心线程
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)  //不能使用if,避免虚假唤醒
                notEmpty.await();  //一旦count队列为空,会一致await阻塞在这里的,直到workQueue.offer()添加元素时唤醒
            return dequeue(); //取出队头元素
        } finally {
            lock.unlock();
        }
}
  • await不设置超时等待时间,notEmpty.await()一直阻塞,那这个阻塞又是何时被唤醒的呢?
  1. 当然是下一个任务达到的时候也就是调用execute的时候添加一个新的任务Task;
//这个就是调用当前核心线程已经满了,则添加到阻塞队列中,
//刚刚上方的核心线程在等待任务,添加以后肯定就调用notEmpty.signal()唤醒等待线程取任务执行啦
if (isRunning(c) && workQueue.offer(command)) 
  • 我们来验证一下我们的想法:workQueue就是选择的队列,这里看ArrayBlockingQueue,当然对于其他队列也是相同的
 public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock; //获取锁,跟上方加锁时同一把锁
        lock.lock();
        try {
            if (count == items.length)
                return false; //如果当前队列已满,不能再加入了false
            else {
                enqueue(e); //正常添加到队列中
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
   
//enqueue添加到数组循环队列中后调用notEmpty.signal()唤醒一个await线程取任务开始工作啦!
private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
  • 通过生成-消费者模式,将execute加入队列的任务通知等待的核心线程取阻塞队列中的任务开始执行!

总结

  1. 线程池核心线程一直在运行,不会终止的原因是由于使用while循环轮训阻塞队列中是否存在任务,若没有CPU不会空转而是调用await()等待函数,当阻塞队列中添加任务时会被唤醒,去取任务继续执行while循环;
  2. 核心和非核心是由等待函数决定的,设置是否有等待超时时间,若超时后返回null退出while循环,线程执行结束被销毁;

参考文档

  1. 深入理解线程池
  2. ThreadPoolExecutor线程池源码和典型问题

https://www.xamrdz.com/backend/3nk1926067.html

相关文章: