

Datax安装及基本使用请查看上一篇文章:https://blog.csdn.net/a924382407/article/details/120952339?spm=1001.2014.3001.5501
@TOC

| 功能清单 |
|--|
| CRUD增删改查 、启动任务、停止任务|

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.geespace.microservices.bd-platform</groupId>
<artifactId>all</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>data-sync-config</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<gson.version>2.8.1</gson.version>
</properties>
<dependencies>
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--elasticsearch-->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.8.12</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>6.8.12</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.8.12</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.geespace.microservices.bd-platform</groupId>
<artifactId>data-config</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<!--httpclient-->
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.2</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>
</dependencies>
<configuration>
<keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.geespace.microservices.dispatcher.DispatchApplication</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.geespace.microservices.builder.dto.ProcessDto;
import com.geespace.microservices.builder.dto.SyncConfigDto;
import com.geespace.microservices.builder.enums.DictionaryEnum;
import com.geespace.microservices.builder.request.ConfigAddForm;
import com.geespace.microservices.builder.request.ConfigSelectForm;
import com.geespace.microservices.builder.request.ConfigUpdateForm;
import com.geespace.microservices.builder.response.BizCode;
import com.geespace.microservices.builder.response.DolphinschedulerResponse;
import com.geespace.microservices.builder.response.Msg;
import com.geespace.microservices.builder.response.PageResult;
import com.geespace.microservices.builder.response.ReturnResult;
import com.geespace.microservices.builder.service.SyncConfigService;
import com.geespace.microservices.builder.tools.JsonTools;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.NameValuePair;
import org.apache.commons.httpclient.methods.PostMethod;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
/**
* 迁移dolphinscheduler调度器
*
* @author: liudz
* @date: 2021/5/7
*/
@Slf4j
@RestController
@RequestMapping("/dolphinscheduler/v1")
public class DataxDolphinschedulerController {
@Autowired
private RestTemplate restTemplate;
@Value("${dolphinscheduler.token}")
String token;
@Value("${dolphinscheduler.address}")
String address;
public static final int ZERO = 0;
public static final int SUCCESS = 200;
public static final String CREATE = "create";
public static final String UPDATE = "update";
public static final String ADD = "add";
public static final String DELETE = "delete";
public static final String ONLINE = "ONLINE";
public static final String OFFLINE = "OFFLINE";
public static final int ONE_THOUSAND_AND_FIVE_HUNDRED = 1500;
public static final int SIX = 6;
public static final int EIGHTY = 80;
public static final int THREE = 3;
@Autowired
private SyncConfigService syncConfigService;
/**
* 创建任务-创建用户下唯一工作流,无则创建有则并排添加
* @param request request
* @param form 任务参数
* @author liudz
* @date 2021/5/8
* @return 执行结果
**/
@PostMapping("/project/process/datax")
@Transactional(rollbackFor = Exception.class)
public ReturnResult operatorDataxTask(HttpServletRequest request, @RequestBody @Validated ConfigAddForm form) {
Long userId = Long.valueOf(request.getUserPrincipal().getName());
form.setUserId(userId);
ReturnResult<SyncConfigDto> dataxTaskReturnResult = syncConfigService.addConfig(form);
if (dataxTaskReturnResult.getCode() != SUCCESS) {
return dataxTaskReturnResult;
}
log.info("--(1)addDataxTaskResult--success");
form.setId(dataxTaskReturnResult.getData().getId());
if (dataxTaskReturnResult.getCode() == SUCCESS) {
Boolean verifyResult = verifyProcessExist(userId + "-dataxTask", form.getProjectName());
log.info("--(2)verifyProcessExist--success:{}", verifyResult);
if (!verifyResult) {
ProcessDto processDto = packageProcessParam(
"create", userId + "-dataxTask", dataxTaskReturnResult.getData(), null);
log.info("--(3)packageProcessParam--success");
processDto.setProjectName(form.getProjectName());
processDto.setProjectId(form.getProjectId());
dataxTaskReturnResult = createProcess(processDto);
} else {
//获取用户下唯一工作流ID
DolphinschedulerResponse processInfoList = getUserProcess(form.getProjectName());
JSONObject processJson = new JSONObject();
log.info("--(3)getUserProcess--success:{}", processInfoList);
List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
for (Map<String, Object> map : list) {
if (map.get("name").equals(userId + "-dataxTask")) {
processJson.fluentPutAll(map);
}
}
ProcessDto processDto = packageProcessParam(
"add", userId + "-dataxTask", dataxTaskReturnResult.getData(), processJson);
processDto.setId(processJson.getInteger("id"));
log.info("--(4)packageProcessParam--success");
if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
releaseProcessDefinition(form.getProjectName(), userId + "-dataxTask",
processDto.getId(), 0);
log.info("--(5)releaseProcessDefinition--OFFLINE--success");
}
dataxTaskReturnResult = updateProcess(form, processDto);
}
}
return dataxTaskReturnResult;
}
/**
* 更新任务
* @param request request
* @param form 任务参数
* @author liudz
* @date 2021/5/8
* @return 执行结果
**/
@PutMapping("/project/process/datax")
@Transactional(rollbackFor = Exception.class)
public ReturnResult updateDataxTask(HttpServletRequest request, @RequestBody @Validated ConfigUpdateForm form) {
Long userId = Long.valueOf(request.getUserPrincipal().getName());
form.setUserId(userId);
ReturnResult<SyncConfigDto> dataxTaskReturnResult = syncConfigService.updateConfig(form);
log.info("--(1)updateDataxTaskResult--mysql--success");
if (dataxTaskReturnResult.getCode() == SUCCESS) {
//获取用户下唯一工作流ID
DolphinschedulerResponse processInfoList = getUserProcess(form.getProjectName());
JSONObject processJson = new JSONObject();
log.info("--(2)getUserProcess--success:{}", processInfoList);
List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
for (Map<String, Object> map : list) {
if (map.get("name").equals(userId + "-dataxTask")) {
processJson.fluentPutAll(map);
}
}
ProcessDto processDto = packageProcessParam(
"update", userId + "-dataxTask", dataxTaskReturnResult.getData(), processJson);
processDto.setProjectName(form.getProjectName());
processDto.setProjectId(form.getProjectId());
processDto.setId(processJson.getInteger("id"));
log.info("--(3)packageProcessParam--success");
if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
releaseProcessDefinition(form.getProjectName(), userId + "-dataxTask",
processDto.getId(), 0);
log.info("--(4)releaseProcessDefinition--OFFLINE--success");
}
ConfigAddForm configAddForm = new ConfigAddForm();
BeanUtils.copyProperties(form, configAddForm);
return updateProcess(configAddForm, processDto);
}
return dataxTaskReturnResult;
}
/**
* 删除任务
* @param request request
* @param projectName 项目名称
* @param id 任务ID
* @author liudz
* @date 2021/5/8
* @return 执行结果
**/
@DeleteMapping("/project/process/datax/{projectName}/{id}")
@Transactional(rollbackFor = Exception.class)
public ReturnResult deleteDataxTask(HttpServletRequest request, @PathVariable("projectName") String projectName,
@PathVariable("id") Long id) {
Long userId = Long.valueOf(request.getUserPrincipal().getName());
SyncConfigDto syncConfigDto = new SyncConfigDto();
syncConfigDto.setId(id);
ConfigAddForm configAddForm = new ConfigAddForm();
configAddForm.setProjectName(projectName);
ReturnResult dataxTaskReturnResult = syncConfigService.delete(id, userId);
log.info("--(1)deleteDataxTask--mysql--success");
if (dataxTaskReturnResult.getCode() == SUCCESS) {
//获取用户下唯一工作流ID
DolphinschedulerResponse processInfoList = getUserProcess(projectName);
JSONObject processJson = new JSONObject();
log.info("--(2)getUserProcess--success:{}", processInfoList);
List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
for (Map<String, Object> map : list) {
if (map.get("name").equals(userId + "-dataxTask")) {
processJson.fluentPutAll(map);
}
}
ProcessDto processDto = packageProcessParam(
"delete", userId + "-dataxTask", syncConfigDto, processJson);
processDto.setId(processJson.getInteger("id"));
log.info("--(3)packageProcessParam--success");
if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
releaseProcessDefinition(projectName, userId + "-dataxTask",
processDto.getId(), 0);
log.info("--(4)releaseProcessDefinition--OFFLINE--success");
}
if (JSONObject.parseObject(processDto.getLocations()).keySet().size() == 0) {
//删除工作流
deleteProcess(configAddForm, processDto);
} else {
//更新工作流
updateProcess(configAddForm, processDto);
}
}
return dataxTaskReturnResult;
}
/**
* 校验工作流是否存在
*
* @param processName
* 工作流名称
* @param projectName 项目名称
* @author liudz
* @date 2021/5/8
* @return boolean
**/
public Boolean verifyProcessExist(String processName, String projectName) {
HttpHeaders headers = new HttpHeaders();
headers.set("token", token);
headers.set("Content-Type", "application/json");
HttpEntity requestEntity = new HttpEntity(headers);
ResponseEntity<DolphinschedulerResponse> returnResult =
restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName
+ "/process/verify-name?name=" + processName,
HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
if (returnResult.getBody().getCode() == ZERO) {
return false;
}
return true;
}
/**
* 创建工作流
* @param processDto processDto
* @author liudz
* @date 2021/5/7
* @return 执行结果
**/
public ReturnResult createProcess(ProcessDto processDto) {
try {
String postURL = address + "/dolphinscheduler/projects/"
+ URLEncoder.encode(processDto.getProjectName(), "utf-8") + "/process/save";
PostMethod postMethod = new PostMethod(postURL);
postMethod.setRequestHeader("Content-Type",
"application/x-www-form-urlencoded;charset=utf-8");
postMethod.setRequestHeader("token", token);
NameValuePair[] data = {new NameValuePair("connects", processDto.getConnects()),
new NameValuePair("name", processDto.getName()),
new NameValuePair("locations", processDto.getLocations()),
new NameValuePair("processDefinitionJson", processDto.getProcessDefinitionJson())};
postMethod.setRequestBody(data);
org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
httpClient.executeMethod(postMethod);
JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
log.info("--(5)httpCreateProcess:{}", result);
if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));
}
} catch (Exception e) {
log.info("请求异常:{}", e);
}
return ReturnResult.success();
}
/**
* 更新工作流
* @param vo vo
* @param processDto processDto
* @author liudz
* @date 2021/5/7
* @return 执行结果
**/
public ReturnResult updateProcess(ConfigAddForm vo, ProcessDto processDto) {
try {
String postURL = address + "/dolphinscheduler/projects/"
+ URLEncoder.encode(vo.getProjectName(), "utf-8") + "/process/update";
PostMethod postMethod = new PostMethod(postURL);
postMethod.setRequestHeader("Content-Type",
"application/x-www-form-urlencoded;charset=utf-8");
postMethod.setRequestHeader("token", token);
// 参数设置,需要注意的就是里边不能传NULL,要传空字符串
NameValuePair[] data = {new NameValuePair("connects", processDto.getConnects()),
new NameValuePair("name", processDto.getName()),
new NameValuePair("locations", processDto.getLocations()),
new NameValuePair("id", processDto.getId().toString()),
new NameValuePair("processDefinitionJson", processDto.getProcessDefinitionJson())};
postMethod.setRequestBody(data);
org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
httpClient.executeMethod(postMethod);
JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
log.info("--(5)httpUpdateProcess:{}", result);
if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));
}
} catch (Exception e) {
log.info("请求异常:{}", e);
}
return ReturnResult.success();
}
/**
* 删除工作流
* @param dto dto
* @param processDto processDto
* @author liudz
* @date 2021/5/7
* @return 执行结果
**/
public DolphinschedulerResponse deleteProcess(ConfigAddForm dto, ProcessDto processDto) {
HttpHeaders headers = new HttpHeaders();
headers.set("token", token);
headers.set("Content-Type", "application/json");
HttpEntity requestEntity = new HttpEntity(headers);
ResponseEntity<DolphinschedulerResponse> returnResult =
restTemplate.exchange(address + "/dolphinscheduler/projects/" + dto.getProjectName()
+ "/process/delete?processDefinitionId=" + processDto.getId(),
HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
log.info("--(5)httpDeleteProcess:{}", returnResult);
return returnResult.getBody();
}
/**
* 获取dolphinscheduler上的资源spark可拖拽jar的id
*
* @author liudz
* @date 2021/5/8
* @return id
**/
public Integer getSparkResourceJarId() {
Integer resourceId = null;
HttpHeaders headers = new HttpHeaders();
headers.set("token", token);
headers.set("Content-Type", "application/json");
HttpEntity requestEntity = new HttpEntity(headers);
ResponseEntity<DolphinschedulerResponse> returnResult =
restTemplate.exchange(address + "/dolphinscheduler/resources/authorize-resource-tree?userId=1",
HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
List<Map<String, Object>> list = (List<Map<String, Object>>) returnResult.getBody().getData();
for (Map<String, Object> map : list) {
if (map.get("name").equals("big_data02.jar")) {
resourceId = Integer.valueOf(map.get("id").toString());
}
}
return resourceId;
}
/**
* 获取dolphinscheduler上的某用户下唯一工作流
* @param projectName 项目名称
* @author liudz
* @date 2021/5/8
* @return id
**/
public DolphinschedulerResponse getUserProcess(String projectName) {
HttpHeaders headers = new HttpHeaders();
headers.set("token", token);
headers.set("Content-Type", "application/json");
HttpEntity requestEntity = new HttpEntity(headers);
ResponseEntity<DolphinschedulerResponse> returnResult =
restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName + "/process/list",
HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
return returnResult.getBody();
}
/**
* 封装参数
* @param type 操作类型
* @param processName 用户工作流名称
* @param dto 任务参数
* @param processJson 工作流json
* @author liudz
* @date 2021/5/13
* @return ProcessDto
**/
public ProcessDto packageProcessParam(String type, String processName, SyncConfigDto dto, JSONObject processJson) {
ProcessDto processDto = new ProcessDto();
processDto.setConnects("[]");
processDto.setName(processName);
JSONObject locationsOne = new JSONObject();
JSONObject locationsTwo = new JSONObject();
locationsTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("targetarr", "").fluentPut("nodenumber", "0");
locationsTwo.fluentPut("x", 0).fluentPut("y", 0);
locationsOne.put("datax-" + dto.getId(), locationsTwo);
// 创建工作流
if (CREATE.equals(type)) {
processDto = packageProcessParamOfCreate(processDto, dto, locationsOne);
} else if (ADD.equals(type)) {
//工作流添加节点
processDto = packageProcessParamOfAdd(processDto, dto, processJson, locationsOne, locationsTwo);
} else if (UPDATE.equals(type)) {
//更新工作流-只更新参数processDefinitionJson的tasks参数
processDto = packageProcessParamOfUpdate(processDto, processJson, dto);
} else if (DELETE.equals(type)) {
//更新工作流或删除工作流-更新则删除参数processDefinitionJson的tasks参数
processDto = packageProcessParamOfDelete(processDto, processJson, dto);
}
return processDto;
}
/**
* packageProcessParamOfCreate
* @param processDto 工作流参数
* @param dto 任务参数
* @param locationsOne locationsOne
* @author liudz
* @date 2021/5/7
* @return ProcessDto
**/
public ProcessDto packageProcessParamOfCreate(ProcessDto processDto, SyncConfigDto dto, JSONObject locationsOne) {
processDto.setLocations(locationsOne.toString());
JSONObject processOne = new JSONObject();
processOne.fluentPut("globalParams", new ArrayList<>()).fluentPut("tenantId", THREE).fluentPut("timeout", 0);
JSONObject processTwo = new JSONObject();
processTwo.fluentPut("type", "DATAX").fluentPut("id", "datax-" + dto.getId());
processTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("description", "");
String taskJsonString = dto.getContent().toString();
processTwo.put("params", JSONObject.parseObject("{\"localParams\":[],\"customConfig\":1,"
+ "\"json\":\"" + taskJsonString.replace("\"", "\\\"") + "\"}"));
JSONObject jsonTimeout = new JSONObject();
jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);
processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");
JSONObject processTree = new JSONObject();
processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());
JSONObject jsonconditionResult = new JSONObject();
jsonconditionResult.put("successNode", new ArrayList<>());
jsonconditionResult.put("failedNode", new ArrayList<>());
processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());
processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");
processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");
processTwo.fluentPut("preTasks", new ArrayList<>());
JSONArray processTaskArray = new JSONArray();
processTaskArray.add(processTwo);
processOne.put("tasks", processTaskArray);
processDto.setProcessDefinitionJson(processOne.toString());
return processDto;
}
/**
* packageProcessParamOfAdd
* @param processDto 工作流参数
* @param locationsOne locationsOne
* @param locationsTwo locationsTwo
* @param dto 任务参数
* @param processJson 工作流json
* @author liudz
* @date 2021/5/7
* @return ProcessDto
**/
public ProcessDto packageProcessParamOfAdd(ProcessDto processDto, SyncConfigDto dto, JSONObject processJson,
JSONObject locationsOne, JSONObject locationsTwo) {
String maxTaskKey = JsonTools.getJsonStringMaxKey(processJson.getString("locations"));
Integer x = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("x");
Integer y = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("y");
if (x < ONE_THOUSAND_AND_FIVE_HUNDRED) {
locationsTwo.fluentPut("x", x + EIGHTY).fluentPut("y", y);
} else if (x >= ONE_THOUSAND_AND_FIVE_HUNDRED) {
locationsTwo.fluentPut("y", y + EIGHTY).fluentPut("x", 0);
}
locationsOne = processJson.getJSONObject("locations").fluentPut("datax-" + dto.getId(), locationsTwo);
processDto.setLocations(locationsOne.toString());
processDto.setId(processJson.getInteger("id"));
JSONObject processTwo = new JSONObject();
processTwo.fluentPut("type", "DATAX").fluentPut("id", "datax-" + dto.getId());
processTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("description", "");
String taskJsonString = dto.getContent().toString().replace("}}", "} }").replace("{{", "{ {");
processTwo.put("params", JSONObject.parseObject("{\"localParams\":[],\"customConfig\":1,"
+ "\"json\":\"" + taskJsonString.replace("\"", "\\\"") + "\"}"));
JSONObject jsonTimeout = new JSONObject();
jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);
processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");
JSONObject processTree = new JSONObject();
processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());
JSONObject jsonconditionResult = new JSONObject();
jsonconditionResult.put("successNode", new ArrayList<>());
jsonconditionResult.put("failedNode", new ArrayList<>());
processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());
processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");
processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");
processTwo.fluentPut("preTasks", new ArrayList<>());
JSONObject jsonNew = processJson.getJSONObject("processDefinitionJson");
JSONArray jsonArray = jsonNew.getJSONArray("tasks");
jsonArray.add(processTwo);
jsonNew.put("tasks", jsonArray);
processDto.setProcessDefinitionJson(jsonNew.toString());
return processDto;
}
/**
* packageProcessParamOfUpdate
* @param processDto 工作流参数
* @param dto 任务参数
* @param processJson 工作流json
* @author liudz
* @date 2021/5/7
* @return ProcessDto
**/
public ProcessDto packageProcessParamOfUpdate(ProcessDto processDto, JSONObject processJson, SyncConfigDto dto) {
processDto.setLocations(processJson.getString("locations"));
processDto.setId(processJson.getInteger("id"));
JSONArray jsonTasksArray = processJson.getJSONObject("processDefinitionJson").getJSONArray("tasks");
JSONArray copyJsonTasksArray = new JSONArray();
copyJsonTasksArray.addAll(jsonTasksArray);
JSONObject processDefinitionJson = new JSONObject();
String taskJsonString = dto.getContent().toString();
for (Object object : jsonTasksArray) {
JSONObject jsonObject = JSONObject.parseObject(object.toString());
if (Long.valueOf(jsonObject.getString("id").substring(SIX)) == dto.getId()) {
String json = jsonObject.getString("json");
json = taskJsonString;
copyJsonTasksArray.remove(jsonObject);
jsonObject.getJSONObject("params").put("json", json);
copyJsonTasksArray.add(jsonObject);
processDefinitionJson = processJson.getJSONObject("processDefinitionJson");
processDefinitionJson.put("tasks", copyJsonTasksArray);
}
}
processDto.setProcessDefinitionJson(processDefinitionJson.toString());
return processDto;
}
/**
* packageProcessParamOfDelete
* @param processDto 工作流参数
* @param dto 任务参数
* @param processJson 工作流json
* @author liudz
* @date 2021/5/7
* @return ProcessDto
**/
public ProcessDto packageProcessParamOfDelete(ProcessDto processDto, JSONObject processJson, SyncConfigDto dto) {
processDto.setId(processJson.getInteger("id"));
JSONObject locationsJson = processJson.getJSONObject("locations");
JSONObject processDefinitionJson = processJson.getJSONObject("processDefinitionJson");
JSONArray processDefinitionArray = processDefinitionJson.getJSONArray("tasks");
JSONArray copyProcessDefinitionArray = new JSONArray();
copyProcessDefinitionArray.addAll(processDefinitionArray);
if (locationsJson.containsKey(DictionaryEnum.DATAX.getFiledString() + dto.getId())) {
locationsJson.remove("datax-" + dto.getId());
for (Object object : processDefinitionArray) {
if (JSONObject.parseObject(object.toString()).getString("id").equals("datax-" + dto.getId())) {
copyProcessDefinitionArray.remove(object);
}
}
processDefinitionJson.put("tasks", copyProcessDefinitionArray);
}
processDto.setLocations(locationsJson.toString());
processDto.setProcessDefinitionJson(processDefinitionJson.toString());
return processDto;
}
/**
* 工作流【上线或者下线】
* @param projectName 项目名称
* @param processName 用户工作流名称
* @param processId 工作流ID
* @param releaseState 上下线状态操作【0:下线,1:上线】
* @author liudz
* @date 2021/5/7
* @return 执行结果
**/
public ReturnResult releaseProcessDefinition(String projectName, String processName, Integer processId,
Integer releaseState) {
try {
String postURL = address + "/dolphinscheduler/projects/"
+ URLEncoder.encode(projectName, "utf-8") + "/process/release";
PostMethod postMethod = new PostMethod(postURL);
postMethod.setRequestHeader("Content-Type",
"application/x-www-form-urlencoded;charset=utf-8");
postMethod.setRequestHeader("token", token);
// 参数设置,需要注意的就是里边不能传NULL,要传空字符串
NameValuePair[] data = {new NameValuePair("name", processName),
new NameValuePair("processId", processId.toString()),
new NameValuePair("releaseState", releaseState.toString())};
postMethod.setRequestBody(data);
org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
httpClient.executeMethod(postMethod);
JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));
}
} catch (Exception e) {
log.info("请求异常:{}", e);
}
return ReturnResult.success();
}
/**
* 运行流程实例
* @param projectName 项目名称
* @param request request
* @param id 数据同步任务ID
* @author liudz
* @date 2021/5/7
* @return 执行结果
**/
@GetMapping("/project/process/datax/start")
public DolphinschedulerResponse startProcessDataxTask(
@RequestParam("projectName") String projectName, @RequestParam("id") Integer id,
HttpServletRequest request) {
try {
Long userId = Long.valueOf(request.getUserPrincipal().getName());
DolphinschedulerResponse processInfoList = getUserProcess(projectName);
if (processInfoList.getCode() != ZERO) {
return processInfoList;
}
JSONObject processJson = new JSONObject();
log.info("--(1)getUserProcess--success:{}", processInfoList);
List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
for (Map<String, Object> map : list) {
if (map.get("name").equals(userId + "-dataxTask")) {
processJson.fluentPutAll(map);
}
}
if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(OFFLINE)) {
releaseProcessDefinition(projectName, userId + "-dataxTask",
processJson.getInteger("id"), 1);
log.info("--(2)releaseProcessDefinition--ONLINE--success");
}
String postURL = address + "/dolphinscheduler/projects/" + URLEncoder.encode(projectName, "utf-8")
+ "/executors/start-process-instance";
PostMethod postMethod = new PostMethod(postURL);
postMethod.setRequestHeader("Content-Type",
"application/x-www-form-urlencoded;charset=utf-8");
postMethod.setRequestHeader("token", token);
// 参数设置,需要注意的就是里边不能传NULL,要传空字符串
NameValuePair[] data = packageNameValuePair(processJson, id);
postMethod.setRequestBody(data);
org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
httpClient.executeMethod(postMethod);
JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
log.info("--(2)startProcessInstance--result:{}", result);
if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg"));
}
} catch (Exception e) {
log.info("请求异常:{}", e);
}
return DolphinschedulerResponse.success();
}
/**
* packageNameValuePair封装参数
* @param processJson 工作流json
* @param dragSparkTaskId 任务ID
* @author liudz
* @date 2021/5/14
* @return NameValuePair
**/
public NameValuePair[] packageNameValuePair(JSONObject processJson, Integer dragSparkTaskId) {
NameValuePair[] data = {
new NameValuePair("failureStrategy", "CONTINUE"),
new NameValuePair("processDefinitionId", processJson.getString("id")),
new NameValuePair("processInstancePriority", "MEDIUM"),
new NameValuePair("warningGroupId", "0"),
new NameValuePair("warningType", "NONE"),
new NameValuePair("runMode", "RUN_MODE_SERIAL"),
new NameValuePair("startNodeList", "datax-" + dragSparkTaskId),
new NameValuePair("taskDependType", "TASK_POST"),
new NameValuePair("workerGroup", "default")};
return data;
}
/**
* stopProcessDataxTask
* @param id id
* @param executeType executeType
* @param projectName 项目名称
* @return ReturnResult
* @author: liudz
* @author: lty update 2020/5/27
* @date: 2020/4/28 10:31
*/
@GetMapping(value = "/project/process/datax/execute/{projectName}/{id}/{executeType}")
public DolphinschedulerResponse<String> stopProcessDataxTask(@PathVariable("projectName") String projectName,
@PathVariable("id") Long id, @PathVariable("executeType") String executeType) {
log.info("--(1)stopProcessDataxTask--begin--projectName:{},id:{},executeType:{}", projectName, id, executeType);
try {
HttpHeaders headers = new HttpHeaders();
headers.set("token", token);
headers.set("Content-Type", "application/json");
HttpEntity requestEntity = new HttpEntity(headers);
ResponseEntity<JSONObject> returnResult = restTemplate.exchange(address + "/"
+ "dolphinscheduler/projects/" + projectName + "/task-instance/list-paging?"
+ "pageNo=1&pageSize=100&taskName=datax-" + id, HttpMethod.GET, requestEntity, JSONObject.class);
List<Map<String, Object>> list =
(List<Map<String, Object>>) returnResult.getBody().getJSONObject("data").get("totalList");
Integer processInstanceId = null;
for (Map<String, Object> map : list) {
if (map.get("state").equals("RUNNING_EXEUTION")) {
processInstanceId = Integer.valueOf(map.get("processInstanceId").toString());
}
}
if (StringUtils.isEmpty(processInstanceId)) {
return DolphinschedulerResponse.error(Msg.TASK_HAS_BEEN_STOPPED);
}
log.info("--(2)getProcessInstanceId--success--:{}", processInstanceId);
String postURL = address + "/dolphinscheduler/projects/"
+ URLEncoder.encode(projectName, "utf-8") + "/executors/execute";
PostMethod postMethod = new PostMethod(postURL);
postMethod.setRequestHeader("Content-Type",
"application/x-www-form-urlencoded;charset=utf-8");
postMethod.setRequestHeader("token", token);
NameValuePair[] data = {new NameValuePair("executeType", executeType),
new NameValuePair("processInstanceId", processInstanceId.toString())};
postMethod.setRequestBody(data);
org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
httpClient.executeMethod(postMethod);
JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg"));
}
log.info("--(3)stopProcessSparkTask--success--:{}", result);
} catch (UnsupportedEncodingException e) {
log.info("UnsupportedEncodingException:{}", e);
} catch (HttpException e) {
log.info("HttpException:{}", e);
} catch (IOException e) {
log.info("IOException:{}", e);
}
return DolphinschedulerResponse.success();
}
/**
* 查询全部同步任务配置(分页)
*
* @param form
* name
* @param request
* 含有用户id
* @return 分页结果
*/
@RequestMapping(value = "/project/process/datax/list", method = RequestMethod.POST)
public ReturnResult<PageResult<SyncConfigDto>> findAll(@RequestBody @Validated ConfigSelectForm form,
HttpServletRequest request) {
Long userId = Long.valueOf(request.getUserPrincipal().getName());
return syncConfigService.list(form, userId);
}
/**
* 获取同步任务配置
*
* @param id
* 配置id
* @param request
* 用户id
* @return 添加结果
*/
@RequestMapping(value = "/project/process/datax", method = RequestMethod.GET)
public ReturnResult<SyncConfigDto> findById(@RequestParam Long id, HttpServletRequest request) {
Long userId = Long.valueOf(request.getUserPrincipal().getName());
return syncConfigService.findById(id, userId);
}
}package com.geespace.microservices.builder.request;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
/**
* @Author: zjr
* @Date: 2020-05-06 09:42
* @Version 1.0
*/
@Data
public class ConfigAddForm {
/**
* 配置名称
*/
@NotEmpty(message = "name不能为空")
private String name;
/**
* 配置描述
*/
private String description;
/**
* 实时/全量/增量
*/
@NotNull(message = "同步方式不能为空")
private int syncType;
/**
* reader 选择的数据源id
*/
@NotNull(message = "读取数据源id不能为空")
private Long readerConfigId;
/**
* reader
*/
@NotEmpty(message = "读取参数不能为空")
private JSONObject readerParam;
/**
* writer 选择的数据源id
*/
@NotNull(message = "写入数据源id不能为空")
private Long writerConfigId;
/**
* writer
*/
@NotEmpty(message = "写入参数不能为空")
private JSONObject writerParam;
/**
* reader:column left,writer:column right
*/
@NotEmpty(message = "字段对照表不能为空")
private JSONArray columnMap;
private Long userId;
/**
* 项目名称
**/
String projectName;
/**
* 项目id
**/
@NotNull(message = "projectId not null")
Long projectId;
Long id;
}package com.geespace.microservices.builder.request;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
/**
* @Author: zjr
* @Date: 2020-05-06 09:42
* @Version 1.0
*/
@Data
public class ConfigUpdateForm {
@NotNull(message = "同步配置id不能为空")
private Long id;
/**
* 配置名称
*/
@NotEmpty(message = "name不能为空")
private String name;
/**
* 配置描述
*/
private String description;
/**
* 实时/全量/增量
*/
@NotNull(message = "同步方式不能为空")
private int syncType;
/**
* reader 选择的数据源id
*/
@NotNull(message = "读取数据源id不能为空")
private Long readerConfigId;
/**
* reader
*/
@NotEmpty(message = "读取参数不能为空")
private JSONObject readerParam;
/**
* writer 选择的数据源id
*/
@NotNull(message = "写入数据源id不能为空")
private Long writerConfigId;
/**
* writer
*/
@NotEmpty(message = "写入参数不能为空")
private JSONObject writerParam;
/**
* reader:column left,writer:column right
*/
@NotEmpty(message = "字段对照表不能为空")
private JSONArray columnMap;
private Long userId;
/**
* 项目id
**/
@NotNull(message = "projectId not null")
Long projectId;
/**
* 项目名称
**/
String projectName;
}package com.geespace.microservices.builder.dto;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
/**
* dolphinscheduler调度器中工作流参数
* @Author: liudz
* @Date: 2020-03-23
**/
@Data
@EqualsAndHashCode(callSuper = false)
@ToString(callSuper = true)
public class ProcessDto {
/**
* 流程定义ID
**/
private Integer id;
/**
* 流程定义节点图标连接信息(json格式)
**/
private String connects;
/**
* 流程定义节点坐标位置信息(json格式)
**/
private String locations;
/**
* 流程定义名称
**/
private String name;
/**
* 流程定义详细信息(json格式)
**/
private String processDefinitionJson;
/**
* 项目名称
**/
String projectName;
/**
* 项目id
**/
Long projectId;
}package com.geespace.microservices.builder.dto;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
/**
* @Author: zjr
* @Date: 2020-05-05 17:03
* @Version 1.0
*/
@Data
public class SyncConfigDto {
private Long id;
/**
* 配置名称
*/
private String name;
/**
* 配置描述
*/
private String description;
/**
* 实时/全量/增量
*/
private int syncType;
/**
* json base64
*/
private JSONObject content;
/**
* 项目名称
**/
String projectName;
/**
* 项目id
**/
Long projectId;
}package com.geespace.microservices.builder.service;
import com.geespace.microservices.builder.dto.SyncConfigDto;
import com.geespace.microservices.builder.request.ConfigAddForm;
import com.geespace.microservices.builder.request.ConfigSelectForm;
import com.geespace.microservices.builder.request.ConfigUpdateForm;
import com.geespace.microservices.builder.response.PageResult;
import com.geespace.microservices.builder.response.ReturnResult;
/**
* @Author: zjr
* @Date: 2020-05-05 13:59
* @Version 1.0
*/
public interface SyncConfigService {
/**
* 添加同步任务配置
*
* @param form
* 任务配置参数
* @return 添加结果
*/
ReturnResult<SyncConfigDto> addConfig(ConfigAddForm form);
/**
* 修改同步任务配置
*
* @param form
* 任务配置参数(含id)
* @return 修改结果
*/
ReturnResult<SyncConfigDto> updateConfig(ConfigUpdateForm form);
/**
* 查找同步任务配置
*
* @param id
* 同步任务配置id
* @param userId
* 用户id
* @return 查询结果
*/
ReturnResult<SyncConfigDto> findById(Long id, Long userId);
/**
* 删除同步任务配置
*
* @param id
* 任务配置id
* @param userId
* 用户id
* @return 删除结果
*/
ReturnResult delete(Long id, Long userId);
/**
* 查询全部同步任务配置(分页)
*
* @param form
* name
* @param userId
* 用户id
* @return 分页结果
*/
ReturnResult<PageResult<SyncConfigDto>> list(ConfigSelectForm form, Long userId);
}



1.springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理:
https://blog.csdn.net/a924382407/article/details/117119831
2.springboot项目集成dolphinscheduler调度器 实现datax数据同步任务:
https://blog.csdn.net/a924382407/article/details/120951230
3.springboot项目集成dolphinscheduler调度器 项目管理:
https://blog.csdn.net/a924382407/article/details/117118931
4.springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
https://blog.csdn.net/a924382407/article/details/117121181
5.springboot项目集成大数据第三方dolphinscheduler调度器
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。