前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >集成elastic-job分布式调度定时任务

集成elastic-job分布式调度定时任务

作者头像
余生大大
发布于 2022-11-02 08:43:02
发布于 2022-11-02 08:43:02
74900
代码可运行
举报
文章被收录于专栏:余生大大余生大大
运行总次数:0
代码可运行

前言

定时任务这一组件在工作过程中经常使用到,在单机节点上可以直接选择使用Spring自带的定时任务组件hubble-task,而这种定时任务一旦确定固化了定时触发策略,也无法动态开启关闭,所以后来有了Quartz

Quartz是定时任务领域的一个开源项目,由JAVA开发,可以通过API调度定时任务的启停及策略,还有对JTA事务跟集群的支持等等强大功能。

但是Quartz又有它的一些缺点:

  • Quartz调整定时任务需要通过API的方式进行调度,本质上还是没有脱离业务系统。
  • Quartz需要持久化数据到底层数据表,对业务系统的数据侵入较高。
  • Quartz也并没有支持分布式的调度执行,只能做到单个任务单个执行

elastic-job就是当当在Quartz的基础上进行了二次封装,elastic-job有两种版本:

  • Elastic-Job-Cloud:针对微服务的部署方式
  • Elastic-Job-Lite:基于zookeeper作为注册中心的部署方式

这两个版本除了部署方式不一样在api上是一样的,elastic-job相对于Quartz增加了很多新特性:

  1. 支持UI页面,可以在web页面上动态调整定时策略跟启停
  2. 基于Zookeeper作为分布式的调度,调度跟任务解耦
  3. 支持了分布式调度分片,同一个任务可以分成多片执行
  4. 作业类型多种,支持Simple、DataFLow数据流、Script脚本
  5. 失效转移,下线的机器的任务会重新分片执行
  6. 作业分片的一致性,任务分片后不会重复执行
  7. 错过执行的作业补偿

安装

安装elastic-job-lite方式,需要提前安装zookeeper,如果需要安装教程可以看这篇文章:Linux在线安装Zookeeper

elastic-job在apache的地址:elasticjob

然后就需要运行包含Elastic-Job-Lite和业务代码的jar文件。不限于jar或war的启动方式。

源码地址:elastic-job-lite

启动elastic-job-lite-console

下载2.1.4版本的源码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
https://codeload.github.com/apache/shardingsphere-elasticjob/zip/refs/tags/2.1.4

下载完成解压后有如下目录:

进入elastic-job-lite文件下的elastic-job-lite-console。在此目录下进行打包,打包命令

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
mvn clean install -Dmaven.test.skip=true  

打包好了启动jar包即可,也可以直接启动源码,找到console模块下的ConsoleBootstrap类进行启动

启动完成后访问ip:8899,账密为:root/root

进入系统后进入注册中心配置,填写需要注册的zookeeper地址进行连接。

下面是在linux中安装配置。也可以直接将打好的包放到linux中执行。

安装控制台

elastic-job 3.0后没有了console模块,有了更加优美的ui控制台

这里直接下载控制台包:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
https://archive.apache.org/dist/shardingsphere/elasticjob-ui-3.0.0-RC1/apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar.gz

下载后上传到服务器进行解压

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 tar -zxvf apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar.gz

到bin文件里进行启动

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
./start.sh

启动成功后访问ip地址:8088,默认账号密码为root/root

进入后需要在全局配置-注册中心配置添加注册中心

新增完成后建立连接,elastic-job初步就搭建好了,如果想要引入数据源需要修改conf文件下的application.properties配置文件

我想要修改为mysql作为数据库,需要再lib文件中加入连接包,这个手动上传即可。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
vim application.properties

修改为mysql的驱动跟连接方式

保存文件然后重新启动elastic-job,在事件追踪数据源配置中添加数据源,如下图:

点击建立连接,后面定时任务的配置及日志会记录在表里

集成

简单集成

引入pom依赖

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependency>
	<groupId>com.cxytiandi</groupId>
	<artifactId>elastic-job-spring-boot-starter</artifactId>
	<version>1.0.0</version>
</dependency>
<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-framework</artifactId>
	<version>2.10.0</version>
</dependency>
<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-recipes</artifactId>
	<version>2.10.0</version>
</dependency>

在配置文件application.properties添加配置:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
elasticJob.zk.serverLists=localhost:2181
elasticJob.zk.namespace=user-sync

然后就可以直接定义一个job类来验证了,代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Component
@ElasticJobConf(name = "TestJob",cron="0 0 0 * * ?",shardingTotalCount=5) // 每天零点执行
public class TestJob extends SimpleJob{
  @Override
  public void execute(ShardingContext shardingContext) {
  	// 要执行的逻辑
  }
}

这种方式的定时策略是依赖在ElasticJobConf注解上,调整注解的配置即可。

常规集成

常规集成有三个类

  • ElasticJobConfig:elastic-job组件的配置,举例zookeeper配置中心
  • ElasticJobHandler:job任务的具体执行类可以配置在这里
  • ElasticJobListener:job任务的监听,开始跟结束
  • ElasticJobProperties:从配置文件读取zookeeper配置

引入依赖

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-common-core</artifactId>
    <version>${elasticjob.version}</version>
    <exclusions>
        <exclusion>
            <artifactId>guava</artifactId>
            <groupId>com.google.guava</groupId>
        </exclusion>
        <exclusion>
            <artifactId>curator-framework</artifactId>
            <groupId>org.apache.curator</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>${elasticjob.version}</version>
</dependency>

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>${elasticjob.version}</version>
</dependency>
<!--解决冲突 elasticjob-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.10.0</version>
</dependency>

ElasticJobConfig

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true")
public class ElasticJobConfig {
    private final ElasticJobProperties jobProperties;

    public ElasticJobConfig(ElasticJobProperties jobProperties) {
        this.jobProperties = jobProperties;
    }

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter() {
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(jobProperties.getServerLists(),
                jobProperties.getNamespace()));
    }

    @Bean
    public ElasticJobListener elasticJobListener() {
        return new ElasticJobListener(100, 100);
    }
}

ElasticJobHandler

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
@AutoConfigureAfter(com.kyf.pe.trade.dataprocess.assist.config.ElasticJobConfig.class)
@ConditionalOnBean(com.kyf.pe.trade.dataprocess.assist.config.ElasticJobConfig.class)
public class ElasticJobHandlerConfig {
    private final ZookeeperRegistryCenter zookeeperRegistryCenter;

    public ElasticJobHandlerConfig(ZookeeperRegistryCenter zookeeperRegistryCenter) {
        this.zookeeperRegistryCenter = zookeeperRegistryCenter;
    }

    /**
     * 配置任务详细信息
     *
     * @param jobClass               定时任务实现类
     * @param cron                   表达式
     * @param shardingTotalCount     分片数
     * @param shardingItemParameters 分片参数
     * @return
     */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                         final String cron,
                                                         final int shardingTotalCount,
                                                         final String shardingItemParameters,
                                                         final String jobParameters,
                                                         final String description) {
        // 定义作业核心配置
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(), cron, shardingTotalCount).
                shardingItemParameters(shardingItemParameters).jobParameter(jobParameters).description(description).build();
        // 定义SIMPLE类型配置
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
        // 定义Lite作业根配置
        return LiteJobConfiguration.newBuilder(simpleJobConfig).build();
    }


    /**
     * 具体任务
     */
    @Bean(initMethod = "init")
    public JobScheduler pushHrhbJobScheduler(final TestJob testjob,
                                             @Value("${job.test.cron}") final String cron,
                                             @Value("${job.test.shardingTotalCount}") final int shardingTotalCount,
                                             @Value("${job.test.description}") final String description) {

        return new SpringJobScheduler(testjob, zookeeperRegistryCenter, getLiteJobConfiguration(testjob.getClass(),
                cron, shardingTotalCount, "", "", description));
    }


}

ElasticJobListener

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.AbstractDistributeOnceElasticJobListener;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {

    /**
     * 设置间隔时间
     *
     * @param startedTimeoutMilliseconds
     * @param completedTimeoutMilliseconds
     */
    public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
        super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
    }

    @Override
    public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
        log.info("任务名:{}开始", shardingContexts.getJobParameter());
    }

    @Override
    public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
        log.info("任务名:{}结束", shardingContexts.getJobParameter());
    }
}

ElasticJobProperties

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "elasticjob")
public class ElasticJobProperties {
    private boolean enabled = true;

    private String serverLists;

    private String namespace;

    public boolean isEnabled() {
        return enabled;
    }

    public void setEnabled(boolean enabled) {
        this.enabled = enabled;
    }

    public String getServerLists() {
        return serverLists;
    }

    public void setServerLists(String serverLists) {
        this.serverLists = serverLists;
    }

    public String getNamespace() {
        return namespace;
    }

    public void setNamespace(String namespace) {
        this.namespace = namespace;
    }
}

我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=1qq6a7mdoq048

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-10-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
logback的日志配置文件
部署运行后发现,在Linux下面 tomcat的日志文件catalina.out增长速度很快,造成愈来愈大;
JQ实验室
2022/02/09
3830
IDEA中使用tomcat9时出现乱码解决方法
1、修改IDEA中setting的设置Editor——File Encodings
cn華少
2021/07/21
9740
Linux下Tomcat 8 常用命令和优化
可能shutdown.sh 无法停止 tomcat,可以修改其配置 修改shutdown.sh的最后这一行
程裕强
2022/05/06
8400
配置catalina.out的日志格式[通俗易懂]
启动 Tomcat 时, catalina.out 中输出的日志格式可能不太理想。
全栈程序员站长
2022/08/25
1.6K0
Tomcat日志管理(一)[通俗易懂]
由于 JDK 自带的  java.util.logging实现提供的日志管理能力极为有限,不支持应用级别日志管理。因此,Tomcat 默认的日志库对java.util.logging API 进行了重新实现,这些实现被称为  “JULI” ,里面包含了一些特有的定制类,其中最重要的是一个自定义的LogManager类,它能够区别出运行在 Tomcat 容器中的多个不同的Web应用以及它们的类加载器,从而可以支持不同的应用使用各自独立的日志配置。 你可以从 Tomcat 全局和 Web 应用两个层面对 Tomcat 默认的JULI进行日志配置:
全栈程序员站长
2022/08/31
1.1K0
tomcat下的日志配置详细说明
#可配置项(5类日志):catalina、localhost、manager、admin、host-manager handlers = 1catalina.org.apache.juli.FileHandler, 2localhost.org.apache.juli.FileHandler, 3manager.org.apache.juli.FileHandler, 4host-manager.org.apache.juli.FileHandler, java.util.logging.Conso
业余草
2019/01/21
1.7K0
解决Tomcat中文乱码问题——windows平台
这是因为tomcat默认编码是UTF-8,但是windows默认的编码格式是GBK,不匹配,所以我们改一下就行了。
红目香薰
2022/11/30
7.7K0
解决Tomcat中文乱码问题——windows平台
tomcat日志详解[通俗易懂]
tomcat有五类日志:catalina、localhost、manager、admin、host-manager
全栈程序员站长
2022/08/31
3.9K0
Tomcat日志系统详解
综合:Tomcat下相关的日志文件 Cataline引擎的日志文件,文件名catalina.日期.log Tomcat下内部代码丢出的日志,文件名localhost.日期.log(jsp页面内部错误的异常,org.apache.jasper.runtime.HttpJspBase.service类丢出的,日志信息就在该文件!) Tomcat下默认manager应用日志,文件名manager.日期.log 控制台输出的日志,Linux下默认重定向到catalina.out Access日志(Servlet.xml配置) 应用程序通过log4j.properties:${catalina.base}/logs/probe.log重定向过来的日志 JULI:org.apache.juli.FileHandler对应的日志文件名:{prefix}.{date}.{suffix} 默认juli.日期.log Tomcat下Web应用程序可以使用如下3种日志: 使用JDK提供的日志java.util.logging. 使用Java Servlets规范中定义的日志javax.servlet.ServletContext.log(...) 使用其他日志框架,如log4j 不同Web应用程序下使用的Servlet日志(或者日志框架提供的日志)是相互独立的(这与Tomcat的class loader有关,参考Class Loader HOW-TO )。如果Web应用程序使用的是java.util.logging日志,那么它们并不相互独立,这是因为java.util.logging是由JAVA系统中的Bootstrap ClassLoader来加载的,因此它在各Web应用程序之间是共享的! Tomcat使用的日志配置文件:$CATALINA_BASE/conf/logging.properties Tomcat日志管理类默认使用的是JULI:LOGGING_MANAGER="-Djava.util.logging.manager=org.apache.juli.ClassLoaderLogManager" Java的stdout and stderr会被重定向到$CATALINA_BASE/logs/catalina.out,同时:下面2种类型的错误信息,也会被记录在这里 Uncaught exceptions printed by java.lang.ThreadGroup.uncaughtException(.. Thread dumps, if you requested them via a system signal Access访问日志:它与一般的日志有关系但不太一样,它在Servlet.xml中的Context或者 Host或者Engine中配置。在上述的配置节中增加下述的Value就行,具体参考:The Valve Component Xml代码 <Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs" prefix="localhost_access_log." suffix=".logs" pattern="common" resolveHosts="false"/> Tomcat默认使用JULI日志系统(可以参考官网文档修改成使用log4j),它是对默认的JDK日志java.util.logging进行一定的封装,和标准JDK日志支持相同的配置。最大的不同是针对不同的classloader,可以使用不同的配置文件,使得tomcat下不同的Web应用程序可以使用各自独立的日志文件。也就是说,Tomcat下的默认日志有如下2个层次: 全局配置文件. That is usually done in the ${catalina.base}/conf/logging.properties file. The file is specified by the java.util.logging.config.file System property which is set by the startup scripts. If it is not readable or is not configured, the default is to use the ${java.home}/lib/logging.properties file in the JRE. Web应用程序中使用WEB-INF/classes/logging.properties 默认的JRE中的logging.properties会
菲宇
2019/06/13
3.1K0
Tomcat日志系统详解
如何使用Tomcat自带的日志实现tomcat-juli.jar
Tomcat自带的日志实现是tomcat-juli.jar,它是对默认的JDK日志java.util.logging进行一定的封装,和标准JDK日志支持相同的配置,但是和log4j等常用的日志框架比起来功能要较为简陋。但是tomcat-juli可以针对不同的classloader来使用不同的配置文件,使得tomcat下不同的Web应用程序可以使用各自独立的日志文件。
雨临Lewis
2022/01/11
1.1K0
解决idea的控制台输出Tomcat日志乱码问题「建议收藏」
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
全栈程序员站长
2022/11/09
8.7K0
解决idea的控制台输出Tomcat日志乱码问题「建议收藏」
tomcat 8.5.9.0 解决catalina.out过大的问题
先吐嘈一下tomcat这个项目,日志切割这么常见的功能,tomcat这种知名开源项目默认居然不开启,生产环境跑不了几天,磁盘就满了,而且很多网上流传的方法,比如修改conf/logging.properties文件,将级别设置成OFF,我试了好象并不管用(tomcat 8.5.9.0版本 + centos 6.5 + jdk1.8环境) 正确姿势: 1、下载 https://mirrors.tuna.tsinghua.edu.cn/apache/logging/log4j/1.2.17/log4j-1.2.
菩提树下的杨过
2018/01/18
2K0
Tomcat常见问题合集记录
解决方法:在Tomcat里的bin中的setclasspath.bat或者setclasspath.sh开头添加设置环境变量;
全栈工程师修炼指南
2022/09/29
1K0
spingboot tomcat 报错 One or more listeners failed to start. Full details will be found in the appr...
在src/main/reousrces/ 下添加文件 logging.properties,内容如下:
飞奔去旅行
2019/10/12
2.7K0
tomcat报错One or more listeners failed to start.
今天真是找报错的一天,就是因为使用maven搭建springmvc工程,实在是太累了,之前报错tomcat找不到servlet是因为没有勾选源文件,这里勾选后,继续报错:
废江_小江
2022/09/05
2.6K0
tomcat报错One or more listeners failed to start.
Error filterStart的问题
可以在项目的 WEB-INF/classes目录下新建一个文件叫logging.properties
全栈程序员站长
2022/09/07
2350
kubernetes集群之微服务tomcat服务
Tomcat是Apache 软件基金会(Apache Software Foundation)的Jakarta 项目中的一个核心项目,由Apache、Sun 和其他一些公司及个人共同开发而成。由于有了Sun 的参与和支持,最新的Servlet 和JSP 规范总是能在Tomcat 中得到体现,Tomcat 5支持最新的Servlet 2.4 和JSP 2.0 规范。因为Tomcat 技术先进、性能稳定,而且免费,因而深受Java 爱好者的喜爱并得到了部分软件开发商的认可,成为目前比较流行的Web 应用服务器。
王先森sec
2023/04/24
3590
kubernetes集群之微服务tomcat服务
Tomcat日志切割总结[通俗易懂]
我们都知道将一个项目部署到Tomcat之后,Tomcat服务启动后的标准输出(stdout)和标准出错(stderr)都会默认重定向到${TOMCAT_HOME}/logs/catalina.out这个文件中,有时候短短一会儿这个文件就能达到几十兆甚至上百兆,日积月累这个文件如果不及时清理将会占用服务器磁盘大量空间从而影响到整个项目的正常运行; 再者这样大日志文件对于我们进行错误排查以及日志分析都不是很方便,一次打开也花上好几分钟,直接cat命令查看也要滚掉好多屏,并且那时候想要来切割的话又异常麻烦。 所以,现在我们提前做好用日期来分割日志的配置,即Tomcat运行的每天都按照日期命名新建一个日志文件。
全栈程序员站长
2022/08/31
2.4K0
深入拆解Tomcat和Jetty之通用组件
每一个系统都有一些通用的模块,比如日志模块、异常处理模块、工具类等,对于 Tomcat 来说,比较重要的通用模块有日志、Session 管理和集群管理。从今天开始我会分三期来介绍通用模块,今天这一期先来讲日志模块。
yuanshuai
2022/08/22
5400
深入拆解Tomcat和Jetty之通用组件
spring整合各种中间件(RocketMQ、kafka、RabbitMQ、TubeMQ、NSQ)-腾讯开源【TubeMQ】
上文:spring整合各种中间件(RocketMQ、kafka、RabbitMQ、ActiveMQ、ZeroMQ)-ZeroMQ
逍遥壮士
2021/05/24
8450
spring整合各种中间件(RocketMQ、kafka、RabbitMQ、TubeMQ、NSQ)-腾讯开源【TubeMQ】
推荐阅读
相关推荐
logback的日志配置文件
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档