首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >springboot项目集成dolphinscheduler调度器 实现datax数据同步任务

springboot项目集成dolphinscheduler调度器 实现datax数据同步任务

原创
作者头像
刘大猫
发布2025-10-22 22:35:58
发布2025-10-22 22:35:58
1450
举报
文章被收录于专栏:JAVA相关JAVA相关

Datax安装及基本使用请查看上一篇文章:https://blog.csdn.net/a924382407/article/details/120952339?spm=1001.2014.3001.5501

@TOC

Datax概述

1.概述

2.功能清单

| 功能清单 |

|--|

| CRUD增删改查 、启动任务、停止任务|

3.==说明==:本项目只支持mysql及hbase之间的数据同步

代码模块

配置文件

pom.xml

代码语言:java
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.geespace.microservices.bd-platform</groupId>
        <artifactId>all</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>data-sync-config</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
        <gson.version>2.8.1</gson.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>com.github.pagehelper</groupId>
            <artifactId>pagehelper-spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>${gson.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--elasticsearch-->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.8.12</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>6.8.12</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.8.12</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-core-asl</artifactId>
            <version>1.9.13</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.geespace.microservices.bd-platform</groupId>
            <artifactId>data-config</artifactId>
            <version>1.0-SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>
        <!--httpclient-->
        <dependency>
            <groupId>commons-httpclient</groupId>
            <artifactId>commons-httpclient</artifactId>
            <version>3.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.2</version>
                <dependencies>
                    <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-maven-plugin</artifactId>
                        <version>2.1.4.RELEASE</version>
                    </dependency>
                </dependencies>
                <configuration>
                    <keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.handlers</resource>
                                </transformer>
                                <transformer
                                        implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                    <resource>META-INF/spring.factories</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.schemas</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.geespace.microservices.dispatcher.DispatchApplication</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
</project>

DataxDolphinschedulerController

代码语言:java
复制
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.servlet.http.HttpServletRequest;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.geespace.microservices.builder.dto.ProcessDto;
import com.geespace.microservices.builder.dto.SyncConfigDto;
import com.geespace.microservices.builder.enums.DictionaryEnum;
import com.geespace.microservices.builder.request.ConfigAddForm;
import com.geespace.microservices.builder.request.ConfigSelectForm;
import com.geespace.microservices.builder.request.ConfigUpdateForm;
import com.geespace.microservices.builder.response.BizCode;
import com.geespace.microservices.builder.response.DolphinschedulerResponse;
import com.geespace.microservices.builder.response.Msg;
import com.geespace.microservices.builder.response.PageResult;
import com.geespace.microservices.builder.response.ReturnResult;
import com.geespace.microservices.builder.service.SyncConfigService;
import com.geespace.microservices.builder.tools.JsonTools;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.NameValuePair;
import org.apache.commons.httpclient.methods.PostMethod;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

/**
 * 迁移dolphinscheduler调度器
 * 
 * @author: liudz
 * @date: 2021/5/7
 */
@Slf4j
@RestController
@RequestMapping("/dolphinscheduler/v1")
public class DataxDolphinschedulerController {
    @Autowired
    private RestTemplate restTemplate;
    @Value("${dolphinscheduler.token}")
    String token;
    @Value("${dolphinscheduler.address}")
    String address;
    public static final int ZERO = 0;
    public static final int SUCCESS = 200;
    public static final String CREATE = "create";
    public static final String UPDATE = "update";
    public static final String ADD = "add";
    public static final String DELETE = "delete";
    public static final String ONLINE = "ONLINE";
    public static final String OFFLINE = "OFFLINE";
    public static final int ONE_THOUSAND_AND_FIVE_HUNDRED = 1500;
    public static final int SIX = 6;
    public static final int EIGHTY = 80;
    public static final int THREE = 3;
    @Autowired
    private SyncConfigService syncConfigService;


    /**
     * 创建任务-创建用户下唯一工作流,无则创建有则并排添加
     * @param request request
     * @param form 任务参数
     * @author liudz
     * @date 2021/5/8
     * @return 执行结果
     **/
    @PostMapping("/project/process/datax")
    @Transactional(rollbackFor = Exception.class)
    public ReturnResult operatorDataxTask(HttpServletRequest request, @RequestBody @Validated ConfigAddForm form) {
        Long userId = Long.valueOf(request.getUserPrincipal().getName());
        form.setUserId(userId);

        ReturnResult<SyncConfigDto> dataxTaskReturnResult = syncConfigService.addConfig(form);
        if (dataxTaskReturnResult.getCode() != SUCCESS) {
            return dataxTaskReturnResult;
        }
        log.info("--(1)addDataxTaskResult--success");
        form.setId(dataxTaskReturnResult.getData().getId());
        if (dataxTaskReturnResult.getCode() == SUCCESS) {
            Boolean verifyResult = verifyProcessExist(userId + "-dataxTask", form.getProjectName());
            log.info("--(2)verifyProcessExist--success:{}", verifyResult);
            if (!verifyResult) {
                ProcessDto processDto = packageProcessParam(
                        "create", userId + "-dataxTask", dataxTaskReturnResult.getData(), null);
                log.info("--(3)packageProcessParam--success");
                processDto.setProjectName(form.getProjectName());
                processDto.setProjectId(form.getProjectId());
                dataxTaskReturnResult =  createProcess(processDto);
            } else {
                //获取用户下唯一工作流ID
                DolphinschedulerResponse processInfoList = getUserProcess(form.getProjectName());
                JSONObject processJson = new JSONObject();
                log.info("--(3)getUserProcess--success:{}", processInfoList);
                List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
                for (Map<String, Object> map : list) {
                    if (map.get("name").equals(userId + "-dataxTask")) {
                        processJson.fluentPutAll(map);
                    }
                }
                ProcessDto processDto = packageProcessParam(
                        "add", userId + "-dataxTask", dataxTaskReturnResult.getData(), processJson);
                processDto.setId(processJson.getInteger("id"));
                log.info("--(4)packageProcessParam--success");
                if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
                    releaseProcessDefinition(form.getProjectName(), userId + "-dataxTask",
                            processDto.getId(), 0);
                    log.info("--(5)releaseProcessDefinition--OFFLINE--success");
                }
                dataxTaskReturnResult =  updateProcess(form, processDto);
            }
        }
        return dataxTaskReturnResult;
    }
    /**
     * 更新任务
     * @param request request
     * @param form 任务参数
     * @author liudz
     * @date 2021/5/8
     * @return 执行结果
     **/
    @PutMapping("/project/process/datax")
    @Transactional(rollbackFor = Exception.class)
    public ReturnResult updateDataxTask(HttpServletRequest request, @RequestBody @Validated ConfigUpdateForm form) {
        Long userId = Long.valueOf(request.getUserPrincipal().getName());
        form.setUserId(userId);
        ReturnResult<SyncConfigDto> dataxTaskReturnResult = syncConfigService.updateConfig(form);
        log.info("--(1)updateDataxTaskResult--mysql--success");
        if (dataxTaskReturnResult.getCode() == SUCCESS) {
            //获取用户下唯一工作流ID
            DolphinschedulerResponse processInfoList = getUserProcess(form.getProjectName());
            JSONObject processJson = new JSONObject();
            log.info("--(2)getUserProcess--success:{}", processInfoList);
            List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
            for (Map<String, Object> map : list) {
                if (map.get("name").equals(userId + "-dataxTask")) {
                    processJson.fluentPutAll(map);
                }
            }
            ProcessDto processDto = packageProcessParam(
                    "update", userId + "-dataxTask", dataxTaskReturnResult.getData(), processJson);
            processDto.setProjectName(form.getProjectName());
            processDto.setProjectId(form.getProjectId());
            processDto.setId(processJson.getInteger("id"));
            log.info("--(3)packageProcessParam--success");
            if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
                releaseProcessDefinition(form.getProjectName(), userId + "-dataxTask",
                        processDto.getId(), 0);
                log.info("--(4)releaseProcessDefinition--OFFLINE--success");
            }
            ConfigAddForm configAddForm = new ConfigAddForm();
            BeanUtils.copyProperties(form, configAddForm);
            return updateProcess(configAddForm, processDto);
        }
        return dataxTaskReturnResult;
    }
    /**
     * 删除任务
     * @param request request
     * @param projectName 项目名称
     * @param id 任务ID
     * @author liudz
     * @date 2021/5/8
     * @return 执行结果
     **/
    @DeleteMapping("/project/process/datax/{projectName}/{id}")
    @Transactional(rollbackFor = Exception.class)
    public ReturnResult deleteDataxTask(HttpServletRequest request, @PathVariable("projectName") String projectName,
                                        @PathVariable("id") Long id) {
        Long userId = Long.valueOf(request.getUserPrincipal().getName());
        SyncConfigDto syncConfigDto = new SyncConfigDto();
        syncConfigDto.setId(id);
        ConfigAddForm configAddForm = new ConfigAddForm();
        configAddForm.setProjectName(projectName);
        ReturnResult dataxTaskReturnResult = syncConfigService.delete(id, userId);
        log.info("--(1)deleteDataxTask--mysql--success");
        if (dataxTaskReturnResult.getCode() == SUCCESS) {
            //获取用户下唯一工作流ID
            DolphinschedulerResponse processInfoList = getUserProcess(projectName);
            JSONObject processJson = new JSONObject();
            log.info("--(2)getUserProcess--success:{}", processInfoList);
            List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
            for (Map<String, Object> map : list) {
                if (map.get("name").equals(userId + "-dataxTask")) {
                    processJson.fluentPutAll(map);
                }
            }
            ProcessDto processDto = packageProcessParam(
                    "delete", userId + "-dataxTask", syncConfigDto, processJson);
            processDto.setId(processJson.getInteger("id"));
            log.info("--(3)packageProcessParam--success");
            if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
                releaseProcessDefinition(projectName, userId + "-dataxTask",
                        processDto.getId(), 0);
                log.info("--(4)releaseProcessDefinition--OFFLINE--success");
            }
            if (JSONObject.parseObject(processDto.getLocations()).keySet().size() == 0) {
                //删除工作流
                deleteProcess(configAddForm, processDto);
            } else {
                //更新工作流
                updateProcess(configAddForm, processDto);
            }
        }
        return dataxTaskReturnResult;
    }

    /**
     * 校验工作流是否存在
     * 
     * @param processName
     *            工作流名称
     * @param projectName 项目名称
     * @author liudz
     * @date 2021/5/8
     * @return boolean
     **/
    public Boolean verifyProcessExist(String processName, String projectName) {
        HttpHeaders headers = new HttpHeaders();
        headers.set("token", token);
        headers.set("Content-Type", "application/json");
        HttpEntity requestEntity = new HttpEntity(headers);
        ResponseEntity<DolphinschedulerResponse> returnResult =
            restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName
                            + "/process/verify-name?name=" + processName,
                HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
        if (returnResult.getBody().getCode() == ZERO) {
            return false;
        }
        return true;
    }

    /**
     * 创建工作流
     * @param processDto processDto
     * @author liudz
     * @date 2021/5/7
     * @return 执行结果
     **/
    public ReturnResult createProcess(ProcessDto processDto) {
        try {
            String postURL = address + "/dolphinscheduler/projects/"
                   + URLEncoder.encode(processDto.getProjectName(), "utf-8") + "/process/save";
            PostMethod postMethod = new PostMethod(postURL);
            postMethod.setRequestHeader("Content-Type",
                    "application/x-www-form-urlencoded;charset=utf-8");
            postMethod.setRequestHeader("token", token);
            NameValuePair[] data = {new NameValuePair("connects", processDto.getConnects()),
                new NameValuePair("name", processDto.getName()),
                new NameValuePair("locations", processDto.getLocations()),
                new NameValuePair("processDefinitionJson", processDto.getProcessDefinitionJson())};
            postMethod.setRequestBody(data);
            org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
            httpClient.executeMethod(postMethod);
            JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
            log.info("--(5)httpCreateProcess:{}", result);
            if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
                return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));
            }
        } catch (Exception e) {
            log.info("请求异常:{}", e);
        }
        return ReturnResult.success();
    }

    /**
     * 更新工作流
     * @param vo vo
     * @param processDto processDto
     * @author liudz
     * @date 2021/5/7
     * @return 执行结果
     **/
    public ReturnResult updateProcess(ConfigAddForm vo, ProcessDto processDto) {
        try {

            String postURL = address + "/dolphinscheduler/projects/"
                   + URLEncoder.encode(vo.getProjectName(), "utf-8") + "/process/update";
            PostMethod postMethod = new PostMethod(postURL);
            postMethod.setRequestHeader("Content-Type",
                    "application/x-www-form-urlencoded;charset=utf-8");
            postMethod.setRequestHeader("token", token);
            // 参数设置,需要注意的就是里边不能传NULL,要传空字符串
            NameValuePair[] data = {new NameValuePair("connects", processDto.getConnects()),
                new NameValuePair("name", processDto.getName()),
                new NameValuePair("locations", processDto.getLocations()),
                new NameValuePair("id", processDto.getId().toString()),
                new NameValuePair("processDefinitionJson", processDto.getProcessDefinitionJson())};
            postMethod.setRequestBody(data);
            org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
            httpClient.executeMethod(postMethod);
            JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
            log.info("--(5)httpUpdateProcess:{}", result);
            if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
                return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));
            }
        } catch (Exception e) {
            log.info("请求异常:{}", e);
        }
        return ReturnResult.success();
    }
    /**
     * 删除工作流
     * @param dto dto
     * @param processDto processDto
     * @author liudz
     * @date 2021/5/7
     * @return 执行结果
     **/
    public DolphinschedulerResponse deleteProcess(ConfigAddForm dto, ProcessDto processDto) {
            HttpHeaders headers = new HttpHeaders();
            headers.set("token", token);
            headers.set("Content-Type", "application/json");
            HttpEntity requestEntity = new HttpEntity(headers);
            ResponseEntity<DolphinschedulerResponse> returnResult =
                    restTemplate.exchange(address + "/dolphinscheduler/projects/" + dto.getProjectName()
                                   + "/process/delete?processDefinitionId=" + processDto.getId(),
                            HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
            log.info("--(5)httpDeleteProcess:{}", returnResult);
        return returnResult.getBody();
    }

    /**
     * 获取dolphinscheduler上的资源spark可拖拽jar的id
     * 
     * @author liudz
     * @date 2021/5/8
     * @return id
     **/
    public Integer getSparkResourceJarId() {
        Integer resourceId = null;
        HttpHeaders headers = new HttpHeaders();
        headers.set("token", token);
        headers.set("Content-Type", "application/json");
        HttpEntity requestEntity = new HttpEntity(headers);
        ResponseEntity<DolphinschedulerResponse> returnResult =
            restTemplate.exchange(address + "/dolphinscheduler/resources/authorize-resource-tree?userId=1",
                HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
        List<Map<String, Object>> list = (List<Map<String, Object>>) returnResult.getBody().getData();
        for (Map<String, Object> map : list) {
            if (map.get("name").equals("big_data02.jar")) {
                resourceId = Integer.valueOf(map.get("id").toString());
            }
        }
        return resourceId;
    }
    /**
     * 获取dolphinscheduler上的某用户下唯一工作流
     * @param projectName 项目名称
     * @author liudz
     * @date 2021/5/8
     * @return id
     **/
    public DolphinschedulerResponse getUserProcess(String projectName) {
        HttpHeaders headers = new HttpHeaders();
        headers.set("token", token);
        headers.set("Content-Type", "application/json");
        HttpEntity requestEntity = new HttpEntity(headers);
        ResponseEntity<DolphinschedulerResponse> returnResult =
            restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName + "/process/list",
                HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
        return returnResult.getBody();
    }
    /**
     *  封装参数
     * @param type 操作类型
     * @param processName 用户工作流名称
     * @param dto 任务参数
     * @param processJson 工作流json
     * @author liudz
     * @date 2021/5/13
     * @return ProcessDto
     **/
    public ProcessDto packageProcessParam(String type, String processName, SyncConfigDto dto, JSONObject processJson) {
        ProcessDto processDto = new ProcessDto();
        processDto.setConnects("[]");
        processDto.setName(processName);
        JSONObject locationsOne = new JSONObject();
        JSONObject locationsTwo = new JSONObject();
        locationsTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("targetarr", "").fluentPut("nodenumber", "0");
        locationsTwo.fluentPut("x", 0).fluentPut("y", 0);
        locationsOne.put("datax-" + dto.getId(), locationsTwo);

        // 创建工作流
        if (CREATE.equals(type)) {
            processDto = packageProcessParamOfCreate(processDto, dto, locationsOne);
         } else if (ADD.equals(type)) {
            //工作流添加节点
            processDto = packageProcessParamOfAdd(processDto, dto, processJson, locationsOne, locationsTwo);
        } else if (UPDATE.equals(type)) {
            //更新工作流-只更新参数processDefinitionJson的tasks参数
            processDto = packageProcessParamOfUpdate(processDto, processJson, dto);
        } else if (DELETE.equals(type)) {
            //更新工作流或删除工作流-更新则删除参数processDefinitionJson的tasks参数
            processDto = packageProcessParamOfDelete(processDto, processJson, dto);
        }
        return processDto;
    }
    /**
     * packageProcessParamOfCreate
     * @param processDto 工作流参数
     * @param dto 任务参数
     * @param locationsOne locationsOne
     * @author liudz
     * @date 2021/5/7
     * @return ProcessDto
     **/
    public ProcessDto packageProcessParamOfCreate(ProcessDto processDto, SyncConfigDto dto, JSONObject locationsOne) {
        processDto.setLocations(locationsOne.toString());
        JSONObject processOne = new JSONObject();
        processOne.fluentPut("globalParams", new ArrayList<>()).fluentPut("tenantId", THREE).fluentPut("timeout", 0);
        JSONObject processTwo = new JSONObject();
        processTwo.fluentPut("type", "DATAX").fluentPut("id", "datax-" + dto.getId());
        processTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("description", "");

        String taskJsonString = dto.getContent().toString();
        processTwo.put("params", JSONObject.parseObject("{\"localParams\":[],\"customConfig\":1,"
               + "\"json\":\"" + taskJsonString.replace("\"", "\\\"") + "\"}"));

        JSONObject jsonTimeout = new JSONObject();
        jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);
        processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");
        JSONObject processTree = new JSONObject();
        processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());
        JSONObject jsonconditionResult = new JSONObject();
        jsonconditionResult.put("successNode", new ArrayList<>());
        jsonconditionResult.put("failedNode", new ArrayList<>());
        processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());
        processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");
        processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");
        processTwo.fluentPut("preTasks", new ArrayList<>());
        JSONArray processTaskArray = new JSONArray();
        processTaskArray.add(processTwo);
        processOne.put("tasks", processTaskArray);
        processDto.setProcessDefinitionJson(processOne.toString());
        return processDto;
    }
    /**
     * packageProcessParamOfAdd
     * @param processDto 工作流参数
     * @param locationsOne locationsOne
     * @param locationsTwo locationsTwo
     * @param dto 任务参数
     * @param processJson 工作流json
     * @author liudz
     * @date 2021/5/7
     * @return ProcessDto
     **/
    public ProcessDto packageProcessParamOfAdd(ProcessDto processDto, SyncConfigDto dto, JSONObject processJson,
                                               JSONObject locationsOne, JSONObject locationsTwo) {
        String maxTaskKey = JsonTools.getJsonStringMaxKey(processJson.getString("locations"));
        Integer x = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("x");
        Integer y = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("y");
        if (x < ONE_THOUSAND_AND_FIVE_HUNDRED) {
            locationsTwo.fluentPut("x", x + EIGHTY).fluentPut("y", y);
        } else if (x >= ONE_THOUSAND_AND_FIVE_HUNDRED) {
            locationsTwo.fluentPut("y", y + EIGHTY).fluentPut("x", 0);
        }
        locationsOne = processJson.getJSONObject("locations").fluentPut("datax-" + dto.getId(), locationsTwo);
        processDto.setLocations(locationsOne.toString());
        processDto.setId(processJson.getInteger("id"));
        JSONObject processTwo = new JSONObject();
        processTwo.fluentPut("type", "DATAX").fluentPut("id", "datax-" + dto.getId());
        processTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("description", "");
        String taskJsonString = dto.getContent().toString().replace("}}", "} }").replace("{{", "{ {");
        processTwo.put("params", JSONObject.parseObject("{\"localParams\":[],\"customConfig\":1,"
                + "\"json\":\"" + taskJsonString.replace("\"", "\\\"") + "\"}"));
        JSONObject jsonTimeout = new JSONObject();
        jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);
        processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");
        JSONObject processTree = new JSONObject();
        processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());
        JSONObject jsonconditionResult = new JSONObject();
        jsonconditionResult.put("successNode", new ArrayList<>());
        jsonconditionResult.put("failedNode", new ArrayList<>());
        processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());
        processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");
        processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");
        processTwo.fluentPut("preTasks", new ArrayList<>());
        JSONObject jsonNew = processJson.getJSONObject("processDefinitionJson");
        JSONArray jsonArray = jsonNew.getJSONArray("tasks");
        jsonArray.add(processTwo);
        jsonNew.put("tasks", jsonArray);
        processDto.setProcessDefinitionJson(jsonNew.toString());
        return processDto;
    }
    /**
     * packageProcessParamOfUpdate
     * @param processDto 工作流参数
     * @param dto 任务参数
     * @param processJson 工作流json
     * @author liudz
     * @date 2021/5/7
     * @return ProcessDto
     **/
    public ProcessDto packageProcessParamOfUpdate(ProcessDto processDto, JSONObject processJson, SyncConfigDto dto) {
        processDto.setLocations(processJson.getString("locations"));
        processDto.setId(processJson.getInteger("id"));
        JSONArray jsonTasksArray = processJson.getJSONObject("processDefinitionJson").getJSONArray("tasks");
        JSONArray copyJsonTasksArray = new JSONArray();
        copyJsonTasksArray.addAll(jsonTasksArray);
        JSONObject processDefinitionJson = new JSONObject();
        String taskJsonString = dto.getContent().toString();
        for (Object object : jsonTasksArray) {
            JSONObject jsonObject = JSONObject.parseObject(object.toString());
            if (Long.valueOf(jsonObject.getString("id").substring(SIX)) == dto.getId()) {
                String json = jsonObject.getString("json");
                json = taskJsonString;
                copyJsonTasksArray.remove(jsonObject);
                jsonObject.getJSONObject("params").put("json", json);
                copyJsonTasksArray.add(jsonObject);
                processDefinitionJson = processJson.getJSONObject("processDefinitionJson");
                processDefinitionJson.put("tasks", copyJsonTasksArray);
            }
        }
        processDto.setProcessDefinitionJson(processDefinitionJson.toString());
        return processDto;
    }
    /**
     * packageProcessParamOfDelete
     * @param processDto 工作流参数
     * @param dto 任务参数
     * @param processJson 工作流json
     * @author liudz
     * @date 2021/5/7
     * @return ProcessDto
     **/
    public ProcessDto packageProcessParamOfDelete(ProcessDto processDto, JSONObject processJson, SyncConfigDto dto) {
        processDto.setId(processJson.getInteger("id"));
        JSONObject locationsJson = processJson.getJSONObject("locations");
        JSONObject processDefinitionJson = processJson.getJSONObject("processDefinitionJson");
        JSONArray processDefinitionArray = processDefinitionJson.getJSONArray("tasks");
        JSONArray copyProcessDefinitionArray = new JSONArray();
        copyProcessDefinitionArray.addAll(processDefinitionArray);
        if (locationsJson.containsKey(DictionaryEnum.DATAX.getFiledString() + dto.getId())) {
            locationsJson.remove("datax-" + dto.getId());
            for (Object object : processDefinitionArray) {
                if (JSONObject.parseObject(object.toString()).getString("id").equals("datax-" + dto.getId())) {
                    copyProcessDefinitionArray.remove(object);
                }
            }
            processDefinitionJson.put("tasks", copyProcessDefinitionArray);
        }
        processDto.setLocations(locationsJson.toString());
        processDto.setProcessDefinitionJson(processDefinitionJson.toString());
        return processDto;
    }

    /**
     * 工作流【上线或者下线】
     * @param projectName 项目名称
     * @param processName 用户工作流名称
     * @param processId 工作流ID
     * @param releaseState 上下线状态操作【0:下线,1:上线】
     * @author liudz
     * @date 2021/5/7
     * @return 执行结果
     **/
    public ReturnResult releaseProcessDefinition(String projectName, String processName, Integer processId,
                  Integer releaseState) {
        try {
            String postURL = address + "/dolphinscheduler/projects/"
                   + URLEncoder.encode(projectName, "utf-8") + "/process/release";
            PostMethod postMethod = new PostMethod(postURL);
            postMethod.setRequestHeader("Content-Type",
                    "application/x-www-form-urlencoded;charset=utf-8");
            postMethod.setRequestHeader("token", token);
            // 参数设置,需要注意的就是里边不能传NULL,要传空字符串
            NameValuePair[] data = {new NameValuePair("name", processName),
                    new NameValuePair("processId", processId.toString()),
                    new NameValuePair("releaseState", releaseState.toString())};
            postMethod.setRequestBody(data);
            org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
            httpClient.executeMethod(postMethod);
            JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
            if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
                return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));
            }
        } catch (Exception e) {
            log.info("请求异常:{}", e);
        }
        return ReturnResult.success();
    }
    /**
     * 运行流程实例
     * @param projectName 项目名称
     * @param request request
     * @param id 数据同步任务ID
     * @author liudz
     * @date 2021/5/7
     * @return 执行结果
     **/
    @GetMapping("/project/process/datax/start")
    public DolphinschedulerResponse startProcessDataxTask(
            @RequestParam("projectName") String projectName, @RequestParam("id") Integer id,
            HttpServletRequest request) {
        try {
            Long userId = Long.valueOf(request.getUserPrincipal().getName());
            DolphinschedulerResponse processInfoList = getUserProcess(projectName);
            if (processInfoList.getCode() != ZERO) {
                return processInfoList;
            }
            JSONObject processJson = new JSONObject();
            log.info("--(1)getUserProcess--success:{}", processInfoList);
            List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
            for (Map<String, Object> map : list) {
                if (map.get("name").equals(userId + "-dataxTask")) {
                    processJson.fluentPutAll(map);
                }
            }
            if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(OFFLINE)) {
                releaseProcessDefinition(projectName, userId + "-dataxTask",
                        processJson.getInteger("id"), 1);
                log.info("--(2)releaseProcessDefinition--ONLINE--success");
            }
            String postURL = address + "/dolphinscheduler/projects/" + URLEncoder.encode(projectName, "utf-8")
                   + "/executors/start-process-instance";
            PostMethod postMethod = new PostMethod(postURL);
            postMethod.setRequestHeader("Content-Type",
                    "application/x-www-form-urlencoded;charset=utf-8");
            postMethod.setRequestHeader("token", token);
            // 参数设置,需要注意的就是里边不能传NULL,要传空字符串
            NameValuePair[] data = packageNameValuePair(processJson, id);
            postMethod.setRequestBody(data);
            org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
            httpClient.executeMethod(postMethod);
            JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
            log.info("--(2)startProcessInstance--result:{}", result);
            if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
                return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg"));
            }
        } catch (Exception e) {
            log.info("请求异常:{}", e);
        }
        return DolphinschedulerResponse.success();
    }
    /**
     *  packageNameValuePair封装参数
     * @param processJson 工作流json
     * @param dragSparkTaskId 任务ID
     * @author liudz
     * @date 2021/5/14
     * @return NameValuePair
     **/
    public NameValuePair[] packageNameValuePair(JSONObject processJson, Integer dragSparkTaskId) {
        NameValuePair[] data = {
                new NameValuePair("failureStrategy", "CONTINUE"),
                new NameValuePair("processDefinitionId", processJson.getString("id")),
                new NameValuePair("processInstancePriority", "MEDIUM"),
                new NameValuePair("warningGroupId", "0"),
                new NameValuePair("warningType", "NONE"),
                new NameValuePair("runMode", "RUN_MODE_SERIAL"),
                new NameValuePair("startNodeList", "datax-" + dragSparkTaskId),
                new NameValuePair("taskDependType", "TASK_POST"),
                new NameValuePair("workerGroup", "default")};
        return data;
    }


    /**
     * stopProcessDataxTask
     * @param id id
     * @param executeType executeType
     * @param projectName 项目名称
     * @return ReturnResult
     * @author: liudz
     * @author: lty update 2020/5/27
     * @date: 2020/4/28 10:31
     */
    @GetMapping(value = "/project/process/datax/execute/{projectName}/{id}/{executeType}")
    public DolphinschedulerResponse<String> stopProcessDataxTask(@PathVariable("projectName") String projectName,
                                 @PathVariable("id") Long id, @PathVariable("executeType") String executeType) {
        log.info("--(1)stopProcessDataxTask--begin--projectName:{},id:{},executeType:{}", projectName, id, executeType);
        try {
            HttpHeaders headers = new HttpHeaders();
            headers.set("token", token);
            headers.set("Content-Type", "application/json");
            HttpEntity requestEntity = new HttpEntity(headers);
            ResponseEntity<JSONObject> returnResult = restTemplate.exchange(address + "/"
   + "dolphinscheduler/projects/" + projectName + "/task-instance/list-paging?"
   + "pageNo=1&pageSize=100&taskName=datax-" + id, HttpMethod.GET, requestEntity, JSONObject.class);
            List<Map<String, Object>> list =
                    (List<Map<String, Object>>) returnResult.getBody().getJSONObject("data").get("totalList");
            Integer processInstanceId = null;
            for (Map<String, Object> map : list) {
                if (map.get("state").equals("RUNNING_EXEUTION")) {
                    processInstanceId = Integer.valueOf(map.get("processInstanceId").toString());
                }

            }
            if (StringUtils.isEmpty(processInstanceId)) {
                return DolphinschedulerResponse.error(Msg.TASK_HAS_BEEN_STOPPED);
            }
            log.info("--(2)getProcessInstanceId--success--:{}", processInstanceId);
            String postURL = address + "/dolphinscheduler/projects/"
                    + URLEncoder.encode(projectName, "utf-8") + "/executors/execute";
            PostMethod postMethod = new PostMethod(postURL);
            postMethod.setRequestHeader("Content-Type",
                    "application/x-www-form-urlencoded;charset=utf-8");
            postMethod.setRequestHeader("token", token);
            NameValuePair[] data = {new NameValuePair("executeType", executeType),
                    new NameValuePair("processInstanceId", processInstanceId.toString())};
            postMethod.setRequestBody(data);
            org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
            httpClient.executeMethod(postMethod);
            JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
            if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
                return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg"));
            }
            log.info("--(3)stopProcessSparkTask--success--:{}", result);
        } catch (UnsupportedEncodingException e) {
            log.info("UnsupportedEncodingException:{}", e);
        } catch (HttpException e) {
            log.info("HttpException:{}", e);
        } catch (IOException e) {
            log.info("IOException:{}", e);
        }
        return DolphinschedulerResponse.success();
    }

    /**
     * 查询全部同步任务配置(分页)
     *
     * @param form
     *            name
     * @param request
     *            含有用户id
     * @return 分页结果
     */
    @RequestMapping(value = "/project/process/datax/list", method = RequestMethod.POST)
    public ReturnResult<PageResult<SyncConfigDto>> findAll(@RequestBody @Validated ConfigSelectForm form,
                                                           HttpServletRequest request) {
        Long userId = Long.valueOf(request.getUserPrincipal().getName());
        return syncConfigService.list(form, userId);
    }

    /**
     * 获取同步任务配置
     *
     * @param id
     *            配置id
     * @param request
     *            用户id
     * @return 添加结果
     */
    @RequestMapping(value = "/project/process/datax", method = RequestMethod.GET)
    public ReturnResult<SyncConfigDto> findById(@RequestParam Long id, HttpServletRequest request) {
        Long userId = Long.valueOf(request.getUserPrincipal().getName());
        return syncConfigService.findById(id, userId);
    }
}

ConfigAddForm

代码语言:java
复制
package com.geespace.microservices.builder.request;

import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import lombok.Data;

/**
 * @Author: zjr
 * @Date: 2020-05-06 09:42
 * @Version 1.0
 */
@Data
public class ConfigAddForm {

    /**
     * 配置名称
     */
    @NotEmpty(message = "name不能为空")
    private String name;
    /**
     * 配置描述
     */
    private String description;
    /**
     * 实时/全量/增量
     */
    @NotNull(message = "同步方式不能为空")
    private int syncType;
    /**
     * reader 选择的数据源id
     */
    @NotNull(message = "读取数据源id不能为空")
    private Long readerConfigId;
    /**
     * reader
     */
    @NotEmpty(message = "读取参数不能为空")
    private JSONObject readerParam;
    /**
     * writer 选择的数据源id
     */
    @NotNull(message = "写入数据源id不能为空")
    private Long writerConfigId;
    /**
     * writer
     */
    @NotEmpty(message = "写入参数不能为空")
    private JSONObject writerParam;
    /**
     * reader:column left,writer:column right
     */
    @NotEmpty(message = "字段对照表不能为空")
    private JSONArray columnMap;

    private Long userId;
    /**
     *  项目名称
     **/
    String projectName;
    /**
     *  项目id
     **/
    @NotNull(message = "projectId not null")
    Long projectId;
    Long id;
}

ConfigUpdateForm

代码语言:java
复制
package com.geespace.microservices.builder.request;

import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import lombok.Data;

/**
 * @Author: zjr
 * @Date: 2020-05-06 09:42
 * @Version 1.0
 */
@Data
public class ConfigUpdateForm {
    @NotNull(message = "同步配置id不能为空")
    private Long id;
    /**
     * 配置名称
     */
    @NotEmpty(message = "name不能为空")
    private String name;
    /**
     * 配置描述
     */
    private String description;
    /**
     * 实时/全量/增量
     */
    @NotNull(message = "同步方式不能为空")
    private int syncType;
    /**
     * reader 选择的数据源id
     */
    @NotNull(message = "读取数据源id不能为空")
    private Long readerConfigId;
    /**
     * reader
     */
    @NotEmpty(message = "读取参数不能为空")
    private JSONObject readerParam;
    /**
     * writer 选择的数据源id
     */
    @NotNull(message = "写入数据源id不能为空")
    private Long writerConfigId;
    /**
     * writer
     */
    @NotEmpty(message = "写入参数不能为空")
    private JSONObject writerParam;
    /**
     * reader:column left,writer:column right
     */
    @NotEmpty(message = "字段对照表不能为空")
    private JSONArray columnMap;

    private Long userId;
    /**
     *  项目id
     **/
    @NotNull(message = "projectId not null")
    Long projectId;
    /**
     *  项目名称
     **/
    String projectName;

}

ProcessDto

代码语言:java
复制
package com.geespace.microservices.builder.dto;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
 * dolphinscheduler调度器中工作流参数
 * @Author: liudz
 * @Date: 2020-03-23
 **/
@Data
@EqualsAndHashCode(callSuper = false)
@ToString(callSuper = true)
public class ProcessDto {
    /**
     * 流程定义ID
     **/
    private Integer id;
    /**
     * 流程定义节点图标连接信息(json格式)
     **/
    private String connects;
    /**
     * 流程定义节点坐标位置信息(json格式)
     **/
    private String locations;
    /**
     * 流程定义名称
     **/
    private String name;
    /**
     * 流程定义详细信息(json格式)
     **/
    private String processDefinitionJson;
    /**
     *  项目名称
     **/
    String projectName;
    /**
     *  项目id
     **/
    Long projectId;
}

SyncConfigDto

代码语言:java
复制
package com.geespace.microservices.builder.dto;

import com.alibaba.fastjson.JSONObject;

import lombok.Data;

/**
 * @Author: zjr
 * @Date: 2020-05-05 17:03
 * @Version 1.0
 */
@Data
public class SyncConfigDto {
    private Long id;

    /**
     * 配置名称
     */
    private String name;

    /**
     * 配置描述
     */
    private String description;

    /**
     * 实时/全量/增量
     */
    private int syncType;
    /**
     * json base64
     */
    private JSONObject content;
    /**
     *  项目名称
     **/
    String projectName;
    /**
     *  项目id
     **/
    Long projectId;
}

SyncConfigService

代码语言:java
复制
package com.geespace.microservices.builder.service;

import com.geespace.microservices.builder.dto.SyncConfigDto;
import com.geespace.microservices.builder.request.ConfigAddForm;
import com.geespace.microservices.builder.request.ConfigSelectForm;
import com.geespace.microservices.builder.request.ConfigUpdateForm;
import com.geespace.microservices.builder.response.PageResult;
import com.geespace.microservices.builder.response.ReturnResult;

/**
 * @Author: zjr
 * @Date: 2020-05-05 13:59
 * @Version 1.0
 */
public interface SyncConfigService {
    /**
     * 添加同步任务配置
     * 
     * @param form
     *            任务配置参数
     * @return 添加结果
     */
    ReturnResult<SyncConfigDto> addConfig(ConfigAddForm form);

    /**
     * 修改同步任务配置
     * 
     * @param form
     *            任务配置参数(含id)
     * @return 修改结果
     */
    ReturnResult<SyncConfigDto> updateConfig(ConfigUpdateForm form);

    /**
     * 查找同步任务配置
     * 
     * @param id
     *            同步任务配置id
     * @param userId
     *            用户id
     * @return 查询结果
     */
    ReturnResult<SyncConfigDto> findById(Long id, Long userId);

    /**
     * 删除同步任务配置
     * 
     * @param id
     *            任务配置id
     * @param userId
     *            用户id
     * @return 删除结果
     */
    ReturnResult delete(Long id, Long userId);

    /**
     * 查询全部同步任务配置(分页)
     *
     * @param form
     *            name
     * @param userId
     *            用户id
     * @return 分页结果
     */
    ReturnResult<PageResult<SyncConfigDto>> list(ConfigSelectForm form, Long userId);
}

5.5 删除同步任务配置

5.4 查询同步任务配置

3.3 执行数据同步任务

3.4 停止数据同步任务

三、本人相关其他文章链接

1.springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理:

https://blog.csdn.net/a924382407/article/details/117119831

2.springboot项目集成dolphinscheduler调度器 实现datax数据同步任务:

https://blog.csdn.net/a924382407/article/details/120951230

3.springboot项目集成dolphinscheduler调度器 项目管理:

https://blog.csdn.net/a924382407/article/details/117118931

4.springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务

https://blog.csdn.net/a924382407/article/details/117121181

5.springboot项目集成大数据第三方dolphinscheduler调度器

https://blog.csdn.net/a924382407/article/details/117113848

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Datax概述
    • 1.概述
    • 2.功能清单
    • 3.==说明==:本项目只支持mysql及hbase之间的数据同步
  • 代码模块
    • 配置文件
    • pom.xml
    • DataxDolphinschedulerController
    • ConfigAddForm
    • ConfigUpdateForm
    • ProcessDto
    • SyncConfigDto
    • SyncConfigService
      • 5.5 删除同步任务配置
      • 5.4 查询同步任务配置
      • 3.3 执行数据同步任务
      • 3.4 停止数据同步任务
  • 三、本人相关其他文章链接
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档