前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java 多线程定时器分析

Java 多线程定时器分析

作者头像
晚上没宵夜
发布2022-05-09 21:17:31
6730
发布2022-05-09 21:17:31
举报
文章被收录于专栏:Howl同学的学习笔记

之前写 Java 定时任务 用到了 ScheduledExecutorService,自己没有思路来实现定时任务的功能,所以十分好奇其底层代码的实现,于是乎就去翻看源码,在这过程中还发现了无处不在的 Doug Lea

1. ScheduledExecutorService

ScheduledExecutorService 是 jdk 提供的计划执行服务接口(实现类是个线程池)。这里举例 newScheduledThreadPool 来分析,其是指定核心线程数的计划线程池

1.1 基本使用

代码语言:javascript
复制
public class ExecutorSchedule {
    public static void main(String[] args) {
        
        // 需要计划执行的任务
        Runnable runnable = () -> {
            System.out.println("执行定时任务");
        };

        // 计划的时间参数
        long delay = 0;
        long period = 1000 * 5;
        
        // 计划线程池执行
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
        service.scheduleAtFixedRate(runnable, delay, period, TimeUnit.MILLISECONDS);
}

1.2 构造函数

通过构造函数知道计划执行的关键是参数中的阻塞队列:new DelayedWorkQueue()

代码语言:javascript
复制
//  创建一个计划线程池执行器
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}


// super 最终指向 ThreadPoolExecutor
// 计划线程池调用 ThreadPoolExecutor 方法来构造线程池(阿里规范要手动传参建立线程池)
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}

// 默认的构造线程池方法
public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {
    
    // xxx
}

2. DelayedWorkQueue

DelayedWorkQueue 顾名思义是一个延迟队列,用来存放线程池待执行的任务

2.1 继承关系

在 DelayedWorkQueue 的类声明中知道,它是一个阻塞队列(线程池并发时保持数据一致性,这部分内容以后会介绍)

代码语言:javascript
复制
// AbstractQueue:有队列的基本方法
// BlockingQueue:有阻塞队列的基本方法
class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {}

2.2 代码实现

在 DelayedWorkQueue 的具体代码中发现 DelayedWorkQueue 的内部数据结构是堆,里面存放了 RunnableScheduledFuture 对象。然后我们大胆猜测: RunnableScheduledFuture 就是延迟任务的核心,里面存放了延迟时间,DelayedWorkQueue 按照这个延迟时间来进行内部排序

代码语言:javascript
复制
// 堆特有的方法,上浮下潜
private void siftUp(int k, RunnableScheduledFuture<?> key) {}

private void siftDown(int k, RunnableScheduledFuture<?> key) {}

3. RunnableScheduledFuture

RunnableScheduledFuture 只是一个接口,ScheduledThreadPoolExecutor 线程池的内部有该接口的实现类(ScheduledFutureTask),我们先来看 RunnableScheduledFuture,然后再分析 ScheduledFutureTask 的接口实现

3.1 接口分析

代码语言:javascript
复制
// RunnableFuture 是异步获取结果的内容
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {}

// Future 是异步获取结果,和 Runnable 对比
public interface ScheduledFuture<V> extends Delayed, Future<V> {}

// 这里就是我们要找核心实现了
// 继承了 Comparable,在堆里面可按定延迟时间来排序对比
public interface Delayed extends Comparable<Delayed> {

    /**
     * 返回与此对象关联的剩余给定时间单位
     * 
     * @param 时间单位
     * @return 剩余延迟时间,0或负值表示时间已经过去
     */

    long getDelay(TimeUnit unit);
}

3.2 ScheduledFutureTask

ScheduledFutureTask 的构造函数入参有 Runnable、result、ns:分别是 延迟任务、Future 返回、延迟时间

代码语言:javascript
复制
// FutureTask 是异步任务
class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
    
    private final long sequenceNumber;

    private long time;

    private final long period;
    
    // 构造方法
    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
    
    // 获取延迟时间(延迟任务的关键)
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - now(), NANOSECONDS);
    }

    // 入队的对比方法
    public int compareTo(Delayed other) {
        if (other == this) // compare zero if same object
            return 0;
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }
}

3.3 实现分析

1.1 基本使用:service.scheduleAtFixedRate(runnable, delay, period, TimeUnit.MILLISECONDS) 就将提交的任务与延迟时间封装成 ScheduledFutureTask,

代码语言:javascript
复制
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
    
    // xxx 省略无关代码
    
    // 在执行延时任务的时候,传入 Runnable,延迟参数创建 ScheduledFutureTask 对象
    ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
    
    // 装饰器
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    
    // 延迟执行
    delayedExecute(t);
    return t;
}


// 延迟执行具体实现
private void delayedExecute(RunnableScheduledFuture<?> task) {
    
    // 判断线程池是否关闭
    if (isShutdown())
        
        // 拒绝队列执行
        reject(task);
    else {
        
        // 往 workQueue(即 DelayedWorkQueue)里添加 上一步被封装的 ScheduledFutureTask
        // 至此,该延时任务已经提交到线程池等待执行了
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

4. 总结过程

  • 我们需要延迟执行的任务被封装成 ScheduledFutureTask
  • 然后被添加到 DelayedWorkQueue 中,队列头部是达到延迟时间的任务(内部堆是按延迟时间排序)
  • 线程池循环执行任务时从 DelayedWorkQueue 中获取,即可实现延迟任务的功能

5. 拓展

参考 ScheduledExecutorService 的设计,我们也能写一个简易的延迟队列出来

5.1 Delayed 对象

代码语言:javascript
复制
public class DelayedTask implements Delayed, Runnable {

    // 任务名
    private String name;

    // 开始时间
    private long start = System.currentTimeMillis();

    // 要延迟的时间
    private long delayTime;

    DelayedTask(String name, long delayTime) {
        this.name = name;
        this.delayTime = delayTime;
    }

    // 返回剩余时间
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((start + delayTime) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    // 排序方法
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public void run() {
        System.out.println(this.name + " " + "延迟任务执行了");
    }
}

5.2 延迟线程池

代码语言:javascript
复制
public class DelayedThread extends Thread {

    private BlockingQueue<DelayedTask> queue;

    public DelayedThread(BlockingQueue<DelayedTask> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        DelayedTask target = null;

        while (!queue.isEmpty()) {
            try {
                target = queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            target.run();
        }
    }
}

5.3 执行事例

代码语言:javascript
复制
public class DelayedTest {
    public static void main(String[] args) {

        DelayedTask task1 = new DelayedTask("1", 1000);
        DelayedTask task2 = new DelayedTask("2", 5000);

        DelayQueue<DelayedTask> delayQueue = new DelayQueue();
        delayQueue.add(task1);
        delayQueue.add(task2);

        DelayedThread delayedThread = new DelayedThread(delayQueue);
        delayedThread.start();
    }
}

// 1 延迟任务执行了
// 2 延迟任务执行了

参考

https://blog.csdn.net/dkfajsldfsdfsd/article/details/88966814

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-04-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. ScheduledExecutorService
    • 1.1 基本使用
      • 1.2 构造函数
      • 2. DelayedWorkQueue
        • 2.1 继承关系
          • 2.2 代码实现
          • 3. RunnableScheduledFuture
            • 3.1 接口分析
              • 3.2 ScheduledFutureTask
                • 3.3 实现分析
                • 4. 总结过程
                • 5. 拓展
                  • 5.1 Delayed 对象
                    • 5.2 延迟线程池
                      • 5.3 执行事例
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档