当提交一个新任务到线程池时,线程池的处理流程如下。
线程池创建线程时,会将线程封装成工作线程 Worker , Worker 在执行完任务后,还会循环获取工作队列里的任务来执行.我们可以从 Worker 类的 run()方法里看到这点。
ThreadPoolExecutor 执行 execute 方法分下面 4 种情况。
ThreadPoolExecutor 采取上述步骤的总体设计思路,是为了在执行 execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈).在 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize),几乎所有的 execute()方法调用都是执行步骤 2,而步骤 2 不需要获取全局锁。
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//等待时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//等待队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler) {} //拒绝策略
细说参数:
超过核心线程数时
,多余的空闲线程等待新任务的最长时间。如果超过这个时间仍然没有新任务到来,超过核心线程数的空闲线程将被终止。PriorityQueue
是优先级队列,通过自然排序或者用 java
的比较器实现自定义排序,无界队列,但是可以在创建时指定大小,不允许有空值,默认是最小堆,当排序相同时,随机返回一个,PriorityQueue
是非线程安全的,PriorityBlockingQueue
是线程安全的,用于多线程环境.PriorityBlockingQueue
实现原理是使用了可重入锁
private final ReentrantLock lock;
PriorityQueue
通过二叉小顶堆实现,任意一个非叶子节点的权值,都不大于其左右子节点的权值
方法 | 作用 | 失败处理方式 |
---|---|---|
add() | 插入元素 | 抛出异常 |
offer() | 插入元素 | 返回 false |
element() | 获取队首元素不删除 | 抛出异常 |
peek() | 获取队首元素不删除 | null |
remove() | 取出队首元素删除 | 抛出异常 |
poll() | 取出队首元素删除 | null |
经典方法源码:从k
指定的位置开始,将x
逐层与当前点的parent
进行比较并交换,直到满足x >= queue[parent]
为止
//siftUp()
private void siftUp(int k, E x){
while (k > 0){
int parent = (k -1)>>> 1;//parentNo = (nodeNo-1)/2
Object e = queue[parent];
if (comparator.compare(x,(E) e)>= 0)//调用比较器的比较方法
break;
queue[k]= e;
k = parent;
}
queue[k]= x;
}
该方法的作用是从k
指定的位置开始,将x
逐层向下与当前点的左右孩子中较小的那个交换,直到x
小于或等于左右孩子中的任何一个为止。
//siftDown()
private void siftDown(int k, E x){
int half = size >>> 1;
while (k < half){
//首先找到左右孩子中较小的那个,记录到c里,并用child记录其下标
int child = (k << 1)+1;//leftNo = parentNo*2+1
Object c = queue[child];
int right = child +1;
if (right < size &&
comparator.compare((E) c,(E) queue[right])> 0)
c = queue[child = right];
if (comparator.compare(x,(E) c)<= 0)
break;
queue[k]= c;//然后用c取代原来的值
k = child;
}
queue[k]= x;
}
public interface Executor {
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
Future<?> submit(Runnable task);
<T> Future<T> submit(Callable<T> task);
}
execute
和submit
都是用于向线程池提交任务的方法,但它们在使用方式和返回结果上有一些区别:
execute
: execute
方法是Executor
接口中定义的方法,它用于提交不需要返回结果的任务。该方法只接受Runnable
类型的任务,即没有返回值的任务。submit
: submit
方法是ExecutorService
接口中定义的方法,它用于提交既可以有返回结果也可以没有返回结果的任务。submit
方法可以接受Runnable
和Callable
类型的任务。Callable
是一个带有泛型返回值的任务类型,通过它可以获得任务执行的结果。execute
: execute
方法没有返回结果,因为它只用于提交没有返回值的任务,所以无法获得任务的执行结果。submit
: submit
方法可以获得任务执行的结果。当使用submit
提交Callable
任务时,会返回一个Future
对象,通过这个对象可以异步获取任务执行的结果。execute
: execute
方法不会抛出任务执行时的异常,因为没有返回结果,所以任务执行的异常只能由任务本身处理。submit
: submit
方法可以通过Future
对象来处理任务执行时的异常。调用Future
对象的get()
方法获取任务的执行结果时,如果任务抛出异常,get()
方法会将异常封装在ExecutionException
中并抛出。总结:
execute
方法。submit
方法,并将任务封装为Callable
类型。shutdown()
和shutdownNow()
都是用于关闭线程池的方法,但它们有一些区别:
shutdown()
方法:shutdown()
方法是ExecutorService
接口中定义的方法。shutdown()
方法后,线程池会拒绝接受新的任务提交,但会继续执行已经提交的任务和队列中的任务。shutdown()
方法会平缓地关闭线程池,它会等待所有已提交的任务执行完成,并且不会中断正在执行的任务。shutdownNow()
方法:shutdownNow()
方法也是ExecutorService
接口中定义的方法。shutdownNow()
方法后,线程池会立即停止接受新的任务提交,并且尝试中断正在执行的任务。shutdownNow()
方法会尝试停止所有任务的执行,包括已经提交但未执行的任务,它会返回一个 List 集合,包含那些未执行的任务。总结:
shutdown()
方法是平缓关闭线程池的方式,它会等待所有任务执行完成后关闭。shutdownNow()
方法是立即关闭线程池的方式,它会尝试中断正在执行的任务,并返回未执行的任务列表。Executor 框架主要由 3 大部分组成如下。
任务:
包括被执行任务需要实现的接口核心类和接口:
主要通过各个线程池的特点和工作队列来进行说明.
ThreadPoolExecutor:
通常使用工厂类 Executors
来创建。 Executors
可以创建 3 种类型的 ThreadPoolExecutor
:
FixedThreadPool:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
SingleThreadExecutor:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
CachedThreadPool:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ScheduledThreadPoolExecuton:
ScheduledThreadPoolExecutor 通常使用工厂类 Executors 来创建。Executors 可以创建 2 种类型的 ScheduledThreadPoolExecutor,如下。
ScheduledFutureTask 主要包含 3 个成员变量,如下。
ScheduledThreadPoolExecutor:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
SingleThreadScheduledExecutor:
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
推荐使用 ThreadPoolExecutor 方式创建线程,在阿里的 Java 开发手册时有一条是不推荐使用 Executors 去创建,而是推荐去使用 ThreadPoolExecutor 来创建线程池。
这样做的目的主要原因是:使用 Executors 创建线程池不会传入核心参数,而是采用的默认值,这样的话我们往往会忽略掉里面参数的含义,如果业务场景要求比较苛刻的话,存在资源耗尽的风险;另外采 ThreadPoolExecutor 的方式可以让我们更加清楚地了解线程池的运行规则,不管是面试还是对技术成长都有莫大的好处。
线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 各个方法的弊端:
ctl 变量是整个线程池的核心控制状态,它是一个 AtomicInteger 类型的原子对象,它记录了线程池中生效线程数和线程池的运行状态。
ctl 总共 32 位,其中低 29 位代表 workerCount,所以最大线程数为 2^29^-1。高 3 位代表 runState。
runState 有 5 个值:
状态转换过程:
ThreadPoolExecutor 回收工作线程,一条线程 getTask()返回 null,就会被回收。
分两种场景:
未调用 shutdown() :
RUNNING 状态下全部任务执行完成的场景,线程数量大于 corePoolSize,线程超时阻塞,超时唤醒后 CAS 减少工作线程数,如果 CAS 成功,返回 null,线程回收。否则进入下一次循环。当工作者线程数量小于等于 corePoolSize,就可以一直阻塞了。调用 shutdown():
,全部任务执行完成的场景,shutdown() 会向所有线程发出中断信号,这时有两种可能。所有线程都在阻塞:
中断唤醒,进入循环,都符合第一个 if 判断条件,都返回 null,所有线程回收。
任务还没有完全执行完:
至少会有一条线程被回收。在 processWorkerExit(Worker w, boolean completedAbruptly)方法里会调用 tryTerminate(),向任意空闲线程发出中断信号。所有被阻塞的线程,最终都会被一个个唤醒,回收。
我们从线程的应用场景来分析,由于 IO 操作比 Cpu 计算耗时要久的多的,如果我们一段程序有 IO 操作和 Cpu 计算,我们可以调用 IO 密集型计算。程序中没有 IO 操作只有 Cpu 的话称为 Cpu 密集型程序。
Cpu密集型:
Cpu 的核数=线程数就行,一般我们会设置 Cpu 核数+ 1,防止由于其他因素导致阻塞。
IO密集型:
确定在 IO 密集型计算中创建多少线程合适是一个复杂的问题,因为它涉及到多个因素,例如计算机的硬件配置、任务的性质和操作系统的特性。IO 密集型任务通常涉及大量的输入/输出操作,例如读写文件、网络通信等,而不是 CPU 密集型任务,这些任务主要涉及大量的计算。
在 IO 密集型任务中,线程通常会在等待 IO 操作完成时被阻塞,而不是在 CPU 上执行计算。因此,创建过多的线程可能会导致线程切换开销增加,从而导致性能下降。同时,创建过少的线程可能导致 CPU 资源得不到充分利用,从而造成性能浪费。
一般来说,建议的线程数量取决于以下几个因素:
一种常见的做法是,首先根据 CPU 核心数来确定线程池的大小,然后根据实际的性能测试进行调优。可以逐渐增加线程数量,并监测系统性能的变化,找到最佳的线程数量。
值得注意的是,如果任务中有长时间的阻塞 IO 操作,也可以考虑使用异步 IO 或者事件驱动的编程模型,以减少线程数量并提高系统的吞吐量。
综上所述,IO 密集型任务的合适线程数量没有固定的标准,需要根据具体情况进行评估和调优。在实际应用中,可以进行性能测试和监测,找到最佳的线程数量来提高系统性能。
private ExecutorService service = new ThreadPoolExecutor(5,
5,
6001,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1_000_000),
Executors.defaultThreadFactory(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 补偿机制
}
});
拒绝策略有4种:
ThreadPoolExecutor.AbortPolicy()
是默认的拒绝策略。当线程池无法接受新任务时,会抛出RejectedExecutionException
异常。ThreadPoolExecutor.DiscardPolicy()
是另一种简单的拒绝策略。当线程池无法接受新任务时,新任务会被丢弃,不会抛出异常。ThreadPoolExecutor.DiscardOldestPolicy()
是一种稍微高级一点的策略。当线程池无法接受新任务时,会丢弃队列中最老的任务,然后尝试重新提交新任务。ThreadPoolExecutor.CallerRunsPolicy()
。当线程池无法接受新任务时,它会将任务交给调用线程来执行。原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。