今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
ThreadPoolExecutor
状态和数量:
状态名 | 高3位 | 接受新任务 | 处理阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | 接受新任务,并会处理阻塞队列中的任务 |
SHUTDOWN | 000 | N | Y | 不会接受新任务,但会处理阻塞队列中剩余的任务 |
STOP | 001 | N | N | 中断正在执行的任务,抛弃阻塞队列中的任务 |
TERMINATED | 010 | - | - | 任务全部执行完毕,活动线程数为0,即将进入终结 |
TERMINATED | 011 | - | - | 线程池终结 |
采用int高3位表示线程池状态,低29位表示线程数量,存储在一个原子变量ctl中,目的是将线程状态与线程个数合二为一,这样就可以用一次CAS对其赋值 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)) | ||||
从数字上,TERMINATED>TERMINATED>STOP>SHUTDOWN>RUNNING,高三位的1表示负数 |
public ThreadPoolExecutor(
int corePoolSize,//核心线程数(最多保留的线程数)
int maximumPoolSize,//最大线程数
long keepAliveTime,//生存时间,针对救急线程
TimeUnit unit,//时间单位,针对救急线程
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂,创建线程时起名字
RejectedExecutionHandler handler)//拒绝策略
ThreadPoolExecutor
的工作流程:
ThreadPoolExecutor
包含两类线程:核心线程和救急线程,采用懒加载的创建方式,存在救急线程的前提是选择有界队列corePoolSize
指核心线程数,maximumPoolSize
指核心线程数+救急线程数ThreadPoolExecutor
会先检查线程池中是否可以有救急线程keepAliveTime
后,要是没有任务继续入队,救急线程就会被销毁,下次高峰期才会再次创建救急线程maximumPoolSize
,采用拒绝策略AbortPolicy
:抛出RejectedExecutionException
异常,默认策略CallerRunsPolicy
:让调用者运行任务DiscardPolicy
:放弃本次任务DiscardOldestPolicy
:放弃队列中最早的任务,本任务取而代之Dubbo
在AbortPolicy
基础上增加日志功能,并调用jstack
抓取当前栈中的信息,方便定位问题Netty
创建新的线程来执行任务,这样实现并不好,因为就没有了限制ActiveMQ
超时等待60sPinPoint
使用了一个拒绝策略链,尝试策略链中每一个拒绝策略//创建一个固定大小的线程池:适用于任务量已知,相对耗时的任务
public static ExecutorService newFixedThreadPool(int nThreads) {//传递的线程数
return new ThreadPoolExecutor(
//核心线程数:nThreads,最大线程数:nThreads
nThreads, nThreads,//没有救急线程
0L, TimeUnit.MILLISECONDS,//存活时间:0毫秒
//阻塞队列:LinkedBlockingQueue无界队列
new LinkedBlockingQueue<Runnable>());
}
//创建一个缓冲线程池:适用于任务量不断增长,但每个任务执行时间较短的情况
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
//核心线程数:0最大线程数:2,147,483,647
0, Integer.MAX_VALUE,//没有核心线程,全都是救急线程,且可以无限创建,存活时间为60s
60L, TimeUnit.SECONDS,//存活时间:60秒
//阻塞队列:SynchronousQueue同步队列,没有容量,一手交钱一手交货
new SynchronousQueue<Runnable>(),
);
}
//创建一个单线程线程池:适用于任务是串行执行,多出来的任务排队
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(
//核心线程数1,最大线程数1
1, 1,
0L, TimeUnit.MILLISECONDS,//存活时间0毫秒
//阻塞队列:LinkedBlockingQueue无界队列
new LinkedBlockingQueue<Runnable>()));
}
//创建一个带有任务调用的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//延时执行任务
public <V> ScheduledFuture<V> schedule(
//具体执行的任务对象
Runnable command,
//延时时间
long delay,
TimeUnit unit);
//定时执行任务
public ScheduledFuture<?> scheduleAtFixedRate(
//具体的执行任务对象
Runnable command,
//初始延时时间
long initialDelay,
//任务之间的执行延迟时间:从上一次任务开始执行时,延迟时间就开始
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(
//具体的执行任务对象
Runnable command,
//初始延时时间
long initialDelay,
//任务之间的执行延迟时间:从上一次任务执行结束时,延迟时间才开始
long delay,
TimeUnit unit)
newSingleThreadExecutor()
与自己创建一个单线程串行执行任务的区别:
newSingleThreadExecutor()
遇到异常情况还会创建一个新的线程,保持始终有一个线程工作newSingleThreadExecutor()
和newFixedThreadPool(1)
的区别
newSingleThreadExecutor()
线程数始终为1,不能修改,FinalizableDelegatedExecutorService
应用的是装饰器模式,对外只暴露了ExecutorService
接口,不能调用ThreadPoolExecutor
中特有的方法newFixedThreadPool(1)
初始线程数为1,之后还可以通过对外暴露的ThreadPoolExecutor
对象来调用其setCorePoolSize()
来修改线程数//执行任务
void execute(Runnable command);
//提交任务task,用返回值Future获取任务执行的结果
<T> Future<T> submit(Callable<T> task);
//提交tasks中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;
//超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;
//提交tasks中所有任务,哪个任务先执行完毕,返回此任务的返回结果,其他任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;
//超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException;
@Slf4j
public class SubmitTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(3);
invokeAny(pool);
}
private static void invokeAny(ExecutorService pool) throws InterruptedException, ExecutionException {
Object result = pool.invokeAny(Arrays.asList(
() -> {
log.debug("running...111");
Thread.sleep(new Random().nextInt(10000));
log.debug("end...111");
return "1";
},
() -> {
log.debug("running...222");
Thread.sleep(new Random().nextInt(10000));
log.debug("end...222");
return "2";
},
() -> {
log.debug("running...333");
Thread.sleep(new Random().nextInt(10000));
log.debug("end...333");
return "3";
}
));
log.debug("执行结果:{}",result);
}
private static void invokeAll(ExecutorService pool) throws InterruptedException {
List<Future<Object>> futures = pool.invokeAll(Arrays.asList(
() -> {
log.debug("running...111");
Thread.sleep(new Random().nextInt(10000));
return "1";
},
() -> {
log.debug("running...222");
Thread.sleep(new Random().nextInt(10000));
return "2";
},
() -> {
log.debug("running...333");
Thread.sleep(new Random().nextInt(10000));
return "3";
}
));
futures.forEach(future -> {
try {
log.debug("执行结果:{}", future.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
private static void submit(ExecutorService pool) throws InterruptedException, ExecutionException {
Future<String> future = pool.submit(() -> {
log.debug("正在执行");
Thread.sleep(1000);
return "任务结束";
});
log.debug("执行结果:{}", future.get());
}
}
/*
1.将线程池状态变为SHUTDOWN
2.不会接受新任务,把剩余任务完成
3.不会阻塞调用线程
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改线程池状态
advanceRunState(SHUTDOWN);
//打断空闲的线程
interruptIdleWorkers();
onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试终止线程池,检查线程池状态是否为TERMINATED
tryTerminate();
}
/*
1.将线程池状态变为STOP
2.不会接受新任务,剩余任务抛弃并返回
3.用interrupt打断正在执行任务的线程
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改线程池状态
advanceRunState(STOP);
//打断所有线程
interruptWorkers();
//将剩余任务返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试终止线程池,检查线程池状态是否为TERMINATED
tryTerminate();
return tasks;
}
//检查线程池状态是否处于RUNNING,是返回false,否返true
public boolean isShutdown();
//检查线程池状态是否处于TERMINATED,是返回true,否返false
public boolean isTerminated();
//调用shutdown()方法后,线程池不会等待所有线程任务执行结束
//如果想在线程池TERMINATED后做些事情,可以用此方法等待
public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有