JAVA线程池实现02-提交任务

img

submit提交任务

/**
 * 提交一个带有返回值的任务
 * @param task 任务
 * @param result 结果
 * @param <T> 泛型
 * @return Future
 */
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    //调用execute进行执行
    execute(ftask);
    return ftask;
}

/**
 * 创建一个FutureTask
 * @param runnable 运行的任务
 * @param value 返回结果
 * @param <T> 泛型
 * @return FutureTask
 */
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

流程步骤如下

  1. 调用submit方法,传入Runnable或者Callable对象
  2. 判断传入的对象是否为null,为null则抛出异常,不为null继续流程
  3. 将传入的对象转换为RunnableFuture对象
  4. 执行execute方法,传入RunnableFuture对象
  5. 返回RunnableFuture对象

img

execute 执行线程

/**
    * 在未来执行任务
    * 任务将新建或者现有的线程池中执行
    * 如果线程池关闭或者线程池满了将执行拒绝策略
    * @param command
    */
   public void execute(Runnable command) {
       if (command == null)
           throw new NullPointerException();

       int c = ctl.get();
       /**
        * 1、运行线程数少于核心线程数,则调用addWorker启动一个新的线程
        *   需要检查否应该添加线程
        */
       if (workerCountOf(c) < corePoolSize) {
           //添加线程
           if (addWorker(command, true)) {
               return;
           }
           c = ctl.get();
       }
       /**
        * 运行线程数量大于核心线程数量时,上面的if分支针对大于corePoolSize,并且缓存队列加入任务操作成功的情况。
        *  运行中并且将任务加入缓冲队列成功,正常来说这样已经完成了处理逻辑。
        *  但是为了保险起见,增加了状态出现异常的确认判断,如果状态出现异常会继续remove操作,如果执行true,则按照拒绝处理策略驳回任务;
        */
       //线程运行状态,并且添加进队列成功
       if (isRunning(c) && workQueue.offer(command)) {
           int recheck = ctl.get();
           //线程未运行并且删除成功
           if (! isRunning(recheck) && remove(command))
               //拒绝任务
               reject(command);
           //线程正在运行中
           else if (workerCountOf(recheck) == 0)
               //添加任务
               addWorker(null, false);
       }
       /**
        * 这里针对运行线程数量超过了corePoolSize,并且缓存队列也已经放满的情况。
        *  注意第二个参数是false,可以在下面addWorker方法看到,就是针对线程池最大线程数量maximumPoolSize的判断。
        */
       else if (!addWorker(command, false))
           //拒绝任务
           reject(command);
   }

其实从上面代码注释中可以看出就三个判断,

  1. 核心线程数是否已满
  2. 队列是否已满
  3. 线程池是否已满

img

  1. 调用execute方法,传入Runable对象
  2. 判断传入的对象是否为null,为null则抛出异常,不为null继续流程
  3. 获取当前线程池的状态和线程个数变量
  4. 判断当前线程数是否小于核心线程数,是走流程5,否则走流程6
  5. 添加线程数,添加成功则结束,失败则重新获取当前线程池的状态和线程个数变量,
  6. 判断线程池是否处于RUNNING状态,是则添加任务到阻塞队列,否则走流程10,添加任务成功则继续流程7
  7. 重新获取当前线程池的状态和线程个数变量
  8. 重新检查线程池状态,不是运行状态则移除之前添加的任务,有一个false走流程9,都为true则走流程11
  9. 检查线程池线程数量是否为0,否则结束流程,是调用addWorker(null, false),然后结束
  10. 调用!addWorker(command, false),为true走流程11,false则结束
  11. 调用拒绝策略reject(command),结束

img

addWorker 增加工作线程

/**
 * 添加工作线程
 * @param firstTask 任务
 * @param core 是否是核心线程
 * @return
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    //自旋
    for (;;) {
        int c = ctl.get();
        //获取运行状态
        int rs = runStateOf(c);

        // 检查当前线程池状态是否是SHUTDOWN、STOP、TIDYING或者TERMINATED
    	// 且!(当前状态为SHUTDOWN、且传入的任务为null,且队列不为null)
    	// 条件都成立则返回false
        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                        firstTask == null &&
                        ! workQueue.isEmpty())) {
            return false;
        }
        //有一个自旋
        for (;;) {
            //获取工作线程数
            int wc = workerCountOf(c);
            /**
             * 工作线程数 >= 队列容量 返回fasle
             * 如果是核心线程 工作线程数>=核心线程数 返回false
             * 如果不是核心线程 工作线程数>=最大线程数 返回false
             */
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize)) {
                return false;
            }
             //CAS增加c,成功则跳出retry
            if (compareAndIncrementWorkerCount(c)) {
                break retry;
            }
            c = ctl.get();  // Re-read ctl
            //CAS失败执行下面方法,查看当前线程数是否变化,变化则继续retry循环,没变化则继续内部循环
            if (runStateOf(c) != rs) {
                continue retry;
            }
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    //CAS成功
    
    //工作线程状态
    boolean workerStarted = false;
    //工作线程添加状态
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创建一个工作线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //获取重入锁
            final ReentrantLock mainLock = this.mainLock;
            //加锁
            mainLock.lock();
            try {
                 //重新检查线程池状态
           		 //避免ThreadFactory退出故障或者在锁获取前线程池被关闭
                int rs = runStateOf(ctl.get());
                //再次检查线程池状态 ???
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    //检查thread的状态
                    if (t.isAlive()) { // precheck that t is startable
                        throw new IllegalThreadStateException();
                    }
                    //任务列表添加任务
                    workers.add(w);
                    //获取任务列表大小
                    int s = workers.size();
                    //最大线程数 计数
                    if (s > largestPoolSize) {
                        largestPoolSize = s;
                    }
                    //线程添加成功
                    workerAdded = true;
                }
            } finally {
                //解锁
                mainLock.unlock();
            }
            //判断worker是否添加成功,成功则启动线程,然后将workerStarted设置为true
            if (workerAdded) {
                t.start();
                //启动状态成功
                workerStarted = true;
            }
        }
    } finally {
        //判断线程有没有启动成功,没有则调用addWorkerFailed方法
        if (! workerStarted) {
            addWorkerFailed(w);
        }
    }
    //返回任务启动状态
    return workerStarted;
}

这里可以将addWorker分为两部分,第一部分增加线程池个数,第二部分是将任务添加到workder里面并执行。

第一部分主要是两个循环,外层循环主要是判断线程池状态

rs >= SHUTDOWN &&
              ! (rs == SHUTDOWN &&
                  firstTask == null &&
                  ! workQueue.isEmpty())

展开!运算后等价于

s >= SHUTDOWN &&
               (rs != SHUTDOWN ||
             firstTask != null ||
             workQueue.isEmpty())

也就是说下面几种情况下会返回false:

  • 当前线程池状态为STOP,TIDYING,TERMINATED
  • 当前线程池状态为SHUTDOWN并且已经有了第一个任务
  • 当前线程池状态为SHUTDOWN并且任务队列为空

内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas,cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了,如果变了,则重新进入外层循环重新获取线程池状态,否者进入内层循环继续进行cas尝试。

到了第二部分说明CAS成功了,也就是说线程个数加一了,但是现在任务还没开始执行,这里使用全局的独占锁来控制workers里面添加任务,其实也可以使用并发安全的set,但是性能没有独占锁好(这个从注释中知道的)。这里需要注意的是要在获取锁后重新检查线程池的状态,这是因为其他线程可可能在本方法获取锁前改变了线程池的状态,比如调用了shutdown方法。添加成功则启动任务执行。

所以这里也将流程图分为两部分来描述

第一部分流程图

img

第二部分流程图

img

这里面有一个核心的工作类 Worker

AQS的Worker工作任务

这个类继承了抽象队列同步器 是标准的AQS线程安全的类。

/**
    * 工作任务对象
    * 继承了AQS 抽象队列同步器 以及 Runnable 接口
    */
   private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
       /**
        * This class will never be serialized, but we provide a
        * serialVersionUID to suppress a javac warning.
        */
       private static final long serialVersionUID = 6138294804551838833L;

       /**
        * Thread this worker is running in.  Null if factory fails.
        */
       //正在运行的线程,工厂创建线程失败则为null
       final Thread thread;
       /**
        * Initial task to run.  Possibly null.
        */
       //运行的初始任务,可能为null
       Runnable firstTask;
       /**
        * Per-thread task counter
        */
       //完成任务的计数器
       volatile long completedTasks;

       /**
        * Creates with given first task and thread from ThreadFactory.
        *
        * @param firstTask the first task (null if none)
        */
       //构造方法
       Worker(Runnable firstTask) {
           //设置状态为未运行
           setState(-1); // inhibit interrupts until runWorker
           this.firstTask = firstTask;
           //使用线程工厂创建线程
           this.thread = getThreadFactory().newThread(this);
       }

       /**
        * Delegates main run loop to outer runWorker
        */
       //实现Runnable的run方法
       @Override
       public void run() {
           //运行任务方法
           runWorker(this);
       }

       // Lock methods
       //
       // The value 0 represents the unlocked state.
       // The value 1 represents the locked state.

       /*是否是独占的
        * @return 0 未锁 1 已锁定
        */
       protected boolean isHeldExclusively() {
           return getState() != 0;
       }

       /**
        * 尝试获取占用权
        * @param unused
        * @return
        */
       protected boolean tryAcquire(int unused) {
           //CAS 设置锁定状态
           if (compareAndSetState(0, 1)) {
               //设置持有者是当前线程
               setExclusiveOwnerThread(Thread.currentThread());
               return true;
           }
           return false;
       }

       /**
        * 尝试释放锁
        * @param unused
        * @return
        */
       protected boolean tryRelease(int unused) {
           //设置是持有者为null
           setExclusiveOwnerThread(null);
           //设置锁定状态为 未锁定
           setState(0);
           return true;
       }

       /**
        * 加锁
        */
       public void lock() {
           acquire(1);
       }

       /**
        * 尝试获取锁
        * @return
        */
       public boolean tryLock() {
           return tryAcquire(1);
       }
       //释放锁
       public void unlock() {
           release(1);
       }

       /**
        * 释放
        * @return
        */
       public boolean isLocked() {
           return isHeldExclusively();
       }

       /**
        * 中断启动
        */
       void interruptIfStarted() {
           Thread t;
           if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
               try {
                   t.interrupt();
               } catch (SecurityException ignore) {
               }
           }
       }
   }

这个类很值得学习,里面最核心的方法是 runWorker 方法

runWorker方法

运行任务的主体,通过循环从阻塞队列中拿任务,进行执行

/**
 * 运行任务
 * @param w 任务
 */
final void runWorker(Worker w) {
    //获取当前线程
    Thread wt = Thread.currentThread();
    //获取任务 task
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    //是否突然完成任务(异常,或者其他情况)
    boolean completedAbruptly = true;
    try {
        //循环获取任务
        while (task != null || (task = getTask()) != null) {
            //加锁
            w.lock();

            // 当线程池是处于STOP状态或者TIDYING、TERMINATED状态时,设置当前线程处于中断状态
        	// 如果不是,当前线程就处于RUNNING或者SHUTDOWN状态,确保当前线程不处于中断状态
        	// 重新检查当前线程池的状态是否大于等于STOP状态
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) {
                wt.interrupt();
            }
            try {
                //线程执行前执行一些任务,在ThreadPoolExecutor是空实现
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //运行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //完成任务后执行一些任务,在ThreadPoolExecutor是空实现
                    afterExecute(task, thrown);
                }
            } finally {
                //完成任务task置为空,交给GC处理
                task = null;
                //完成任务计数器+1
                w.completedTasks++;
                //解锁
                w.unlock();
            }
        }
        /**
         * 正常完成任务为false
         * 否则completedAbruptly 为true
         */
        completedAbruptly = false;
    } finally {
        //整个线程结束时调用,线程退出操作。统计整个线程池完成的任务个数之类的工作
        processWorkerExit(w, completedAbruptly);
    }
}

这里面有两个核心方法

  • getTask:从队列中获取任务
  • processWorkerExit:处任务并退出

我们先从getTask开始

getTask 方法
 /**
 * 获取待执行的任务
 *
 * @return
 */
private Runnable getTask() {
    //最后一次poll()是否超时
    boolean timedOut = false;
    //自旋
    for (; ; ) {
        int c = ctl.get();
        //获取运行状态
        int rs = runStateOf(c);

        //线程不在运行状态并且队列为空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //使用CAS进行工作任务数-1
            decrementWorkerCount();
            return null;
        }
        //获取当前工作任务数
        int wc = workerCountOf(c);

        /**
         * 是否进行任务淘汰 如果 allowCoreThreadTimeOut为true 就一直淘汰下去
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //(当前线程数是否大于最大线程数或者)
   		//且(线程数大于1或者任务队列为空)
    	//这里有个问题(timed && timedOut)timedOut = false,好像(timed && timedOut)一直都是false吧
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            //CAS方式进行工作线程-1
            if (compareAndDecrementWorkerCount(c)) {
                return null;
            }
            continue;
        }

        try {
            /**
             * 如果需要淘汰淘汰从工作先队列中在指定keepAliveTime时间内获取一个工作线程否则返回null
             * 否则工作线程池为空就一直等待
             */
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null) {
                return r;
            }
            //如果获取超时设置超时时间为true 
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

接下来我们分析下processWorkerExit方法

processWorkerExit 方法

/**
   * 处理完成后续的线程统计工作
   * 删除完成工作的线程
   * @param w 工作线程
   * @param completedAbruptly 是否突然完成(异常情况)
   */
  private void processWorkerExit(Worker w, boolean completedAbruptly) {
      //如果突然完成,工作线程数统计未统计
      if (completedAbruptly) { // If abrupt, then workerCount wasn't adjusted
          //重新对工作线程数-1
          decrementWorkerCount();
      }
      //获取锁
      final ReentrantLock mainLock = this.mainLock;
      //加锁
      mainLock.lock();
      try {
          //完成任务数统计
          completedTaskCount += w.completedTasks;
          //从工作任务队列删除队列
          workers.remove(w);
      } finally {
          //解锁
          mainLock.unlock();
      }
      //尝试终止线程池
      tryTerminate();

      int c = ctl.get();
      //正在运行或者停止
      if (runStateLessThan(c, STOP)) {
          //没有突然完成
          if (!completedAbruptly) {
              // 计算最小工作线程,如果allowCoreThreadTimeOut为true 就是 0 否则就是核心线程数
              int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
              //如果最小线程为0并且工作任务队列不为空则设置最小线程数为1
              if (min == 0 && !workQueue.isEmpty()) {
                  min = 1;
              }
              //如果工作线程数>=最小线程数返回
              if (workerCountOf(c) >= min)
                  return; // replacement not needed
          }
          addWorker(null, false);
      }
  }

到这里为止,submit 和 execute已经分析完成了。