之前写过关于线程池的源码分析文章:一文读懂JDK源码:ThreadPoolExecutor,但实际上还有很多地方值得思考的。
对ThreadPoolExecutors的思考
业务定制化ThreadPoolExecutors,而不直接复用Executor的5个现成方法去构建线程池,因为原来的API方式有弊端:
工具类示例
public class ThreadPoolUtils {
public static ThreadPoolExecutor newThreadPool(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
){
if (handler == null) {
handler = new ThreadPoolExecutor.AbortPolicy();
}
if (threadFactory == null) {
threadFactory = Executors.defaultThreadFactory();
}
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler
);
}
}
代码分析:7个参数如下
参数 | 定义 | 作用 | 备注 |
---|---|---|---|
corePoolSize | 池子的基本容量 | 长期驻留线程池的工作线程数量 | allowCoreThreadTimeOut为true,该值为true,则线程池数量最后销毁到0个。 |
maximumPoolSize | 池子的最大容量 | 定义池子最大容量 | allowCoreThreadTimeOut为false,会对超出基本容量的线程进行销毁,销毁机制:超过核心线程数时,而且(超过最大值或者timeout超时),就会销毁。 |
keepAliveTime | 当线程池线程数量大于corePoolSize时候,多出来的空闲线程,多长时间会被销毁。 | 必须大于0,默认是。0 | |
unit | 生存时间的单位时间 | 参考枚举类:java.util.concurrent.TimeUnit | |
workQueue | 工作任务队列 | 用于存放提交但是尚未被执行的任务 | |
threadFactory | 线程工厂 | 用于创建线程 | |
handler | 拒绝策略 | 指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。 |
观察了一下JDK内置应用案例,大约有以下四种可选方案:
先介绍两个几乎不咋用得到的方案:LinkedTransferQueue和SynchronousQueue。
LinkedBlockingQueue 和 ArrayBlockingQueue 都是 Java 并发包 java.util.concurrent 中的阻塞队列实现,它们分别基于链表和数组实现。
队列 | 特点 | 优点 | 缺点 |
---|---|---|---|
LinkedTransferQueue | 无界的阻塞队列 | 它支持一种特殊的操作,即“传输”(transfer),允许生产者线程等待直到消费者线程取走元素。提供了高效的并发性能,适用于高吞吐量的场景。 | 内存开销不可控 |
SynchronousQueue | 没有存储空间的队列 | 生产者线程直接将元素传递给消费者线程,而不是先将元素放入队列。高并发情况下,可以减少内存的使用,并且可以提高系统的吞吐量。 | 不是一个真正的队列,因为它不会在内部存储任何元素。 |
在队列选型时,应根据具体的应用需求和场景来决定。对于他们的源码分析,先挖个坑,后续再补。
可以参考文章:多线程反思(上):ThreadLocal行不行?TTL也有对线程池进行简单的增强封装。
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
下面我们可以总结:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//【1】
if (workerCountOf(c) < corePoolSize) {
//【1.1】
if (addWorker(command, true))
return;
c = ctl.get();
}
//【2】
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//【3】
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
//【1.2】
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
//【1.3】
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//【1.4】
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//【1.5】
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
//【1.6】
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
//【1.7】
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
//【1.8】
mainLock.unlock();
}
if (workerAdded) {
//【1.9】
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//【1.10】
addWorkerFailed(w);
}
return workerStarted;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
代码分析:
梳理得到下面的流程图:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。