最近公司的项目需要用到分布式任务调度,在结合多款开源框架后决定使用当当的Elastic-job。不知道大家有没有这样的需求,就是动态任务。之前比较了xxl-job和elastic-job发现,都只是支持注解或者配置以及后台添加现有的任务,不支持动态添加。比如:类似订单半小时后自动取消的场景。
xxl-job理论上来说是可以支持的,但是需要高度整合admin端的程序,然后开放对应的接口才可以给其他服务调用,这样本质直接改源码对后期的升级十分不便,最后放弃了xxl-job。elastic-job在移交Apache后的版本规划中,有提到API的开放,但是目前还没有稳定版,所以只能使用之前的2.1.5的版本来做。在Github搜了很多整合方案,最后决定选择下面的来实现。
<dependency>
<groupId>com.github.xjzrc.spring.boot</groupId>
<artifactId>elastic-job-lite-spring-boot-starter</artifactId>
<version>${lasted.release.version}</version>
</dependency>
因为要做的是动态的,所以这里没有直接使用maven坐标引入,直接将源码全部接入项目来使用,这样比较灵活,因为底层本质上还是用elastic-job的东西。下面引入elastic-job坐标
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-lifecycle</artifactId>
<version>${elastic-job.version}</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
</exclusions>
</dependency>
在整合完上面的源码后,就可以直接支持配置式的定时任务了,只需要修改服务的config即可生效,但是要做到动态式的添加和删除就必须在实现一个动态的实现。
首先在ElasticJobAutoConfiguration新增一个Bean
/**
* 动态任务初始化
* @return
*/
@Bean(initMethod = "init")
@ConditionalOnMissingBean
public DynamicJobInitialization dynamicJobInitialization() {
return new DynamicJobInitialization(this.regCenter());
}
然后实现动态的类
/**
* 动态任务初始化(支持简单、流式任务)
* @author Zzq
* @date 2020/9/14 19:22
*/
@Slf4j
public class DynamicJobInitialization extends AbstractJobInitialization {
private JobStatisticsAPI jobStatisticsAPI;
private JobSettingsAPI jobSettingsAPI;
public DynamicJobInitialization(ZookeeperRegistryCenter zookeeperRegistryCenter) {
this.jobStatisticsAPI = new JobStatisticsAPIImpl(zookeeperRegistryCenter);
this.jobSettingsAPI = new JobSettingsAPIImpl(zookeeperRegistryCenter);
}
public void init() {
Collection<JobBriefInfo> allJob = jobStatisticsAPI.getAllJobsBriefInfo();
if (CollUtil.isNotEmpty(allJob)) {
allJob.forEach(jobInfo -> {
// 已下线的任务
if (JobBriefInfo.JobStatus.CRASHED.equals(jobInfo.getStatus())) {
try {
Date currentDate = new Date();
CronExpression cronExpression = new CronExpression(jobInfo.getCron());
Date nextValidTimeAfter = cronExpression.getNextValidTimeAfter(currentDate);
// 表达式还生效的任务
if (ObjectUtil.isNotNull(nextValidTimeAfter)) {
this.initJobHandler(jobInfo.getJobName());
}
} catch (ParseException e) {
log.error(e.getMessage(), e);
}
}
});
}
}
/**
* 初始化任务操作
* @param jobName 任务名
*/
private void initJobHandler(String jobName) {
try {
JobSettings jobSetting = jobSettingsAPI.getJobSettings(jobName);
if (ObjectUtil.isNotNull(jobSetting)) {
String jobCode = StrUtil.subBefore(jobSetting.getJobName(), StrUtil.UNDERLINE, false);
JobClassEnum jobClassEnum = JobClassEnum.convert(jobCode);
if (ObjectUtil.isNotNull(jobClassEnum)) {
ElasticJobProperties.JobConfiguration configuration = new ElasticJobProperties.JobConfiguration();
configuration.setCron(jobSetting.getCron());
configuration.setJobParameter(jobSetting.getJobParameter());
configuration.setShardingTotalCount(jobSetting.getShardingTotalCount());
configuration.setDescription(jobSetting.getDescription());
configuration.setShardingItemParameters(jobSetting.getShardingItemParameters());
configuration.setJobClass(jobClassEnum.getClazz().getCanonicalName());
super.initJob(jobName, JobType.valueOf(jobSetting.getJobType()), configuration);
}
}
} catch (Exception e) {
log.error("初始化任务操作失败: {}", e.getMessage(), e);
}
}
/**
* 保存/更新任务
* @param job
* @param jobClass
*/
public void addOrUpdateJob(Job job, Class<? extends ElasticJob> jobClass) {
ElasticJobProperties.JobConfiguration configuration = new ElasticJobProperties.JobConfiguration();
configuration.setCron(job.getCron());
configuration.setJobParameter(job.getJobParameter());
configuration.setShardingTotalCount(job.getShardingTotalCount());
configuration.setShardingItemParameters(job.getShardingItemParameters());
configuration.setJobClass(jobClass.getCanonicalName());
super.initJob(job.getJobName(), JobType.valueOf(job.getJobType()), configuration);
}
@Override
public JobTypeConfiguration getJobTypeConfiguration(String jobName, JobType jobType, JobCoreConfiguration jobCoreConfiguration) {
String jobCode = StrUtil.subBefore(jobName, StrUtil.UNDERLINE, false);
JobClassEnum jobClassEnum = JobClassEnum.convert(jobCode);
if (ObjectUtil.isNotNull(jobClassEnum)) {
if (JobType.SIMPLE.equals(jobType)) {
return new SimpleJobConfiguration(jobCoreConfiguration, jobClassEnum.getClazz().getCanonicalName());
} else if (JobType.DATAFLOW.equals(jobType)) {
return new DataflowJobConfiguration(jobCoreConfiguration, jobClassEnum.getClazz().getCanonicalName(), false);
}
}
return null;
}
}
为什么是这样的实现?我发现每次重新发布服务后,现在的未执行的任务都会变成“已下线”,这可能跟Zookeeper有关,需要重新初始化才行,对于注解和配置式的,会自动初始化,但是动态添加的不会自动初始化。所以必须自己初始化,之前有个思路是自己建张表来维护定时,每次启动时进行初始化,但是这样太麻烦,后来实现使用elastic-job现有的API来实现,即启动时,遍历Zookeeper已有的节点,然后判断Cron表达式是否过期,如果还没有过期,则重新初始化任务,初始化时配置设置了会覆盖原来的配置,所以不会有影响。然后外层可以通过MQ来新增任务,在通过服务调用去指定对应的定时逻辑即可。
(不知道大家有没有更好的实现方案,可以初始化动态任务的)
而配置式的,可以直接在配置文件指定并实现即可
spring:
elasticjob:
#注册中心配置
zookeeper:
server-lists: 127.0.0.1:6181
namespace: elastic-job-spring-boot-stater-demo
#简单作业配置
simples:
#spring简单作业示例配置
spring-simple-job:
#配置简单作业,必须实现com.dangdang.ddframe.job.api.simple.SimpleJob
job-class: com.zen.spring.boot.demo.elasticjob.job.SpringSimpleJob
cron: 0/2 * * * * ?
sharding-total-count: 3
sharding-item-parameters: 0=Beijing,1=Shanghai,2=Guangzhou
#配置监听器
listener:
#配置每台作业节点均执行的监听,必须实现com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener
listener-class: com.zen.spring.boot.demo.elasticjob.listener.MyElasticJobListener
#流式作业配置
dataflows:
#spring简单作业示例配置
spring-dataflow-job:
#配置简单作业,必须实现com.dangdang.ddframe.job.api.dataflow.DataflowJob<T>
job-class: com.zen.spring.boot.demo.elasticjob.job.SpringDataflowJob
cron: 0/2 * * * * ?
sharding-total-count: 3
sharding-item-parameters: 0=Beijing,1=Shanghai,2=Guangzhou
streaming-process: true
#配置监听器
listener:
#配置分布式场景中仅单一节点执行的监听,必须实现com.dangdang.ddframe.job.lite.api.listener.AbstractDistributeOnceElasticJobListener
distributed-listener-class: com.zen.spring.boot.demo.elasticjob.listener.MyDistributeElasticJobListener
started-timeout-milliseconds: 5000
completed-timeout-milliseconds: 10000
#脚本作业配置
scripts:
#脚本作业示例配置
script-job:
cron: 0/2 * * * * ?
sharding-total-count: 3
sharding-item-parameters: 0=Beijing,1=Shanghai,2=Guangzhou
script-command-line: youPath/spring-boot-starter-demo/elastic-job-spring-boot-starter-demo/src/main/resources/script/demo.bat
以上整合基本可以满足现在的使用,比较期待移交Apache后的3的版本,这样可以有更多API的支持,而不用自己造轮子。
领取专属 10元无门槛券
私享最新 技术干货