JAVA线程池实现02-提交任务
JAVA线程池实现02-提交任务
submit提交任务
/**
* 提交一个带有返回值的任务
* @param task 任务
* @param result 结果
* @param <T> 泛型
* @return Future
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
//调用execute进行执行
execute(ftask);
return ftask;
}
/**
* 创建一个FutureTask
* @param runnable 运行的任务
* @param value 返回结果
* @param <T> 泛型
* @return FutureTask
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
流程步骤如下
- 调用submit方法,传入Runnable或者Callable对象
- 判断传入的对象是否为null,为null则抛出异常,不为null继续流程
- 将传入的对象转换为RunnableFuture对象
- 执行execute方法,传入RunnableFuture对象
- 返回RunnableFuture对象
execute 执行线程
/**
* 在未来执行任务
* 任务将新建或者现有的线程池中执行
* 如果线程池关闭或者线程池满了将执行拒绝策略
* @param command
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/**
* 1、运行线程数少于核心线程数,则调用addWorker启动一个新的线程
* 需要检查否应该添加线程
*/
if (workerCountOf(c) < corePoolSize) {
//添加线程
if (addWorker(command, true)) {
return;
}
c = ctl.get();
}
/**
* 运行线程数量大于核心线程数量时,上面的if分支针对大于corePoolSize,并且缓存队列加入任务操作成功的情况。
* 运行中并且将任务加入缓冲队列成功,正常来说这样已经完成了处理逻辑。
* 但是为了保险起见,增加了状态出现异常的确认判断,如果状态出现异常会继续remove操作,如果执行true,则按照拒绝处理策略驳回任务;
*/
//线程运行状态,并且添加进队列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//线程未运行并且删除成功
if (! isRunning(recheck) && remove(command))
//拒绝任务
reject(command);
//线程正在运行中
else if (workerCountOf(recheck) == 0)
//添加任务
addWorker(null, false);
}
/**
* 这里针对运行线程数量超过了corePoolSize,并且缓存队列也已经放满的情况。
* 注意第二个参数是false,可以在下面addWorker方法看到,就是针对线程池最大线程数量maximumPoolSize的判断。
*/
else if (!addWorker(command, false))
//拒绝任务
reject(command);
}
其实从上面代码注释中可以看出就三个判断,
- 核心线程数是否已满
- 队列是否已满
- 线程池是否已满
- 调用execute方法,传入Runable对象
- 判断传入的对象是否为null,为null则抛出异常,不为null继续流程
- 获取当前线程池的状态和线程个数变量
- 判断当前线程数是否小于核心线程数,是走流程5,否则走流程6
- 添加线程数,添加成功则结束,失败则重新获取当前线程池的状态和线程个数变量,
- 判断线程池是否处于RUNNING状态,是则添加任务到阻塞队列,否则走流程10,添加任务成功则继续流程7
- 重新获取当前线程池的状态和线程个数变量
- 重新检查线程池状态,不是运行状态则移除之前添加的任务,有一个false走流程9,都为true则走流程11
- 检查线程池线程数量是否为0,否则结束流程,是调用addWorker(null, false),然后结束
- 调用!addWorker(command, false),为true走流程11,false则结束
- 调用拒绝策略reject(command),结束
addWorker 增加工作线程
/**
* 添加工作线程
* @param firstTask 任务
* @param core 是否是核心线程
* @return
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//自旋
for (;;) {
int c = ctl.get();
//获取运行状态
int rs = runStateOf(c);
// 检查当前线程池状态是否是SHUTDOWN、STOP、TIDYING或者TERMINATED
// 且!(当前状态为SHUTDOWN、且传入的任务为null,且队列不为null)
// 条件都成立则返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty())) {
return false;
}
//有一个自旋
for (;;) {
//获取工作线程数
int wc = workerCountOf(c);
/**
* 工作线程数 >= 队列容量 返回fasle
* 如果是核心线程 工作线程数>=核心线程数 返回false
* 如果不是核心线程 工作线程数>=最大线程数 返回false
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
//CAS增加c,成功则跳出retry
if (compareAndIncrementWorkerCount(c)) {
break retry;
}
c = ctl.get(); // Re-read ctl
//CAS失败执行下面方法,查看当前线程数是否变化,变化则继续retry循环,没变化则继续内部循环
if (runStateOf(c) != rs) {
continue retry;
}
// else CAS failed due to workerCount change; retry inner loop
}
}
//CAS成功
//工作线程状态
boolean workerStarted = false;
//工作线程添加状态
boolean workerAdded = false;
Worker w = null;
try {
//创建一个工作线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//获取重入锁
final ReentrantLock mainLock = this.mainLock;
//加锁
mainLock.lock();
try {
//重新检查线程池状态
//避免ThreadFactory退出故障或者在锁获取前线程池被关闭
int rs = runStateOf(ctl.get());
//再次检查线程池状态 ???
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
//检查thread的状态
if (t.isAlive()) { // precheck that t is startable
throw new IllegalThreadStateException();
}
//任务列表添加任务
workers.add(w);
//获取任务列表大小
int s = workers.size();
//最大线程数 计数
if (s > largestPoolSize) {
largestPoolSize = s;
}
//线程添加成功
workerAdded = true;
}
} finally {
//解锁
mainLock.unlock();
}
//判断worker是否添加成功,成功则启动线程,然后将workerStarted设置为true
if (workerAdded) {
t.start();
//启动状态成功
workerStarted = true;
}
}
} finally {
//判断线程有没有启动成功,没有则调用addWorkerFailed方法
if (! workerStarted) {
addWorkerFailed(w);
}
}
//返回任务启动状态
return workerStarted;
}
这里可以将addWorker分为两部分,第一部分增加线程池个数,第二部分是将任务添加到workder里面并执行。
第一部分主要是两个循环,外层循环主要是判断线程池状态
rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty())
展开!运算后等价于
s >= SHUTDOWN &&
(rs != SHUTDOWN ||
firstTask != null ||
workQueue.isEmpty())
也就是说下面几种情况下会返回false:
- 当前线程池状态为STOP,TIDYING,TERMINATED
- 当前线程池状态为SHUTDOWN并且已经有了第一个任务
- 当前线程池状态为SHUTDOWN并且任务队列为空
内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas,cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了,如果变了,则重新进入外层循环重新获取线程池状态,否者进入内层循环继续进行cas尝试。
到了第二部分说明CAS成功了,也就是说线程个数加一了,但是现在任务还没开始执行,这里使用全局的独占锁来控制workers里面添加任务,其实也可以使用并发安全的set,但是性能没有独占锁好(这个从注释中知道的)。这里需要注意的是要在获取锁后重新检查线程池的状态,这是因为其他线程可可能在本方法获取锁前改变了线程池的状态,比如调用了shutdown方法。添加成功则启动任务执行。
所以这里也将流程图分为两部分来描述
第一部分流程图
第二部分流程图
这里面有一个核心的工作类 Worker
AQS的Worker工作任务
这个类继承了抽象队列同步器 是标准的AQS线程安全的类。
/**
* 工作任务对象
* 继承了AQS 抽象队列同步器 以及 Runnable 接口
*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/**
* Thread this worker is running in. Null if factory fails.
*/
//正在运行的线程,工厂创建线程失败则为null
final Thread thread;
/**
* Initial task to run. Possibly null.
*/
//运行的初始任务,可能为null
Runnable firstTask;
/**
* Per-thread task counter
*/
//完成任务的计数器
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
*
* @param firstTask the first task (null if none)
*/
//构造方法
Worker(Runnable firstTask) {
//设置状态为未运行
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//使用线程工厂创建线程
this.thread = getThreadFactory().newThread(this);
}
/**
* Delegates main run loop to outer runWorker
*/
//实现Runnable的run方法
@Override
public void run() {
//运行任务方法
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
/*是否是独占的
* @return 0 未锁 1 已锁定
*/
protected boolean isHeldExclusively() {
return getState() != 0;
}
/**
* 尝试获取占用权
* @param unused
* @return
*/
protected boolean tryAcquire(int unused) {
//CAS 设置锁定状态
if (compareAndSetState(0, 1)) {
//设置持有者是当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 尝试释放锁
* @param unused
* @return
*/
protected boolean tryRelease(int unused) {
//设置是持有者为null
setExclusiveOwnerThread(null);
//设置锁定状态为 未锁定
setState(0);
return true;
}
/**
* 加锁
*/
public void lock() {
acquire(1);
}
/**
* 尝试获取锁
* @return
*/
public boolean tryLock() {
return tryAcquire(1);
}
//释放锁
public void unlock() {
release(1);
}
/**
* 释放
* @return
*/
public boolean isLocked() {
return isHeldExclusively();
}
/**
* 中断启动
*/
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
这个类很值得学习,里面最核心的方法是 runWorker 方法
runWorker方法
运行任务的主体,通过循环从阻塞队列中拿任务,进行执行
/**
* 运行任务
* @param w 任务
*/
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
//获取任务 task
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
//是否突然完成任务(异常,或者其他情况)
boolean completedAbruptly = true;
try {
//循环获取任务
while (task != null || (task = getTask()) != null) {
//加锁
w.lock();
// 当线程池是处于STOP状态或者TIDYING、TERMINATED状态时,设置当前线程处于中断状态
// 如果不是,当前线程就处于RUNNING或者SHUTDOWN状态,确保当前线程不处于中断状态
// 重新检查当前线程池的状态是否大于等于STOP状态
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) {
wt.interrupt();
}
try {
//线程执行前执行一些任务,在ThreadPoolExecutor是空实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
//运行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//完成任务后执行一些任务,在ThreadPoolExecutor是空实现
afterExecute(task, thrown);
}
} finally {
//完成任务task置为空,交给GC处理
task = null;
//完成任务计数器+1
w.completedTasks++;
//解锁
w.unlock();
}
}
/**
* 正常完成任务为false
* 否则completedAbruptly 为true
*/
completedAbruptly = false;
} finally {
//整个线程结束时调用,线程退出操作。统计整个线程池完成的任务个数之类的工作
processWorkerExit(w, completedAbruptly);
}
}
这里面有两个核心方法
- getTask:从队列中获取任务
- processWorkerExit:处任务并退出
我们先从getTask开始
getTask 方法
/**
* 获取待执行的任务
*
* @return
*/
private Runnable getTask() {
//最后一次poll()是否超时
boolean timedOut = false;
//自旋
for (; ; ) {
int c = ctl.get();
//获取运行状态
int rs = runStateOf(c);
//线程不在运行状态并且队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//使用CAS进行工作任务数-1
decrementWorkerCount();
return null;
}
//获取当前工作任务数
int wc = workerCountOf(c);
/**
* 是否进行任务淘汰 如果 allowCoreThreadTimeOut为true 就一直淘汰下去
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//(当前线程数是否大于最大线程数或者)
//且(线程数大于1或者任务队列为空)
//这里有个问题(timed && timedOut)timedOut = false,好像(timed && timedOut)一直都是false吧
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
//CAS方式进行工作线程-1
if (compareAndDecrementWorkerCount(c)) {
return null;
}
continue;
}
try {
/**
* 如果需要淘汰淘汰从工作先队列中在指定keepAliveTime时间内获取一个工作线程否则返回null
* 否则工作线程池为空就一直等待
*/
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null) {
return r;
}
//如果获取超时设置超时时间为true
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
接下来我们分析下processWorkerExit方法
processWorkerExit 方法
/**
* 处理完成后续的线程统计工作
* 删除完成工作的线程
* @param w 工作线程
* @param completedAbruptly 是否突然完成(异常情况)
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果突然完成,工作线程数统计未统计
if (completedAbruptly) { // If abrupt, then workerCount wasn't adjusted
//重新对工作线程数-1
decrementWorkerCount();
}
//获取锁
final ReentrantLock mainLock = this.mainLock;
//加锁
mainLock.lock();
try {
//完成任务数统计
completedTaskCount += w.completedTasks;
//从工作任务队列删除队列
workers.remove(w);
} finally {
//解锁
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
int c = ctl.get();
//正在运行或者停止
if (runStateLessThan(c, STOP)) {
//没有突然完成
if (!completedAbruptly) {
// 计算最小工作线程,如果allowCoreThreadTimeOut为true 就是 0 否则就是核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果最小线程为0并且工作任务队列不为空则设置最小线程数为1
if (min == 0 && !workQueue.isEmpty()) {
min = 1;
}
//如果工作线程数>=最小线程数返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
到这里为止,submit 和 execute已经分析完成了。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 牧马人的忧伤!
评论