前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >通过了解RejectedExecutionException来分析ThreadPoolExecutor源码

通过了解RejectedExecutionException来分析ThreadPoolExecutor源码

作者头像
用户2032165
发布2018-10-09 15:59:01
7000
发布2018-10-09 15:59:01
举报
文章被收录于专栏:cmazxiaoma的架构师之路

观看本文章之前,最好看一下这篇文章熟悉下ThreadPoolExecutor基础知识。

1.关于Java多线程的一些常考知识点

2.看ThreadPoolExecutor源码前的骚操作


讲解本篇文章从下面一个例子开始,test1()和test2()方法都会抛出RejectedExecutionException异常,ThreadPoolExecutor默认的拒绝任务策略是AbortPolicy。test1()中线程池中corePoolSize和maximumPoolSize都为2,阻塞队列的长度是10,线程池最多能处理12个任务。当超过12个任务时,就会拒绝新的任务,抛出RejectedExecutionException。而test2()中的任务没有超过线程池的阀值,但是在线程池调用shutdown()后,线程池的状态会变成shutdown,此时不接收新任务,但会处理正在运行的任务和在阻塞队列中等待处理的任务。所以我们在shutdown()之后再调用submit(),会抛出RejectedExecutionException异常。有了这个例子的基础,我们再来分析源码,会好过一点。

代码语言:javascript
复制
/**
 * @author cmazxiaoma
 * @version V1.0
 * @Description: 分析抛出RejectedExecutionException问题
 * @date 2018/8/16 14:35
 */
public class RejectedExecutionExceptionTest {

    public static void main(String[] args) {
//        test1();
        test2();
    }

    /**
     * 提交的任务数量超过其本身最大能处理的任务量
     */
    public static void test1() {
        CustomThreadPoolExecutor customThreadPoolExecutor =
                new CustomThreadPoolExecutor(2, 2,
                        0L,
                        TimeUnit.SECONDS,
                        new ArrayBlockingQueue<Runnable>(10));

        for (int i = 0; i < 13; i++) {
            CustomThreadPoolExecutor.CustomTask customTask
                    = new CustomThreadPoolExecutor.CustomTask(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(60 * 60);
                        System.out.println("线程" + Thread.currentThread().getName()
                                + "正在执行...");
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }, "success");

            if (i == 12) {
                // throw RejectedExectionException
                customThreadPoolExecutor.submit(customTask);
            } else {
                customThreadPoolExecutor.submit(customTask);
            }
        }
        customThreadPoolExecutor.shutdown();
    }

    /**
     * 当线程池shutdown()后,会中断空闲线程。但是正在运行的线程和处于阻塞队列等待执行的线程不会中断。
     * shutdown(),不会接收新的线程。
     */
    public static void test2() {
        CustomThreadPoolExecutor customThreadPoolExecutor =
                new CustomThreadPoolExecutor(2, 2,
                        0L,
                        TimeUnit.SECONDS,
                        new ArrayBlockingQueue<Runnable>(10));

        for (int i = 0; i < 2; i++) {
            CustomThreadPoolExecutor.CustomTask customTask
                    = new CustomThreadPoolExecutor.CustomTask(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(60 * 60);
                        System.out.println("线程" + Thread.currentThread().getName()
                                + "正在执行...");
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }, "success");
            customThreadPoolExecutor.submit(customTask);
        }
        customThreadPoolExecutor.shutdown();

        CustomThreadPoolExecutor.CustomTask customTask
                = new CustomThreadPoolExecutor.CustomTask(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(60 * 60);
                    System.out.println("线程" + Thread.currentThread().getName()
                            + "正在执行...");
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
        }, "success");

        customThreadPoolExecutor.submit(customTask);
    }

}

源码分析

线程池执行过程

关于线程池执行过程,我们看下面一幅图,就能明白个大概。

1.当线程池中的线程数量小于corePoolSize,就会创建新的线程来处理添加的任务直至线程数量等于corePoolSize。

2.当线程池中的线程数量大于等于corePoolSize且阻塞队列(workQueue)未满,就会把新添加的任务放到阻塞队列中。

3.当线程池中的线程数量大于等于corePoolSize且阻塞队列满了,就会创建线程来处理添加的任务直到线程数量等于maximumPoolSize

4.如果线程池的数量大于maximumPoolSize,会根据RejectedExecutionHandler策略来拒绝任务。AbortPolicy就是其中的一种拒绝任务策略。

线程池执行过程(图来自于网络).png


submit

submit()相比于execute()而言,多了RunnableFuture<Void> ftask = newTaskFor(task, null);这一步,把task包装成RunnableFuture类型的ftask。所以submit()有返回值,返回值类型是Future<?>,可以通过get()获取线程执行完毕后返回的值。还可以通过isDone()isCancelled()cancel(boolean mayInterruptIfRunning)这些方法进行某些操作。比如判断线程是否执行完毕、判断线程是否被取消,显式取消启动的线程的操作。

代码语言:javascript
复制
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

execute

线程池去处理被提交的任务,很明显通过execute()方法提交的任务必须要实现Runnable接口。

我们来仔细看下execute()注释,发现它说到:如果任务不能被成功提交得到执行,因为线程池已经处于shutdown状态或者是任务数量已经达到容器上限,任务会被RejectedExecutionHandler处理进行拒绝操作。很明显,注释已经告诉上文抛出RejectedExecutionException异常的答案了。有时候真的要仔细看注释!!!多看注释,事半功倍。

我们来看execute()中做了什么操作。

1.获取线程池的状态,如果线程池中的线程数量小于corePoolSize,调用addWorker(command, true)创建新的线程去处理command任务。如果addWorker()返回失败,我们再次获取线程池的状态。因为addWorker()失败的原因可能有:线程池已经处于shutdown状态不接收新的任务或者是存在并发,在workerCountOf(c) < corePoolSize这块代码后,有其他的线程创建了worker线程,导致worker线程的数量大于等于corePoolSize

2.如果线程池的数量大于等于corePoolSize,且线程池的状态处于RUNNING状态,我们将任务放到阻塞队列中。当任务成功放入阻塞队列中,我们仍然需要一个双重校验的机制去判断是否应该创建新的线程去处理任务。

因为会存在这些情况:有些线程在我们上次校验后已经死掉、线程池在上次校验后突然关闭处于shutdown状态。考虑到这些原因,我们必须再次校验线程池的状态。如果线程池的状态不处于RUNNING状态,那么就行回滚操作,把刚才入队的任务移除掉,后续通过reject(command)执行拒绝任务策略。

如果线程池处于RUNNING状态且线程池中线程数量等于0或者从阻塞队列中删除任务失败(意味着:这个任务已经被其他线程处理掉了)且线程池中线程数量等于0,那么调用addWorker(null, false)新建一个worker线程,去消费workQueue中里面的任务

3.如果线程池不处于RUNNING状态或者任务无法成功入队(此时阻塞队列已经满了),此时需要创建新的线程扩容至maximumPoolSize。如果addWorker(command, false)返回false,那么通过reject(command)执行拒绝任务策略。

这里再唠叨几句,调用addWorker()有这4种传参的方式,适用于不同场景。

1.addWorker(command, true)当线程池中的线程数量少于corePoolSize,会把command包装成worker并且放入到workers集合中。如果线程池中的线程数量超过了corePoolSize,会返回false。

2.addWorker(command, false)当阻塞队列满了,同样会把command包装成worker并且放入到worker集合中。如果线程池中的线程数量超过了maximumPoolSize,会返回false。

3.addWorker(null, false)说明firstTask是个空任务,同样把它包装成worker并且放入到worker集合中。如果线程池中的数量超过了maximumPoolSize,会返回false。这样firstTask为空的worker在线程执行的时候,也可以从阻塞队列中获取任务去处理。

4.addWorker(null, true):和上面一样,只是线程池的线程数量限制在corePoolSize,超过也是返回false。使用它的有prestartAllCoreThreads()prestartCoreThread()这2个方法,其使用目的是预加载线程池中的核心线程。

代码语言:javascript
复制
    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

addWorker

addWorker()主要是创建新的线程,然后执行任务。

1.首先判断线程池的状态是否满足创建worker线程的要求。

如果线程池的状态大于SHUTDOWN状态,那么此时处于STOP、TIDYING、TERMINATE状态,不能创建worker线程,返回false。

如果线程池处于shutdown状态且firstTask不等于null,此时也无法创建worker线程。因为处于shutdown状态的线程池不会去接收新的任务。

如果线程池处于shutdown状态且firstTask等于null且workQueue阻塞队列为空,此时就更没有必要创建worker线程了。因为firstTask为null,就是为了创建一个没有任务的worker线程去阻塞队列里面获取任务。而阻塞队列都已经为空,那么再创建一个firstTask为null的worker线程显然没有什么意思,返回false即可。

  1. 判断线程池中的线程数量是否超过最大值。当core为true,最大值为corePoolSize。当core为false,最大值为maximumPoolSize。如果超过最大值,也无法创建worker线程,直接返回false即可。如果没有超过最大值,通过CAS操作让当前线程数加1,然后通过标签跳转跳出循环体至retry:位置。如果CAS操作失败,说明workerCount被其他线程修改过。我们再次获取ctl,判断当前线程池状态和之前的状态是否匹配。如果不匹配,说明线程池状态发生变更,继续循环操作。

3.通过传入来的firstTask创建worker线程。Worker的构造方法中通过setState(-1)设置state(同步状态)为-1。Worker继承了AbstractQueuedSynchronizer,其本身是一把不可重入锁。getThreadFactory().newThread(this)创建新线程,因为Worker实现了Runnable接口,其本身也是一个可执行的任务。

代码语言:javascript
复制
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

4.我们往workers添加worker线程时,通过ReentrantLock保证线程安全。只有在当前线程池处于RUNNING状态或者是处于SHUTDOWN状态且firstTask等于null的情况下,才可以添加worker线程。如果worker线程已经处于启动且未死亡的状态,会抛出IllegalThreadStateException异常。

添加完毕后,启动worker线程。如果worker线程启动成功返回true,启动失败调用addWorkerFailed()进行回滚操作。

代码语言:javascript
复制
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
代码语言:javascript
复制
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker

我们来看下ThreadPoolExecutor的内部类Worker,上文已经说到Worker继承了AbstractQueuedSynchronizer类且实现了Runnable接口。所以说是一个可执行的任务,也是一把不可重入锁,具有排他性。

1.我们创建Worker对象时,默认的state为-1。我们中断的时候,要获取worker对象的锁(state从0 CAS到1)。获取锁成功后,才能进行中断。这说明了在初始化worker对象阶段,不允许中断。只有调用了runWorker()之后,将state置为0,才能中断。

2.shutdown()中调用interruptIdleWorkers()中断空闲线程和shutdownNow()中调用interruptWorkers()中断所有线程。

interruptIdleWorkers()中中断空闲线程的前提是要获取worker对象的锁。

代码语言:javascript
复制
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

interruptWorkers()中中断所有线程时,不用调用tryLock()获取worker对象的锁,最终是通过worker中的interruptIfStarted()来中断线程。在这个方法中只有state大于等于0且线程不等于null且线程没有被中断过,才能进行中断操作。说明只有经过了runworker()阶段才能进行中断操作。

这也是Worker为什么要设计成不可重入的原因,就是为了防止中断在运行中的任务,只会中断在等待从workQueue中通过getTask()获取任务的线程(因为他们没有上锁,此时state为0)。

以下这5种方法都会调用到interruptIdleWorkers()去中断空闲线程。

代码语言:javascript
复制
setCorePoolSize()
setKeepAliveTime(long time, TimeUnit unit)
setMaximumPoolSize(int maximumPoolSize)
shutdown()
allowCoreThreadTimeOut(boolean value)

还有一点必须强调。Task没有真正的被执行,执行的是Work线程。Work线程中只是调用到了Task中的run()方法。

代码语言:javascript
复制
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

runWorker

1.work线程启动后,会调用其run()方法。run()方法再去调用runWorker(this)方法。

2.执行任务之前,获取work线程中的task,然后释放worker的锁。让state状态从-1 CAS到0。当state为0,说明可以去中断此线程。

3.以轮询的方式通过getTask()从阻塞队列中获取task,当task为null,跳出轮询。

4.开始执行任务的时候,通过lock()获取锁,将state从0 CAS到1。任务执行完毕时,通过unlock()释放锁。

5.如果线程池处于STOP、TIDYING、TERMINATE状态,要中断worker线程。

6.通过beforeExecute(wt, task)和afterExecute(task, thrown)对task进行前置和后置处理。

7.在task.run()、beforeExecute(wt, task)、afterExecute(task, thrown)发生异常时都会导致worker线程终止。通过调用processWorkerExit(w, completedAbruptly)来进行worker退出操作。

8.在getTask()获取阻塞队列中的任务,如果队列中没有任务或者是获取任务超时,都会调用processWorkerExit(w, completedAbruptly)来进行worker退出操作。

代码语言:javascript
复制
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask

上文已经提起过getTask()方法,主要是从阻塞队列获取task的。那么条件下task会返回null呢?我们可以通过注释得到一些信息。

  • 超过了maximumPoolSize设置的线程数量,因为调用了setMaximumPoolSize()方法。
  • 线程池处于stop状态。
  • 线程池处于shutdown状态且workQueue为空.
  • 获取任务等待超时。

1.首先获取线程池运行状态,如果线程池的状态处于shutdown状态且workQueue为空,或者处于stop状态。然后调用decrementWorkerCount()递减workerCount,最后返回null。

代码语言:javascript
复制
     * Decrements the workerCount field of ctl. This is called only on
     * abrupt termination of a thread (see processWorkerExit). Other
     * decrements are performed within getTask.
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

2.allowCoreThreadTimeOut默认为false。为false的时候,核心线程即时在空闲时也会保持活跃。为true的时候,核心线程在keepAliveTime时间范围内等待工作。如果线程池的数量超过maximumPoolSize或者等待任务超时或者workQueue为空,那么直接通过CAS减少workerCount数量,返回null。

3.如果timed为true,通过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)获取task,等待时间超过了keepAliveTime还没获取到task,直接返回null。如果timed为false,通过workQueue.take()获取task。如果没有获取到task,会一直阻塞当前线程直到获取到task(当阻塞队列中加入了新的任务,会唤醒当前线程)为止。

4.如果获取task成功,就直接返回。如果获取task超时,timedOut会置为true,会在下一次循环中以返回null告终。

再强调一点,只有当线程池中的线程数量大于corePoolSize才会进行获取任务超时检查,这也体现线程池中的一种策略:当线程池中线程数量达到maximumPoolSize大小后,如果一直没有任务进来,会逐渐减少workerCount直到线程数量等于corePoolSize。

代码语言:javascript
复制
    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

processWorkerExit

1.completedAbruptly为true,说明worker线程时突然终止,说明执行task.run()发生了异常,所以要通过CAS减少workerCount的数量。

2.completedAbruptly为false,说明worker线程是正常终止,不需要对workerCount进行减少的操作。因为在getTask()中已经做了此操作。

3.对worker完成的任务数进行统计,并且从workers集合中移出。

4.调用tryTerminate()方法,尝试终止线程池。如果状态满足的话,线程池还存在线程,会调用interruptIdleWorkers(ONLY_ONE)进行中断处理,使其进入退出流程。如果线程池中的线程数量等于0的话,通过CAS把线程池的状态更新到TIDYING。然后通过terminated()进行一些结束的处理,最后通过CAS把线程池状态更新到TERMINATED。最后的最后,调用termination.signalAll()唤醒等待的线程,通知它们线程池已经终止。

代码语言:javascript
复制
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

5.获取线程池的状态。如果线程池的状态还处于RUNNING、SHUTDOWN,说明tryTerminate()没有成功。如果worker线程是突然终止的话,通过addWorker(null, false)再创建一个没有task的worker线程去处理任务。

6.如果worker线程是正常终止的话,且当前线程池中的线程数量小于需要维护的数量,我们也会通过addWorker(null, false)再创建一个没有task的worker线程去处理任务。

7.默认情况下allowCoreThreadTimeOut为false,那么min就等于corePoolSize。那么线程池需要维护的线程数量就是corePoolSize个。如果allowCoreThreadTimeOut为true,min就等于0。在workQueue不等于空的情况,min会被赋值成1。此时线程池需要维护的线程池数量是1。

如果线程池处于shutdown状态,在workQueue不为空的情况下,线程池始终会维护corePoolSize个线程。当workQueue为空的话,线程池会逐渐销毁这corePoolSize个线程。

代码语言:javascript
复制
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

尾言

大家好,我是cmazxiaoma(寓意是沉梦昂志的小马),感谢各位阅读本文章。

小弟不才。

如果您对这篇文章有什么意见或者错误需要改进的地方,欢迎与我讨论。

如果您觉得还不错的话,希望你们可以点个赞。

希望我的文章对你能有所帮助。

有什么意见、见解或疑惑,欢迎留言讨论。

最后送上:心之所向,素履以往。生如逆旅,一苇以航。

saoqi.png

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.09.04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 源码分析
    • 线程池执行过程
      • submit
        • execute
          • addWorker
          • Worker
            • runWorker
              • getTask
                • processWorkerExit
                • 尾言
                相关产品与服务
                容器服务
                腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档