前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Java并发-线程池

Java并发-线程池

作者头像
lpe234
发布于 2021-03-02 07:34:06
发布于 2021-03-02 07:34:06
45900
代码可运行
举报
文章被收录于专栏:若是烟花若是烟花
运行总次数:0
代码可运行

Java中线程池是运用场景最多的并发框架,几乎所有需要异步或者并发执行任务的程序都可以使用线程池。

合理使用线程池可以带来3个好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性:使用线程池可以进行统一分配、调优和监控。

1 线程池的使用

1.1 线程池的创建

推荐使用ThreadPoolExecutor创建线程池。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
  if (corePoolSize < 0 ||
      maximumPoolSize <= 0 ||
      maximumPoolSize < corePoolSize ||
      keepAliveTime < 0)
    throw new IllegalArgumentException();
  if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
  this.corePoolSize = corePoolSize;
  this.maximumPoolSize = maximumPoolSize;
  this.workQueue = workQueue;
  this.keepAliveTime = unit.toNanos(keepAliveTime);
  this.threadFactory = threadFactory;
  this.handler = handler;
}

线程池创建参数解释:

  1. corePoolSize(核心线程数):当线程池线程数量小于核心线程数时,即使有空闲线程也会创建线程,只有达到核心线程数时才不会创建。可以调用prestartCoreThreadprestartAllCoreThreads来预先创建一个或全部核心线程。核心线程默认不会被销毁,除非主动调用allowCoreThreadTimeOut(true)
  2. maximumPoolSize(最大线程数):线程池允许的最大线程数。核心线程数+非核心线程数=最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则会创建新线程执行新任务。如果使用了无界队列,则该参数基本无效。
  3. keepAliveTime,unit(存活时间):线程池的工作线程空闲后,保持的存活时间。默认仅对非核心线程有效,除非主动调用allowCoreThreadTimeOut(true)
  4. workQueue(工作队列):线程阻塞队列,只会存放由execute提交的Runnable任务。①ArrayBlockingQueue:基于数组的有界阻塞队列。②LinkedBlockingQueue:基于链表的阻塞队列,吞吐量通常高于ArrayBlockingQueue。③SynchronousQueue:不存储元素的阻塞队列,每个插入必须等待另一个线程调用移除操作。④PriorityBlockingQueue:具有优先级的无限阻塞队列。
  5. threadFactory(线程工厂):设置创建线程的工厂,创建线程池时要指定有意义的线程名称,方便出错时回溯。
  6. handler(拒绝策略):当队列和线程池都满,或者线程池不处于RUNNING状态时,会使用该策略,默认有4种实现:①AbortPolicy(默认):直接抛出RejectedExecutionException异常。②DiscardPolicy:静默丢弃当前提交的任务。③DiscardOldestPolicy:线程池未关闭的情况下,丢弃队列中最久未被执行的任务,并执行当前任务。④CallerRunsPolicy:线程池未关闭的情况下,使用调用者的线程去执行当前任务。
1.2 提交任务

线程池提交任务两个方法。

  • execute方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
  • submit方法用于提交需要有返回值的任务。线程池会返回一个Future类型对象,该对象可以判断任务是否执行成功,并且可以通过Future.get()来获取返回值,get()会阻塞当前线程直到任务完成,get(long timeout, TimeUnit unit)会阻塞当前线程一段时间后立即返回。
1.3 线程池的关闭

可调用shutdown或者shutdownNow方法来关闭线程池。**原理:**遍历线程池中的线程,逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow首先向线程池置为STOP状态,然后停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表;shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程,之前提交的任务(正在执行的和队列中的)会被执行。

只要调用了shutdown或者shutdownNow任意方法,isShutdown都会返回true。当所有任务都已关闭后,才表示线程池关闭,isTerminated方法返回true。通常调用shutdown来关闭线程池,如果不一定要任务执行完,则可以调用shutdownNow方法。

1.4 线程池的配置

要想合理的配置线程池,先要对任务特性进行分析。

  • 任务性质:CPU密集、IO密集、混合型
  • 任务执行时间:长、中、短
  • 任务依赖性:是否依赖其他系统资源,如数据库连接、外部系统API调用
  • 任务优先级:高、中、低

性质不同的任务可以使用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的线程,如N或N+1。IO密集型的任务并不是一直在执行任务,则应配置尽可能多的线程。可以大概预估请求等待时间(WT)和服务时间(ST)之间的比例。线程池大小设置为N*(1+WT/ST)。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理,可以让高优先级的任务先执行。为了防止优先级低的任务可能永远都不能执行。可以将等待时间加入权重计算优先级。

建议使用有界队列,能增加系统的稳定性和预警能力。

1.5 线程池的监控

如果系统中存在大量使用线程池,则由必要对线程池进行监控,方便在出现问题时,可根据线程池使用情况进行快速定位。可根据线程池提供的参数进行监控,常用属性如下:

  • getPoolSize:线程池当前线程数
  • getCorePoolSize:线程池核心线程数
  • getActiveCount:正在执行任务的线程数量
  • getCompletedTaskCount:已完成任务的大致总数(任务和线程的状态可能在计算过程中有所变化)
  • getTaskCount:全部任务的大致总数
  • getQueue:当前线程池的任务队列
  • getLargestPoolSize:线程池曾经最大线程数量
  • getMaximumPoolSize:线程池允许最大线程数
  • getKeepAliveTime:线程池线程存活时间
  • isShutdown:线程池是否为关闭(SHUTDOWN状态)
  • isTerminated:线程池是否为TERMINATED状态

通过扩展线程池进行监控。可以通过继承的方式来自定义线程池,重写beforeExecute(Thread t, Runnable r)afterExecute(Runnable r, Throwable t)terminated(),可以在任务执行前、执行后和线程池关闭前执行一些代码进行监控。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class TPE extends ThreadPoolExecutor {
	// 记录Runnable任务起始执行时间
  private ConcurrentHashMap<Integer, Long> beginTimeMaps = new ConcurrentHashMap<>();
  
  @Override
  protected void beforeExecute(Thread t, Runnable r) {
    // 设置起始时间
    beginTimeMaps.put(r.hashCode(), new Date().getTime());
  }
  
  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    // 获取起始时间  
    Long begin = beginTimeMaps.remove(r.hashCode());
    Long end = new Date().getTime();
    System.out.println(end-begin);
  }
  
  @Override
  protected void terminated() {
    // code
  }

  @Override
  public void shutdown() {
    // code
    super.shutdown();
  }

  @Override
  public List<Runnable> shutdownNow() {
    // code
    return super.shutdownNow();
  }
}

2 线程池原理

2.1 线程池状态及线程数
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
// 运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 工作线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 运行状态和工作线程数
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池控制状态,ctl是一个原子integer包含两个字段:workerCount线程池内有效线程数量,runState线程池状态。高3位存储runState,低29位存储workerCount

线程池的状态如下:

  • RUNNING:接受新任务,并且能够处理队列任务
  • SHUTDOWN:不接收新任务,但能处理队列任务
  • STOP:不接收新任务,不处理队列任务
  • TIDYING:所有任务都已终止,workCount为0,线程进入该状态后会调用terminated()钩子函数
  • TERMINATED:terminated()函数已经调用完毕
2.2 关键方法
2.2.1 execute方法代码

在将来的某个时刻执行给定的任务,该任务可能被新线程执行也可能被线程池中已存在的线程执行。如果无法提交任务执行,因为执行器已经关闭或者达到最大容量,则该任务由当前的RejectedExecutionHandler处理。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  /*
   * Proceed in 3 steps:
   *
   * 1. If fewer than corePoolSize threads are running, try to
   * start a new thread with the given command as its first
   * task.  The call to addWorker atomically checks runState and
   * workerCount, and so prevents false alarms that would add
   * threads when it shouldn't, by returning false.
   *
   * 2. If a task can be successfully queued, then we still need
   * to double-check whether we should have added a thread
   * (because existing ones died since last checking) or that
   * the pool shut down since entry into this method. So we
   * recheck state and if necessary roll back the enqueuing if
   * stopped, or start a new thread if there are none.
   *
   * 3. If we cannot queue task, then we try to add a new
   * thread.  If it fails, we know we are shut down or saturated
   * and so reject the task.
   */
  // 获取线程池的runState和workerCount
  int c = ctl.get();
  // 若当前线程数小于核心线程数;则新建一个Worker并执行当前任务
  if (workerCountOf(c) < corePoolSize) {
    // 新增线程(command, core) core表示新增的是否为核心线程
    if (addWorker(command, true))
      return;
    // 添加核心线程失败,重新获取线程池状态
    c = ctl.get();
  }
  // 若当前线程数不小于核心线程数并且处于运行状态,则将任务放到阻塞队列中
  if (isRunning(c) && workQueue.offer(command)) {
    // 添加队列成功后,再次获取线程池状态
    int recheck = ctl.get();
    // 线程池不处于运行状态,并且成功移除任务。则执行拒绝策略
    if (! isRunning(recheck) && remove(command))
      // 执行拒绝策略
      reject(command);
    else if (workerCountOf(recheck) == 0)
      // 如果工作线程为0(线程池无线程),则新建一个无任务的线程
      addWorker(null, false);
  }
  // 非运行状态或放队列失败时,直接拒绝策略
  else if (!addWorker(command, false))
    // 执行拒绝策略
    reject(command);
}
2.2.2 addWorker方法

检查是否可根据当前线程池状态以及给定的边界(核心线程或最大线程)创建新的worker。firstTask为新线程需先执行的任务,如果为null的话则不执行。core如果为true则使用corePoolSize作为边界界定条件,为false则使用maximumPoolSize作为边界界定条件。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private boolean addWorker(Runnable firstTask, boolean core) {
  retry:
  for (;;) {
    // 线程池状态及线程池运行状态
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    // 当rs>=SHUTDOWN时 即SHUTDOWN(0)、STOP(1)、TIDYING(2)、TERMINATED(3),此状态不再接受新任务
    // 当rs=SHUTDOWN时,此时可创建线程条件如下:
    // 1. rs==SHUTDOWN,此时不再接受新任务,但可
    // 2. firstTask为空,不可再继续提交任务
    // 3. !workQueue.isEmpty(),为空的话则不再需要新线程
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
      return false;

    for (;;) {
      // 当前工作线程数
      int wc = workerCountOf(c);
      // 当线程数达到CAPACITY,
      // 或者core为true时达到核心线程数,
      // 或者core为false时达到最大线程数,
      // 不再创建新线程
      if (wc >= CAPACITY ||
          wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
      // 使用CAS增加workerCount
      if (compareAndIncrementWorkerCount(c))
        // 成功修改workerCount,跳出最外层for循环
        break retry;
      
      // 重新获取线程池状态
      c = ctl.get();  // Re-read ctl
      // 线程池状态已被修改,继续外传for循环
      if (runStateOf(c) != rs)
        continue retry;
      // else CAS failed due to workerCount change; retry inner loop
    }
  }

  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    // 根据firstTask创建新worker
    w = new Worker(firstTask);
    // 拿到当前worker的线程
    final Thread t = w.thread;
    if (t != null) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        // Recheck while holding lock.
        // Back out on ThreadFactory failure or if
        // shut down before lock acquired.
        int rs = runStateOf(ctl.get());

        // rs<SHUTDOWN即为RUNNING,线程池处于运行状态
        // rs==SHUTDOWN时,因为可以继续执行队列中的任务,故允许添加无任务的worker
        if (rs < SHUTDOWN ||
            (rs == SHUTDOWN && firstTask == null)) {
          if (t.isAlive()) // precheck that t is startable
            throw new IllegalThreadStateException();
          workers.add(w);
          int s = workers.size();
          // 更新getLargestPoolSize,线程池中出现过最大线程数
          if (s > largestPoolSize)
            largestPoolSize = s;
          workerAdded = true;
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        // worker添加成功,启动线程
        t.start();
        workerStarted = true;
      }
    }
  } finally {
    // worker启动失败,则roll back cleanly.
    if (! workerStarted)
      addWorkerFailed(w);
  }
  return workerStarted;
}

t.start()即为worker.thread.start()Worker类本身实现了Runnable接口,在Worker初始化时,会执行this.thread = getThreadFactory().newThread(this);。也就是t.start()-->worker.run()

2.2.3 Worker类

线程池中的线程都会被封装成Worker实例,Worker继承于AbstractQueuedSynchronizer(AQS),实现了Runnalbe接口。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private final class Worker
  extends AbstractQueuedSynchronizer
  implements Runnable
{
  /**
   * This class will never be serialized, but we provide a
   * serialVersionUID to suppress a javac warning.
   */
  private static final long serialVersionUID = 6138294804551838833L;

  /** Thread this worker is running in.  Null if factory fails. */
  // ThreadFactory.newThread创建的线程实例
  final Thread thread;
  /** Initial task to run.  Possibly null. */
  // 要运行的初始任务,可能为空
  Runnable firstTask;
  /** Per-thread task counter */
  // 每个线程都会记录的完成任务数
  volatile long completedTasks;

  /**
   * Creates with given first task and thread from ThreadFactory.
   * @param firstTask the first task (null if none)
   */
  Worker(Runnable firstTask) {
    // 禁止中断,直到runWorker方法执行
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // 通过ThreadFactory创建线程实例,Worker自身作为Runnable来创建线程实例
    this.thread = getThreadFactory().newThread(this);
  }

  /** Delegates main run loop to outer runWorker  */
  // t.start() 将会调用该方法
  public void run() {
    // 执行任务核心方法
    runWorker(this);
  }

  // Lock methods
  //
  // The value 0 represents the unlocked state.
  // The value 1 represents the locked state.

  protected boolean isHeldExclusively() {
    return getState() != 0;
  }

  protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
      setExclusiveOwnerThread(Thread.currentThread());
      return true;
    }
    return false;
  }

  protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
  }

  public void lock()        { acquire(1); }
  public boolean tryLock()  { return tryAcquire(1); }
  public void unlock()      { release(1); }
  public boolean isLocked() { return isHeldExclusively(); }

  void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
        t.interrupt();
      } catch (SecurityException ignore) {
      }
    }
  }
}
2.2.4 runWorker方法

主工作线程循环,不断的从队列获取任务并且执行他们。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  // unlock 将state由-1置为0。
  w.unlock(); // allow interrupts
  boolean completedAbruptly = true;
  try {
    // 初始任务不为null,或者从队列中获取到了任务
    while (task != null || (task = getTask()) != null) {
      // worker加锁
      w.lock();
      // If pool is stopping, ensure thread is interrupted;
      // if not, ensure thread is not interrupted.  This
      // requires a recheck in second case to deal with
      // shutdownNow race while clearing interrupt
      // 若线程池正在停止,要保证当前线程是中断状态
      // 若不是的话,则要保证当前线程不是中断状态
      if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() &&
            runStateAtLeast(ctl.get(), STOP))) &&
          !wt.isInterrupted())
        wt.interrupt();
      try {
        // 钩子函数,任务执行前执行
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
          // 执行任务 =======>
          task.run();
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          // 钩子函数,任务执行后执行
          afterExecute(task, thrown);
        }
      } finally {
        // 任务执行完,清空task任务
        task = null;
        // 已完成任务+1
        w.completedTasks++;
        // worker解锁
        w.unlock();
      }
    }
    // 线程正常退出
    completedAbruptly = false;
  } finally {
    // 线程退出 completedAbruptly标识正常退出/非正常退出
    processWorkerExit(w, completedAbruptly);
  }
}

Thread.interrupted()判断线程是否中断,同时复位中断状态。

runWorker()方法的执行流程:

  1. while循环调用getTask()方法,从任务队列中获取任务(Worker新建时可能有firstTask)
  2. 若线程池正在停止,要保证当前线程是中断状态,否则要保证当前线程不是中断状态
  3. 调用task.run()执行任务
  4. 若task为null则退出循环,执行processWorkerExit
  5. runWorker执行完,即代表Worker中run方法执行完毕,销毁线程
2.2.5 getTask方法

根据当前配置,阻塞或者超时等待任务。发生以下情况时,会返回null。task为null时,则销毁线程。

  1. 当前线程池线程数超过最大线程数(maximumPoolSize)。调用setMaximumPoolSize方法。
  2. 线程池处于STOP状态。调用了shutdownNow方法。
  3. 线程池处于SHUTDOWN状态,并且workQueue队列为空。
  4. 当前worker获取任务等待超时(有可能是allowCoreThreadTimeOutworkerCount > corePoolSize)。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private Runnable getTask() {
  boolean timedOut = false; // Did the last poll() time out?

  for (;;) {
    // 获取当前线程池状态、运行状态
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    // rs>=STOP,线程池调用`shutdownNow`后,不再处理新任务
    // rs>=SHUTDOWN,workQueue消费完后,不再接受新任务
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      decrementWorkerCount();
      return null;
    }

    // 当前工作线程数
    int wc = workerCountOf(c);

    // Are workers subject to culling?
    // 是否存在超时校验标识
    // `allowCoreThreadTimeOut`允许核心线程超时 或 工作线程数大于核心线程数
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    // wc>maximumPoolSize,可能调用了`setMaximumPoolSize`,修改了最大线程数
    // timed&&timedOut,当前线程需要进行超时控制,并且上次发生了超时
    // 线程数大于1,或者阻塞队列为空
    if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
      // 尝试扣减工作线程
      if (compareAndDecrementWorkerCount(c))
        return null;
      continue;
    }

    try {
      // 是否需要进行超时判断 timed
      // 若为true,则需要进行超时判断,通过阻塞队列的poll方法来进行超时控制,超时则返回null
      // 若为false,则通过take获取,阻塞队列直到workQueue不为空
      Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
      workQueue.take();
      if (r != null)
        return r;
      // r为null,已经超时,设置标识位
      timedOut = true;
    } catch (InterruptedException retry) {
      // 发生了中断,未发生超时,设置标识位
      timedOut = false;
    }
  }
}

getTask方法返回null时,跳出循环,然后执行processWorkerExit方法进行退出。

2.2.6 processWorkerExit方法
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void processWorkerExit(Worker w, boolean completedAbruptly) {
  // completedAbruptly是否出现异常,将workerCount减1
  if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();

  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    // 已完成任务数
    completedTaskCount += w.completedTasks;
    workers.remove(w);
  } finally {
    mainLock.unlock();
  }

  tryTerminate();

  int c = ctl.get();
  // 若线程处于RUNNING或SHUTDOWN状态时,若worker异常结束,则直接添加空任务worker
  if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
      // 允许核心线程超时时,如果阻塞队列不为空,则至少保留一个worker
      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
      if (min == 0 && ! workQueue.isEmpty())
        min = 1;
      // 若不允许核心线程池超时,则workerCount不少于corePoolSize
      if (workerCountOf(c) >= min)
        return; // replacement not needed
    }
    addWorker(null, false);
  }
}
2.2.7 shutdown方法
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void shutdown() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    // 安全策略校验
    checkShutdownAccess();
    // 切换状态为SHUTDOWN
    advanceRunState(SHUTDOWN);
    // 中断空闲线程
    interruptIdleWorkers();
    // 钩子
    onShutdown(); // hook for ScheduledThreadPoolExecutor
  } finally {
    mainLock.unlock();
  }
  // 尝试结束线程池
  tryTerminate();
}
2.2.8 shutdownNow方法
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public List<Runnable> shutdownNow() {
  List<Runnable> tasks;
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    checkShutdownAccess();
    advanceRunState(STOP);
    // 中断所有线程(空闲、非空闲)
    interruptWorkers();
    // 取出队列中所有未被执行的任务
    tasks = drainQueue();
  } finally {
    mainLock.unlock();
  }
  tryTerminate();
  return tasks;
}

3 几种常见线程池

3.1 Exectors线程池

Executors提供了几个静态方法来创建线程池。

3.1.1 newFixedThreadPool
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}

核心线程数量和总线程数相同,都是传入的参数nThreads,所以只能创建核心线程。因为LinkedBlockingQueue的默认大小是Integer.MAX_VALUE,故核心线程空闲则由其处理,否则入队等待直到核心线程空闲。keepAliveTime设置为0L,多余的线程将会被立即停止。

适用于为满足资源管理需求,需要限制当前线程数量的应用场景,适用于负载比较高的服务器

3.1.2 newSingleThreadExecutor
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
}

核心线程数和最大线程数都为1,使用无界队列。任意时刻最多只有一个线程执行任务,多余任务会被缓冲至队列中。

适用于保证顺序的执行各任务;并且在任意时间点,不会有多线程是活动的应用场景。

3.1.3 newCachedThreadPool
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
}

核心线程数为0,最大线程数为Integer.MAX_VALUEkeepAliveTime为60秒,空闲线程超时将会被终止。阻塞队列SynchronousQueue是一个没有容量的阻塞队列,插入数据时必须等待一个线程来获取数据、否则就会阻塞。没有了队列的缓冲,提交的任务会被尽快的分配线程执行。

适用于很多短期的异步任务的小程序,或者是负载较轻的服务器。

3.1.4 newScheduledThreadPool
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
        new DelayedWorkQueue());
}

支持定时或周期性的任务执行。阻塞队列使用DelayedWorkQueue

适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程数量的应用场景。

3.2 Exectors弊端
  • newFixedThreadPoolnewSingleThreadExecutor:阻塞队列使用LinkedBlockingQueue,其默认容量为Integer.MAX_VALUE,若任务处理较慢,则会引起消息堆积问题,消耗大量内存甚至触发OOM。
  • newCachedThreadPoolnewScheduledThreadPool:最大线程数为Integer.MAX_VALUE,可能会创建很多的线程,甚至导致OOM。

4 线程池注意事项

  • 自定义线程工厂ThreadFactory,指定有意义的线程名称,方便出错时回溯。
  • 使用Exectors时,避免出现任务堆积线程堆积情况。最好使用ThreadPoolExecutor显示的创建线程池。
  • 若线程池中使用到ThreadLocal,必须主动回收。
  • 最好有有效的监控、日志等记录信息。方便异常处理。
  • 线程池大小设置要根据任务类型进行设置,根据任务运行情况、系统负载、资源利用率进行调整。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
基于质谱的蛋白质组学在加速药物发现中的新角色
蛋白质是大多数药物的靶点,目前,通过将新的生化方法与基于质谱的蛋白质组学相结合,能够从全新的维度对疾病表型及其生物活性分子的调节机制进行剖析。最近,德国慕尼黑马克斯普朗克生化研究所(Max Planck Institute of Biochemistry)等研究团队在Nature Reviews Drug Discovery上发表了题为The emerging role of mass spectrometry-based proteomics in drug discovery的综述文章。
智药邦
2022/11/16
6380
基于质谱的蛋白质组学在加速药物发现中的新角色
前瞻 | 溶质载体 (SLC):代谢的看门人 | Cell
◉ 图1. 研究SLCs多功能性的挑战,多功能性是指转运蛋白能够识别和运输多种底物的能力;冗余现象,当多种转运蛋白共享重叠的底物特异性时的现象;亚细胞定位的复杂性,在细胞器/质膜上的定位。◉ ,
生信菜鸟团
2025/02/27
2360
前瞻 | 溶质载体 (SLC):代谢的看门人 | Cell
干货分享 | 活性氧 ROS 检测攻略大全!| MedChemExpress (MCE)
【定义】: 活性氧 (Reactive oxygen species, ROS),细胞正常代谢的副产物,氧的部分还原代谢产物,是源自 O2 且比 O2 本身更活泼的物质的统称[1][2]。 【组成】: ROS 不仅包括超氧自由基阴离子 (O2•−) 和一些其他氧自由基,还包括一些 O2 的非自由基衍生物,如过氧化氢 (H2O2)、次氯酸 (HOCl) 和过氧亚硝酸盐/过氧亚硝酸 (ONOO−/ONOOH)。
MedChemExpress
2024/06/14
1840
干货分享 | 活性氧 ROS 检测攻略大全!| MedChemExpress (MCE)
铜死亡丨解锁细胞死亡新方式 - MedChemExpress
细胞死亡的相关研究一直是生命科学领域的热点。依据发生机制的不同 (起始的刺激、中间过程的激活和末端的效应器的不同) ,细胞的死亡可以区分为不同的方式,常见的有细胞凋亡、焦亡、坏死、铁死亡等。
MedChemExpress
2023/01/09
7670
铜死亡丨解锁细胞死亡新方式 - MedChemExpress
转录组代谢组蛋白质组解密如何延长寿命
我前面的教程:转录和蛋白水平的表达量相关性如何,解读了一个很有意思的文献, 它包含着转录组数据,蛋白质组和代谢组,而且数据是可以下载进行图表复现。恰好有学徒进行到了蛋白质组研究,还在抱怨我们《生信技能树》平台的相关教程太少了,我就趁机邀请她来做这方面知识整理,先从文献解读开始!
生信技能树
2021/11/04
8820
转录组代谢组蛋白质组解密如何延长寿命
【生信文献200篇】33 sirt6和sirt1调控昼夜节律
英文标题: Partitioning circadian transcription by SIRT6 leads to segregated control of cellular metabolism
生信菜鸟团
2021/05/24
1.7K0
【生信文献200篇】33 sirt6和sirt1调控昼夜节律
细胞死亡途径指南 | Nature Reviews Molecular Cell Biology
生信菜鸟团
2024/12/27
2070
细胞死亡途径指南 | Nature Reviews Molecular Cell Biology
面向转录组测序数据分析和机器学习方法的植物生物信息学应用新趋势
分析植物适应环境变化和胁迫反应的分子机制对植物生物技术至关重要。其中关键方法包括生物信息学方法、高通量测序和后基因组技术。测序和系统生物学方法提供了从分子到细胞、器官和种群水平的植物生长的全面视图。基因组学和生物信息学促进了植物细胞中蛋白质-蛋白质和基因调控相互作用的建模,为更好的作物生产和可持续性提供了基础。植物-病原体相互作用研究补充了这一领域的网络建模。
生信菜鸟团
2024/07/10
2250
面向转录组测序数据分析和机器学习方法的植物生物信息学应用新趋势
综述 | 肿瘤与宿主之间的代谢互作塑造了肿瘤的大环境 | Nat.Rev.Cancer
◉ 宿主因素、肿瘤、其微环境和大环境之间的交流途径:肿瘤细胞固有的遗传改变和局部环境(包括免疫细胞、组织驻留细胞和营养可用性)影响肿瘤细胞内的代谢过程(微环境)。◉ 这种微环境也受到宿主状态和行为的影响,包括与衰老相关的改变、饮食、运动、慢性药物(如二甲双胍和他汀类药物)、代谢紊乱、遗传以及宿主的微生物组组成。◉ 相反,癌症和宿主因素通过主要在肝脏、大脑和免疫系统中诱导变化来影响全身代谢状况,这些变化反过来又导致全身性的变化,最终可能导致癌症恶病质。◉ 此外,肿瘤可以通过释放分子成分、细胞因子和胞外囊泡来改变远处组织,这些有利于转移前生态位的形成。◉ 总体而言,这些过程调节了肿瘤的大环境,影响了癌症的发展进程。
生信菜鸟团
2025/04/11
1030
综述 | 肿瘤与宿主之间的代谢互作塑造了肿瘤的大环境 | Nat.Rev.Cancer
科研助攻 | 蛋白降解:AUTOTAC、第一代、第二代 AUTAC? 你分清了吗? | MedChemExpress (MCE)
在蛋白降解技术中,基于溶酶体系统开发的靶向蛋白降解技术有哪些?AUTOTAC、第一代、第二代 AUTAC? 你能分清吗?
MedChemExpress
2024/11/25
1090
科研助攻 | 蛋白降解:AUTOTAC、第一代、第二代 AUTAC? 你分清了吗? | MedChemExpress (MCE)
MCE | Nrf2 的“戏精”之路
外界刺激(如药物、紫外线和电离辐射)和内源性自由基和活性氧(ROS)会直接或者间接地损伤蛋白质、脂质和 DNA 等细胞成分,为了抵御这些不利影响,机体形成了一套复杂的氧化应激应答系统来缓解细胞所受的损害。而 Nrf2,作为调控抗氧化应激的一种关键转录因子,在诱导机体的抗氧化应答中起着重要作用,如调节氧化还原平衡、药物代谢和排泄、能量代谢、铁代谢、氨基酸代谢、生存、增殖、自噬、蛋白酶体降解、DNA 修复和线粒体生理机能。另外,Keap1-Nrf2 系统已成为癌症和神经退行性疾病以及许多自身免疫和炎性疾病的重要治疗靶点。
MedChemExpress
2023/03/22
4810
MCE | Nrf2 的“戏精”之路
单细胞时代|| 单细胞代谢组学之免疫代谢
这不是最好的时代,也不是最坏的时代,这里是单细胞时代。灵活的单细胞系统,高效的组织解离液,开源的数据分析工具,端到端的单细胞解决方案是未来发展的趋势。这里最主要的是开放灵活的单细胞系统,有了这个系统我们就可以自主地设计反应体系,来从不同纬度捕获单个细胞的信息。
生信技能树jimmy
2021/03/10
3.2K0
单细胞时代|| 单细胞代谢组学之免疫代谢
双硫死亡 | 发现的细胞 “新” 型死亡方式-MedChemExpress
SLC 家族成员 SLC7A11 转运蛋白在维持细胞内谷胱甘肽水平和保护细胞免受氧化应激诱导的细胞死亡方面具有重要作用,具有公认的促生存作用[3]。但有研究表明,胶质母细胞瘤细胞在葡萄糖剥夺条件下,通过 System Xc- (其中 SLC7A11 为催化亚基) 摄取胱氨酸会迅速诱导 NADPH 耗竭、活性氧物质积累和细胞死亡。
MedChemExpress
2023/03/29
5860
双硫死亡 | 发现的细胞 “新” 型死亡方式-MedChemExpress
徐峻|人工智能与药物设计学:新范式探索 (5)
随着信息技术的不断发展,药物设计方法学的新概念、新方法和新思路持续更新,药物发现范式也与时俱进。人工智能作为新工具,已应用于药物发现过程的多个方面,引起了制药行业的高度关注,也带来了对药物发现科学理论和方法学的新思考和新探索。
智药邦
2022/04/13
7520
徐峻|人工智能与药物设计学:新范式探索 (5)
Nat. Rev. Chem. | 检测和分析手性分子作为疾病生物标志物
今天为大家介绍的是来自Zilong Wu , Daniel W. Armstrong, Herman Wolosker & Yuebing Zheng团队的一篇论文。小型代谢分子的手性在控制生理过程和指示人类健康状况方面非常重要。在多种疾病中,包括癌症、肾脏和脑部疾病,生物体液和组织中手性分子的对映异构体比例会发生异常。因此,手性小分子是疾病诊断、预后、不良药物反应监测、药效学研究和个性化医疗的有前景的生物标志物。然而,由于这些小型手性分子种类繁多、浓度低,要在临床程序中实现成本效益高且可靠的分析仍然困难。
DrugAI
2024/03/04
8500
Nat. Rev. Chem. | 检测和分析手性分子作为疾病生物标志物
饿死癌细胞?还是先看看肿瘤中的异常代谢的特征分析和背后的遗传与环境互作吧!
本文授权转载自 “生物狗窝,很有深度的公众号”,讲述了肿瘤中异常的代谢类型,代谢重编程与基因突变和肿瘤微环境的关系,以及如何靶向这些突变来实现癌症治疗的研究。原文共三篇,读完一篇,感觉意犹未尽,一下读完三篇,很好的综述研究。故此合并一起,值得好好阅读、收藏。
生信宝典
2018/09/21
3K0
饿死癌细胞?还是先看看肿瘤中的异常代谢的特征分析和背后的遗传与环境互作吧!
Drug Discovery Today| 频繁命中化合物:高通量筛选中需警惕的假阳性结果
今天给大家介绍的是2020年1月在Drug Discovery Today上发表的综述“Frequent hitters: nuisance artifacts in high-throughput screening”。高通量筛选是药物研发的一个重要手段,然而研究中发现一些化合物在不同类型靶点筛选中均表现出阳性结果,这类化合物称为“频繁命中化合物”(Frequent hitters)。本综述针对频繁命中化合物的主要分类:胶体聚集化合物(Aggregators)、光学干扰物(Spectroscopic interference compounds)、化学易反应化合物(Chemical reactive compounds)和混乱成键化合物(Promiscuous compounds)进行简单介绍,同时对现常用的模型或规则和其未来发展方向进行讨论分析。
DrugAI
2021/01/29
1K0
铁死亡,究竟该如何检测?- MedChemExpress
铁死亡是一种铁依赖性的,区别于细胞凋亡、细胞自噬的细胞程序性死亡 (programmed cell death,PCD) 方式,依赖于铁介导的氧化损伤,铁积累的增加,自由基的产生,脂肪酸供应与脂质过氧化物增加是诱导铁死亡的关键。
MedChemExpress
2022/12/28
7210
铁死亡,究竟该如何检测?- MedChemExpress
神刊综述 | CA:代谢组学在癌症研究和临床肿瘤学中的新应用
代谢组学是对小分子代谢物的全局分析,可以提供关于癌症状态的关键信息。2021年5月神刊《CA: a cancer journal for clinicians》发表了一篇综述文章,概述了代谢组学在改进癌症诊断、监测和治疗方面当前和未来的机遇。
尐尐呅
2022/04/01
4.8K0
神刊综述 | CA:代谢组学在癌症研究和临床肿瘤学中的新应用
程序性细胞坏死?细胞“铁死亡(Ferroptosis)”的那些事。
细胞死亡是一切细胞的宿命,细胞死亡的方式也是生物医学领域的重大问题,这方面现在已经比较成熟的研究包括细胞凋亡、细胞自噬、细胞坏死,其中细胞坏死是最新研究的热点,过去曾经认为细胞坏死都是非程序性的,或者是意外发生的。现在发现细胞坏死不仅存在程序或调节,而且存在许多不同类型的细胞坏死类型。例如程序性细胞坏死,程序性铁坏死等。其中铁坏死是近年来才发现的程序性细胞坏死类型,这种坏死的特点是依赖铁离子,是一种氧化损伤类的细胞坏死。
Mark Chen
2020/11/02
6.4K0
程序性细胞坏死?细胞“铁死亡(Ferroptosis)”的那些事。
推荐阅读
相关推荐
基于质谱的蛋白质组学在加速药物发现中的新角色
更多 >
LV.1
贵州大学博士
目录
  • 1 线程池的使用
    • 1.1 线程池的创建
    • 1.2 提交任务
    • 1.3 线程池的关闭
    • 1.4 线程池的配置
    • 1.5 线程池的监控
  • 2 线程池原理
    • 2.1 线程池状态及线程数
    • 2.2 关键方法
      • 2.2.1 execute方法代码
      • 2.2.2 addWorker方法
      • 2.2.3 Worker类
      • 2.2.4 runWorker方法
      • 2.2.5 getTask方法
      • 2.2.6 processWorkerExit方法
      • 2.2.7 shutdown方法
      • 2.2.8 shutdownNow方法
  • 3 几种常见线程池
    • 3.1 Exectors线程池
      • 3.1.1 newFixedThreadPool
      • 3.1.2 newSingleThreadExecutor
      • 3.1.3 newCachedThreadPool
      • 3.1.4 newScheduledThreadPool
    • 3.2 Exectors弊端
  • 4 线程池注意事项
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档