Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程。在底层,操作系统内核将这些线程映射到硬件处理器上。
下面从以下几个方面学习一下线程池
1)Executors的创建
2)ThreadPoolExecutor的使用
3)FixedThreadPool固定线程数的线程池
4)SingleThreadExecutor单个worker线程池
5)CachedThreadPool
6)线程优雅地停止
整个线程池框架的类继承图如下
Executors是线程池框架提供给我们的创建线程池的工具类,它里面提供了以下创建几类线程池的方法。
// 创建固定线程数量的线程池
public static ExecutorService newFixedThreadPool();
// 创建单个线程的线程池(本质上就是容量为1的FixedThreadPool)
public static ExecutorService newSingleThreadExecutor();
// 创建无数量限制可自动增减线程的线程池
public static ExecutorService newCachedThreadPool();
// 创建(可计划的)任务延时执行线程池
public static ScheduledExecutorService newScheduledThreadPool();
// 单线程版的任务计划执行的线程池
public static ScheduledExecutorService newSingleThreadScheduledExecutor();
通过查看这几个方法的源码发现:前三个方法new了ThreadPoolExecutor对象,而后面两个方法new了ScheduledThreadPoolExecutor对象。ScheduledThreadPoolExecutor下次再讲。
Executor框架最核心的类就是ThreadPoolExecutor,它是线程池的实现类。
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,//核心池的大小。
int maximumPoolSize,//线程池允许的最大线程数。
long keepAliveTime,//表示线程没有任务执行时最多保持多久时间会终止。
TimeUnit unit,//参数keepAliveTime的时间单位
BlockingQueue<Runnable> workQueue,//一个阻塞队列(BlockingQueue接口的实现类)
ThreadFactory threadFactory,//线程工厂,主要用来创建线程;
RejectedExecutionHandler handler//表示当拒绝处理任务时的策略) {
//代码省略
}
...
}
构造器中各个参数的含义:
1、corePoolSize:核心池的大小。
核心池中的线程会一致保存在线程池中(即使线程空闲),除非调用allowCoreThreadTimeOut方法允许核心线程在空闲后一定时间内销毁,该时间由构造方法中的keepAliveTime和unit参数指定;
在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这两个方法的名字就可以看出,是“预创建线程”的意思,即在没有任务到来之前就创建corePoolSize个线程(prestartAllCoreThreads)或者一个线程(prestartCoreThread);
2、maximumPoolSize:线程池允许的最大线程数。
这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程。
默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把新加入的任务放到缓存队列当中,缓存队列由构造方法中的workQueue参数指定,如果入队失败(队列已满)则尝试创建临时线程,但临时线程和核心线程的总数不能超过maximumPoolSize,当线程总数达到maximumPoolSize后会拒绝新任务;所以有两种方式可以让任务绝不被拒绝:
① 将maximumPoolSize设置为Integer.MAX_VALUE(线程数不可能达到这个值),CachedThreadPool就是这么做的;
② 使用无限容量的阻塞队列(比如LinkedBlockingQueue),所有处理不过来的任务全部排队去,FixedThreadPool就是这么做的。
3、keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。
默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用——当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会被销毁,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(true)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
4、unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
并发库中所有时间表示方法都是以TimeUnit枚举类作为单位
5、workQueue:一个阻塞队列(BlockingQueue接口的实现类),用来存储等待执行的任务,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue // 数组实现的阻塞队列,数组不支持自动扩容。所以当阻塞队列已满
// 线程池会根据handler参数中指定的拒绝任务的策略决定如何处理后面加入的任务
LinkedBlockingQueue // 链表实现的阻塞队列,默认容量Integer.MAX_VALUE(不限容),
// 当然也可以通过构造方法限制容量
SynchronousQueue // 零容量的同步阻塞队列,添加任务直到有线程接受该任务才返回
// 用于实现生产者与消费者的同步,所以被叫做同步队列
PriorityBlockingQueue // 二叉堆实现的优先级阻塞队列
DelayQueue // 延时阻塞队列,该队列中的元素需要实现Delayed接口
// 底层使用PriorityQueue的二叉堆对Delayed元素排序
// ScheduledThreadPoolExecutor底层就用了DelayQueue的变体"DelayWorkQueue"
// 队列中所有的任务都会封装成ScheduledFutureTask对象(该类已实现Delayed接口)
6、threadFactory:线程工厂,主要用来创建线程;默认情况都会使用Executors工具类中定义的默认工厂类DefaultThreadFactory。可以实现ThreadFactory接口来自己控制创建线程池的过程(比如设置创建线程的名字、优先级或者是否为Deamon守护线程)
7、handler:表示当拒绝处理任务时的策略,有以下四种取值(默认为AbortPolicy):
ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
可通过实现RejectedExecutionHandler接口来自定义任务拒绝后的处理策略。
以上参数解析,除此之外Java提供了3种常用的ThreadPoolExecutor:
1、FixedThreadPool
2、SingleThreadExecutor
3、CacheThreadPool
FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。
FixedThreadPool的execute()方法运行如下图:
说明:1)如果当前运行的线程数小于corePoolSize,则创建新线程来执行任务。2)在线程池完成预热之后(当前运行的线程等于corePoolSize),将任务加入LinkedBlockingQueue。3)线程执行完1)中的任务后,会循环中反复从LinkedBlockingQueue获取任务来执行。
SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1,其他参数与FixedThreadPool相同。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。SingleThreadExecutor使用无界队列作为工作队列对线程池带来的影响与FixedThreadPool相同。
说明:1)如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务。2)在线程池完成预热之后(当前线程池中有一个运行的线程),将任务加入LinkedBlockingQueue。3)线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行。
CachedThreadPool是一个会根据需要创建新线程的线程池。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是无界的。这里把keepAliveTime设置为60L,意味着CachedThreadPool中空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。
FixedThreadPool和SingleThreadExecutor使用无界队列LinkBlockingQueue作为线程池的工作队列。CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的。这就意味着如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。
Java虚拟机会先将该线程的中断标识位清除,然后抛出InterruptedException,因为在发生InterruptedException异常的时候,会清除中断标记。如果不加处理,那么下一次循环开始的时候,就无法捕获这个异常。故在异常处理中,再次设置中断标记位
public class ThreadStopSafeInterrupted {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread() {
@Override
public void run() {
while (true) {
// 使用中断机制,来终止线程
if (Thread.currentThread().isInterrupted()) {
System.out.println("Interrupted ...");
break;
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("Interrupted When Sleep ...");
// Thread.sleep()方法由于中断抛出异常。
// Java虚拟机会先将该线程的中断标识位清除,然后抛出InterruptedException,
// 因为在发生InterruptedException异常的时候,会清除中断标记
// 如果不加处理,那么下一次循环开始的时候,就无法捕获这个异常。
// 故在异常处理中,再次设置中断标记位
Thread.currentThread().interrupt();
}
}
}
};
// 开启线程
thread.start();
Thread.sleep(2000);
thread.interrupt();//主线程发起中断
}
}
/**
* 停止线程池中的所有线程
*/
private void stopDownloadThreadTask() {
try {
this.fixedThreadPool.shutdown();//尝试停止所有线程
if(!this.fixedThreadPool.awaitTermination(5 * 1000, TimeUnit.MILLISECONDS)){
this.fixedThreadPool.shutdownNow();//规定时间内还未停止,再次请求停止
}
} catch (InterruptedException e) {
logger.error("awaitTermination interrupted: " + e);
this.fixedThreadPool.shutdownNow();//停不了就再停止一次。
}
}
《Java并发编程艺术》