在当今的软件开发领域,尤其是对于需要处理高并发、多任务的系统,多线程编程已经成为一项必备技能。多线程能够显著提升系统性能,使程序能够充分利用多核处理器,实现并行处理,进而缩短任务执行时间。然而,直接使用线程来处理任务存在诸多问题,例如:
为了克服这些问题,线程池应运而生。线程池是一种管理和复用线程的机制,它提前创建一定数量的线程,将任务分配给这些线程执行,避免了频繁创建和销毁线程的开销,提高了系统的性能和资源利用率。
线程池允许预先创建一组线程,这些线程在完成任务后不会立即销毁,而是处于等待状态,当新任务到来时可以立即复用,避免了频繁创建和销毁线程的成本,提高了系统的响应速度和性能。
通过线程池,可以对线程的数量进行管理和控制,避免因过多线程同时运行而导致的资源耗尽,同时也可以根据系统的负载情况动态调整线程数量,提高系统的稳定性和可靠性。
线程池可以根据任务的优先级和执行顺序,合理地分配线程资源,确保任务的有序执行,提高任务的执行效率。
Executors.newFixedThreadPool(int nThreads)
创建一个固定大小的线程池,该线程池包含固定数量的线程,即核心线程数等于最大线程数。
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
System.out.println("执行开始时间"+ LocalDateTime.now());
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is running");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
System.out.println("执行完成时间"+ LocalDateTime.now());
}
}
执行结果,由于开的线程进行执行的,所以整个调用没有耗时,每个线程都是开的独立线程进行执行的;
代码解释:
Executors.newFixedThreadPool(3)
创建了一个具有 3 个线程的线程池。execute()
方法向线程池提交 10 个任务,这些任务会被分配给线程池中的线程执行。Executors.newCachedThreadPool()
创建一个可缓存的线程池,线程池的大小会根据需要自动调整,适合执行大量短期异步任务。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is running");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
代码解释:
Executors.newSingleThreadExecutor()
创建一个单线程的线程池,保证所有任务按照提交顺序依次执行。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadExecutorExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is running");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
代码解释:
Java 自带的线程池虽然方便,但在某些场景下无法满足定制化需求,例如需要自定义线程池的核心参数,如核心线程数、最大线程数、任务队列类型、拒绝策略等。
使用 Executors
创建的线程池可能存在资源耗尽的风险,如 CachedThreadPool
可能会创建大量线程,导致系统资源耗尽。手动创建线程池可以更好地根据实际情况优化资源使用。
ctl
是一个 AtomicInteger
类型的变量,它是线程池状态和工作线程数量的组合表示,使用低 29 位表示线程数量,高 3 位表示线程池状态。
通过位运算来同时管理线程池状态和线程数量,如 private static int runStateOf(int c) { return c & ~CAPACITY; }
用于获取线程池状态,private static int workerCountOf(int c) { return c & CAPACITY; }
用于获取线程数量。
线程池处于运行状态,接受新任务并处理队列中的任务。
不再接受新任务,但会继续处理队列中的任务。
不接受新任务,也不处理队列中的任务,会中断正在执行的任务。
所有任务都已终止,即将进行清理工作。
线程池终止完成。
调用 shutdown()
方法时,线程池进入 SHUTDOWN
状态,不再接受新任务,但继续处理队列中的任务。
调用 shutdownNow()
方法,线程池进入 STOP
状态,中断正在执行的任务,不处理队列中的任务。
当队列中的任务和正在执行的任务都完成时,进入 TIDYING
状态。
当 terminated()
方法执行完毕,进入 TERMINATED
状态。
execute(Runnable command)
方法用于提交一个 Runnable
任务,如果无法执行会抛出 RejectedExecutionException
。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecuteMethodExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
System.out.println("Task is running");
});
executorService.shutdown();
}
}
代码解释:
execute()
方法将 Runnable
任务提交给线程池执行。submit(Callable<T> task)
方法可提交 Callable
任务,并返回 Future<T>
对象,可获取任务执行结果或取消任务。
import java.util.concurrent.*;
public class SubmitMethodExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<String> future = executorService.submit(() -> {
return "Task completed";
});
System.out.println(future.get());
executorService.shutdown();
}
}
代码解释:
submit()
方法提交 Callable
任务,通过 Future.get()
获取任务结果。核心线程数,即使空闲也不会被销毁的线程数量。
线程池允许的最大线程数。
当线程数大于核心线程数时,多余线程的空闲时间超过此值将被销毁。
keepAliveTime
的时间单位。
存储等待执行任务的队列,如 ArrayBlockingQueue
、LinkedBlockingQueue
等。
创建新线程的工厂,可自定义线程的名称、优先级等属性。
任务无法执行时的拒绝策略,如 AbortPolicy
、CallerRunsPolicy
等。
当使用 execute()
或 submit()
方法提交任务时:
RUNNING
则继续。corePoolSize
,创建新的核心线程执行任务。corePoolSize
,将任务添加到 workQueue
。workQueue
已满且线程数小于 maximumPoolSize
,创建新的非核心线程执行任务。workQueue
已满且线程数达到 maximumPoolSize
,根据 rejectedExecutionHandler
处理任务。import java.util.concurrent.*;
public class CustomThreadPoolExample {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60, TimeUnit.SECONDS, // keepAliveTime and unit
new LinkedBlockingQueue<>(10), // workQueue
new ThreadFactoryBuilder().setNameFormat("custom-thread-%d").build(), // threadFactory
new ThreadPoolExecutor.CallerRunsPolicy() // rejectedExecutionHandler
);
for (int i = 0; i < 20; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is running");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
代码解释:
ThreadFactoryBuilder
自定义线程工厂,设置线程名称格式。CallerRunsPolicy
作为拒绝策略,当任务无法执行时,由调用者线程执行任务。import java.util.concurrent.*;
public class AsyncTaskThreadPool {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + " is running");
}, executorService).thenRun(() -> {
System.out.println("Task completed");
});
executorService.shutdown();
}
}
代码解释:
CompletableFuture.runAsync()
方法提交一个异步任务到线程池。thenRun()
方法在任务完成后执行后续操作。import java.util.concurrent.*;
public class DynamicThreadPoolExample {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60, TimeUnit.SECONDS, // keepAliveTime and unit
new LinkedBlockingQueue<>(10) // workQueue
);
for (int i = 0; i < 20; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is running");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 动态调整核心线程数
executor.setCorePoolSize(3);
executor.shutdown();
}
}
代码解释:
setCorePoolSize()
方法动态调整核心线程数,可根据系统负载灵活调整线程池规模。Runtime.getRuntime().availableProcessors()
,避免过多线程导致的上下文切换开销。setCorePoolSize()
和 setMaximumPoolSize()
动态调整线程池规模。try-catch
处理异常,防止异常导致线程池异常终止。ctl
属性和位运算管理线程池状态和线程数量,实现状态的安全转换和线程数量的准确控制。线程池是 Java 并发编程中的重要组件,通过理解其核心源码和工作原理,我们可以更好地使用和优化线程池。在实际项目中,根据具体业务需求和系统性能,合理选择线程池类型,手动创建定制化的线程池,并对线程池进行性能优化、扩展和稳定性保障。作为大数据工程师,掌握线程池的原理和使用技巧,有助于开发高性能、高并发的系统,避免因多线程引发的性能和资源管理问题。通过上述实战示例,可以将理论知识应用于实践,根据不同场景灵活调整线程池,提升系统的整体性能和稳定性。
以上文章从多个维度对线程池进行了深入剖析,包含其背景、使用、源码原理、实战示例和性能优化,帮助你更好地理解和使用线程池,提升多线程编程的能力。在实际开发中,根据具体的业务需求和系统特点,深入考虑性能、扩展和稳定性因素,确保系统的高效运行。
请注意,在多线程编程中,需要特别关注线程安全问题,例如同步问题、资源竞争和死锁等,避免因并发问题影响系统性能和稳定性。同时,合理使用线程池可以显著提高系统性能,但需要谨慎设计和测试,确保线程池在不同负载下的表现符合预期。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有