java.util.concurrent.Executors提供了简单易用的Java线程池创建方法,但是在面对复杂的多线程使用场景时,依然无法满足我们所有的需求。我们需要更加灵活的线程池,以应繁杂的使用需求。使用自定义Java线程池有以下几点好处:
(1)控制线程数量。不仅仅是固定线程数量也不是无限制创建新的线程数量,而且根据使用场景配置,既能够保障任务处理速率又能避免创建大量线程。
(2)线程生命周期。自定义线程池可以设置空闲回收策略,能够提升活跃线程的复用率也能释放多余线程降低资源消耗。
(3)提高伸缩性。线程池中的线程数不仅可以通过配置控制,还可以通过API进行动态调整,极大提升了单个线程池对于不同场景的适配能力,根据负载的情况及时调整线程池的配置。
那么我们该如何创建自定义线程池呢?
相信聪明的读者已经从固定线程线程池和缓存线程池两者的创建方法体中窥见端倪。两种常用线程池的创建方法共同使用到了同一个API:java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue<java.lang.Runnable>)
,但这还不是基础的构造方法,我们逐本溯源,就能发现下面这个java.util.concurrent.ThreadPoolExecutor
的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
这个方法就是本节的核心,接下来我们依据该方法讲解在性能测试中如何使用自定义线程池。对于初学者来讲,需要掌握以下几点内容:
下面是ThreadPoolExecutor构造方法详解,首先我们看到最基础的构造方法有哪些构造参数:
对于性能测试的初学来讲,第1~4的参数是比较重要的,5~7的参数可以理解即可,使用参数可以使用默认值。这里每个参数如果深究的话,又会牵扯出来其他知识点,笔者会根据实际工作使用去情况进行适当简化,避免增加大家学习负担。
下面单独讲解这些参数的含义。
corePoolSize 是Java线程池核心线程数。在使用当中有两个含义:
maximumPoolSize是Java线程池最大线程数。含义为线程池能够容纳最大线程数量。若一个线程被创建后因为异常终止,则不会被统计到。
keepAliveTime和unit这两个参数合起来是Java线程池中线程的最大空闲时间。当一个线程因需要被创建,执行完任务之后变成空闲状态。线程池中线程空闲保持空闲状态超过设定值即会被回收,释放资源。
workQueue是Java线程池保存还未被执行任务的队列。类型为BlockingQueue<Runnable>
,这里有几点含义:
这里有一个隐藏的参数,队列的大小。当任务队列满了,再有任务试图提交的话,会尝试创建新的线程执行任务。若无法新建,则会执行拒绝策略。
threadFactory是Java线程池的线程工厂。在创建新的线程时,执行特殊的逻辑。若要创建对象,必须实现这个方法:Thread newThread(Runnable r),将java.lang.Runnable任务组装成一个java.lang.Thread,最终提交到线程池中。
handler是Java线程池拒绝策略。当一个任务无法被提交到线程池中时,就会执行拒绝策略。java.util.concurrent包提供了4中常用的拒绝策略实现:
经过参数分析,相信你已经对线程池有了更加深入的理解,但对于各个参数如何在线程创建流程发挥作用可能还有点模糊,通过下面这张图帮助大家理解这个重点的知识点。
线程创建逻辑图
这张图已经将性能测试中常遇到的情况考虑到了,其中有几点关键需要掌握:
(1)创建新线程的时机。总计2个时机:当有新任务又没有达到核心线程数时;当任务队列已满,又未达到最大线程数时。
(2)执行拒绝策略的时机。当队列满叠加线程池达到最大线程数时,执行拒绝策略。不同的拒绝策略后续的处理方式各有不同。
(3)慎重选择任务队列容量大小。如果选择过大的任务队列,首先会产生大量任务堆积,其次线程池无法创建新线程去加快任务执行速度。实际的效果等同于创建一个固定线程数(线程数等于corePoolSize)的线程池,失去了自定义线程池的意义。
(4)选择合适的拒绝策略。合适的拒绝策略可以在任务队列大小和并发执行效率之间做到更好的平衡。如果Java提供4种拒绝策略均无法满足需求,可以自定义拒绝策略。
这里有个常见的误解,很多初学者会认为,当线程池数量未达到最大值时,只要有未执行的任务(此处包含新提交和等待队列种),就会创建新的线程池,尽可能消耗未完成任务。
下面我们来写一段演示代码:
package org.funtester.performance.books.chapter01.section3;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线程池创建线程示例
*/
public class CreateThreadDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2));// 创建线程池
for (int i = 0; i < 4; i++) {
int index = i;// 任务索引,用于标识任务,由于lambda表达式中的变量必须是final或者等效的,所以这里使用局部变量
Thread.sleep(200);
executor.execute(() -> {// 提交任务
try {
Thread.sleep(1000);// 模拟任务执行,睡眠1秒,避免任务过快执行完毕
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis() + " " + index + " 执行任务");// 打印任务执行信息
});
System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis() + " " + index + " 提交任务");// 打印任务提交信息
}
executor.shutdown();// 关闭线程池,不再接受新任务,但会执行完队列中的任务,并不会立即关闭
}
}
这里我们创建了一个核心线程数为0,最大线程数2,等待队列容量为10的线程池。然后间隔200毫秒循环向线程池提交4个异步任务。相信会有小伙伴认为线程池会很快创建2个线程执行这4个任务,事实是这样的么?下面我们看看控制台打印的内容:
main 1712996368590 0 提交任务
main 1712996368795 1 提交任务
main 1712996369000 2 提交任务
main 1712996369201 3 提交任务
pool-1-thread-1 1712996369591 0 执行任务
pool-1-thread-1 1712996370592 1 执行任务
pool-1-thread-1 1712996371593 2 执行任务
pool-1-thread-1 1712996372594 3 执行任务
可以看出,虽然4个异步都在1秒内提交成功了,但只有一个线程在执行,因为线程池当时只有一个线程可用使用。
现在我们把例子中等待队列的容量大小改成2,再运行看看控制台打印的内容:
main 1712996440747 0 提交任务
main 1712996440950 1 提交任务
main 1712996441155 2 提交任务
main 1712996441359 3 提交任务
pool-1-thread-1 1712996441748 0 执行任务
pool-1-thread-2 1712996442361 3 执行任务
pool-1-thread-1 1712996442749 1 执行任务
pool-1-thread-2 1712996443362 2 执行任务
从日志信息我们大致可以推断出当时情况是这样的:
(1)第一个任务被提交,线程池创建了1个线程执行。
(2)第二个任务被提交,因为第一个任务还在执行,所以暂时放在了等待队列中。
(3)第三个任务被提交,因为线程池中没有空闲线程,而且第二个任务还没被执行,所以也只能在等待队列中待着。
(4)第四个任务被提交,这时第一个任务依旧在执行中,而且等待线程池已经满了,然后线程池创建了第二个线程,然后开始优先执行第四个任务。
(5)第一个任务执行完毕,从等待队列中取出第二个任务,交给第一个线程来执行。
(6)第四个任务执行完毕,从等待队列中把第三个任务取出来,交给了第二个线程执行。
从中可以看出线程池创建新线程的时机关键在于等待队列是否已经满了,这是第一个反直觉的点。当新任务无处安放时,线程池开始创建新的线程,先把无处安放的新任务执行了,而不是去等待队列中捞那些等待许久的任务,这是第二个反直觉的点。这部分源码路径java.util.concurrent.ThreadPoolExecutor#execute,内容如下:
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
当向队列提交失败之后,则会调用java.util.concurrent.ThreadPoolExecutor#addWorker,第一个参数类型是java.lang.Runnable,也就是我们提交的异步任务,第二个参数代表是否核心线程,这里用的是false。
当我们把核心线程数改成2,最大线程数改成3,等待队列容量改为10,再来执行。控制台打印如下:
main 1712997072632 0 提交任务
main 1712997072832 1 提交任务
main 1712997073038 2 提交任务
main 1712997073242 3 提交任务
pool-1-thread-1 1712997073643 0 执行任务
pool-1-thread-2 1712997073834 1 执行任务
pool-1-thread-1 1712997074644 2 执行任务
pool-1-thread-2 1712997074834 3 执行任务
线程池主动创建了2个线程来执行异步任务,而且是在等待队列还没有满的情况下。这是因为提交异步任务的时候,首先判断了当先线程数是否小于核心线程数,源码路径java.util.concurrent.ThreadPoolExecutor#execute,内容如下:
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
书的名字:从 Java 开始做性能测试 。
如果本书内容对你有所帮助,希望各位多多赞赏,让我可以贴补家用。赞赏两位数可以提前阅读未公开章节。我也会尝试制作本书的视频教程,包括必要的答疑。