我们继续前面的《【JUC基础】12.线程池(一)》。
Executors类是Java并发工具包(java.util.concurrent)中提供的一个工具类,用于创建和管理线程池。它提供了一些静态方法,用于创建不同类型的线程池,简化了线程池的创建和配置过程。
Executor框架提供了各种类型的线程池,主要方法有:
/**
* 固定线程大小的线程池
*/
public static ExecutorService newFixedThreadPool(int nThreads)
/**
* 单线程的线程池
*/
public static ExecutorService newSingleThreadExecutor()
/**
* 可根据实际情况调整线程数量的线程池
*/
public static ExecutorService newCachedThreadPool()
/**
* 单线程的线程池,扩展了延时和周期性执行的功能
*/
public static ExecutorService newSingleThreadScheduledExecutor()
/**
* 可执行线程数量的线程池,扩展了延时和周期性执行的功能
*/
public static ExecutorService newScheduledThreadPool(int corePoolSize)
newFixedThreadPool()方法。返回一个固定线程数量的线程池。线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲数量,则立即执行。如果没有,则新的任务会被暂存在一个队列中,等到有空闲的线程时,再从任务队列中取出任务执行。
示例代码:
public class FixedThreadPoolTest {
public static void main(String[] args) {
// 固定线程数量为3
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 6; i++) {
executorService.submit(() -> {
System.out.println("Thread Id:" + Thread.currentThread().getId());
ThreadUtil.sleep(1000);
});
}
}
}
执行结果如下:我们创建了固定3个线程的线程池,然后我们依次提交6个任务,线程池就会安排这6个任务,然后执行。执行期间我们发现前3个任务和后3个任务的执行时间相差1s,且前3和后3个任务的线程ID是一致的,这就说明线程被分成了2批执行。
newCachedThreadPool()方法。返回一个可根据实际情况调整线程数量的线程池。线程池的数量不确定,但如果有空闲线程可以复用,则优先使用可复用线程。如果所有线程都在运行,又有新的任务提交,则会创建新的线程处理任务,处理结束后,线程池回收多余线程。
我们拿2.2示例代码来改造:
public static void main(String[] args) {
// 可调整大小线程池
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
System.out.println("Thread Id:" + Thread.currentThread().getId() + " is running...");
ThreadUtil.sleep(1000);
System.out.println("Thread Id:" + Thread.currentThread().getId() + " done!");
});
// 这里多睡2秒,验证是否复用了空闲线程
if(i == 1){
ThreadUtil.sleep(2000);
}
}
}
运行结果:
当我们i==1的时候睡了2秒。2秒过后,ID为9和10的线程已经执行结束。所以当第二批开始执行的时候,我们看到线程9和10被复用执行了,而与fixedThreadPool不同的是,他自动调整了线程池的线程数量大小,而非固定。因此我们看到了11、12、13......后的线程被创建。
newSingleThreadExecutor()。返回一个只有一个线程的线程池。若多于1个任务提交到线程池,任务会被存在任务等待队列中,直到当前线程空闲后,再取出执行。
示例代码:
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
System.out.println("Thread Id:" + Thread.currentThread().getId() + " is running...");
ThreadUtil.sleep(1000);
System.out.println("Thread Id:" + Thread.currentThread().getId() + " done!");
});
}
}
执行结果:
newScheduledThreadPool()。可以根据时间需要对线程进行调度的线程池。主要有两个方法:
/**
* 创建并执行在给定的初始延迟之后,随后以给定的时间段首先启用的周期性动作; 那就是执行将在initialDelay之后开始,然后是initialDelay+period
* ,然后是initialDelay + 2 * period ,等等。
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
/**
* 创建并执行在给定的初始延迟之后首先启用的定期动作,随后在一个执行的终止和下一个执行的开始之间给定的延迟。
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
与其他线程池不同,该线程池不一定会立即安排任务执行。更多是起到了定时计划的作用。
使用scheduleAtFixedRate()来调度一个任务。这个任务执行1秒,调度周期是2秒。那么这个任务就会每2秒执行一次。
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
scheduledExecutorService.scheduleAtFixedRate(() -> {
System.out.println(DateUtil.now() + ":Thread Id:" + Thread.currentThread().getId() + " is running...");
ThreadUtil.sleep(1000);
// 参数值0表示:立即执行,不延迟
// 参数值2表示:计划周期为2秒
}, 0, 2, TimeUnit.SECONDS);
}
执行结果:
当然,scheduleAtFixedRate是不会允许任务堆叠的情况。当一个任务执行时间大于周期时间时,那么周期计划就会等待任务结束。
举个例子:
如周期为2秒,一个任务执行了1秒。那么该计划周期为2秒;
如周期为2秒,一个任务执行了5秒。那么该计划周期会等待任务5秒执行结束,周期就变为5秒;
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
scheduledExecutorService.scheduleAtFixedRate(() -> {
System.out.println(DateUtil.now() + ":Thread Id:" + Thread.currentThread().getId() + " is running...");
// 这里任务执行改为5秒
ThreadUtil.sleep(5000);
}, 0, 2, TimeUnit.SECONDS);
}
执行结果:
使用scheduleWithFixedDelay()来调度一个任务。这个任务执行1秒,调度周期是2秒。那么这个任务就会每(2+1)秒执行一次。
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
scheduledExecutorService.scheduleWithFixedDelay(() -> {
System.out.println(DateUtil.now() + ":Thread Id:" + Thread.currentThread().getId() + " is running...");
ThreadUtil.sleep(1000);
}, 0, 2, TimeUnit.SECONDS);
}
执行结果:
同样,scheduleWithFixedDelay是不会允许任务堆叠的情况。当一个任务执行时间大于周期时间时,那么周期计划就会等待任务结束。
举个例子:
如周期为2秒,一个任务执行了1秒。那么该计划周期为(2+1)秒;
如周期为2秒,一个任务执行了5秒。那么该计划周期会等待任务5秒执行结束,周期就变为(5+2)秒;
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
scheduledExecutorService.scheduleWithFixedDelay(() -> {
System.out.println(DateUtil.now() + ":Thread Id:" + Thread.currentThread().getId() + " is running...");
ThreadUtil.sleep(5000);
}, 0, 2, TimeUnit.SECONDS);
}
执行结果:
需要注意的是,如果任务本身抛出异常,那么后续的所有任务都会被中断。
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
AtomicInteger i = new AtomicInteger(0);
scheduledExecutorService.scheduleWithFixedDelay(() -> {
System.out.println(DateUtil.now() + ":Thread Id:" + Thread.currentThread().getId() + " is running...");
if(i.get() == 3){
// 这里抛个异常
try {
int number = 10 / 0;
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
i.getAndIncrement();
ThreadUtil.sleep(1000);
}, 0, 2, TimeUnit.SECONDS);
}
可以看到后续都不会继续执行:
ThreadPoolExecutor提供了两种提交任务的方法:submit和execute。
execute:将任务提交给线程池进行执行,但无法获取任务的执行结果。适用于不关心任务执行结果的场景。例如,执行一些简单的异步操作或无需返回结果的任务。
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.execute(() -> {
// 执行任务的代码
});
submit:将任务提交给线程池进行执行,并返回一个Future对象,通过该对象可以获取任务的执行状态和结果。适用于需要获取任务执行结果或对任务进行异常处理的场景。
ExecutorService executor = Executors.newFixedThreadPool(5);
Future<String> future = executor.submit(() -> {
// 执行任务的代码
return "Task Result";
});
try {
String result = future.get(); // 获取任务执行结果
System.out.println("Task Result: " + result);
} catch (InterruptedException | ExecutionException e) {
// 处理异常
}
前面我们讲了如何创建线程池,线程池类型,以及如何提交任务到线程池中执行。那么当线程池执行完任务,线程处于空闲状态,依旧会占用系统资源。此时我们就需要讲线程池进行关闭,以待垃圾回收器回收。
关闭线程池通常有两种方式:
较为优雅的方式:
此外还可以使用awaitTermination(timeout, unit)方法等待线程池中的任务执行完成。该方法会阻塞当前线程,直到线程池中的任务全部完成或超过指定的超时时间。如果等待超时,调用shutdownNow()方法中断执行中的任务,并尝试终止线程池。最后,调用isTerminated()方法判断线程池是否已经终止,确认所有任务都已完成。
ExecutorService executor = Executors.newFixedThreadPool(5);
// 关闭线程池
executor.shutdown();
try {
// 等待线程池中的任务执行完成,最多等待5秒
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
// 等待超时,调用shutdownNow()方法终止执行中的任务
executor.shutdownNow();
// 再次等待线程池中的任务执行完成,最多等待5秒
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
// 等待超时后仍有任务未完成,可能需要其他处理方式
}
}
} catch (InterruptedException e) {
// 捕获中断异常,可能需要其他处理方式
} finally {
// 判断线程池是否已终止
if (executor.isTerminated()) {
// 线程池已终止,进行相关资源的释放
}
}
通过以上步骤,可以保证线程池能够优雅地终止,并确保所有任务都得到执行或被中断。这样可以避免应用程序中出现线程池资源泄漏或未处理的任务。
到此为止,线程池相关的基本知识都介绍完了。当然这些只是线程池的一些基本用法以及常规使用。面对基础入门也是足够了。至于类似线程池的源码,自定义扩展线程池等,放到后面看看进阶篇再写吧~