
先说结论
线程池使用FutureTask时如果把拒绝策略设置为 DiscardPolicy和 DiscardOldestPolicy,并且在被拒绝的任务的Future对象上调用了无参get方法,那么调用线程会一直被阻塞。
import java.util.concurrent.*;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/11/21 0:11
* @mark: show me the code , change the world
*/
public class FutureTest {
// 1 线程池单个线程,队列大小为1 - 初始化线程池
private final static ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(1),
new ThreadPoolExecutor.DiscardPolicy());
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 2 添加你任务1
Future futureOne = tpe.submit(() -> {
System.out.println("开始处理业务1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("业务1执行结束");
return "Result1";
});
// 3 添加你任务2
Future futureTwo = tpe.submit(() -> {
System.out.println("开始处理业务2");
System.out.println("业务2执行结束");
return "Result2";
});
// 4 添加任务3
Future futureThree = null;
try {
futureThree = tpe.submit(() -> System.out.println("开始处理业务3"));
} catch (Exception e) {
System.out.print(e.getLocalizedMessage());
}
// 5 等待任务1执行完毕
System.out.println("任务1返回结果: " + futureOne.get());
// 6 等待任务2执行完毕
System.out.println("任务2返回结果: " + futureTwo.get());
// 7 等待任务3执行完毕
System.out.println("任务3返回结果: " + futureThree==null?null:futureThree.get());
//关闭线程池,阻塞知道所有任务执行完毕
tpe.shutdown();
}
}输出

让我们来分析下
开始处理业务1
Task java.util.concurrent.FutureTask@27bc2616 rejected from java.util.concurrent.ThreadPoolExecutor@3941a79c[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]业务1执行结束
任务1返回结果: Result1
开始处理业务2
业务2执行结束
任务2返回结果: Result2
Exception in thread "main" java.lang.NullPointerException
at com.artisan.bfzm.chapter11.FutureTest.main(FutureTest.java:58)要分析这个问题,需要看线程池的submit方法都做了什么,submit方法的代码如下
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 1 装饰Runnable对象为Future对象
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
// 6 返回Future对象
return ftask;
}看下 execute方法
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 2 如果线程个数小于核心线程数量,则新增线程处理
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 3. 如果线程都在工作且当前线程个数已经达到核心线程数,就把任务放入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 4 尝试新增处理线程
else if (!addWorker(command, false))
// 5 新增失败则触发拒绝策略
reject(command);
}所以要找到上面例子中问题所在,只需要看代码(5)对被拒绝任务的影响,这里先看下拒绝策略DiscardPolicy的代码。
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}拒绝策略的rejectedExecution方法什么都没做,代码(4)调用submit后会返回一个Future对象。

Future是有状态的,Future的状态枚举值如下

在代码(1)中使用newTaskFor方法将Runnable任务转换为FutureTask,
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}继续
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}而在FutureTask的构造函数里面设置的状态就是NEW。
所以使用DiscardPolicy策略提交后返回了一个状态为NEW的Future对象。
那么我们下面就需要看下当调用Future的无参get方法时Future变为什么状态才会返回,那就要看下FutureTask的get()方法代码。

public V get() throws InterruptedException, ExecutionException {
int s = state;
//当前状态值 <= COMPLETING需要等待,否则调用report返回
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
} /**
* Returns result or throws exception for completed task.
*
* @param s completed state value
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
// 状态值为NORMAL的时候正常返回
if (s == NORMAL)
return (V)x;
// 状态值大于等于CANCELLED的时候抛出异常
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/11/21 1:40
* @mark: show me the code , change the world
*/
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()){
if (null != r && r instanceof FutureTask) {
((FutureTask) r).cancel(true);
}
}
}
}使用这个策略时,由于在cancel的任务上调用get()方法会抛出异常,所以代码(7)需要使用try-catch块捕获异常,因此将代码(7)修改为如下所示。

执行结果为

当然这相比正常情况多了一个异常捕获操作。最好的情况是,重写拒绝策略时设置FutureTask的状态为NORMAL,但是这需要重写FutureTask方法,因为FutureTask并没有提供接口让我们设置。
通过案例介绍了在线程池中使用FutureTask时,当拒绝策略为DiscardPolicy和DiscardOldestPolicy时,在被拒绝的任务的FutureTask对象上调用get()方法会导致调用线程一直阻塞,所以在日常开发中尽量使用带超时参数的get方法以避免线程一直阻塞。