线程池由两个核心数据结构组成: 1)线程集合(workers):存放执行任务的线程,是一个HashSet; 2)任务等待队列(workQueue):存放等待线程池调度执行的任务,是一个阻塞式队列BlockingQueue; 关闭线程池时有两个关键步骤: 1)修改线程池状态到SHUTDOWN,这时新提交到线程池的任务都会被直接拒绝; 2)中断线程池中的所有线程,中断任务执行回收线程集合中所有线程。 线程池有如下状态: ●RUNNING:接收新任务,处理队列任务。 ●SHUTDOWN:不接收新任务,但处理队列任务。 ●STOP:不接收新任务,也不处理队列任务,并且中断所有处理中的任务。 ●TIDYING:所有任务都被终结,有效线程为0。会触发terminated()方法。 ●TERMINATED:当terminated()方法执行结束 线程池的状态转换规则: (1)线程池创建之后状态为RUNNING。 (2)执行线程池的shutdown()实例方法,会使线程池状态从RUNNING转变为SHUTDOWN (3)执行线程池的shutdownNow()实例方法,会使线程池状态从RUNNING转变为STOP(4)当线程池处于SHUTDOWN状态时,执行其shutdownNow()方法会将其状态转变为STOP(5)等待线程池的所有工作线程停止工作队列清空之后,线程池状态会从STOP转变为TIDYIN(6)执行完terminated()钩子方法之后,线程池状态从TIDYING转变为TERMINATED。
优雅地关闭线程池主要涉及的方法:
(1)shutdown:是JUC提供的一个有序关闭线程池的方法,此方法会等待当前工作队列中的剩余任务全部执行完成之后,才会执行关闭,但是此方法被调用之后线程池的状态转为SHUTDOWN,线程池不会再接收新的任务。
(2)shutdownNow:是JUC提供的一个立即关闭线程池的方法,此方法会打断正在执行的工作线程,并且会清空当前工作队列中的剩余任务,返回的是尚未执行的任务。
(3)awaitTermination:等待线程池完成关闭。在调用线程池的shutdown()与shutdownNow()方法时,当前线程会立即返回,不会一直等待直到线程池完成关闭。如果需要等到线程池关闭完成,可以调用awaitTermination()方法。
Executor:
Executors:Exectors工厂类提供了线程池的初始化接口:
newFixedThreadPool:初始化一个指定线程数的线程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作为阻塞队列,不过当线程池没有可执行任务时,也不会释放线程。
newCachedThreadPool:1、初始化一个可以缓存线程的线程池,默认缓存60s,线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列; 2、和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
corePoolSize: 规定线程池有几个线程(worker)在运行。
maximumPoolSize: 当workQueue满了,不能添加任务的时候,这个参数才会生效。规定线程池最多只能有多少个线程(worker)在执行。
keepAliveTime: 超出corePoolSize大小的那些线程的生存时间,这些线程如果长时间没有执行任务并且超过了keepAliveTime设定的时间,就会消亡。
unit: 生存时间对于的单位
workQueue: 存放任务的队列
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有优先级的无界阻塞队列;
threadFactory: 创建线程的工厂
handler: 当workQueue已经满了,并且线程池线程数已经达到maximumPoolSize,将执行拒绝策略。
线程池提供了4种策略:
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
任务阻塞队列:在一个线程从一个空的阻塞队列中获取元素时线程会被阻塞,直到阻塞队列中有了元素;当队列中有元素后,被阻塞的线程会自动被唤醒(唤醒过程不需要用户程序干预)。
1)ArrayBlockingQueue:是一个数组实现的有界阻塞队列(有界队列),队列中的元素按FIFO排序。ArrayBlockingQueue在创建时必须设置大小,接收的任务超出corePoolSize数量时,任务被缓存到该阻塞队列中,任务缓存的数量只能为创建时设置的大小,若该阻塞队列已满,则会为新的任务创建线程,直到线程池中的线程总数大于maximumPoolSize。
(2)LinkedBlockingQueue:是一个基于链表实现的阻塞队列,按FIFO排序任务,可以设置容量(有界队列),不设置容量则默认使用Integer.Max_VALUE作为容量(无界队列)。该队列的吞吐量高于ArrayBlockingQueue。如果不设置LinkedBlockingQueue的容量(无界队列),当接收的任务数量超出corePoolSize时,则新任务可以被无限制地缓存到该阻塞队列中,直到资源耗尽。有两个快捷创建线程池的工厂方法Executors.newSingleThreadExecutor和Executors.newFixedThreadPool使用了这个队列,并且都没有设置容量(无界队列)。
(3)PriorityBlockingQueue:是具有优先级的无界队列。
(4)DelayQueue:这是一个无界阻塞延迟队列,底层基于PriorityBlockingQueue实现,队列中每个元素都有过期时间,当从队列获取元素(元素出队)时,只有已经过期的元素才会出队,队列头部的元素是过期最快的元素。快捷工厂方法Executors.newScheduledThreadPool所创建的线程池使用此队列。
(5)SynchronousQueue:(同步队列)是一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程的调用移除操作,否则插入操作一直处于阻塞状态,其吞吐量通常高于LinkedBlockingQueue。快捷工厂方法Executors.newCachedThreadPool所创建的线程池使用此队列。与前面的队列相比,这个队列比较特殊,它不会保存提交的任务,而是直接新建一个线程来执行新来的任务。
用户通过submit提交一个任务。线程池会执行如下流程:
判断当前运行的worker数量是否超过corePoolSize,如果不超过corePoolSize。就创建一个worker直接执行该任务。—— 线程池最开始是没有worker在运行的
如果正在运行的worker数量超过或者等于corePoolSize,那么就将该任务加入到workQueue队列中去。
如果workQueue队列满了,也就是offer方法返回false的话,就检查当前运行的worker数量是否小于maximumPoolSize,如果小于就创建一个worker直接执行该任务。
如果当前运行的worker数量是否大于等于maximumPoolSize,那么就执行RejectedExecutionHandler来拒绝这个任务的提交。
====>
(1)如果当前工作线程数量小于核心线程数量,执行器总是优先创建一个任务线程,而不是从线程队列中获取一个空闲线程。
(2)如果线程池中总的任务数量大于核心线程池数量,新接收的任务将被加入阻塞队列中,一直到阻塞队列已满。在核心线程池数量已经用完、阻塞队列没有满的场景下,线程池不会为新任务创建一个新线程。
(3)当完成一个任务的执行时,执行器总是优先从阻塞队列中获取下一个任务,并开始执行,一直到阻塞队列为空,其中所有的缓存任务被取光。
(4)在核心线程池数量已经用完、阻塞队列也已经满了的场景下,如果线程池接收到新的任务,将会为新任务创建一个线程(非核心线程),并且立即开始执行新任务。
(5)在核心线程都用完、阻塞队列已满的情况下,一直会创建新线程去执行新任务,直到池内的线程总数超出maximumPoolSize。如果线程池的线程总数超过maximumPoolSize,线程池就会拒绝接收任务,当新任务过来时,会为新任务执行拒绝策略。
两个知识点:
(1)核心和最大线程数量、BlockingQueue队列等参数如果配置得不合理,可能会造成异步任务得不到预期的并发执行,造成严重的排队等待现象。
(2)线程池的调度器创建线程的一条重要的规则是:在corePoolSize已满之后,还需要等阻塞队列已满,才会去创建新的线程。
调度器的钩子方法:
ThreadPoolExecutor类提供了三个钩子方法:
//任务执行之前的钩子方法(前钩子)
protected void beforeExecute(Thread t, Runnable r) { }
//任务执行之后的钩子方法(后钩子)
protected void afterExecute(Runnable r, Throwable t) { }
//线程池终止时的钩子方法(停止钩子)
protected void terminated() { }
beforeExecute和afterExecute两个方法在每个任务执行前后被调用,
如果钩子(回调方法)引发异常,内部工作线程可能失败并突然终止。
public class CreateThreadPoolDemo{
@org.junit.Test
public void testHooks(){
ExecutorService pool = new ThreadPoolExecutor(2, //coreSize
4, //最大线程数
60,//空闲保活时长
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2)) //等待队列 {
//继承:调度器终止钩子
@Override
protected void terminated(){
Print.tco("调度器已经终止!");
}
//继承:执行前钩子
@Override
protected void beforeExecute(Thread t, Runnable target){
Print.tco( target +"前钩被执行");
//记录开始执行时间
startTime.set(System.currentTimeMillis());
super.beforeExecute(t, target);
}
//继承:执行后钩子
@Override
protected void afterExecute(Runnable target, Throwable t){
super.afterExecute(target, t);
//计算执行时长
long time = (System.currentTimeMillis() - startTime.get()) ;
Print.tco( target + " 后钩被执行, 任务执行时长(ms):" + time);
//清空本地变量
startTime.remove();
}
};
for (int i = 1; i <= 5; i++){
pool.execute(new TargetTask());
}
//等待10秒
sleepSeconds(10);
Print.tco("关闭线程池");
pool.shutdown();
}
// 省略其他
}
示例代码在beforeExecute(前钩子)方法中通过startTime线程局部
变量暂存了异步目标任务(如Runnable实例)的开始执行时间(起始时
间),在afterExecute(后钩子)方法中通过startTime线程局部变量获取
了之前暂存的起始时间,然后计算与系统当前时间(结束时间)之间的
时间差,从而得出异步目标任务的执行时长。
Executors创建线程池主要弊端:
1.使用Executors创建“固定数量的线程池”的潜在问题
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(
nThreads, // 核心线程数
nThreads, // 最大线程数
0L, // 线程最大空闲(Idle)时长
TimeUnit.MILLISECONDS, // 时间单位:毫秒
new LinkedBlockingQueue<Runnable>() //任务的排队队列,无界队列
);
}
newFixedThreadPool工厂方法返回一个ThreadPoolExecutor实例,该线程池实例的corePoolSize数量为参数nThread,其maximumPoolSize数量也为参数nThread,其workQueue属性的值为LinkedBlockingQueue<Runnable>()无界阻塞队列。
使用Executors创建“固定数量的线程池”的潜在问题主要存在于其workQueue上,其值为LinkedBlockingQueue(无界阻塞队列)。如果任务提交速度持续大于任务处理速度,就会造成队列中大量的任务等待。如果队列很大,很有可能导致JVM出现OOM(Out Of Memory)异常,即内存资源耗尽。
2.使用Executors创建“单线程化线程池”的潜在问题
public static ExecutorService newSingleThreadExecutor(){
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(
1, // 核心线程数
1, // 最大线程数
0L, // 线程最大空闲(Idle)时长
TimeUnit.MILLISECONDS, //时间单位:毫秒
new LinkedBlockingQueue<Runnable>() //无界队列
));
}
使用Executors创建的“单线程化线程池”与“固定大小的线程池”一样,其潜在问题仍然存在于其workQueue属性上,该属性的值为LinkedBlockingQueue(无界阻塞队列)。如果任务提交速度持续大于任务处理速度,就会造成队列大量阻塞。如果队列很大,很有可能导致JVM的OOM异常,甚至造成内存资源耗尽。
3.使用Executors创建“可缓存线程池”的潜在问题
public static ExecutorService newCachedThreadPool(){
return new ThreadPoolExecutor(
0, // 核心线程数
Integer.MAX_VALUE, // 最大线程数
60L, // 线程最大空闲(Idle)时长
TimeUnit.MILLISECONDS, // 时间单位:毫秒
new SynchronousQueue<Runnable>() // 任务的排队队列,无界队列
);
}
当“可缓存线程池”有新任务到来时,新任务会被插入SynchronousQueue实例中,由于SynchronousQueue是同步队列,因此会在池中寻找可用线程来执行,若有可用线程则执行,若没有可用线程,则线程池会创建一个线程来执行该任务。SynchronousQueue是一个比较特殊的阻塞队列实现类,
SynchronousQueue没有容量,每一个插入操作都要等待对应的删除操作,反之每个删除操作都要等待对应的插入操作。也就是说,如果使用SynchronousQueue,提交的任务不会被真实地保存,而是将新任务交给空闲线程执行,如果没有空闲线程,就创建线程,如果线程数都已经大于最大线程数,就执行拒绝策略。使用这种队列需要将maximumPoolSize设置得非常大,从而使得新任务不会被拒绝。
使用Executors创建的“可缓存线程池”的潜在问题存在于其最大线程数量不设限上。由于其maximumPoolSize的值为Integer.MAX_VALUE(非常大),可以认为可以无限创建线程,如果任务提交较多,就会造成大量的线程被启动,很有可能造成OOM异常,甚至导致CPU线程资源耗尽。
4.使用Executors创建“可调度线程池”的潜在问题
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize){
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize){
super(corePoolSize, // 核心线程数
Integer.MAX_VALUE, // 最大线程数
0, // 线程最大空闲(Idle)时长
NANOSECONDS,//时间单位
new DelayedWorkQueue() //任务的排队队列
);
}
使用Executors创建的“可缓存线程池”的潜在问题存在于其最大线程数量不设限上。由于其线程数量不设限,如果到期任务太多,就会导致CPU的线程资源耗尽。
线程池任务分类: (1)IO密集型任务 此类任务主要是执行IO操作。由于执行IO操作的时间较长,导致CPU的利用率不高,这类任务CPU常处于空闲状态。Netty的IO读写操为此类任务的典型例子。由于IO密集型任务的CPU使用率较低,导致线程空余时间很多,因此通常需要开CPU核心数两倍的线程。当IO线程空闲时,可以启用其他线程继续使用CPU,以提高CPU的使用率。 Netty的IO处理任务就是典型的IO密集型任务。所以,Netty的Reactor(反应器)实现类(定制版的线程池)的IO处理线程数默认正好为CPU核数的两倍,corePoolSize和maximumPoolSize保持一致,使得在接收到新任务时,如果没有空闲工作线程,就优先创建新的线程去执行新任务,而不是优先加入阻塞队列,等待现有工作线程空闲后再执行。
(2)CPU密集型任务
此类任务主要是执行计算任务。由于响应时间很快,CPU一直在运行,这种任务CPU的利用率很高。CPU密集型任务也叫计算密集型任务,其特点是要进行大量计算而需要消耗CPU资源,比如计算圆周率、对视频进行高清解码等。
CPU密集型任务虽然也可以并行完成,但是并行的任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以要最高效地利用CPU,CPU密集型任务并行执行的数量应当等于CPU的核心数。
(3)混合型任务
此类任务既要执行逻辑计算,又要进行IO操作(如RPC调用、数据库访问)。相对来说,由于执行IO操作的耗时较长(一次网络往返往往在数百毫秒级别),这类任务的CPU利用率也不是太高。Web服务器的HTTP请求处理操作为此类任务的典型例子。
混合型任务既要执行逻辑计算,又要进行大量非CPU耗时操作(如RPC调用、数据库访问、网络通信等),所以混合型任务CPU的利用率不是太高,非CPU耗时往往是CPU耗时的数倍。比如在Web应用中处理HTTP请求时,一次请求处理会包括DB操作、RPC操作、缓存操作等多种耗时操作。一般来说,一次Web请求的CPU计算耗时往往较少,大致在100~500毫秒,而其他耗时操作会占用500~1000毫秒,甚至更多的时间
最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1) * CPU核数
等待时间所占的比例越高,需要的线程就越多;CPU耗时所占的比例越高,需要的线程就越少。