前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java中关于线程池的几道面试题

Java中关于线程池的几道面试题

作者头像
半月无霜
发布2023-10-18 16:30:51
3240
发布2023-10-18 16:30:51
举报
文章被收录于专栏:半月无霜

Java中关于线程池的几道面试题

一、介绍

以前就讲过线程池的使用,本文中介绍深挖线程池中的几道面试题

Java线程池 | 半月无霜 (banmoon.top)

在上面可以找到

  • 线程池的核心参数都有什么,代表什么含义?
  • 线程池的拒绝策略有哪些?

二、线程池

1)任务添加流程

当一个线程池在添加一个任务时,它是怎么分配线程去执行这个任务的

代码语言:javascript
复制
public class ThreadPoolExecutor extends AbstractExecutorService {
    
    public void execute(Runnable command) {
        // 判断是否为空
        if (command == null)
            throw new NullPointerException();
        
        // 判断当前正在运行的线程数是否小于核心线程数
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            // 添加任务至线程执行,成功添加则结束
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果核心线程都有在运行,将任务放至队列中
        if (isRunning(c) && workQueue.offer(command)) {
            // 如果成功推入队列,将再次检查线程状态,有线程死亡则将当前任务添加至线程执行
            int recheck = ctl.get();
           	// 检查线程状态是不是RUNNING,如果不是将会拒绝此任务
            if (!isRunning(recheck) && remove(command))
                reject(command);
            // 检查当前的工作线程数是否为0
            else if (workerCountOf(recheck) == 0)
                // 添加一个null的任务
                addWorker(null, false);
        }
        // 如果队列推入任务失败了,那将直接添加至线程执行
        else if (!addWorker(command, false))
            // 如果任务添加至线程失败,则将进行拒绝策略
            reject(command);
    }
    
    /**
     * 会从线程工厂获取线程,并添加执行任务
     * @param firstTask 执行的任务
     * @param core 是否可以添加至核心线程
     * @return true:成功添加至线程执行
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // ...
    }
}

2)线程池的状态有哪些

线程池的状态有哪些,状态是如何进行转换的?

注意是在提问线程池的状态,而不是线程的状态


这是ThreadPoolExecutor.java中的源码

代码语言:javascript
复制
   /**
    * 主池控制状态ctl是一个原子整数,包含两个概念性字段:
    * workerCount表示实际线程数,runState表示是运行中、正在关闭等状态。
    * 为了将它们打包成一个整数,我们将workerCount限制为(2^29)-1(大约5亿)个线程,而不是(2^31)-1(可表示20亿)。
    * 如果将来出现了问题,该变量可以改为AtomicLong,并且下面的移位/掩码常量需要调整。但在需要之前,使用int类型会更快,更简单。
    * 
    * workerCount是已被允许启动且未被允许停止的工作线程数。
    * 该值暂时可能与实际的活动线程数不同,例如当ThreadFactory无法按要求创建线程时,或者退出线程在终止之前仍在执行簿记操作。用户可见的池大小报告为工作线程集合的当前大小。
    * 
    * runState提供了主要的生命周期控制,接受以下值:
    * RUNNING:接受新任务并处理队列中的任务;
    * SHUTDOWN:不接受新任务,但处理队列中的任务;
    * STOP:不接受新任务,不处理队列中的任务,并中断正在处理的任务;
    * TIDYING:所有任务都已终止,workerCount为零,转换到TIDYING状态的线程将运行terminated()钩子方法;
    * TERMINATED:terminated()已完成。这些值之间的数值顺序很重要,才能允许有序比较。
    * 
    * runState随时间单调递增,但不一定达到每个状态。转换如下:
    * RUNNING->SHUTDOWN:调用shutdown()时,可能是隐式的(RUNNING或SHUTDOWN状态);
    * RUNNING或SHUTDOWN->STOP:调用shutdownNow()时;
    * SHUTDOWN->TIDYING:当队列和池都为空时;
    * STOP->TIDYING:当池为空时;
    * TIDYING -> TERMINATED:当terminated()钩子方法完成时。等待在awaitTermination()中的线程将在状态到达TERMINATED时返回。
    * 由于在SHUTDOWN状态下队列可能在非空和空之间变化,因此检测从SHUTDOWN到TIDYING的转换不是很直观,但我们只有在看到它为空时,才能看到workerCount为0而终止(有时需要重新检查-见下文)。
    */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
   private static final int COUNT_BITS = Integer.SIZE - 3;
   private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState的状态,RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
   private static final int RUNNING    = -1 << COUNT_BITS;
   private static final int SHUTDOWN   =  0 << COUNT_BITS;
   private static final int STOP       =  1 << COUNT_BITS;
   private static final int TIDYING    =  2 << COUNT_BITS;
   private static final int TERMINATED =  3 << COUNT_BITS;

// 运行状态
private static int runStateOf(int c) { 
       return c & ~CAPACITY;
   }
// 实际线程数
   private static int workerCountOf(int c) { 
   	return c & CAPACITY;
   }
// ctl控制数
private static int ctlOf(int rs, int wc) { 
       return rs | wc; 
   }

由上面源码可知,线程的状态一共有5

  1. RUNNING:运行状态,线程池正在接受、处理任务
  2. SHUTDOWN:当RUNNING状态调用shutdown()方法时,进入此状态;
代码语言:txt
复制
1. 不再接受新的任务
2. 正在运行中的任务和队列中的任务会等待其执行完毕
代码语言:txt
复制
1. 不再接受新的任务
2. 中断运行中的任务,销毁队列中的任务
  1. TERMINATED:由TIDYING状态进入,terminated()方法执行完毕

如果是画图的话,是下面这个样子的


terminated()方法默认什么都不做,线程池提供这个方法,交给子类来进行扩展

代码语言:javascript
复制
protected void terminated() { }

3)线程池如何去执行任务的

添加任务的流程我们已经讲述完毕,那么线程池是如何分配线程去执行任务的呢?

在第一节中,有一段...addWorker()方法,这里面就是执行任务的逻辑

代码语言:javascript
复制
  private boolean addWorker(Runnable firstTask, boolean core) {
      // 外部循环标识retry
      retry:
      for (;;) {
          // 当前的状态值ctl及runState,第二节有讲过
          int c = ctl.get();
          int rs = runStateOf(c);

          // 检查运行状态是不是大于等于SHUTDOWN
          if (rs >= SHUTDOWN &&
              // 且 (运行状态不等于SHUTDOWN 或 任务不为空 或 队列中是否有值)
              !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
              return false;
          // 上面这段,主要就是为了检测线程池的状态,队列中是否有任务
          // 如果线程处于RUNNING,就会跳过此处的return false
          // 如果处于SHUTDOWN,还要额外判断当前任务是否为有值,有值也会return false
          // 如果处于SHUTDOWN,且当前任务为null,还要判断当前队列是否有值;队列中没有值的话,也会return false

          // 内部循环
          for (;;) {
              // 当前运行的线程数
              int wc = workerCountOf(c);
              // 判断运行的线程数是否大于(容量最大),根据是否核心,判断是否大于核心线程数 或者 最大线程数
              if (wc >= CAPACITY ||
                  wc >= (core ? corePoolSize : maximumPoolSize))
                  // 返回false
                  return false;
              // 使用CAS自旋锁,添加ctl的运行线程数
              // 成功添加则会,跳出外层的循环
              if (compareAndIncrementWorkerCount(c))
                  break retry;
              // 如果添加没有成功,重新获取ctl
              c = ctl.get();
              // 得到当前线程池状态,与外部循环的线程池做一个对比;如果不一致,则退回到外部循环,重新进行loop
              if (runStateOf(c) != rs)
                  continue retry;
		// 如果状态是相等的,则在内部循环进行loop即可
          }
      }
// 当上面的自旋锁添加运行线程数成功后,才会进入此处
      
      boolean workerStarted = false;
      boolean workerAdded = false;
      Worker w = null;
      try {
          // 构建一个Worker对象,里面就默认分配了一个线程
          w = new Worker(firstTask);
          final Thread t = w.thread;
          if (t != null) {
              // 加锁
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  // 再次获取线程池的运行状态
                  int rs = runStateOf(ctl.get());
			
                  // 运行状态是RUNNING 或者 (运行状态是SHUTDOWN 且 当前任务为null)
                  if (rs < SHUTDOWN ||
                      (rs == SHUTDOWN && firstTask == null)) {
                      // 预先检查线程是否为启动状态
                      if (t.isAlive())
                          // 如果是启动状态,就有问题了,要抛出异常
                          throw new IllegalThreadStateException();
                      // 将任务添加至workers容器中,这个容器包括了运行线程的状态
                      // largestPoolSize 最大线程池数量
                      workers.add(w);
                      int s = workers.size();
                      if (s > largestPoolSize)
                          largestPoolSize = s;
                      // 标志位改为true,代表worker已经添加至workers
                      workerAdded = true;
                  }
              } finally {
                  // 解锁
                  mainLock.unlock();
              }
              // 判断标志位
              if (workerAdded) {
                  // 启动线程,执行任务
                  t.start();
                  // 启动线程标志位,设置为true
                  workerStarted = true;
              }
          }
      } finally {
          // 是否启动线程的标志位
          if (! workerStarted)
              // 添加一个worker失败的处理
              addWorkerFailed(w);
      }
      // 返回是否启动线程的标志位
      return workerStarted;
  }

4)为什么线程池中会把null作为任务添加

看下图,很多调用addWorker()方法都传递了一个null,这是为什么呢,有什么用?

首先说结论,这是为了更快的启动队列中的任务

大家通过上面第一节的任务添加流程就会发现,有一些任务在添加进入任务阻塞队列后就没有声音了。

那么就要看看,如果添加一个为null的任务会出现什么情况把。

代码语言:javascript
复制
// 这中间代码省略了,第一部分是判断状态和添加任务数,第二部分是判断状态和启动任务
// 如果线程池的状态是RUNNING,那么一个任务是大概率都是可以添加成功的
private boolean addWorker(Runnable firstTask, boolean core) {
	// ... 省略了,关心下面t.start();做了什么即可
       if (workerAdded) {
           t.start();
           workerStarted = true;
       }
   }

// 主要还是要看启动Worker做了什么
   private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
       
       // 首先构造方法
       Worker(Runnable firstTask) {
           setState(-1);
           // 传入一个任务后,作为自己的属性
           this.firstTask = firstTask;
           // 将自己作为任务构建了一个线程作为自己的属性。他自己也实现了Runnable接口
           this.thread = getThreadFactory().newThread(this);
       }
       
       // 当上面t.start();启动的是Worker的run方法
       public void run() {
           runWorker(this);
       }

       // 上面run();方法调用过来的
       final void runWorker(Worker w) {
           // 线程池里面的,当前的线程
           Thread wt = Thread.currentThread();
           // 当前真正要执行的任务,可能为null,本小节直接定义null
           Runnable task = w.firstTask;
           // 将属性变为null
           w.firstTask = null;
           w.unlock();
           boolean completedAbruptly = true;
           try {
               // 重点在这里,当task==null时,它会去getTask();方法中去获取task进行判断
               // 如果getTask();方法返回的是null,那么说明本次循环结束,任务运行完成
               // 如果getTask();方法返回的是队列中的任务,那么进入循环体,执行任务
               while (task != null || (task = getTask()) != null) {
                   w.lock();
				// 下面就是一些判断状态和执行任务的代码了
                   if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                         runStateAtLeast(ctl.get(), STOP))) &&
                       !wt.isInterrupted())
                       wt.interrupt();
                   try {
                       beforeExecute(wt, task);
                       Throwable thrown = null;
                       try {
                           // 这里才是真正执行我们任务的地方
                           task.run();
                       } catch (RuntimeException x) {
                           thrown = x; throw x;
                       } catch (Error x) {
                           thrown = x; throw x;
                       } catch (Throwable x) {
                           thrown = x; throw new Error(x);
                       } finally {
                           afterExecute(task, thrown);
                       }
                   } finally {
                       task = null;
                       w.completedTasks++;
                       w.unlock();
                   }
               }
               completedAbruptly = false;
           } finally {
               processWorkerExit(w, completedAbruptly);
           }
       }

       // 这是从上面runWorker();方法调用而来
       private Runnable getTask() {
           // 最后poll()方法是否超时
           boolean timedOut = false;

           for (;;) {
               // 获取线程池状态
               int c = ctl.get();
               int rs = runStateOf(c);

               // 检查阻塞队列是否为空
               // 当状态是RUNNING时,false
               // 当状态是SHUTDOWN时,判断队列是否为空,如果有值,false
               // 如果状态是后面几种状态时,无论队列是否有值,true
               if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                   decrementWorkerCount();
                   // 这里返回null,就代表task=null了
                   return null;
               }

               // 当前线程池运行的线程数
               int wc = workerCountOf(c);

               // allowCoreThreadTimeOut,这个讲一个,这个布尔值代表,核心线程数是否也可以被回收
               // 如果为true,空闲时会保证keepAliveTime的时候,过期销毁
               // 如果为false(默认),那么在空闲时也会保持活动
               // 这里主要判断是否允许超时保留核心线程,用来确定下面阻塞队列的阻塞时间
               boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

               // 加这个判断,主要是想留一个线程在这循环阻塞,加快从队列中取任务的流程步骤
               if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                   if (compareAndDecrementWorkerCount(c))
                       return null;
                   continue;
               }

               try {
                   // 获取任务
                   Runnable r = timed ?
                       workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                       workQueue.take();
                   // 返回
                   if (r != null)
                       return r;
                   // 没有就一直处在循环之中,并配合上面的107行判断使用
                   timedOut = true;
               } catch (InterruptedException retry) {
                   timedOut = false;
               }
           }
       }

   }

三、最后

当前线程池的解读就到这里了,如果不是为了造火箭,谁会看这么底层的东西。

有一说一,这线程池写起来真的很优雅!!!

我是半月,你我一同共勉!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-05-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Java中关于线程池的几道面试题
    • 一、介绍
      • 二、线程池
        • 1)任务添加流程
        • 2)线程池的状态有哪些
        • 3)线程池如何去执行任务的
        • 4)为什么线程池中会把null作为任务添加
      • 三、最后
      相关产品与服务
      容器服务
      腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档