今天一起从一个小bug来看下, Spring定时任务是如何处理的.
一次非预期任务
预定义的任务很简单, 每隔1s执行一次.
代码如下:
@Scheduled(fixedDelay = 1000)
public void syncUser() {
String uuid = UUID.randomUUID().toString();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("syncUser success:{}", uuid);
}
但观察日志发现, 有的任务执行间隔并不是1s, 同时可以观察到, 多个task是使用的同一线程执行的, 完全不符合预期.
2020-09-17 20:57:20.750 INFO 75127 --- [pool-1-thread-1] com.in.task.Task2 : syncUser success:1f4ad20c-541a-41c8-8fd0-c9a4a6fd612c
2020-09-17 20:57:30.755 INFO 75127 --- [pool-1-thread-1] com.in.task.Tasks : cleanUser does not get locker lockKey:1, uuid:2052fe42-b06a-4424-a028-7136b4392215
2020-09-17 20:57:32.761 INFO 75127 --- [pool-1-thread-1] com.in.task.Task2 : syncUser success:5be5d07c-991a-442d-a026-e36fcfb0a2fc
2020-09-17 20:57:35.767 INFO 75127 --- [pool-1-thread-1] com.in.task.Task2 : syncUser success:7dea98ec-df3a-4c8f-84f4-aced84f25c74
2020-09-17 20:57:38.774 INFO 75127 --- [pool-1-thread-1] com.in.task.Task2 : syncUser success:31fe1753-9467-4956-99b9-fcb134a736ab
解决方式很简单, 自定义定时任务配置, 其中包括定时任务线程池.
@Configuration
public class ScheduleConfiguration implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
scheduledTaskRegistrar.setScheduler(setTaskExecutors());
}
@Bean(destroyMethod="shutdown")
public Executor setTaskExecutors(){
return Executors.newScheduledThreadPool(20);
}
}
添加配置后, 观察日志, 任务能正常运行, 并且各任务也不会相互影响.
2020-09-17 20:59:28.324 INFO 75215 --- [pool-1-thread-1] com.in.task.Task2 : syncUser success:71bd8fd4-8690-4db9-b159-10645a53a2e6
2020-09-17 20:59:31.332 INFO 75215 --- [pool-1-thread-7] com.in.task.Task2 : syncUser success:ac8dee9f-a155-4d21-a83d-5aa004862232
2020-09-17 20:59:33.316 INFO 75215 --- [pool-1-thread-5] com.in.task.Tasks : cleanUser does not get locker lockKey:1, uuid:01459057-2274-4681-80f3-e5be3c20561b
2020-09-17 20:59:34.338 INFO 75215 --- [pool-1-thread-2] com.in.task.Task2 : syncUser success:244e8157-2536-407c-9bcf-614ea0106b50
2020-09-17 20:59:37.346 INFO 75215 --- [ool-1-thread-10] com.in.task.Task2 : syncUser success:449b6e72-9d9c-4474-a469-4f3763bb9474
问题虽然解决了, 但知其然,还要知其所以然. 下面就一起看看Spring是如何管理定时任务的, 为什么加个配置就解决了.
Spring定时任务
从定时任务注解@EnableScheduling入手, 看下spring启动时都做了什么.
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}
继续, 看定时任务配置类: SchedulingConfiguration
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
再继续, 查看定时任务初始化的核心类及方法
ScheduledAnnotationBeanPostProcessor.postProcessAfterInitialization.(Object bean, String beanName)
该方法会遍历所有Bean中, 查找@Scheduled注解及对应的方法, 并处理.
public Object postProcessAfterInitialization(Object bean, String beanName) {
// ...
Map<method, </method,Set> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<set>) method -> {</set
// 遍历bean
Set scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
// ...
// Non-empty set of methods
annotatedMethods.forEach((method, scheduledMethods) ->
// 处理定时任务方法
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isDebugEnabled()) {
logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
return bean;
}
再继续看processScheduled()如何处理定时任务的
根据@Scheduled注解参数, 分成不同种类的定时任务, 并登记到ScheduledTaskRegistrar类中处理.
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
// 定时任务封装成线程
Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
// ...
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
// 添加cronTask
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
// 添加fixedDelayTask
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
if (StringUtils.hasText(fixedRateString)) {
// 添加fixedRateTask
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
登记定时任务
在ScheduledTaskRegistrar中, 当前还没有初始化线程池, 只登记任务, 不执行.
public ScheduledTask scheduleFixedDelayTask(FixedDelayTask task) {
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
// 放入定时任务线程池中执行
if (this.taskScheduler != null) {
scheduledTask.future =
this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), task.getInterval());
}
else {
// 没有线程池, 只登记定时任务
addFixedDelayTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
启动定时任务
所以定时任务已经登记好了, 剩下的就是启动定时任务了.
任务启动方法:
ScheduledAnnotationBeanPostProcessor.afterSingletonsInstantiated()
里面只调用一个方法, 初始化定时任务登记器(ScheduledTaskRegistrar)
finishRegistration()
定时任务的线程池, 首先会查找SchedulingConfigurer配置, 初始化ScheduledTaskRegistrar, 包括初始化定时任务线程池.
如果Spring不能从SchedulingConfigurer配置中初始化线程池, 那Spring会尝试从全局范围内查找一个线程池的Bean实例, 但很遗憾, 在我的服务中并没有预定义的线程池.
private void finishRegistration() {
// ...
if (this.beanFactory instanceof ListableBeanFactory) {
Mapbeans =((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
Listconfigurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
// ...
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}
this.registrar.afterPropertiesSet();
}
在定时任务记录器中, 启动各任务
ScheduledTaskRegistrar.afterPropertiesSet();
执行scheduleTasks() 方法, 初始化只有一个核心线程的定时任务线程池, 并添加定时任务.
我们的问题就是Spring自己创建的线程池不能提供足够的线程, 导致多个任务不能并行执行, 各task任务互相影响.
protected void scheduleTasks() {
if (this.taskScheduler == null) {
// 默认线程池
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
到此, 整个线程任务就能正常运行了.
流程图
代码涉及到多个类的反复调用, 不容易理解. 可参考下面的时序图理解
初始化:
定时任务启动:
定时任务线程池的初始化为3种, 上面代码流程中都有详解, 这里再汇总下,