首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

SpringBoot 实现静态、动态定时任务,本地动态定时任务调度

前言

在日常的业务需求中,很多时候会遇到需要写定时任务的场景,如:每天刷新缓存、定时刷新业务数据状态等等。

基本上遇到的都是编写固定执行时间的任务,最近认识Spring中自带的轻量级定时任务调度包,想着正好来使用一下,便写了一个基于spring framework scheduling的本地任务调度的demo,以此作为契机带来介绍静态、动态任务的文章。

demo仓库点此跳转:https://github.com/IsNott/springboot-scheduler

依赖版本

demo环境搭建选择我常用的老三样:mysql-connectorj、mybatis-plus、springboot3

固定任务

实现固定的定时任务很简单,关注org.springframework.scheduling包下的其中两个注解。

@EnableScheduling

启用Spring的计划任务执行功能,类似于Spring的XML命名空间中的功能。顾名思义,就是开启spring项目的定时任务功能,通常跟随@Configuration注解使用。

在Spring文档中底部这样备注:Note: @EnableScheduling applies to its local application context only, allowing for selective scheduling of beans at different levels.

意思是它仅应用于其本地应用程序上下文,允许在不同级别对bean进行选择性调度。

@Scheduled

用于标记需要定时执行的方法,对于这些周期执行方法@Scheduled注解必须接收cron()、fixedDelay()、fixedRate()其中之一作为参数。

使用

@Component

@EnableScheduling

@Slf4j

public class TaskComponent {

// 接收cron表达式作为@Scheduled参数

// 下列表达式标识每五秒执行一次

@Scheduled(cron = "0/5 * * * * ? ")

public void originTask(){

log.info("原始定时任务执行");

}

// 接收fixedDelay作为@Scheduled参数

// 下列方法每延时五秒执行一次

// 根据上一个方法结束开始计时

@Scheduled(fixedDelay = 5000)

public void fixedDelay(){

log.info("原始定时任务执行");

}

// 接收fixedRate作为@Scheduled参数

// 下列方法每间隔五秒执行一次

// 根据上次任务开始时计时,假如任务中间花费了2.5秒,即+2.5秒开始执行

// 假如间隔5秒,在单线程执行的情况下,A1任务执行7秒,A1还没执行完,A2会开始执行,此时A2会出现阻塞

@Scheduled(fixedRate = 5000)

public void fixedRate(){

log.info("原始定时任务执行");

}

}

以上是Spring自带的定时任务调度,有很多好用的第三方框架,例如:QuartZ、xxl-job等。

动态任务

假设你是第一次接触springboot中的定时任务,你会发现前文解释的任务,都是开发者提前知道每个任务需要在什么时候执行。

新问题出现了:假设我的项目中,有任务可能要定时执行,但我目前还没有认识到需要执行什么东西,如何实现?

下面有两种方法,加载数据库中的记录并注册到Spring定时任务上下文中。

建表

CREATE TABLE `table_task_info` (

`id` varchar(255) NOT NULL,

`class_name` varchar(1024) DEFAULT NULL,

`bean_name` varchar(255) DEFAULT NULL,

`period_unit` varchar(255) DEFAULT NULL,

`period` bigint(20) DEFAULT NULL,

`cron` varchar(255) DEFAULT NULL,

`execute_method` varchar(255) DEFAULT NULL,

`execute_mode` tinyint(4) DEFAULT NULL,

`param` text,

`execute_time` datetime DEFAULT NULL,

`create_time` datetime DEFAULT NULL,

`update_time` datetime DEFAULT NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8

CREATE TABLE `table_task_execute_record` (

`id` varchar(255) NOT NULL,

`task_id` varchar(255) DEFAULT NULL,

`execute_time` datetime DEFAULT NULL,

`execute_status` tinyint(4) DEFAULT NULL,

`execute_msg` text,

`error_msg` text,

PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8

实现SchedulingConfigurer接口

这是org.springframework.scheduling包下的一个接口,通常用于注册定时任务,相当于用编程方式注册原本被@Scheduled注解注册的方法。接口只提供了一个方法:void configureTasks(ScheduledTaskRegistrar taskRegistrar);

ScheduledTaskRegistrar

作为configureTasks方法接收的唯一参数,单看名称能简单了解它是作为任务注册的对象。下图是它所提供的方法,看方法名称再联系上文的@cron注解,容易看出它是担任了@cron注解的任务。

使用

以下案例,可在项目启动时搜索数据库中需要执行的任务类并注册到Spring上下文中。

@Slf4j

@Configuration

public class ScheduledConfig implements SchedulingConfigurer {

@Resource

private TaskService taskService;

@Bean(name = "myExecutor")

public Executor taskExecutor() {

return Executors.newScheduledThreadPool(10);

}

@Override

public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {

taskRegistrar.setScheduler(taskExecutor());

List<TaskInfo> taskInfos = taskService.queryStableCronTask();

for (TaskInfo info : taskInfos) {

// addCronTask(Runnable task, String expression)

// 接收一个Runnable对象,和执行的表达式

taskRegistrar.addCronTask(taskService.getRunnableTask(info),

info.getCron());

log.info("添加新定时任务id{},Cron-》{}",info.getId(),info.getCron());

}

}

}

使用ThreadPoolTaskScheduler

它是org.springframework.scheduling.concurrent下的一个类,实现了TaskScheduler接口,封装了一个线程池对象ScheduledThreadPoolExecutor,默认线程数为1,所以使用时需要配置线程数量。

需要关注下图的几个方法,第一个参数都是接收Runnable对象,可理解为需要执行的实际任务。后面的参数姑且理解为需要执行的时间。

注册Bean

@Configuration

public class TaskConfiguration {

@Bean(name = "myTaskScheduler")

public ThreadPoolTaskScheduler setThreadPoolTaskScheduler(){

ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();

taskScheduler.setPoolSize(5);

taskScheduler.setThreadNamePrefix("task-pool-");

// 执行shutdown时,等待前一个任务执行完

taskScheduler.setWaitForTasksToCompleteOnShutdown(true);

// 执行shutdown时,等待的超时时间

taskScheduler.setAwaitTerminationSeconds(30);

// 自定义错误处理器,当某个线程执行时出现错误会跳过线程内部的Try/catch,进入ErrorHandler

// 需要实现 ErrorHandler接口

//        taskScheduler.setErrorHandler(new MyTaskErrorHandler());

return taskScheduler;

}

}

使用

demo中的TaskController,编写RestController层用于接收需要调度的定时任务并记录入库,用于后续的执行状态查看、修改或取消。

@RestController

@RequestMapping("/task/")

public class TaskController {

@Description("任务列表")

@GetMapping("list")

public Collection<TaskInfo> taskList(){

return taskService.getTaskList();

}

@Description("取消任务调度")

@RequestMapping("cancel/{id}")

public boolean cancelTask(@PathVariable String id){

return taskService.cancelTask(id);

}

@Description("新增任务记录")

@PostMapping("add")

public void addTask(@RequestBody TaskParam param){

taskService.addTask(param);

}

}

主要关注taskService.addTask(TaskParam param)方法内部。

@Transactional(rollbackFor = Exception.class)

public void addTask(TaskParam param) {

// ..任务入库省略

this.putTaskMap(info, null);

if(param.isScheduleNow()){

ScheduledFuture<?> future = this.scheduleTaskByMode(info);

if(future == null){

throw new RuntimeException("传入的任务参数调度失败");

}

}

}

private ScheduledFuture<?> scheduleTaskByMode(TaskInfo info) {

Integer mode = info.getExecuteMode();

Timestamp executeTime = info.getExecuteTime();

Long period = info.getPeriod();

String periodUnit = info.getPeriodUnit();

// ScheduledFuture继承Future,带有cancel方法,遇到方法需要修改时可cancel后重新调度

ScheduledFuture<?> future;

switch (mode) {

default -> future = null;

case 0 -> {

// 接收Runnable+Trigger对象(这里使用CronTrigger)

// 根据cron表达式定时执行

future = threadPoolTaskScheduler.schedule(getRunnableTask(info), new CronTrigger(info.getCron()));

break;

}

// 接收Runnable+时间戳类对象

// 根据时间戳定时执行

case 1 -> {

future = threadPoolTaskScheduler.schedule(getRunnableTask(info), executeTime.toInstant());

break;

}

// 接收Runnable+Duration对象

// 根据时间间隔执行(上次开始时计算)

case 2 -> {

future = threadPoolTaskScheduler.scheduleAtFixedRate(getRunnableTask(info), Duration.of(period, PeriodUnit.getByName(periodUnit)));

break;

}

// 接收Runnable+Duration对象

// 根据时间延迟执行(上次结束时计算)

case 3 -> {

future = threadPoolTaskScheduler.scheduleWithFixedDelay(getRunnableTask(info), executeTime.toInstant(), Duration.of(period, PeriodUnit.getByName(periodUnit)));

break;

}

}

log.info("调度一个任务:\n{}",info.toString());

this.putTaskMap(info, future);

return future;

}

public Runnable getRunnableTask(TaskInfo info) {

return () -> {

long s = System.currentTimeMillis();

log.info("----Task Execute----");

// ..具体的执行内容,并在finally写入当前Task状态

log.info("----Task End [{}ms]----", System.currentTimeMillis() - s);

};

}

结束

上文简单介绍了spring framework框架自带的定时任务包下的冰山一角,还有很多内容在遇到某些具体问题才可能挖掘出来,再加上定时任务还有其他的场景,例如:分布式调度、任务告警等需要考虑。

作者:IsNott

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OVnyNZYv27AQL5nUM7HlHIKQ0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券