来,随我吃透线程池!!!
线程的创建和销毁的开销是非常大的,线程创建,直接依靠操作系统。大量的线程的创建,会给操作系统和jvm虚拟机带来压力,同时,大量的销毁也会给垃圾回收器带来压力
所以,线程池的目的就是为了解决两个问题
1反复创建线程开销大
2过多的线程太多占用内存
线程池:通过少量线程的复用,成功的解决了这个问题
线程池的好处:
加快响应速度,合理利用cpu与内存,统一管理线程这些线程。
1服务器,服务器要收到大量请求,比如tomcat服务器,也是用线程池实现的
2开发中,5个以上的线程,就可用用线程池了
参数 | 说明 |
---|---|
corePoolSize | 线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活 |
maximumPoolSize | 最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务 |
keepAliveTime | 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize |
TimeUnit | 时间单位 |
workQueue | 缓存队列(阻塞队列)当核心线程数达到最大时,新任务会放在队列中排队等待执行 |
threadFactory | 线程创建的工厂,一般用默认的 Executors.defaultThreadFactory() |
handler | 当pool已经达到max size的时候,如何处理新任务 |
这里我们先说一下线程创建的参数,
线程创建,会先创建corePoolsize的线程数量,比如5,当阻塞队列满的时候,比如100,此时线程数量会变大,还处理不过来,就会到maxNumPoolSize最大线程数量,
注意点:弱corePoolSize与maxNumPoolSize保持一致,线程池的大小就是固定的
线程池期望保留线程更少一点,通过corePoolSize在阻塞队列满的时候,才会增加,可以看出来
两个特殊的场景,当阻塞队列大小设置成Integer.MaxValue,那么此时线程数量将不会大于corePoolSize
若maxNumPoolSize设置成Integer.MaxValue,那么此时策略是:允许线程池的容量无限扩大!
keepAliveTime,这里就是多余corePoolSize,大于核心线程数的数量的线程,空闲时间到达这个值 就会被回收
线程工厂,ThreadFactory这个参数直到是创建线程的就可以了,源码这里也无非是new Thread
主要再将一下
1)直接交接,SynchronousQueue
2)无界队列:LinkedBlockingQueue
当任务太多,处理不过来,就会导致OOM,内存溢出,而且会让任务丢失
这个队列maxPoolSize是无意义的,因为这个队列满不了
3)有界队列:ArrayBlockingQueue
这个队列是可以满的,maxPoolsize有意义
有人可能看到过alibaba编码规范
自动创建的话,juc提供了
线程池名称 说明 newFixedThreadPool 一个定长线程池,可控制线程最大并发数 newCachedThreadPool 一个可缓存线程池, newSingleThreadExecutor 一个单线程化的线程池,用唯一的工作线程来执行任务 newScheduledThreadPool 一个定长线程池,支持定时/周期性任务执行
看名字就能想到,线程池的数量是固定的,那么通过上面的分析,能猜到,核心线程数corePoolSize的数量和maxNumPoolSize最大线程数应该是一致的,
/**
* @Author:Joseph
* @bolg:https://li-huancheng.gitee.io/
* @Package:threadPool
* @Project:bing-fa-demo
* @name:ExecutorDemo
* @Date:2023-07-26 15:03
* @Filename:ExecutorDemo
*/
public class ExecutorDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
}
}
class Task implements Runnable{
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
通过控制台,可以看出,就是编码时候 的4个线程数
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
构造函数中,核心线程数,最大线程数,都是传入的参数nThreads
线程存货时间keepAliveTime是0L,后面的单位就不说了,这个0L,就是,根本没有非核心线程的线程,所以这个参数没什么意义
LinkedBlockQueue,这个工作队列是无限长的,所以当任务量过大的时候,都堆积再linkedBlokQueue阻塞队列中,太多了,内存扛不住,就会出现OOM
package threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Author:Joseph
* @bolg:https://li-huancheng.gitee.io/
* @Package:threadPool
* @Project:bing-fa-demo
* @name:FixedThreadPoolExecutorOOM
* @Date:2023-07-26 15:34
* @Filename:FixedThreadPoolExecutorOOM
*/
public class FixedThreadPoolExecutorOOM {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executorService.execute(new SubThread());
}
}
}
class SubThread implements Runnable{
@Override
public void run() {
try {
Thread.sleep(1000000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
这是造成OOM的一个小demo,配置下jvm内存小一点就可以看到
这个就不掩饰了,就是暴漏的问题和上面一样,线程存货时间keepAliveLife是没意义的
然后核心线程数和最大线程数是1,阻塞队列采取的是无界队列LinkedBlokingQueue
这两个例子就是阿里巴巴编码规范中说的,大量请求堆积导致的OOM
再看下面两种,
先说下结果
下面两种是创建线程数量太多,大量的线程而导致的OOM
他的功能是缓存线程池,可回收多余的线程
这个通过上面参数的讲解,可以猜测到,工作队列一定不是无界队列LinkedBlockingQueue,不然线程数量是不会大于corePoolSize核心线程数的
他采用的是SynchronousQueue,直接交换队列,也就是说,这个队列存不了东西,任务直接走到线程去,然后线程不够的话,就直接创建
所以他会导致大量的线程导致oom,线程不用可回收,那么它的keepAliveTime一定配置了个值
现在看一下源码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程为0,最大线程无限,keepAliveTime最大存活时间60s,队列采用直接交换队列,队列不缓冲任务
和上面的分析一样
支持定时,周期型的执行任务
**
* @Author:Joseph
* @bolg:https://li-huancheng.gitee.io/
* @Package:threadPool
* @Project:bing-fa-demo
* @name:ScheduledThreadPool
* @Date:2023-07-26 15:55
* @Filename:ScheduledThreadPool
*/
public class ScheduledThreadPool {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
//4s后再执行
scheduledExecutorService.schedule(new Task(),4, TimeUnit.SECONDS);
//以一定频率重复运行,最开始等1s钟,后面每3s的执行
scheduledExecutorService.scheduleAtFixedRate(new Task1(),1,3,TimeUnit.SECONDS);
}
}
class Task1 implements Runnable{
@Override
public void run() {
System.out.println("test");
}
}
这里写了下常见 的使用方法
这个既然有线程过多的情况,那么它的maxNumPoolSize最大线程数一定是Integer.MaxValue
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
这里的话,有核心线程数,但是最大线程数是MaxValue,有配置活时间,主要是采用的是延迟工作队列。
这里又多看到一个工作队列
一共有
SynchronousQueue直接交接队列,特点是长度为0
LinkedBlockingQueue无界队列,特点是无界
ArrayBlockingQueue有界队列,特点是有范围
DelayedWorkQueue()延时工作队列,特点是能延迟处理任务
看见,jdk提供的,都有bug,都会导致OOM,所以业务使用要自己配置
那么线程池的数量设置成多少何时?
cpu密集型与IO密集型
业务时加密,计算hash等,cpu密集型的话,就可以设置大小为cpu的1~2倍,8核cpu的话,就可以将核心线程数设置为8-16
数据库的读写,文件,网络id这样的,io密集型的话,因为cpu的速度比io快,所以可以将核心线程数设置的多一些,10倍也可以的,
因为80个很多都是在等待io的,所以这样的话才能更好的利用cpu
Brain Goetz大佬就给除了一个公式
线程数= cpu核心数*(1+平均等待时间/平均工作时间),当然真实使用的话,还是压测真实环境。估测的话,这个公式就可以的
通过上面的选择手动创建or自动创建
我们来总结一下jdk提供的线程池
1fixedThreadPool,这种线程池coro和max线程数一致,固定下线程
2cacheThreadPool,这种就是可缓存线程,线程会自动回收
3ScheduleThreadPool.定期执行任务,定时的执行
4singleThreadThreadExcecutor,就是一个线程,
上面我已经讲的很详细了,只需要注意,
1,4会因任务在工作队列的堆积造成OOM
这两个队列的选择就是为了满足需求嘛,线程固定了,那么任务只能通过队列来堆积,所以采用LinedBlocking’queue
2\4会因为线程数量的过大,导致OOM
cachedThreadPool选择SynchronousQueue目的就是全给线程,不需要在队列中转,提高效率
secheduledThread这个的队列就不用说了吧,延时功能
workStealingPool
特点:子任务 窃取
这里这样理解,三个线程,各个有自己的队列,他们也有公共的队列,
第一个线程自己创建了3个子任务,执行,另外两个线程会帮线程1去执行
这样使用有两个注意,适用不加锁的场景,这样才能让别的线程帮忙执行 ,另外就是执行的顺序不保证,因为会窃取嘛
场景:比较少,
线程池的关闭也是有些讲究的!
shutDown
发出线程终止的命令,但是并非是马上的关闭,会等队列中的任务全搞完,就关闭,
同时不会去接受新的任务
isShutDown
判断是否shutDown
isTerminated
这个是判断是否真的关闭了,因为执行shutDown要等待全关掉
awaitTermination
等待一段时间后来判断是否真的关闭
shutDownNow
这个命令会马上关闭,正在执行的线程会中断,在队列中为被消费的,会返回
/**
* @Author:Joseph
* @bolg:https://li-huancheng.gitee.io/
* @Package:threadPool
* @Project:bing-fa-demo
* @name:ShutDown
* @Date:2023-07-28 15:59
* @Filename:ShutDown
*/
public class ShutDown {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
executorService.shutdown();
}
}
class ShutDownTask implements Runnable{
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
这里就比较简单了,大家自己测试,不是本文的重点
这里回顾一下重要的参数
最大线程数,核心线程数,最大存活时间,工作队列,没错,还有一个就是现在要讲的拒绝策略
1,当Executior执行shutDown命令,就不会再接受新的任务了
2、工作队列长度,或者最大线程数饱和
AbortPolicy
直接抛出异常
DiscardPolicy
丢弃新的任务,我们是不知道的!
DiscardPolicyOldestPolicy
这个是丢掉最老的任务。
CallerRunsPolicy
让提交任务的线程去执行,比如异步功能,想要让线程池去做,但是线程池饱和了,线程池说,我不做,你自己做,这个就是CallerRunsPolicy,这个就是不像被压榨的员工哈哈
我们想在每个任务执行前后做一些日志统计,等任务,比如暂停,实现一个暂停的线程池
package threadPool;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 延时每个任务执行的前后都可以放钩子函数
* @Author:Joseph
* @bolg:https://li-huancheng.gitee.io/
* @Package:threadPool
* @Project:bing-fa-demo
* @name:PauseableThreadPool
* @Date:2023-07-28 16:31
* @Filename:PauseableThreadPool
*/
public class PauseableThreadPool extends ThreadPoolExecutor {
private final ReentrantLock lock= new ReentrantLock();
private Condition unpaused = lock.newCondition();
private boolean isPaused;
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try{
while(isPaused){
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
private void pause(){
lock.lock();
try{
isPaused = true;
}finally {
lock.unlock();
}
}
public void resume(){
lock.lock();
try{
isPaused=false;
unpaused.signalAll();
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableThreadPool.pause();
System.out.println("线程池被暂停了");
Thread.sleep(1500);
pauseableThreadPool.resume();
System.out.println("线程池恢复了");
}
}
上面了解了下线程池的使用注意点,现在看下线程池怎么实现的
线程池管理器:创建、管理线程池
工作线程:就是线程池中存在的线程
任务队列:这个就是参数里重要之一的工作队列,因为并发嘛,多个线程去取线程,所以采用的阻塞队列blokingQueue
任务接口(Task):这个就是线程池要执行的一个一个的任务
ThreadPoolExecutor、ExecutorService、Executor、Executors
这是他们的继承实现关系
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
Executor是一个顶级的接口,只有这一个execute的方法
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
Future<T> submit(Callable<T> task);
Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService继承了Executor,并新增了一些管理线程池的方法
一个工具类这里就不多说了,上面讲解的很透彻,只是调用创建线程池的构造函数,指定一些规则而已
这里就要着重讲一下了,这里他是ExecutorService的实现,返回值是ExecutorService
也是我们开发人员调用这个构造函数自定义线程池使用的,文章末尾会带大家实操一下
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
private final Object poolSizeMonitor = new Object();
private int corePoolSize = 1;
private int maxPoolSize = 2147483647;
private int keepAliveSeconds = 60;
private int queueCapacity = 2147483647;
private boolean allowCoreThreadTimeOut = false;
这是ThreadPoolExecutor中额execute方法
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//线程不够,增加线程
if (workerCountOf(c) < corePoolSize) {
//我们要看的就是wordker方法
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
好,看一下work中的runWorker方法
java.util.concurrent.ThreadPoolExecutor#runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//注意这里的task,Runnable的run方法就是一个一个的类
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//主要看这里,如果task不为null,就去执行这里的逻辑,getTask就是从阻塞队列中拿出任务
//while循环代表,这个work不会停止,执行完任务就会继续拿下一个任务去执行,这就实现了线程的复用
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
//这里就是调用runnable的run方法,
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
状态 | 说明 |
---|---|
RUNNING | 运行状态,能接受新提交的任务,并且也能处理阻塞队列中的任务 |
SHUTDOWN | 关闭状态,不再接受新提交的任务,可以继续处理阻塞队列中已保存的任务。 |
STOP | 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程 |
TIDYING | 所有的任务都已终止了,workerCount (有效线程数)为0 |
TERMINATED | terminated() 方法执行完后进入该状态 |
通过上面线程关闭的学习,再结合这里看一下
shutDown会使Running状态到SHUTDOWN状态,线程池会拒绝新任务,但是还是会执行新的任务
shutDOwnNow让线程池从Running状态到STOP状态,此时拒绝新任务,同时也会中断正在执行的任务,同时会返回队列中的任务
当上面两种方法执行完,队列和工作线程都空时,会进入tidying状态,执行termined会进入TERMINATED关闭状态。
在这里站在上帝视角看下源码
无非就是
execute、addworker 、runworker、getTask方法
这里要先将一下,ctl这个
变量 ctl
这个AtomicInteger包含两部分的信息,使用的是位运算的方式,相比于基本运算,速度快很多
下面是变量和比较重要的方法
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
* RUNNING -> SHUTDOWN
* On invocation of shutdown()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*
*/
//int类型的数字,高3位表示线程池状态,低29位表示worker数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//Integer.SIZE`为32,所以`COUNT_BITS`为29
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池允许的最大线程数, 1左移29位,然后减1,即为 2^29 - 1
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
//获取线程池状态
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
//获取线程池worker数量
private static int workerCountOf(int c) { return c & COUNT_MASK; }
//根据线程池状态和线程池worker数量,生成ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }
//任务执行
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 32位,高3位存储线程池状态,低29位存储活跃线程数
int c = ctl.get();
//判断工作线程小于核心线程,则创建新线程,true表示是核心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//判断线程池是否运行,把任务放到队列里面去,返回boolean状态
if (isRunning(c) && workQueue.offer(command)) {
//再次获取值
int recheck = ctl.get();
//如果线程池已经终止,则移除任务,不在响应
if (! isRunning(recheck) && remove(command))
reject(command);
//如果没有线程,则创建一个空的worker,会从队列获取任务执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//队列满后,调用addWorker,创建非核心线程,参数是false,
else if (!addWorker(command, false))
//队列已满,创建非核心线程,失败则执行拒绝策略
reject(command);
}
//用于向线程池中添加一个新的工作线程。如果线程池中的线程数量已经达到maximumPoolSize,则返回false;
//如果线程池已经关闭,则返回false;否则,创建一个新的工作线程,并将其加入工作线程集合中
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
//判断线程数,根据传进来参数判断是创建线程数最大值
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
//增加worker数量成功,返回到上面的retry
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
//新创建的worker,然后立刻启动,立刻执行任务(不是从队列中获取)
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
//用于执行指定的工作线程,首先获取当前线程,然后不断从阻塞队列中取出任务并执行,直到从阻塞队列中取出null为止。
//在每次执行任务之前,会调用beforeExecute()方法和afterExecute()方法,这两个方法可以由子类进行重写,以实现一些特定的功能。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//一直循环判断,当前任务是否有,没的话getTask()从队列中获取任务执行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
//用于从阻塞队列中获取一个任务,如果当前线程数小于corePoolSize,则会调用workQueue的take方法阻塞在当前
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// timed用于超时控制,当allowCoreThreadTimeOut是true或者活跃线程数大于核心线程数,则需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
//take和poll都是从队列头部【拿出】一个元素,从头部得到并移除该元素
//poll空队列的头部元素时返回null,不抛异常;而take方法对应获得空队列的头部元素时,会阻塞在获取的位置
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
关于源码的解读,大家看注释就可以
我想着重点一下,这个阻塞队列,take方法,这里就体现阻塞队列的特性了,take方法,不会返回元素,而是阻塞,
还有就是work类中的works,本质是一个HashSet集合
再者需要注意runworker中执行前后的两个钩子
前面提到,核心线程数是不会消失的,但是,这里指的是永远是数量,HashSet这里并没有区分线程是先创建的还是后创建的,
所以core核心线程数为3,假设有6个,剩下来的3个不一定是最初创建的线程!
这里讲一下工作中,如何创建线程池
先看一下,使用线程池的注意点
避免任务堆积
避免线程数过度增加
排查线程泄漏(线程回收不了的情况):一般是任务逻辑问题,导致任务结束不了,导致任务回收不了
如果有人问:工作中线程池是怎样做的?我认为可以这样回答
线程数的配置,要考虑io密集型和cpu密集型,io密集型可以设置2倍的核数,cpu密集型最好是和核数相等
阻塞队列的长度可以区分面向C端的快速响应还是面向B端的允许慢速处理的场景
C端的阻塞队列长度不能太长,另外C端,可以通过多节点,来增快消费速度,避免堆积,B端可以长一些
当然要结合业务,进行多接口压测,才能得到合理的数据。
这里列举两个场景。(以32核,64g为例)
C端:
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 32, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000),
new ThreadPoolExecutor.DiscardPolicy());
这个C端的场景,就没有考虑上面说的,阻塞队列太长了,而核心线程数有过短,所以这里,就会出现问题,任务堆积而得不到消费
C端的场景不应该设置过长的策略,且要注意消费的速度要大一些
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(32,128, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
new ThreadPoolExecutor.DiscardPolicy());
一台机器不够,那就多一些
B端
商家管理后台统计报表
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8,1024, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
new ThreadPoolExecutor.DiscardPolicy());
这里没有考虑B端场景,队列可以长一些,因为不要求实时性,这样配置,1024线程,导致OOM,服务器崩溃
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(32,124, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000),
new ThreadPoolExecutor.DiscardPolicy());
这样配置,让任务能过慢慢处理,同时队列又不是长的很过分,100万,并不会出现OOM的问题
另外
生产配置的时候,要线程池的隔离,不能让多个不同任务公用线程池,
到这里,线程池基本就完结了
这里提供之前做项目的一个场景,这里用到了http连接池,和线程池,通过这个例子,我们看一下,工作中,如何用线程池,以及考虑引发的一些问题,
涉及网络通信,且有需要提高响应速度,就可以用异步,
MQ也属于一种异步,
适用于处理log,发送邮件,短信–等场景,涉及网络io调用,并不影响业务,因为异步是直接返回正常的,不知道后面是否成功
使用方式:启动类,开启异步@EnableAsync
异步失效场景:
异步采用动态代理,不能调用类本身的方法,加@Async,
还要注意事物,@Tranctional与@Async会失效,但是,在调用异步的上游,就没事
当我们使用@Async的时候,没有自定义线程池,他会使用默认的线程池
默认8个核心线程数,核心线程处理不了,就会进入阻塞队列,阻塞队列是默认的Integer.MAX_VALUE,21亿!采用的是LinkedBlokingQueue,最大线程数也是21亿,但是根本不会大于8,因为阻塞队列满了才会到达,但是不可能满的
所以就要自定义线程池,
但是!
在spring中,自定义线程池要用ThreadPoolTaskExecutor,其实是一样的,只是框架多了层包装而已
我们只需要自定义线程池,覆盖spring默认的线程池,就能避免OOM的问题了
Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
@Bean("threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
//如果设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
executor.setCorePoolSize(16);
// executor.setAllowCoreThreadTimeOut(true);
//缓存队列(阻塞队列)当核心线程数达到最大时,新任务会放在队列中排队等待执行
executor.setQueueCapacity(1024);
//最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
//当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
executor.setMaxPoolSize(64);
//当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
//允许线程空闲时间60秒,当maxPoolSize的线程在空闲时间到达的时候销毁
//如果allowCoreThreadTimeout=true,则会直到线程数量=0
executor.setKeepAliveSeconds(30);
executor.setThreadNamePrefix("joseph的自定义线程池");
//拒绝策略
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行
//AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
//DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
//DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.initialize();
return executor;
}
}
好,我们异步的时候
直接指定线程池
@Async(“threadPoolTaskExecutor”)就可以了
好,那么当请求量很多的时候,就会引发问题
导致OOM,服务器重启,是会丢失的!,或者说,阻塞队列你设置的短一些的话,任务也是会丢失的(拒绝策略的话)
意思就是,我们不能单单的把任务堆到队列就好了,是会有消息丢失的风险的!
对的,就是提高线程处理任务的速度!
当涉及网络请求时,spring中使用的是restTempate
这是spring基于httpClient提供的一个http请求工具,我们异步发短信调用第三方服务,用到restTemplate,但是restTemplate在spring中有个坑,
这就导致,每次请求都会三次握手,非常的耗时,
当请求处理不过来,客户端等待过长,主动断掉连接,就会报错
错误Caused by: java.io.IOException: Broken pipe
我们可以通过http连接池的方式,来服用建立 的连接,来加快线程处理的速度
进而加快阻塞队列的消费
对httpclient进行封装的有:Apache的Fluent、es的restHighLevelClient、spring的restTemplate等
这几个都可以进行http连接池的封装!
下面来看一下,如何封装
@Configuration
public class RestTemplateConfig {
@Bean
public RestTemplate restTemplate(ClientHttpRequestFactory requestFactory){
return new RestTemplate(requestFactory);
}
@Bean
public ClientHttpRequestFactory httpRequestFactory(){
return new HttpComponentsClientHttpRequestFactory(httpClient());
}
@Bean
public HttpClient httpClient(){
Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", SSLConnectionSocketFactory.getSocketFactory())
.build();
PoolingHttpClientConnectionManager connectManager = new PoolingHttpClientConnectionManager(registry);
//设置连接池最大是500个链接
connectManager.setMaxTotal(500);
//maxPerToute对maxTotal细分 每个主机最大并发是300 route是指域名
connectManager.setDefaultMaxPerRoute(300);
RequestConfig requestConfig = RequestConfig.custom()
//返回数据超时时间
.setSocketTimeout(20000)
//连胜服务器超时时间
.setConnectTimeout(10000)
//从连接池中获取连接的超时时间
.setConnectionRequestTimeout(1000)
.build();
CloseableHttpClient closeableHttpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig)
.setConnectionManager(connectManager)
.build();
return closeableHttpClient;
}
//优化前
// @Bean
// public ClientHttpRequestFactory simpleClientHttpRequestFactory(){
// SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
// factory.setReadTimeout(10000);
// factory.setConnectTimeout(10000);
// return factory;
// }
}
现在,要涉及网络调用,异步提高性能的话
就没有任何问题了!
总结就是,先发现异步请求在压测的时候,会导致OOM,然后再自定义线程池,但是处理太慢,有超时报错和http管道报错,我们又更换httpClient,自定义http连接池,最终完成了qps的提高