
在当今的软件开发领域,尤其是对于需要处理高并发、多任务的系统,多线程编程已经成为一项必备技能。多线程能够显著提升系统性能,使程序能够充分利用多核处理器,实现并行处理,进而缩短任务执行时间。然而,直接使用线程来处理任务存在诸多问题,例如:
为了克服这些问题,线程池应运而生。线程池是一种管理和复用线程的机制,它提前创建一定数量的线程,将任务分配给这些线程执行,避免了频繁创建和销毁线程的开销,提高了系统的性能和资源利用率。
线程池允许预先创建一组线程,这些线程在完成任务后不会立即销毁,而是处于等待状态,当新任务到来时可以立即复用,避免了频繁创建和销毁线程的成本,提高了系统的响应速度和性能。
通过线程池,可以对线程的数量进行管理和控制,避免因过多线程同时运行而导致的资源耗尽,同时也可以根据系统的负载情况动态调整线程数量,提高系统的稳定性和可靠性。
线程池可以根据任务的优先级和执行顺序,合理地分配线程资源,确保任务的有序执行,提高任务的执行效率。
Executors.newFixedThreadPool(int nThreads) 创建一个固定大小的线程池,该线程池包含固定数量的线程,即核心线程数等于最大线程数。
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        System.out.println("执行开始时间"+ LocalDateTime.now());
        for (int i = 0; i < 10; i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " is running");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
        System.out.println("执行完成时间"+ LocalDateTime.now());
    }
}执行结果,由于开的线程进行执行的,所以整个调用没有耗时,每个线程都是开的独立线程进行执行的;

代码解释:
Executors.newFixedThreadPool(3) 创建了一个具有 3 个线程的线程池。execute() 方法向线程池提交 10 个任务,这些任务会被分配给线程池中的线程执行。Executors.newCachedThreadPool() 创建一个可缓存的线程池,线程池的大小会根据需要自动调整,适合执行大量短期异步任务。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " is running");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}代码解释:
Executors.newSingleThreadExecutor() 创建一个单线程的线程池,保证所有任务按照提交顺序依次执行。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadExecutorExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " is running");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}代码解释:
Java 自带的线程池虽然方便,但在某些场景下无法满足定制化需求,例如需要自定义线程池的核心参数,如核心线程数、最大线程数、任务队列类型、拒绝策略等。
使用 Executors 创建的线程池可能存在资源耗尽的风险,如 CachedThreadPool 可能会创建大量线程,导致系统资源耗尽。手动创建线程池可以更好地根据实际情况优化资源使用。
ctl 是一个 AtomicInteger 类型的变量,它是线程池状态和工作线程数量的组合表示,使用低 29 位表示线程数量,高 3 位表示线程池状态。
通过位运算来同时管理线程池状态和线程数量,如 private static int runStateOf(int c) { return c & ~CAPACITY; } 用于获取线程池状态,private static int workerCountOf(int c) { return c & CAPACITY; } 用于获取线程数量。
线程池处于运行状态,接受新任务并处理队列中的任务。
不再接受新任务,但会继续处理队列中的任务。
不接受新任务,也不处理队列中的任务,会中断正在执行的任务。
所有任务都已终止,即将进行清理工作。
线程池终止完成。
调用 shutdown() 方法时,线程池进入 SHUTDOWN 状态,不再接受新任务,但继续处理队列中的任务。
调用 shutdownNow() 方法,线程池进入 STOP 状态,中断正在执行的任务,不处理队列中的任务。
当队列中的任务和正在执行的任务都完成时,进入 TIDYING 状态。
当 terminated() 方法执行完毕,进入 TERMINATED 状态。
execute(Runnable command) 方法用于提交一个 Runnable 任务,如果无法执行会抛出 RejectedExecutionException。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecuteMethodExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(() -> {
            System.out.println("Task is running");
        });
        executorService.shutdown();
    }
}代码解释:
execute() 方法将 Runnable 任务提交给线程池执行。submit(Callable<T> task) 方法可提交 Callable 任务,并返回 Future<T> 对象,可获取任务执行结果或取消任务。
import java.util.concurrent.*;
public class SubmitMethodExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Future<String> future = executorService.submit(() -> {
            return "Task completed";
        });
        System.out.println(future.get());
        executorService.shutdown();
    }
}代码解释:
submit() 方法提交 Callable 任务,通过 Future.get() 获取任务结果。核心线程数,即使空闲也不会被销毁的线程数量。
线程池允许的最大线程数。
当线程数大于核心线程数时,多余线程的空闲时间超过此值将被销毁。
keepAliveTime 的时间单位。
存储等待执行任务的队列,如 ArrayBlockingQueue、LinkedBlockingQueue 等。
创建新线程的工厂,可自定义线程的名称、优先级等属性。
任务无法执行时的拒绝策略,如 AbortPolicy、CallerRunsPolicy 等。
当使用 execute() 或 submit() 方法提交任务时:
RUNNING 则继续。corePoolSize,创建新的核心线程执行任务。corePoolSize,将任务添加到 workQueue。workQueue 已满且线程数小于 maximumPoolSize,创建新的非核心线程执行任务。workQueue 已满且线程数达到 maximumPoolSize,根据 rejectedExecutionHandler 处理任务。import java.util.concurrent.*;
public class CustomThreadPoolExample {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,  // corePoolSize
                4,  // maximumPoolSize
                60, TimeUnit.SECONDS,  // keepAliveTime and unit
                new LinkedBlockingQueue<>(10),  // workQueue
                new ThreadFactoryBuilder().setNameFormat("custom-thread-%d").build(),  // threadFactory
                new ThreadPoolExecutor.CallerRunsPolicy()  // rejectedExecutionHandler
        );
        for (int i = 0; i < 20; i++) {
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " is running");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}代码解释:
ThreadFactoryBuilder 自定义线程工厂,设置线程名称格式。CallerRunsPolicy 作为拒绝策略,当任务无法执行时,由调用者线程执行任务。import java.util.concurrent.*;
public class AsyncTaskThreadPool {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " is running");
        }, executorService).thenRun(() -> {
            System.out.println("Task completed");
        });
        executorService.shutdown();
    }
}代码解释:
CompletableFuture.runAsync() 方法提交一个异步任务到线程池。thenRun() 方法在任务完成后执行后续操作。import java.util.concurrent.*;
public class DynamicThreadPoolExample {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,  // corePoolSize
                4,  // maximumPoolSize
                60, TimeUnit.SECONDS,  // keepAliveTime and unit
                new LinkedBlockingQueue<>(10)  // workQueue
        );
        for (int i = 0; i < 20; i++) {
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " is running");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        // 动态调整核心线程数
        executor.setCorePoolSize(3);
        executor.shutdown();
    }
}代码解释:
setCorePoolSize() 方法动态调整核心线程数,可根据系统负载灵活调整线程池规模。Runtime.getRuntime().availableProcessors(),避免过多线程导致的上下文切换开销。setCorePoolSize() 和 setMaximumPoolSize() 动态调整线程池规模。try-catch 处理异常,防止异常导致线程池异常终止。ctl 属性和位运算管理线程池状态和线程数量,实现状态的安全转换和线程数量的准确控制。线程池是 Java 并发编程中的重要组件,通过理解其核心源码和工作原理,我们可以更好地使用和优化线程池。在实际项目中,根据具体业务需求和系统性能,合理选择线程池类型,手动创建定制化的线程池,并对线程池进行性能优化、扩展和稳定性保障。作为大数据工程师,掌握线程池的原理和使用技巧,有助于开发高性能、高并发的系统,避免因多线程引发的性能和资源管理问题。通过上述实战示例,可以将理论知识应用于实践,根据不同场景灵活调整线程池,提升系统的整体性能和稳定性。
以上文章从多个维度对线程池进行了深入剖析,包含其背景、使用、源码原理、实战示例和性能优化,帮助你更好地理解和使用线程池,提升多线程编程的能力。在实际开发中,根据具体的业务需求和系统特点,深入考虑性能、扩展和稳定性因素,确保系统的高效运行。
请注意,在多线程编程中,需要特别关注线程安全问题,例如同步问题、资源竞争和死锁等,避免因并发问题影响系统性能和稳定性。同时,合理使用线程池可以显著提高系统性能,但需要谨慎设计和测试,确保线程池在不同负载下的表现符合预期。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。