本文主要介绍分布式定时任务框架xxl-job,本文首先会对xxl-job做一个基本的介绍,接着将xxl-job与quartz做一个比较,最后就是介绍xxl-job调度的详细过程。xxl-job官方文档
xxl-job是一个开源的分布式定时任务框架,其调度中心和执行器是相互分离,分开部署的,两者通过HTTP协议进行通信。其架构如下图所示:
调度中心:负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover,支持创建执行器等功能。执行模块(执行器):负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;接收“调度中心”的执行请求、终止请求和日志请求等。
xxl-job的特性有很多,官网上有详细的介绍,这里我会介绍几个重要的特性:
xxl-job将任务信息以及日志信息持久化到数据表中,这个就保证了可以动态的添加删除任务。
这一部分主要是将quartz和xxl-job做一个比较,quartz是一款开源的使用非常广泛的定时任务框架。其可以说是定时任务的鼻祖,很多理念都与xxl-job类似。
整体来说,xxl-job就是quartz的一个增强版,其弥补了quartz不支持并行调度,不支持失败处理策略和动态分片的策略等诸多不足,同时其有管理界面,上手比较容易,支持分布式,适用于分布式场景下的使用。两者相同的是都是通过数据库锁来控制任务不能重复执行。
quartz的核心类如下图所示:
类名 | 作用 |
---|---|
QuartzSchedulerThread | 负责执行向QuartzScheduler注册的触发Trigger的工作的线程 |
ThreadPool | Scheduler使用一个线程池作为任务运行的基础设施,任务通过共享线程池中的线程提供运行效率 |
QuartzSchedulerResources | 包含创建QuartzScheduler实例所需的所有资源(JobStore,ThreadPool等) |
SchedulerFactory | 生成Scheduler实例 |
JobStore | 通过类实现的接口,这些类要为org.quartz.core.QuartzScheduler的使用提供一个org.quartz.Job和org.quartz.Trigger存储机制。作业和触发器的存储应该以其名称和组的组合为唯一性。 |
QuartzScheduler | 这是Quartz的核心,它是org.quartz.Scheduler接口的间接实现,包含调度org.quartz.Jobs,注册org.quartz.JobListener实例等的方法。 |
Scheduler | 代表一个调度容器,一个调度容器中可以注册多个JobDetail和Trigger。当Trigger与JobDetail组合,就可以被Scheduler容器调度了。 |
Trigger | 具有所有触发器通用属性的基本接口,描述了job执行的时间出发规则,使用TriggerBuilder实例化实际触发器,即表示什么时候去调用任务 |
JobDetail | 表示一个具体的可执行的调度程序,Job是这个可执行的调度程序所要执行的内容,另外JobDetail还包含了这个任务调度的方案和策略 |
Job | 表示一个工作,即要执行的具体内容 |
quartz中的类有很多,我们关注并掌握好Schedule(调度容器),Trigger(触发器),JobDetail&Job(定义具体的执行任务)这几个类就掌握了quartz的核心了。因为其余的类都是围绕这几个类转的,下图展示了各个核心类的调用关系:
public class RAMQuartz {
public static void main(String[] args) throws SchedulerException {
//1.创建Scheduler的工厂
SchedulerFactory sf = new StdSchedulerFactory();
//2.从工厂中获取调度器实例
Scheduler scheduler = sf.getScheduler();
//3.创建JobDetail
JobDetail jobDetail = JobBuilder.newJob(RAMJob.class).withDescription("this is a ram job")
.withIdentity("ramJob", "ramGroup").build(); //job的name和group
// 4.任务运行的时间,SimpleScheduler类型触发器有效,3秒后启动
long time = System.currentTimeMillis() + 3 * 1000L;
Date startTime = new Date(time);
// 5.创建Trigger
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withDescription("")
.withIdentity("ramTrigger", "ramTriggerGroup")
.startAt(startTime).withSchedule(CronScheduleBuilder.cronSchedule("0/10 * * * * ?")) //每10秒跑一次
.build();
// 6.注册任务和定时器
scheduler.scheduleJob(jobDetail, cronTrigger);
// 7.启动调度器
scheduler.start();
System.out.println("启动时间: " + new Date());
}
其中RAMJob实现了Job接口,并重写了execute方法。
public class RAMJob implements Job {
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println("Say hello to Quartz " + System.currentTimeMillis());
}
}
xxl-job的核心类如下图所示:
类名 | 作用 |
---|---|
XxlJobAdminConfig | 调度中心的总配置类,负责创建XxlJobScheduler实例 |
XxlJobScheduler | 负责创建各种线程,包括任务注册主线程,调度容器的主线程,以及调度参数的配置线程池JobTriggerPoolHelper |
JobScheduleHelper | 调度容器,创建一个守护线程查询所有下次执行时间在当前时间5秒内的定时任务,并按条件执行 |
JobTriggerPoolHelper | 创建操作XxlJobTrigger的线程池,并添加trigger |
XxlJobTrigger | 表示一个调度参数的配置,会查询具体的定时任务信息XxlJobInfo |
XxlJob | 定义执行器的注解 |
JobThread | 调用IJobHandler的executer执行任务,并回调调度中心 |
IJobHandler | 抽象的执行器接口,定义了要执行的具体内容,同样的也是一个execute方法 |
EmbedServer | 内嵌的Server,默认端口是9999 |
ExecutorBiz | 其中的run方法用于调用执行器,有两个是实现类ExecutorBizImpl以及ExecutorBizClient 。 |
核心类的调用关系如下图所示:
从核心类我们可以看出xxl-job和quartz还是有很多相同点的,都有Scheduler,Trigger以及Job等几个核心的组件。不同之处是xxl-job把任务信息直接存储在了数据表中,而quartz是可以不存的。而且xxl-job调度和执行是分开的,而quartz调度和执行是在一块的。
下图展示了调度中心调度执行器执行任务的时序图:
@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
public void init() throws Exception {
//省略部分代码
// admin registry monitor run
JobRegistryMonitorHelper.getInstance().start();
// admin trigger pool start
JobTriggerPoolHelper.toStart();
// start-schedule
JobScheduleHelper.getInstance().start();
}
//上锁
try{
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
//取消事务自动提交
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
finally {
// commit
if (conn != null) {
//提交事务,释放锁
conn.commit();
conn.setAutoCommit(connAutoCommit);
}
}
下面代码展示了定时任务的调用:
public static final long PRE_READ_MS = 5000; // pre read
long nowTime = System.currentTimeMillis();
//查询任务下一次执行时间<当前时间+5秒的任务
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
for (XxlJobInfo jobInfo: scheduleList) {
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time,任务过期超过5秒,不在执行该任务,重新设置下一次执行时间
// fresh next
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time,任务过期<5秒,立即执行任务
// 1、trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
// 2、fresh next
refreshNextValidTime(jobInfo, new Date());
//省略部分代码
}
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
triggerPool_.execute(new Runnable() {
// do trigger
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
// load data,加载任务信息
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (executorParam != null) {
jobInfo.setExecutorParam(executorParam);
}
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
核心逻辑在processTrigger中。
//初始化trigger-param
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
//初始化地址
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
//执行任务
triggerResult = runExecutor(triggerParam, address);
//日志处理,代码省略
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq){
else if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
}
}
public ReturnT<String> run(TriggerParam triggerParam) {
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
// push data to queue
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
public void run() {
//通过反射的方式获取执行器的方法
handler.init();
//从队列中取出任务
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
@Override
public ReturnT<String> call() throws Exception {
return
//执行任务
handler.execute(triggerParamTmp.getExecutorParams());
}
});
}
,调度中心和执行器分开部署,减少了系统的耦合以及调度中心的调度效率。最重要的是xxl-job对任务的过期处理以及阻塞处理策略设计的比较好。
XXL-JOB官方文档定时任务框架:quartz、elastic-job和xxl-job的分析对比。Quartz任务调度框架--简介与示例(一)