之前写 Java 定时任务 用到了 ScheduledExecutorService,自己没有思路来实现定时任务的功能,所以十分好奇其底层代码的实现,于是乎就去翻看源码,在这过程中还发现了无处不在的 Doug Lea
ScheduledExecutorService 是 jdk 提供的计划执行服务接口(实现类是个线程池)。这里举例 newScheduledThreadPool 来分析,其是指定核心线程数的计划线程池
public class ExecutorSchedule {
public static void main(String[] args) {
// 需要计划执行的任务
Runnable runnable = () -> {
System.out.println("执行定时任务");
};
// 计划的时间参数
long delay = 0;
long period = 1000 * 5;
// 计划线程池执行
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
service.scheduleAtFixedRate(runnable, delay, period, TimeUnit.MILLISECONDS);
}
通过构造函数知道计划执行的关键是参数中的阻塞队列:new DelayedWorkQueue()
// 创建一个计划线程池执行器
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// super 最终指向 ThreadPoolExecutor
// 计划线程池调用 ThreadPoolExecutor 方法来构造线程池(阿里规范要手动传参建立线程池)
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
// 默认的构造线程池方法
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// xxx
}
DelayedWorkQueue 顾名思义是一个延迟队列,用来存放线程池待执行的任务
在 DelayedWorkQueue 的类声明中知道,它是一个阻塞队列(线程池并发时保持数据一致性,这部分内容以后会介绍)
// AbstractQueue:有队列的基本方法
// BlockingQueue:有阻塞队列的基本方法
class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {}
在 DelayedWorkQueue 的具体代码中发现 DelayedWorkQueue 的内部数据结构是堆,里面存放了 RunnableScheduledFuture 对象。然后我们大胆猜测: RunnableScheduledFuture 就是延迟任务的核心,里面存放了延迟时间,DelayedWorkQueue 按照这个延迟时间来进行内部排序
// 堆特有的方法,上浮下潜
private void siftUp(int k, RunnableScheduledFuture<?> key) {}
private void siftDown(int k, RunnableScheduledFuture<?> key) {}
RunnableScheduledFuture 只是一个接口,ScheduledThreadPoolExecutor 线程池的内部有该接口的实现类(ScheduledFutureTask),我们先来看 RunnableScheduledFuture,然后再分析 ScheduledFutureTask 的接口实现
// RunnableFuture 是异步获取结果的内容
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {}
// Future 是异步获取结果,和 Runnable 对比
public interface ScheduledFuture<V> extends Delayed, Future<V> {}
// 这里就是我们要找核心实现了
// 继承了 Comparable,在堆里面可按定延迟时间来排序对比
public interface Delayed extends Comparable<Delayed> {
/**
* 返回与此对象关联的剩余给定时间单位
*
* @param 时间单位
* @return 剩余延迟时间,0或负值表示时间已经过去
*/
long getDelay(TimeUnit unit);
}
ScheduledFutureTask 的构造函数入参有 Runnable、result、ns:分别是 延迟任务、Future 返回、延迟时间
// FutureTask 是异步任务
class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
private final long sequenceNumber;
private long time;
private final long period;
// 构造方法
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 获取延迟时间(延迟任务的关键)
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
// 入队的对比方法
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
}
1.1 基本使用:service.scheduleAtFixedRate(runnable, delay, period, TimeUnit.MILLISECONDS) 就将提交的任务与延迟时间封装成 ScheduledFutureTask,
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
// xxx 省略无关代码
// 在执行延时任务的时候,传入 Runnable,延迟参数创建 ScheduledFutureTask 对象
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
// 装饰器
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 延迟执行
delayedExecute(t);
return t;
}
// 延迟执行具体实现
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 判断线程池是否关闭
if (isShutdown())
// 拒绝队列执行
reject(task);
else {
// 往 workQueue(即 DelayedWorkQueue)里添加 上一步被封装的 ScheduledFutureTask
// 至此,该延时任务已经提交到线程池等待执行了
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
参考 ScheduledExecutorService 的设计,我们也能写一个简易的延迟队列出来
public class DelayedTask implements Delayed, Runnable {
// 任务名
private String name;
// 开始时间
private long start = System.currentTimeMillis();
// 要延迟的时间
private long delayTime;
DelayedTask(String name, long delayTime) {
this.name = name;
this.delayTime = delayTime;
}
// 返回剩余时间
@Override
public long getDelay(TimeUnit unit) {
return unit.convert((start + delayTime) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// 排序方法
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public void run() {
System.out.println(this.name + " " + "延迟任务执行了");
}
}
public class DelayedThread extends Thread {
private BlockingQueue<DelayedTask> queue;
public DelayedThread(BlockingQueue<DelayedTask> queue) {
this.queue = queue;
}
@Override
public void run() {
DelayedTask target = null;
while (!queue.isEmpty()) {
try {
target = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
target.run();
}
}
}
public class DelayedTest {
public static void main(String[] args) {
DelayedTask task1 = new DelayedTask("1", 1000);
DelayedTask task2 = new DelayedTask("2", 5000);
DelayQueue<DelayedTask> delayQueue = new DelayQueue();
delayQueue.add(task1);
delayQueue.add(task2);
DelayedThread delayedThread = new DelayedThread(delayQueue);
delayedThread.start();
}
}
// 1 延迟任务执行了
// 2 延迟任务执行了
参考
https://blog.csdn.net/dkfajsldfsdfsd/article/details/88966814