以前就讲过线程池的使用,本文中介绍深挖线程池中的几道面试题
在上面可以找到
当一个线程池在添加一个任务时,它是怎么分配线程去执行这个任务的
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
// 判断是否为空
if (command == null)
throw new NullPointerException();
// 判断当前正在运行的线程数是否小于核心线程数
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 添加任务至线程执行,成功添加则结束
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果核心线程都有在运行,将任务放至队列中
if (isRunning(c) && workQueue.offer(command)) {
// 如果成功推入队列,将再次检查线程状态,有线程死亡则将当前任务添加至线程执行
int recheck = ctl.get();
// 检查线程状态是不是RUNNING,如果不是将会拒绝此任务
if (!isRunning(recheck) && remove(command))
reject(command);
// 检查当前的工作线程数是否为0
else if (workerCountOf(recheck) == 0)
// 添加一个null的任务
addWorker(null, false);
}
// 如果队列推入任务失败了,那将直接添加至线程执行
else if (!addWorker(command, false))
// 如果任务添加至线程失败,则将进行拒绝策略
reject(command);
}
/**
* 会从线程工厂获取线程,并添加执行任务
* @param firstTask 执行的任务
* @param core 是否可以添加至核心线程
* @return true:成功添加至线程执行
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// ...
}
}
线程池的状态有哪些,状态是如何进行转换的?
注意是在提问线程池的状态,而不是线程的状态
这是ThreadPoolExecutor.java
中的源码
/**
* 主池控制状态ctl是一个原子整数,包含两个概念性字段:
* workerCount表示实际线程数,runState表示是运行中、正在关闭等状态。
* 为了将它们打包成一个整数,我们将workerCount限制为(2^29)-1(大约5亿)个线程,而不是(2^31)-1(可表示20亿)。
* 如果将来出现了问题,该变量可以改为AtomicLong,并且下面的移位/掩码常量需要调整。但在需要之前,使用int类型会更快,更简单。
*
* workerCount是已被允许启动且未被允许停止的工作线程数。
* 该值暂时可能与实际的活动线程数不同,例如当ThreadFactory无法按要求创建线程时,或者退出线程在终止之前仍在执行簿记操作。用户可见的池大小报告为工作线程集合的当前大小。
*
* runState提供了主要的生命周期控制,接受以下值:
* RUNNING:接受新任务并处理队列中的任务;
* SHUTDOWN:不接受新任务,但处理队列中的任务;
* STOP:不接受新任务,不处理队列中的任务,并中断正在处理的任务;
* TIDYING:所有任务都已终止,workerCount为零,转换到TIDYING状态的线程将运行terminated()钩子方法;
* TERMINATED:terminated()已完成。这些值之间的数值顺序很重要,才能允许有序比较。
*
* runState随时间单调递增,但不一定达到每个状态。转换如下:
* RUNNING->SHUTDOWN:调用shutdown()时,可能是隐式的(RUNNING或SHUTDOWN状态);
* RUNNING或SHUTDOWN->STOP:调用shutdownNow()时;
* SHUTDOWN->TIDYING:当队列和池都为空时;
* STOP->TIDYING:当池为空时;
* TIDYING -> TERMINATED:当terminated()钩子方法完成时。等待在awaitTermination()中的线程将在状态到达TERMINATED时返回。
* 由于在SHUTDOWN状态下队列可能在非空和空之间变化,因此检测从SHUTDOWN到TIDYING的转换不是很直观,但我们只有在看到它为空时,才能看到workerCount为0而终止(有时需要重新检查-见下文)。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState的状态,RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 运行状态
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
// 实际线程数
private static int workerCountOf(int c) {
return c & CAPACITY;
}
// ctl控制数
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
由上面源码可知,线程的状态一共有5
个
RUNNING
状态调用shutdown()
方法时,进入此状态;1. 不再接受新的任务
2. 正在运行中的任务和队列中的任务会等待其执行完毕
1. 不再接受新的任务
2. 中断运行中的任务,销毁队列中的任务
TIDYING
状态进入,terminated()
方法执行完毕如果是画图的话,是下面这个样子的
terminated()
方法默认什么都不做,线程池提供这个方法,交给子类来进行扩展
protected void terminated() { }
添加任务的流程我们已经讲述完毕,那么线程池是如何分配线程去执行任务的呢?
在第一节中,有一段...
的addWorker()
方法,这里面就是执行任务的逻辑
private boolean addWorker(Runnable firstTask, boolean core) {
// 外部循环标识retry
retry:
for (;;) {
// 当前的状态值ctl及runState,第二节有讲过
int c = ctl.get();
int rs = runStateOf(c);
// 检查运行状态是不是大于等于SHUTDOWN
if (rs >= SHUTDOWN &&
// 且 (运行状态不等于SHUTDOWN 或 任务不为空 或 队列中是否有值)
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
// 上面这段,主要就是为了检测线程池的状态,队列中是否有任务
// 如果线程处于RUNNING,就会跳过此处的return false
// 如果处于SHUTDOWN,还要额外判断当前任务是否为有值,有值也会return false
// 如果处于SHUTDOWN,且当前任务为null,还要判断当前队列是否有值;队列中没有值的话,也会return false
// 内部循环
for (;;) {
// 当前运行的线程数
int wc = workerCountOf(c);
// 判断运行的线程数是否大于(容量最大),根据是否核心,判断是否大于核心线程数 或者 最大线程数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 返回false
return false;
// 使用CAS自旋锁,添加ctl的运行线程数
// 成功添加则会,跳出外层的循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果添加没有成功,重新获取ctl
c = ctl.get();
// 得到当前线程池状态,与外部循环的线程池做一个对比;如果不一致,则退回到外部循环,重新进行loop
if (runStateOf(c) != rs)
continue retry;
// 如果状态是相等的,则在内部循环进行loop即可
}
}
// 当上面的自旋锁添加运行线程数成功后,才会进入此处
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 构建一个Worker对象,里面就默认分配了一个线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次获取线程池的运行状态
int rs = runStateOf(ctl.get());
// 运行状态是RUNNING 或者 (运行状态是SHUTDOWN 且 当前任务为null)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 预先检查线程是否为启动状态
if (t.isAlive())
// 如果是启动状态,就有问题了,要抛出异常
throw new IllegalThreadStateException();
// 将任务添加至workers容器中,这个容器包括了运行线程的状态
// largestPoolSize 最大线程池数量
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 标志位改为true,代表worker已经添加至workers
workerAdded = true;
}
} finally {
// 解锁
mainLock.unlock();
}
// 判断标志位
if (workerAdded) {
// 启动线程,执行任务
t.start();
// 启动线程标志位,设置为true
workerStarted = true;
}
}
} finally {
// 是否启动线程的标志位
if (! workerStarted)
// 添加一个worker失败的处理
addWorkerFailed(w);
}
// 返回是否启动线程的标志位
return workerStarted;
}
看下图,很多调用addWorker()
方法都传递了一个null
,这是为什么呢,有什么用?
首先说结论,这是为了更快的启动队列中的任务。
大家通过上面第一节的任务添加流程就会发现,有一些任务在添加进入任务阻塞队列后就没有声音了。
那么就要看看,如果添加一个为null
的任务会出现什么情况把。
// 这中间代码省略了,第一部分是判断状态和添加任务数,第二部分是判断状态和启动任务
// 如果线程池的状态是RUNNING,那么一个任务是大概率都是可以添加成功的
private boolean addWorker(Runnable firstTask, boolean core) {
// ... 省略了,关心下面t.start();做了什么即可
if (workerAdded) {
t.start();
workerStarted = true;
}
}
// 主要还是要看启动Worker做了什么
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// 首先构造方法
Worker(Runnable firstTask) {
setState(-1);
// 传入一个任务后,作为自己的属性
this.firstTask = firstTask;
// 将自己作为任务构建了一个线程作为自己的属性。他自己也实现了Runnable接口
this.thread = getThreadFactory().newThread(this);
}
// 当上面t.start();启动的是Worker的run方法
public void run() {
runWorker(this);
}
// 上面run();方法调用过来的
final void runWorker(Worker w) {
// 线程池里面的,当前的线程
Thread wt = Thread.currentThread();
// 当前真正要执行的任务,可能为null,本小节直接定义null
Runnable task = w.firstTask;
// 将属性变为null
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 重点在这里,当task==null时,它会去getTask();方法中去获取task进行判断
// 如果getTask();方法返回的是null,那么说明本次循环结束,任务运行完成
// 如果getTask();方法返回的是队列中的任务,那么进入循环体,执行任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 下面就是一些判断状态和执行任务的代码了
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 这里才是真正执行我们任务的地方
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
// 这是从上面runWorker();方法调用而来
private Runnable getTask() {
// 最后poll()方法是否超时
boolean timedOut = false;
for (;;) {
// 获取线程池状态
int c = ctl.get();
int rs = runStateOf(c);
// 检查阻塞队列是否为空
// 当状态是RUNNING时,false
// 当状态是SHUTDOWN时,判断队列是否为空,如果有值,false
// 如果状态是后面几种状态时,无论队列是否有值,true
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
// 这里返回null,就代表task=null了
return null;
}
// 当前线程池运行的线程数
int wc = workerCountOf(c);
// allowCoreThreadTimeOut,这个讲一个,这个布尔值代表,核心线程数是否也可以被回收
// 如果为true,空闲时会保证keepAliveTime的时候,过期销毁
// 如果为false(默认),那么在空闲时也会保持活动
// 这里主要判断是否允许超时保留核心线程,用来确定下面阻塞队列的阻塞时间
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 加这个判断,主要是想留一个线程在这循环阻塞,加快从队列中取任务的流程步骤
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 返回
if (r != null)
return r;
// 没有就一直处在循环之中,并配合上面的107行判断使用
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
}
当前线程池的解读就到这里了,如果不是为了造火箭,谁会看这么底层的东西。
有一说一,这线程池写起来真的很优雅!!!
我是半月,你我一同共勉!