线程池: 简单理解,它就是一个管理线程的池子。
线程池的优点:
图解线程池流程 先来一张通俗易懂的:
文字理解:工厂中有固定的一批工人,称为正式工人,工厂接收的订单由这些工人去完成。当订单增加,正式工人已经忙不过来了,工厂会将生产原料暂时堆积在仓库中,等有空闲的工人时再处理(因为工人空闲了也不会主动处理仓库中的生产任务,所以需要调度员实时调度)。仓库堆积满了后,订单还在增加怎么办?工厂只能临时扩招一批工人来应对生产高峰,而这批工人高峰结束后是要清退的,所以称为临时工。当时临时工也以招满后(受限于工位限制,临时工数量有上限),后面的订单只能忍痛拒绝了。 将其翻译成线程池理解:
线程池关键参数的理解(在图中可以看到):
图解参数之间的关系:
看完工作流程后再来看下线程池工作原理(图解):
创建线程池的方式:
Executors
去快捷创建线程池,而是通过 ThreadPoolExecutor
构造函数的方式,除了避免 OOM 的原因外,这样的处理方式可以让我们更加明确线程池的运行规则,规避资源耗尽的风险。
也就是说使用有界队列,控制线程创建数量。因为实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。我们应该显示地给我们的线程池命名,有助于后期快速定位问题。 Executors 返回线程池对象的弊端:
FixedThreadPool
和 SingleThreadExecutor
:允许请求的队列长度为 Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM(java.lang.OutOfMemoryError:内存用完了)。
CachedThreadPool
和ScheduledThreadPool
:允许创建的线程数量为 Integer.MAX_VALUE
,可能会创建大量线程,从而导致 OOM。
示例:一个简单的for遍历线程
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class App
{
//corePoolSize: 定义核心线程数为 3
private static final Integer CORE_POOL_SIZE = 3;
//maximumPoolSize:定义最大线程数为 9
private static final Integer MAX_POOL_SIZE = 9;
//workQueue:定义任务队列为 ArrayBlockingQueue,并且容量为 90
private static final Integer QUEUE_CAPACITY = 90;
//keepAliveTime: 定义等待时间为 1
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {
//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,//unit: 等待时间的单位为 TimeUnit.SECONDS
new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY), //workQueue:定义任务队列为 ArrayBlockingQueue
new ThreadPoolExecutor.CallerRunsPolicy());//handler:定义拒绝策略为 CallerRunsPolicy
// 模拟向线程池提交(6个线程)任务
for (int i = 0; i < 6; i++) {
//执行线程;这里直接使用的Runnable匿名对象;也可以自行创建Runnable或Callable的实现类
executor.execute(new Runnable() {
@Override
public void run() {
Long start = System.currentTimeMillis(); //获取线程开始执行时间
//这里随便写了一个遍历功能的线程
for (int numValue = 0; numValue <= 2; numValue++) {
System.out.println(Thread.currentThread().getName() + ":" + numValue + " " + new Date(System.currentTimeMillis()));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":当前线程任务结束,总共花费时间:" + (System.currentTimeMillis() - start) / 1000 + "秒");
}
}
);
}
//终止线程池
executor.shutdown(); //设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程
// executor.shutdownNow(); //(慎用)设置线程池的状态为STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,该方法要慎用,容易造成不可控的后果
while (!executor.isTerminated()) {
//使用isTerminated()函数判断线程池中的所有线程是否执行完毕时候,不能直接使用该函数,必须在shutdown()方法关闭线程池之后才能使用,
//否则isTerminated()永远为false,线程将一直阻塞在该判断的地方,导致程序最终崩溃。
}
System.out.println("所有线程都已结束");
}
}
打印结果:
pool-1-thread-2:0 Thu Sep 30 10:42:01 CST 2021
pool-1-thread-1:0 Thu Sep 30 10:42:01 CST 2021
pool-1-thread-3:0 Thu Sep 30 10:42:01 CST 2021
pool-1-thread-2:1 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-1:1 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-3:1 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-2:2 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-1:2 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-3:2 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-3:当前线程任务结束,总共花费时间:1秒
pool-1-thread-3:0 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-1:当前线程任务结束,总共花费时间:1秒
pool-1-thread-1:0 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-2:当前线程任务结束,总共花费时间:1秒
pool-1-thread-2:0 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-3:1 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-2:1 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-1:1 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-3:2 Thu Sep 30 10:42:04 CST 2021
pool-1-thread-1:2 Thu Sep 30 10:42:04 CST 2021
pool-1-thread-2:2 Thu Sep 30 10:42:04 CST 2021
pool-1-thread-2:当前线程任务结束,总共花费时间:1秒
pool-1-thread-1:当前线程任务结束,总共花费时间:1秒
pool-1-thread-3:当前线程任务结束,总共花费时间:1秒
所有线程都已结束
打印结果分析: 该示例中总共向线程池提交了6个线程任务,但是该线程池的核心线程只有3个,所以根据示例中定义的拒绝策略和任务队列,剩下的3个线程在任务队列中,根据打印结果可以看到,当前面的3个线程分别执行完毕时,空出的核心线程会被剩下的3个线程任务拿到,然后继续执行剩下的线程任务
创建可重用固定线程数的线程池。
示例:
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executor.execute(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//do nothing
}
});
使用场景: 适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。 缺点:
当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize;
由于使用无界队列时 maximumPoolSize 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 FixedThreadPool的源码可以看出创建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 被设置为同一个值。
由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数;
运行中的 FixedThreadPool(未执行 shutdown()或 shutdownNow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)
提交任务
因为没有核心线程,所以任务直接加到SynchronousQueue队列。
判断是否有空闲线程,如果有,就去取出任务执行。
如果没有空闲线程,就新建一个线程执行。
执行完任务的线程,还可以存活60秒,如果在这期间,接到任务,可以继续活下去;否则被销毁。
使用场景: 因为maximumPoolSize是无界的,所以提交任务的速度 > 线程池中线程处理任务的速度就要不断创建新线程;每次提交任务,都会立即有线程去处理,因此CachedThreadPool适用于处理大量、耗时少的任务。 缺点: CachedThreadPool允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
只有一个线程的线程池。
核心线程数为1
最大线程数也为1
阻塞队列是LinkedBlockingQueue
keepAliveTime为0
使用场景: 适用于串行执行任务的场景,一个任务一个任务地执行。 缺点: SingleThreadExecutor 使用无界队列 LinkedBlockingQueue 作为线程池的任务队列(队列的容量为 Intger.MAX_VALUE)。SingleThreadExecutor 使用无界队列作为线程池的任务队列会对线程池带来的影响与 FixedThreadPool 相同。说简单点就是可能会导致 OOM,
execute()
方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
submit()
方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()
方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)
方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
以AbstractExecutorService
接口中的一个 submit 方法为例:
public Future<?> submit(Runnable task) {
if (task == null) thrownew NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
上面方法调用的 newTaskFor()
方法返回了一个 FutureTask 对象。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
对比execute()
方法:
public void execute(Runnable command) {
...
}
shutdown()
:关闭线程池,线程池的状态变为 SHUTDOWN
。线程池不再接受新任务了,但是队列里的任务得执行完毕。
shutdownNow()
:关闭线程池,线程的状态变为 STOP
。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List;该方法要慎用,容易造成不可控的后果
isShutDown
当调用 shutdown()
方法后,返回为 true,否则为false。
isTerminated
当调用shutdown()
方法后,并且所有提交的任务完成后返回为 true,否则为false。
RUNNING
SHUTDOWN
STOP
TIDYING
TERMINATED