img

什么是线程池

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

为什么使用线程池

操作系统创建线程、切换线程状态、终结线程都要进行CPU调度——这是一个耗费时间和系统资源的事情。

大多数实际场景中是这样的:处理某一次请求的时间是非常短暂的,但是请求数量是巨大的。这种技术背景下,如果我们为每一个请求都单独创建一个线程,那么物理机的所有资源基本上都被操作系统创建线程、切换线程状态、销毁线程这些操作所占用,用于业务请求处理的资源反而减少了。所以最理想的处理方式是,将处理请求的线程数量控制在一个范围,既保证后续的请求不会等待太长时间,又保证物理机将足够的资源用于请求处理本身。另外,一些操作系统是有最大线程数量限制的。当运行的线程数量逼近这个值的时候,操作系统会变得不稳定。这也是我们要限制线程数量的原因。

线程池的优点

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁带来的消耗。
  • 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行。
  • 提高线程的可管理性:使用线程池可以统一进行线程分配、调度和监控。
  • 线程统一管理:线程池具有创建线程和销毁线程的能力,线程集中在一起比起分散开来,更加便于管理

继承关系

线程池都继承自Exceutor接口

img

Executor接口

Executor接口只有一个方法execute,传入线程任务参数

public interface Executor {
    void execute(Runnable command);
}

ExecutorService接口

ExecutorService接口继承Executor接口,并增加了submit、shutdown、invokeAll等等一系列方法。

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

   
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService抽象类

bstractExecutorService抽象类实现ExecutorService接口,并且提供了一些方法的默认实现,例如submit方法、invokeAny方法、invokeAll方法。

像execute方法、线程池的关闭方法(shutdown、shutdownNow等等)就没有提供默认的实现。

public abstract class AbstractExecutorService implements ExecutorService {

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {...}

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {... }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {...}

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {...}

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {...}

}

线程池的分类和作用

newCachedThreadPool

创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们,并在需要时使用提供的 ThreadFactory 创建新线程。

  1. 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
  2. 线程池中的线程可进行缓存重复利用和回收(回收默认时间为1分钟)
  3. 当线程池中,没有可用线程,会重新创建一个线程

newFixedThreadPool

创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。

  1. 线程池中的线程处于一定的量,可以很好的控制线程的并发量
  2. 线程可以重复被使用,在显示关闭之前,都将一直存在
  3. 超出一定量的线程被提交时候需在队列中等待

newSingleThreadExecutor

创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newFixedThreadPool(1) 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。

  1. 线程池中最多执行1个线程,之后提交的线程活动将会排在队列中以此执行

newScheduleThreadPool

创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。

  1. 线程池中具有指定数量的线程,即便是空线程也将保留
  2. 可定时或者延迟执行线程活动

newSingleThreadScheduledExecutor

创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。

  1. 线程池中最多执行1个线程,之后提交的线程活动将会排在队列中以此执行
  2. 可定时或者延迟执行线程活动

ThreadPoolExecutor源码分析

为什么要讲ThreadPoolExector类

Exector是ThreadPoolExector的祖父类接口,ThreadPoolExector的直接父类接口是ExectorService,而我们所讲的第三点,其中的不同线程池的分类其实都是Exector中的方法,而在ThreadPoollExector中得到了实现,所以我们要构建的不同种类的线程池主要还是依赖这个类完成,接下来我们就聚焦ThreadPoolExector来看其具体的实现方法。

线程池的执行流程

img

成员变量分析

   //记录线程池状态和线程数量(总共32位,前三位表示线程池状态,后29位表示线程数量)
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//线程数量统计位数29  Integer.SIZE=32 
   private static final int COUNT_BITS = Integer.SIZE - 3;

//容量 000 11111111111111111111111111111
   private static final int CAPACITY = (1 << COUNT_BITS) - 1;

  //运行中 111 00000000000000000000000000000
   private static final int RUNNING = -1 << COUNT_BITS;

  //关闭 000 00000000000000000000000000000
   private static final int SHUTDOWN = 0 << COUNT_BITS;

   //停止 001 00000000000000000000000000000
   private static final int STOP = 1 << COUNT_BITS;

   //整理 010 00000000000000000000000000000
   private static final int TIDYING = 2 << COUNT_BITS;

   //终止 011 00000000000000000000000000000
   private static final int TERMINATED = 3 << COUNT_BITS;


   //获取运行状态(获取前3位)
   private static int runStateOf(int c)     { return c & ~CAPACITY; }
   //获取线程个数(获取后29位)
   private static int workerCountOf(int c)  { return c & CAPACITY; }
   private static int ctlOf(int rs, int wc) { return rs | wc; }



   // 存放任务的阻塞队列泛型是Runnable
   private final BlockingQueue<Runnable> workQueue;

   //可重入锁
   private final ReentrantLock mainLock = new ReentrantLock();

   //获取锁的一个条件
   private final Condition termination = mainLock.newCondition();

   //存放任务Worker 的集合
   private final HashSet<Worker> workers = new HashSet<Worker>();

   //线程池正在运行的数量
   private int largestPoolSize;

   //已完成任务的计数器
   private long completedTaskCount;

   //线程工厂,可以手工传入 自己构建线程
   private volatile ThreadFactory threadFactory;

   //拒接策略
   private volatile RejectedExecutionHandler handler;

   //默认拒绝策略为AbortPolicy
   private static final RejectedExecutionHandler defaultHandler = new ThreadPoolExecutor.AbortPolicy();

   //空闲线程等待超时时间
   private volatile long keepAliveTime;

   /**
    * 是否允许核心线程超时
    * 默认为 false
    * true 核心线程等待超时后 也将会销毁
    */
   private volatile boolean allowCoreThreadTimeOut;

   /**
    * 核心池大小 不允许超时
    * 除非allowCoreThreadTimeOut为true 这种情况下可为0
    */
   private volatile int corePoolSize;

   //最大线程池大小 最大不超过 CAPACITY
   private volatile int maximumPoolSize;

我们也可以看出我们在线程池介绍中谈到的关于coreSize和maxiumSize等参数,这些int值对线程池的中的线程池数量进行了限制,还有一些关于锁ReentrantLock的类,这是一个可重入锁,它的主要目的是锁住其操作,因为线程的操作要保证其原子性,防止冲突发生,所以在其源码中很多都对其进行了上锁操作。还有一个很重要的值的全局的变量state:

线城池的状态

//表示正在运行中
   private static final int RUNNING    = -1 << COUNT_BITS;
//表示关闭
   private static final int SHUTDOWN   =  0 << COUNT_BITS;
//表示停止
   private static final int STOP       =  1 << COUNT_BITS;
//表示整理
   private static final int TIDYING    =  2 << COUNT_BITS;
//表示结束
   private static final int TERMINATED =  3 << COUNT_BITS;

这些状态值是线程池目前所处环境的状态的体现,它采用int数字来表现,记住这些值很重要,因为后面有很多方法调用线程池的运行状态,有很多对其值进行判断。

构造方法

/**
  * 创建线程池
  * @param corePoolSize 核心线程池大小
  * @param maximumPoolSize 最大线程池大小
  * @param keepAliveTime 空闲等待时间
  * @param unit 时间单位
  * @param workQueue 传入的阻塞队列
  */
 public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue) {
     //调用重载的构造方法
     this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
 }

 /**
  * 创建线程池
  * @param corePoolSize 核心线程池大小
  * @param maximumPoolSize 最大线程池大小
  * @param keepAliveTime 空闲等待时间
  * @param unit 时间单位
  * @param workQueue 传入的阻塞队列
  * @param threadFactory 线程工厂
  */
 public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           ThreadFactory threadFactory) {
     ///调用重载的构造方法
     this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
 }

 /**
  * 创建线程池
  * @param corePoolSize 核心线程池大小
  * @param maximumPoolSize 最大线程池大小
  * @param keepAliveTime 空闲等待时间
  * @param unit 时间单位
  * @param workQueue 传入的阻塞队列
  * @param handler 拒绝策略
  */
 public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           RejectedExecutionHandler handler) {
     ///调用重载的构造方法
     this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
 }


 /**
  * 创建线程池
  * @param corePoolSize 核心线程池大小
  * @param maximumPoolSize 最大线程池大小
  * @param keepAliveTime 空闲等待时间
  * @param unit 时间单位
  * @param workQueue 传入的阻塞队列
  * @param threadFactory 线程工厂
  * @param handler 拒绝策略
  */
 public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler) {
     //条件校验,不满足抛出异常
     if (corePoolSize < 0 ||
             maximumPoolSize <= 0 ||
             maximumPoolSize < corePoolSize ||
             keepAliveTime < 0)
         throw new IllegalArgumentException();
     // 阻塞队列,线程工厂,拒绝策略不允许为空
     if (workQueue == null || threadFactory == null || handler == null)
         throw new NullPointerException();
     //java安全模式
     this.acc = System.getSecurityManager() == null ?
             null :
             AccessController.getContext();
     this.corePoolSize = corePoolSize;
     this.maximumPoolSize = maximumPoolSize;
     this.workQueue = workQueue;
     this.keepAliveTime = unit.toNanos(keepAliveTime);
     this.threadFactory = threadFactory;
     this.handler = handler;
 }

可以看出ThreadPoolExector一共有四个构造函数,但是最后调用的都是最后一个,我们可以只看最后一个,它主要有核心池大小、最大池大小、存活时间、时间单位、阻塞队列、线程工厂这几个参数,其中又对其进行了值范围的检查,如果参数违法就抛出异常,然后构造进去。关于这几个参数,随着后面我们对其方法的讲解,会理解越来越深刻的。