今天是10.24号,天气晴,你正在摸鱼,突然间接到一个需求,由于系统升级,说要同步数据,方案就是把老系统需要同步的数据(订单)发送到MQ中,新系统再去拉取这个MQ,很简单的方案,你要做的就是,把老系统的数据封装成MQ消息,调用发送MQ接口,发送到MQ中。
你现在有了一个发MQ消息的接口,调用一次接口发送一条MQ消息,那么什么时候调用这个接口呢?同步一条数据就调用一次接口,现在老系统有一万条数据,我就要for循环分别调用一万次这个接口,调用一次接口花费600ms,调用一万次就要花费 600ms * 10000 = 6000s = 600min = 10h
心想,真好,这样的话我就可以摸鱼10个小时了,今天的行云面板就先安排上了!
老板,真不是我不干活,而是程序它不中用啊,你看,我代码都写好了,是他自己非要跑10个小时的!冤枉啊老大~
被领导谈话过后,回到工位开始思考,究竟是代码的问题还是你的问题,现在有1万条数据,而你是串行调用的,能不能让它并行调用呢,听说有个叫线程池的东西很厉害的样子,听说隔壁组的小杰最近一直研究这个,我去问(逼)问(迫)他吧,
你来到了小杰的工位,发现他也正在用for循环摸鱼......
"用线程池?那当然是首选Executors工具类啊,简单粗暴,就连名字都这么短小精悍方便记忆,我们直接指定创建多少个线程就好啦,代码大概长这样"
”这简直太方便啦“
"不过但是这样的话,会有几个问题"
你看,我们到现在,就指定了一个参数,可是线程池其实一共有七个参数呢,其他的我们都控制不了,这点就很不方便,同时也不有助于我们理解线程池的原理,出问题不好排查
其实线程池里面有一个任务队列,用来存储待执行的任务,我们如果用图中的这个线程池的话,他的队列(LinkedBlockingQueue)大小是接近无限大的
这就意味着理论情况下,我们可以无限的往里放任务,一直堆在这个队列里,但是我们内存是有限的,有可能在堆任务的时候就OOM了
”那我们怎么能够自定义参数呢?怎么避免OOM呢?“
"用带参数的构造函数就好啦,给阻塞队列设置规定大小“
如果我们要是想自定义这几个参数的话,就要用 ThreadPoolExecutor 原生的7个参数的构造函数,他们大概长这样,根据我的观察,一般我们项目中的业务用这样的就够了 ,自己定义参数
我们先来回顾下这七个参数是干嘛的
发没发现,这样确实可以跑出来对应的结果,不过我总觉得有点怪怪的,站在一个旁观者的角度来看,这里有一堆任务,我把他们丢进去了线程池里面,它会有一堆线程来处理,可是我也不知道具体的线程处理情况,也不知道每个任务对应的线程的运行时间是多少,那在每个任务运行前和运行后我能不能打印点自定义的日志?这些我都不能做呀,束缚住了我的手脚
这么多任务,要是其中有一个跑着跑着有异常怎么处理?线程池的异常梳理机制是什么?我想记录异常日志怎么记录?
只要思想不滑坡,方法总比困难多!
我们想到的问题,前辈们肯定是都想到了呀,要不然这个线程池能被广泛的应用在各种业务场景下你说是不?这一切的一切,都要继承ThreadPoolExecutor这个类,重写父类的几个方法
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
public static Logger log = LoggerFactory.getLogger(MyThreadPoolExecutor .class);
private AtomicLong numTasks = new AtomicLong();
private AtomicLong totalTime = new AtomicLong();
private ThreadLocal<Long> startTime = new ThreadLocal<Long>();
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
log.info("Thread {},Runnable {}",r.toString(),r.toString());
startTime.set(System.currentTimeMillis());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
numTasks.incrementAndGet();
long taskTime = System.currentTimeMillis() - startTime.get();
totalTime.addAndGet(taskTime);
log.info("Runnable {},这次任务执行时间是{},截至目前为止总耗时是{},异常是{}", r.toString(),taskTime,totalTime.get(),t.getMessage());
}
@Override
protected void terminated() {
log.info("terminated 线程关闭 总耗时是{},总任务数是 {},平均耗时是{}",totalTime.get(),numTasks.get(),totalTime.get()/numTasks.get());
}
@Override
public void execute(Runnable command) {
super.execute(command);
}
}
我们可以这样就可以在任务执行前后输出自己想输出的业务日志了/记录时间等相关信息
@Configuration
public class MyThreadPoolExecutorConfig {
public static Logger log = LoggerFactory.getLogger(MyThreadPoolExecutorConfig.class);
@Value("${threadPoolExecutorConfig.corePoolSize}")
private int corePoolSize;
@Value("${threadPoolExecutorConfig.maximumPoolSize}")
private int maximumPoolSize;
@Value("${threadPoolExecutorConfig.keepAliveTime}")
private int keepAliveTime;
@Value("${threadPoolExecutorConfig.capacity}")
private int capacity;
@Value("${threadPoolExecutorConfig.namePrefix}")
private String namePrefix;
class MyThreadPoolFactory implements ThreadFactory{
private final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
MyThreadPoolFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
if (null == name || name.isEmpty()) {
name = "pool";
}
namePrefix = name + "-thread-" + poolNumber.getAndIncrement();
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon()){
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY){
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
class MyRejectedExecutionHandler implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//打印被拒绝的任务
log.info("任务被拒绝 {},CompletedTaskCount已完成执行的大致任务总数 {},ActiveCount正在主动执行任务的线程的大致数量{}," +
"CorePoolSize核心线程数{},LargestPoolSize曾经同时进入池中的最大线程数 {},MaximumPoolSize允许的最大线程数{}," +
"QueueSize{},TaskCount已安排执行的大致任务总数{}",r.toString(),executor.getCompletedTaskCount(),
executor.getActiveCount(),executor.getCorePoolSize(),executor.getLargestPoolSize(),executor.getMaximumPoolSize(),
executor.getQueue().size(),executor.getTaskCount());
}
}
@Bean
public ExecutorService getThreadPool(){
log.info("初始化线程池 corePoolSize {},maximumPoolSize{},keepAliveTime {},capacity{},namePrefix{}",corePoolSize,maximumPoolSize,keepAliveTime,capacity,namePrefix);
return new MyThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.SECONDS,
new ArrayBlockingQueue<>(capacity),new MyThreadPoolFactory(namePrefix),new MyRejectedExecutionHandler());
}
}
前面我们讲过,线程池工厂/拒绝策略,我们都可以自定义的,图中就是自定义之后的样子,除此之外,我们一般喜欢把重要的参数抽取出来写在配置文件里面,便于查看与修改
为了测试自定义处理异常,改了下测试代码,让处理orderNo为”order_1“的订单号的线程抛异常
”太棒了!这样我就可以改造一个属于我自己的线程池了!不过我还有一个问题,这个线程池的原理是啥啊?为什么可以做到线程复用?为什么重写那三个方法就可以达到这样的效果?线程池是不是还有什么秘密瞒着我?小杰你说!“
摊牌了我不装了,其实我也不知道为啥
不过 源码面前,了无秘密
这就是大佬的设计思想,非常巧妙,里面也有位运算的相关知识
数据结构预与算法在这里体现的淋漓尽致,要是基础不扎实的话,理解起来就相当费劲啦,可见其重要性
我们来分别看一下,这都是啥啥啥,平常CRUD的时候怎么没见过呢
如图所示,线程池一共五种状态
根据观察我们可以知道,线程池的五种状态其高三位是不一致的,分别是
还记得一开始的 COUNT_BITS = Integer.SIZE - 3 吗?
话说回来,这个左移,具体是怎么移动的呢?就拿 1<<29举个例子吧
这是正数的左移,那么负数的左移又要怎么办呢?
首先,我们要知道,负数在计算机中是以其正值的补码形式表示
回到我们刚才说的:-1 << 29
其中的左移,就是右边补0
位运算的左移我们看完了,我们再来看下这个
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
这些又是什么意思呢?
public void execute(Runnable command) {
//先判断提交的任务是不是空的
if (command == null)
throw new NullPointerException();
//获得线程池的ctl(根据这个可以获取线程池状态或者线程池数量)
int c = ctl.get();
//判断线程池数量是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//如果小于核心线程数
//添加worker
if (addWorker(command, true))
//添加成功,返回
return;
//添加失败,重新获取ctl
c = ctl.get();
}
// 到这个if时,有两个原因
// 1. 线程池的线程数量已经大于等于核心线程数
// 2.添加worker失败
//当线程池处于Running状态,就要把任务添加至阻塞队列,如果添加成功的话
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次检查线程池的状态是否为running,因为添加任务需要时间,这个时间内线程池的状态可能会变化
// 如果不为running的话,就要从阻塞队列中删除这个任务,然后执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 到这个if时,有两种情况
// 1. 线程池状态为runnning
//2. 线程池状态不为runnign,但是删除失败
// 判断 当前线程池是否有线程,如果没有的话
else if (workerCountOf(recheck) == 0)
// 添加一个线程,来确保线程池有线程可以执行任务
addWorker(null, false);
}
// 到这个if时,有两个原因
// 1.走到这里说明添加阻塞队列失败,
// 2. 线程池不是runninng状态
else if (!addWorker(command, false))
//创建非核心线程也失败的话,执行拒绝策略
reject(command);
}
流程图大概如下图所示
private boolean addWorker(Runnable firstTask, boolean core) {
//标志位,一会儿会跳过来
retry:
for (;;) {
int c = ctl.get();
// 获取线程池状态
int rs = runStateOf(c);
// 1. 如果线程池状态是非running状态
// 且
// 2.非( 线程池是SHUTDOWN状态并且firstTask为空并且阻塞队列为不为空 )
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
//判断线程池线程总数量
int wc = workerCountOf(c);
//如果wc大于CAPACITY 或者
// 当 core 为true时,wc大于 corePoolSize
// 当 core 为false时,wc大于 maximumPoolSize
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
// wc数量超的话,则添加失败
return false;
//cas 线程池数量加1
if (compareAndIncrementWorkerCount(c))
//如果成功,则跳到开始 retry处且往下执行,不在进入大循环
break retry;
// 到这里就说明,cas失败了
// 失败有两个原因,1.线程数量变了 2.线程池状态变了,不能再添加任务了
// 重新获取ctl
c = ctl.get(); // Re-read ctl
// 判断当前线程池状态是否和刚才的状态一致,如果不一致
if (runStateOf(c) != rs)
//继续大循环
continue retry;
}
}
//线程启动成功的标志
boolean workerStarted = false;
//线程添加成功的标志
boolean workerAdded = false;
Worker w = null;
try {
//将提交的任务封装进worker
w = new Worker(firstTask);
//得到worker中的线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
//得到线程池状态
int rs = runStateOf(ctl.get());
// 1.如果线程池状态是running状态
// 2. 或者线程池状态是SHUTDOWN并且firstTask为空
// 这样的话代表可以添加线程
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())//预先检查 t 是否可启动
throw new IllegalThreadStateException();
//将worker添加至workers中 (这是个set集合,真正的线程池)
workers.add(w);
//判断线程数量
int s = workers.size();
// 更新最大的线程数
if (s > largestPoolSize)
largestPoolSize = s;
//添加worker成功
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
if (workerAdded) {
//启动worker中的线程
t.start();
workerStarted = true;
}
}
} finally {
//如果启动失败
if (! workerStarted)
addWorkerFailed(w);
}
// 返回启动是否成功
return workerStarted;
}
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//线程
final Thread thread;
//要执行的任务
Runnable firstTask;
//完成的任务
volatile long completedTasks;
Worker(Runnable firstTask) {
//禁止中断直到 runWorker
setState(-1); // inhibit interrupts until runWorker
//用户提交的任务
this.firstTask = firstTask;
//通过创建一个线程,传入的this是woker自身 worker继承了Runnable 那么这个线程在t.start就是调用重写的run()方法了
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
//真正运行任务的方法
public void run() {
runWorker(this);
}
// 0表示解锁状态
// 1 表示锁定状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
// 如果获取锁成功,cas将 state设置为从0到1
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁,将state设置为0
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
// state是1且 当前线程t不等于空且t没有被中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
// 中断线程
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
final void runWorker(Worker w) {
//得到当前线程
Thread wt = Thread.currentThread();
//这是我们提交的任务
Runnable task = w.firstTask;
w.firstTask = null;
// 这里设置sate为0,为了响应中断
w.unlock(); // allow interrupts
//线程退出的原因
boolean completedAbruptly = true;
try {
//线程复用的密码就在这里,是一个while循环,判断如果提交的任务不为空或者队列里有任务的话
while (task != null || (task = getTask()) != null) {
// 上锁
w.lock();
// 判断是否需要设置中断标识
// 1. 当线程池状态大于等于STOP
// 2. 当线程池状态小于STOP 但是线程已经被打断,并且wt没有被中断,则设置中断流程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行前的函数,用户可以自己拓展
beforeExecute(wt, task);
Throwable thrown = null;
try {
//任务自己的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行后的函数,用户可以自己拓展
afterExecute(task, thrown);
}
} finally {
task = null;
//完成任务数+1
w.completedTasks++;
//释放锁
w.unlock();
}
}
// 线程正常退出
completedAbruptly = false;
} finally {
//销毁线程
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 获取ctl
int c = ctl.get();
// 得到线程池状态
int rs = runStateOf(c);
// 状态大于等于SHUTDOWN 且 (状态大于stop或者阻塞队列为空)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 线程数-1
decrementWorkerCount();
return null;
}
//得到线程池线程数量
int wc = workerCountOf(c);
// 是否设置超时时间 allowCoreThreadTimeOut默认是false 是否允许核心线程超时回收
//判断线程池数量是否大于核心线程数,如果大于的话 timed为true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 1. wc > 最大线程数 或者 阻塞队列拉取时间超时 都为true
// 2. 线程数大于1 或者 阻塞队列为空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// cas 使线程数减一
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//这里 timed为ture的时候,采用带超时时间的获取元素的方法, 否则采取一直阻塞的方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//获取到任务就返回
if (r != null)
return r;
// 超时获取不到任务 timedOut设置为true
timedOut = true;
} catch (InterruptedException retry) {
// 获取任务事线程被中断,继续重试
timedOut = false;
}
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//completedAbruptly = true 代表线程执行出了异常
// completedAbruptly = tfalse 线程正常结束
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// 线程数量减一
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
//上锁
mainLock.lock();
try {
//任务完成数
completedTaskCount += w.completedTasks;
// 线程集合里删掉该woker
workers.remove(w);
} finally {
// 释放锁
mainLock.unlock();
}
// 判断是否需要结束线程池
tryTerminate();
// 获得ctl
int c = ctl.get();
// 线程池状态是 running 时
if (runStateLessThan(c, STOP)) {
// 如果线程不是异常结束的话
if (!completedAbruptly) {
// 如果允许核心线程超时取消的话 min = 0
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果 min = 0 且阻塞队列里还有任务 则保留一个线程来处理
if (min == 0 && ! workQueue.isEmpty())
min = 1;/**/
// 确保当前线程数量至少要为min
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 添加worker
addWorker(null, false);
}
}
其上就是我们用线程池execute方法提交任务时的总体流程,里面东西其实非常之多,不得不佩服当时设计者的精妙思想,但是,这还不是线程池的全部,还有两个关闭线程池的方法,还有异常梳理没有详细展开说,为什么线程池有时会吞异常?线程池处理异常到底有哪几种方法?submit方法和execute方法有什么区别等等
大家是喜欢直接讲源码的呢?还是喜欢有开头这种场景带入的类型的呢?大家要是对后半段源码感兴趣的话