Commit f9d7cdd8 authored by wenmo's avatar wenmo

savepoint管理

parent 2cbc96e0
package com.dlink.controller;
import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result;
import com.dlink.model.Savepoints;
import com.dlink.service.SavepointsService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
/**
* SavepointsController
*
* @author wenmo
* @since 2021/11/21
**/
@Slf4j
@RestController
@RequestMapping("/api/savepoints")
public class SavepointsController {
@Autowired
private SavepointsService savepointsService;
/**
* 新增或者更新
*/
@PutMapping
public Result saveOrUpdate(@RequestBody Savepoints savepoints) throws Exception {
if(savepointsService.saveOrUpdate(savepoints)){
return Result.succeed("新增成功");
}else {
return Result.failed("新增失败");
}
}
/**
* 动态查询列表
*/
@PostMapping
public ProTableResult<Savepoints> listSavepointss(@RequestBody JsonNode para) {
return savepointsService.selectForProTable(para);
}
/**
* 批量删除
*/
@DeleteMapping
public Result deleteMul(@RequestBody JsonNode para) {
if (para.size()>0){
List<Integer> error = new ArrayList<>();
for (final JsonNode item : para){
Integer id = item.asInt();
if(!savepointsService.removeById(id)){
error.add(id);
}
}
if(error.size()==0) {
return Result.succeed("删除成功");
}else {
return Result.succeed("删除部分成功,但"+error.toString()+"删除失败,共"+error.size()+"次失败。");
}
}else{
return Result.failed("请选择要删除的记录");
}
}
/**
* 获取指定ID的信息
*/
@PostMapping("/getOneById")
public Result getOneById(@RequestBody Savepoints savepoints) throws Exception {
savepoints = savepointsService.getById(savepoints.getId());
return Result.succeed(savepoints,"获取成功");
}
/**
* 获取指定作业ID的所有savepoint
*/
@GetMapping("/listSavepointsByTaskId")
public Result listSavepointsByTaskId(@RequestParam Integer taskID) throws Exception {
return Result.succeed(savepointsService.listSavepointsByTaskId(taskID),"获取成功");
}
}
...@@ -133,10 +133,19 @@ public class StudioController { ...@@ -133,10 +133,19 @@ public class StudioController {
} }
/** /**
* 获取session列表 * 停止任务
*/ */
@GetMapping("/cancel") @GetMapping("/cancel")
public Result cancel(@RequestParam Integer clusterId,@RequestParam String jobId) { public Result cancel(@RequestParam Integer clusterId,@RequestParam String jobId) {
return Result.succeed(studioService.cancel(clusterId,jobId),"停止成功"); return Result.succeed(studioService.cancel(clusterId,jobId),"停止成功");
} }
/**
* savepoint
*/
@GetMapping("/savepoint")
public Result savepoint(@RequestParam Integer clusterId,@RequestParam String jobId,
@RequestParam String savePointType,@RequestParam String name) {
return Result.succeed(studioService.savepoint(clusterId,jobId,savePointType,name),"savepoint 成功");
}
} }
...@@ -9,6 +9,8 @@ import com.dlink.service.ClusterService; ...@@ -9,6 +9,8 @@ import com.dlink.service.ClusterService;
import com.dlink.service.HistoryService; import com.dlink.service.HistoryService;
import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.DependsOn;
import java.time.LocalDateTime;
/** /**
* Job2MysqlHandler * Job2MysqlHandler
* *
...@@ -73,7 +75,7 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -73,7 +75,7 @@ public class Job2MysqlHandler implements JobHandler {
// history.setResult(JSONUtil.toJsonStr(job.getResult())); // history.setResult(JSONUtil.toJsonStr(job.getResult()));
if(job.isUseGateway()){ if(job.isUseGateway()){
Cluster cluster = clusterService.registersCluster(Cluster.autoRegistersCluster(job.getJobManagerAddress(), Cluster cluster = clusterService.registersCluster(Cluster.autoRegistersCluster(job.getJobManagerAddress(),
job.getJobConfig().getJobName(), job.getType().getLongValue(), job.getJobConfig().getJobName()+ LocalDateTime.now(), job.getType().getLongValue(),
job.getJobConfig().getClusterConfigurationId(),job.getJobConfig().getClusterConfigurationId())); job.getJobConfig().getClusterConfigurationId(),job.getJobConfig().getClusterConfigurationId()));
if(Asserts.isNotNull(cluster)){ if(Asserts.isNotNull(cluster)){
history.setClusterId(cluster.getId()); history.setClusterId(cluster.getId());
......
package com.dlink.mapper;
import com.dlink.db.mapper.SuperMapper;
import com.dlink.model.Savepoints;
import org.apache.ibatis.annotations.Mapper;
/**
* Savepoints
*
* @author wenmo
* @since 2021/11/21
**/
@Mapper
public interface SavepointsMapper extends SuperMapper<Savepoints> {
}
package com.dlink.model;
import com.baomidou.mybatisplus.annotation.*;
import com.dlink.db.annotation.Save;
import lombok.Data;
import lombok.EqualsAndHashCode;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* Savepoints
*
* @author wenmo
* @since 2021/11/21 16:10
*/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("dlink_savepoints")
public class Savepoints implements Serializable {
private static final long serialVersionUID = 115345627846554078L;
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
@NotNull(message = "作业ID不能为空", groups = {Save.class})
private Integer taskId;
private String name;
private String type;
private String path;
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createTime;
protected Serializable pkVal() {
return this.id;
}
}
...@@ -8,6 +8,8 @@ import com.dlink.job.JobConfig; ...@@ -8,6 +8,8 @@ import com.dlink.job.JobConfig;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import java.util.List;
/** /**
* 任务 * 任务
* *
...@@ -50,6 +52,9 @@ public class Task extends SuperEntity{ ...@@ -50,6 +52,9 @@ public class Task extends SuperEntity{
@TableField(exist = false) @TableField(exist = false)
private String clusterName; private String clusterName;
@TableField(exist = false)
private List<Savepoints> savepoints;
/*public ExecutorSetting buildExecutorSetting(){ /*public ExecutorSetting buildExecutorSetting(){
HashMap configMap = new HashMap(); HashMap configMap = new HashMap();
if(config!=null&&!"".equals(clusterName)) { if(config!=null&&!"".equals(clusterName)) {
......
package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.model.Savepoints;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
import java.util.Map;
/**
* Savepoints
*
* @author wenmo
* @since 2021/11/21
**/
public interface SavepointsService extends ISuperService<Savepoints> {
List<Savepoints> listSavepointsByTaskId(Integer taskId);
}
...@@ -48,4 +48,6 @@ public interface StudioService { ...@@ -48,4 +48,6 @@ public interface StudioService {
List<JsonNode> listJobs(Integer clusterId); List<JsonNode> listJobs(Integer clusterId);
boolean cancel(Integer clusterId,String jobId); boolean cancel(Integer clusterId,String jobId);
boolean savepoint(Integer clusterId,String jobId,String savePointType,String name);
} }
...@@ -14,7 +14,7 @@ import java.util.Map; ...@@ -14,7 +14,7 @@ import java.util.Map;
**/ **/
public interface SysConfigService extends ISuperService<SysConfig> { public interface SysConfigService extends ISuperService<SysConfig> {
Map<String,String> getAll(); Map<String,Object> getAll();
void initSysConfig(); void initSysConfig();
......
package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.SavepointsMapper;
import com.dlink.model.Savepoints;
import com.dlink.service.SavepointsService;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* SavepointsServiceImpl
*
* @author wenmo
* @since 2021/11/21
**/
@Service
public class SavepointsServiceImpl extends SuperServiceImpl<SavepointsMapper, Savepoints> implements SavepointsService {
@Override
public List<Savepoints> listSavepointsByTaskId(Integer taskId) {
return list(new QueryWrapper<Savepoints>().eq("task_id",taskId));
}
}
...@@ -8,14 +8,20 @@ import com.dlink.dto.StudioExecuteDTO; ...@@ -8,14 +8,20 @@ import com.dlink.dto.StudioExecuteDTO;
import com.dlink.explainer.ca.CABuilder; import com.dlink.explainer.ca.CABuilder;
import com.dlink.explainer.ca.ColumnCANode; import com.dlink.explainer.ca.ColumnCANode;
import com.dlink.explainer.ca.TableCANode; import com.dlink.explainer.ca.TableCANode;
import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.job.JobConfig; import com.dlink.job.JobConfig;
import com.dlink.job.JobManager; import com.dlink.job.JobManager;
import com.dlink.job.JobResult; import com.dlink.job.JobResult;
import com.dlink.model.Cluster; import com.dlink.model.Cluster;
import com.dlink.model.Savepoints;
import com.dlink.model.SystemConfiguration;
import com.dlink.result.IResult; import com.dlink.result.IResult;
import com.dlink.result.SelectResult; import com.dlink.result.SelectResult;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import com.dlink.service.ClusterConfigurationService;
import com.dlink.service.ClusterService; import com.dlink.service.ClusterService;
import com.dlink.service.SavepointsService;
import com.dlink.service.StudioService; import com.dlink.service.StudioService;
import com.dlink.session.SessionConfig; import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo; import com.dlink.session.SessionInfo;
...@@ -25,7 +31,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode; ...@@ -25,7 +31,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* StudioServiceImpl * StudioServiceImpl
...@@ -38,6 +46,10 @@ public class StudioServiceImpl implements StudioService { ...@@ -38,6 +46,10 @@ public class StudioServiceImpl implements StudioService {
@Autowired @Autowired
private ClusterService clusterService; private ClusterService clusterService;
@Autowired
private ClusterConfigurationService clusterConfigurationService;
@Autowired
private SavepointsService savepointsService;
@Override @Override
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) { public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
...@@ -142,6 +154,45 @@ public class StudioServiceImpl implements StudioService { ...@@ -142,6 +154,45 @@ public class StudioServiceImpl implements StudioService {
public boolean cancel(Integer clusterId,String jobId) { public boolean cancel(Integer clusterId,String jobId) {
Cluster cluster = clusterService.getById(clusterId); Cluster cluster = clusterService.getById(clusterId);
Asserts.checkNotNull(cluster,"该集群不存在"); Asserts.checkNotNull(cluster,"该集群不存在");
return FlinkAPI.build(cluster.getJobManagerHost()).stop(jobId); JobConfig jobConfig = new JobConfig();
jobConfig.setAddress(cluster.getJobManagerHost());
if(Asserts.isNotNull(cluster.getClusterConfigurationId())){
Map<String, String> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
}
jobConfig.setUseRestAPI(SystemConfiguration.getInstances().isUseRestAPI());
JobManager jobManager = JobManager.build(jobConfig);
return jobManager.cancel(jobId);
}
@Override
public boolean savepoint(Integer clusterId, String jobId, String savePointType,String name) {
Cluster cluster = clusterService.getById(clusterId);
Asserts.checkNotNull(cluster,"该集群不存在");
JobConfig jobConfig = new JobConfig();
jobConfig.setAddress(cluster.getJobManagerHost());
jobConfig.setType(cluster.getType());
if(Asserts.isNotNull(cluster.getClusterConfigurationId())){
Map<String, String> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
}
jobConfig.setUseRestAPI(SystemConfiguration.getInstances().isUseRestAPI());
JobManager jobManager = JobManager.build(jobConfig);
jobManager.setUseGateway(true);
SavePointResult savePointResult = jobManager.savepoint(jobId, savePointType);
if(Asserts.isNotNull(savePointResult)){
for(JobInfo item : savePointResult.getJobInfos()){
if(Asserts.isEqualsIgnoreCase(jobId,item.getJobId())){
Savepoints savepoints = new Savepoints();
savepoints.setName(name);
savepoints.setType(savePointType);
savepoints.setPath(item.getSavePoint());
savepoints.setTaskId(cluster.getTaskId());
savepointsService.save(savepoints);
}
}
return true;
}
return false;
} }
} }
...@@ -29,8 +29,8 @@ public class SysConfigServiceImpl extends SuperServiceImpl<SysConfigMapper, SysC ...@@ -29,8 +29,8 @@ public class SysConfigServiceImpl extends SuperServiceImpl<SysConfigMapper, SysC
private static final ObjectMapper mapper = new ObjectMapper(); private static final ObjectMapper mapper = new ObjectMapper();
@Override @Override
public Map<String, String> getAll() { public Map<String, Object> getAll() {
Map<String,String> map = new HashMap<>(); Map<String,Object> map = new HashMap<>();
List<SysConfig> sysConfigs = list(); List<SysConfig> sysConfigs = list();
for(SysConfig item : sysConfigs){ for(SysConfig item : sysConfigs){
map.put(item.getName(),item.getValue()); map.put(item.getName(),item.getValue());
......
...@@ -10,10 +10,7 @@ import com.dlink.model.Cluster; ...@@ -10,10 +10,7 @@ import com.dlink.model.Cluster;
import com.dlink.model.Statement; import com.dlink.model.Statement;
import com.dlink.model.SystemConfiguration; import com.dlink.model.SystemConfiguration;
import com.dlink.model.Task; import com.dlink.model.Task;
import com.dlink.service.ClusterConfigurationService; import com.dlink.service.*;
import com.dlink.service.ClusterService;
import com.dlink.service.StatementService;
import com.dlink.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -35,6 +32,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -35,6 +32,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private ClusterService clusterService; private ClusterService clusterService;
@Autowired @Autowired
private ClusterConfigurationService clusterConfigurationService; private ClusterConfigurationService clusterConfigurationService;
@Autowired
private SavepointsService savepointsService;
@Value("${spring.datasource.driver-class-name}") @Value("${spring.datasource.driver-class-name}")
private String driver; private String driver;
...@@ -87,6 +86,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -87,6 +86,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
task.setStatement(statement.getStatement()); task.setStatement(statement.getStatement());
} }
} }
return task; return task;
} }
......
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dlink.mapper.SavepointsMapper">
<select id="selectForProTable" resultType="com.dlink.model.Savepoints">
select
a.*
from
dlink_savepoints a
<where>
1=1
<if test='param.name!=null and param.name!=""'>
and a.name like "%${param.name}%"
</if>
<if test='param.createTime!=null and param.createTime!=""'>
and a.create_time <![CDATA[>=]]> str_to_date( #{param.createTime},'%Y-%m-%d %H:%i:%s')
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!="" and !ew.sqlSegment.startsWith(" ORDER BY")'>
and
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!=""'>
${ew.sqlSegment}
</if>
</where>
</select>
</mapper>
package com.dlink.model; package com.dlink.model;
import com.dlink.assertion.Asserts;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
...@@ -16,6 +19,12 @@ public class SystemConfiguration { ...@@ -16,6 +19,12 @@ public class SystemConfiguration {
public static SystemConfiguration getInstances() { public static SystemConfiguration getInstances() {
return systemConfiguration; return systemConfiguration;
} }
private static final List<Configuration> CONFIGURATION_LIST = new ArrayList<Configuration>(){{
add(systemConfiguration.sqlSubmitJarPath);
add(systemConfiguration.sqlSubmitJarParas);
add(systemConfiguration.sqlSubmitJarMainAppClass);
add(systemConfiguration.useRestAPI);
}};
private Configuration sqlSubmitJarPath = new Configuration( private Configuration sqlSubmitJarPath = new Configuration(
"sqlSubmitJarPath", "sqlSubmitJarPath",
...@@ -38,28 +47,40 @@ public class SystemConfiguration { ...@@ -38,28 +47,40 @@ public class SystemConfiguration {
"com.dlink.app.MainApp", "com.dlink.app.MainApp",
"用于指定Applcation模式提交FlinkSQL的Jar的主类" "用于指定Applcation模式提交FlinkSQL的Jar的主类"
); );
private Configuration useRestAPI = new Configuration(
"useRestAPI",
"使用 RestAPI",
ValueType.BOOLEAN,
true,
"在运维 Flink 任务时是否使用 RestAPI"
);
public void setConfiguration(JsonNode jsonNode){ public void setConfiguration(JsonNode jsonNode){
if(jsonNode.has("sqlSubmitJarPath")){ for(Configuration item : CONFIGURATION_LIST){
setSqlSubmitJarPath(jsonNode.get("sqlSubmitJarPath").asText()); if(!jsonNode.has(item.getName())){
} continue;
if(jsonNode.has("sqlSubmitJarParas")){ }
setSqlSubmitJarParas(jsonNode.get("sqlSubmitJarParas").asText()); switch (item.getType()){
} case BOOLEAN:
if(jsonNode.has("sqlSubmitJarMainAppClass")){ item.setValue(jsonNode.get(item.getName()).asBoolean());
setSqlSubmitJarMainAppClass(jsonNode.get("sqlSubmitJarMainAppClass").asText()); break;
case INT:
item.setValue(jsonNode.get(item.getName()).asInt());
break;
default:
item.setValue(jsonNode.get(item.getName()).asText());
}
} }
} }
public void addConfiguration(Map<String,String> map){ public void addConfiguration(Map<String,Object> map){
if(!map.containsKey("sqlSubmitJarPath")){ for(Configuration item : CONFIGURATION_LIST){
map.put("sqlSubmitJarPath",sqlSubmitJarPath.getValue().toString()); if(map.containsKey(item.getName())&&item.getType().equals(ValueType.BOOLEAN)){
} map.put(item.getName(), Asserts.isEqualsIgnoreCase("true",map.get(item.getName()).toString()));
if(!map.containsKey("sqlSubmitJarParas")){ }
map.put("sqlSubmitJarParas",sqlSubmitJarParas.getValue().toString()); if(!map.containsKey(item.getName())) {
} map.put(item.getName(), item.getValue());
if(!map.containsKey("sqlSubmitJarMainAppClass")){ }
map.put("sqlSubmitJarMainAppClass",sqlSubmitJarMainAppClass.getValue().toString());
} }
} }
...@@ -87,6 +108,14 @@ public class SystemConfiguration { ...@@ -87,6 +108,14 @@ public class SystemConfiguration {
this.sqlSubmitJarMainAppClass.setValue(sqlSubmitJarMainAppClass); this.sqlSubmitJarMainAppClass.setValue(sqlSubmitJarMainAppClass);
} }
public boolean isUseRestAPI() {
return (boolean) useRestAPI.getValue();
}
public void setUseRestAPI(boolean useRestAPI) {
this.useRestAPI.setValue(useRestAPI);
}
enum ValueType{ enum ValueType{
STRING,INT,DOUBLE,FLOAT,BOOLEAN,DATE STRING,INT,DOUBLE,FLOAT,BOOLEAN,DATE
} }
...@@ -115,5 +144,13 @@ public class SystemConfiguration { ...@@ -115,5 +144,13 @@ public class SystemConfiguration {
public Object getValue() { public Object getValue() {
return value; return value;
} }
public ValueType getType() {
return type;
}
public String getName() {
return name;
}
} }
} }
...@@ -37,13 +37,17 @@ public class JobConfig { ...@@ -37,13 +37,17 @@ public class JobConfig {
private Integer parallelism; private Integer parallelism;
private String savePointPath; private String savePointPath;
private GatewayConfig gatewayConfig; private GatewayConfig gatewayConfig;
private boolean useRestAPI;
private Map<String,String> config; private Map<String,String> config;
public JobConfig(String type,boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId, public JobConfig() {
Integer clusterConfigurationId,Integer taskId, String jobName, boolean useSqlFragment, }
boolean useStatementSet,Integer maxRowNum, Integer checkpoint,
Integer parallelism, String savePointPath,Map<String,String> config) { public JobConfig(String type, boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId,
Integer clusterConfigurationId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet, Integer maxRowNum, Integer checkpoint,
Integer parallelism, String savePointPath, Map<String,String> config) {
this.type = type; this.type = type;
this.useResult = useResult; this.useResult = useResult;
this.useSession = useSession; this.useSession = useSession;
......
package com.dlink.job; package com.dlink.job;
import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkSQLConstant; import com.dlink.constant.FlinkSQLConstant;
import com.dlink.executor.EnvironmentSetting; import com.dlink.executor.EnvironmentSetting;
...@@ -9,9 +10,12 @@ import com.dlink.executor.custom.CustomTableEnvironmentImpl; ...@@ -9,9 +10,12 @@ import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.explainer.Explainer; import com.dlink.explainer.Explainer;
import com.dlink.gateway.Gateway; import com.dlink.gateway.Gateway;
import com.dlink.gateway.GatewayType; import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.ActionType;
import com.dlink.gateway.config.AppConfig; import com.dlink.gateway.config.AppConfig;
import com.dlink.gateway.config.FlinkConfig; import com.dlink.gateway.config.FlinkConfig;
import com.dlink.gateway.config.SavePointType;
import com.dlink.gateway.result.GatewayResult; import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.SavePointResult;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
import com.dlink.result.*; import com.dlink.result.*;
...@@ -50,6 +54,10 @@ public class JobManager extends RunTime { ...@@ -50,6 +54,10 @@ public class JobManager extends RunTime {
public JobManager() { public JobManager() {
} }
public void setUseGateway(boolean useGateway) {
this.useGateway = useGateway;
}
public JobManager(String address, ExecutorSetting executorSetting) { public JobManager(String address, ExecutorSetting executorSetting) {
if (address != null) { if (address != null) {
this.environmentSetting = EnvironmentSetting.build(address); this.environmentSetting = EnvironmentSetting.build(address);
...@@ -169,7 +177,7 @@ public class JobManager extends RunTime { ...@@ -169,7 +177,7 @@ public class JobManager extends RunTime {
} }
public JobResult executeSql(String statement) { public JobResult executeSql(String statement) {
Job job = Job.init(GatewayType.get(config.getType()), config, executorSetting, executor, statement,useGateway); Job job = Job.init(GatewayType.get(config.getType()), config, executorSetting, executor, statement, useGateway);
JobContextHolder.setJob(job); JobContextHolder.setJob(job);
if (!useGateway) { if (!useGateway) {
job.setJobManagerAddress(environmentSetting.getAddress()); job.setJobManagerAddress(environmentSetting.getAddress());
...@@ -183,7 +191,7 @@ public class JobManager extends RunTime { ...@@ -183,7 +191,7 @@ public class JobManager extends RunTime {
currentSql = item.getValue(); currentSql = item.getValue();
executor.executeSql(item.getValue()); executor.executeSql(item.getValue());
} }
if(jobParam.getTrans().size()>0) { if (jobParam.getTrans().size() > 0) {
if (config.isUseStatementSet() && useGateway) { if (config.isUseStatementSet() && useGateway) {
List<String> inserts = new ArrayList<>(); List<String> inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) { for (StatementParam item : jobParam.getTrans()) {
...@@ -275,13 +283,14 @@ public class JobManager extends RunTime { ...@@ -275,13 +283,14 @@ public class JobManager extends RunTime {
return job.getJobResult(); return job.getJobResult();
} }
private String formatAddress(String webURL){ private String formatAddress(String webURL) {
if(Asserts.isNotNullString(webURL)) { if (Asserts.isNotNullString(webURL)) {
return webURL.replaceAll("http://",""); return webURL.replaceAll("http://", "");
}else { } else {
return ""; return "";
} }
} }
private JobParam pretreatStatements(String[] statements) { private JobParam pretreatStatements(String[] statements) {
List<StatementParam> ddl = new ArrayList<>(); List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>(); List<StatementParam> trans = new ArrayList<>();
...@@ -358,4 +367,26 @@ public class JobManager extends RunTime { ...@@ -358,4 +367,26 @@ public class JobManager extends RunTime {
Explainer explainer = Explainer.build(executor); Explainer explainer = Explainer.build(executor);
return explainer.getStreamGraph(statement); return explainer.getStreamGraph(statement);
} }
public boolean cancel(String jobId) {
if (useGateway && !config.isUseRestAPI()) {
config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.CANCEL.getValue(),
null, config.getSavePointPath()));
Gateway.build(config.getGatewayConfig()).savepointJob();
return true;
} else {
return FlinkAPI.build(config.getAddress()).stop(jobId);
}
}
public SavePointResult savepoint(String jobId,String savePointType) {
if (useGateway && !config.isUseRestAPI()) {
config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.SAVEPOINT.getValue(),
savePointType, config.getSavePointPath()));
return Gateway.build(config.getGatewayConfig()).savepointJob();
} else {
return null;
}
}
} }
...@@ -427,4 +427,14 @@ ALTER TABLE `dlink_cluster` ...@@ -427,4 +427,14 @@ ALTER TABLE `dlink_cluster`
ALTER TABLE `dlink_cluster` ALTER TABLE `dlink_cluster`
ADD COLUMN `task_id` int(11) NULL COMMENT '任务ID' AFTER `cluster_configuration_id`; ADD COLUMN `task_id` int(11) NULL COMMENT '任务ID' AFTER `cluster_configuration_id`;
CREATE TABLE `dlink_savepoints` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`task_id` int(11) NOT NULL COMMENT '任务ID',
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '名称',
`type` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '类型',
`path` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '路径',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1; SET FOREIGN_KEY_CHECKS = 1;
...@@ -4,6 +4,7 @@ import com.dlink.assertion.Asserts; ...@@ -4,6 +4,7 @@ import com.dlink.assertion.Asserts;
import com.dlink.gateway.config.GatewayConfig; import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.exception.GatewayException; import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult; import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.SavePointResult;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import java.util.Iterator; import java.util.Iterator;
...@@ -50,8 +51,8 @@ public interface Gateway { ...@@ -50,8 +51,8 @@ public interface Gateway {
GatewayResult submitJar(); GatewayResult submitJar();
GatewayResult savepointCluster(); SavePointResult savepointCluster();
GatewayResult savepointJob(); SavePointResult savepointJob();
} }
...@@ -57,4 +57,9 @@ public class FlinkConfig { ...@@ -57,4 +57,9 @@ public class FlinkConfig {
} }
return new FlinkConfig(jobName,jobId,ActionType.get(actionStr),SavePointType.get(savePointTypeStr),savePoint,configParasList); return new FlinkConfig(jobName,jobId,ActionType.get(actionStr),SavePointType.get(savePointTypeStr),savePoint,configParasList);
} }
public static FlinkConfig build(String jobId, String actionStr, String savePointTypeStr, String savePoint){
return new FlinkConfig(null,jobId,ActionType.get(actionStr),SavePointType.get(savePointTypeStr),savePoint,null);
}
} }
...@@ -85,7 +85,7 @@ public abstract class YarnGateway extends AbstractGateway { ...@@ -85,7 +85,7 @@ public abstract class YarnGateway extends AbstractGateway {
} }
} }
public GatewayResult savepointCluster(){ public SavePointResult savepointCluster(){
if(Asserts.isNull(yarnClient)){ if(Asserts.isNull(yarnClient)){
init(); init();
} }
...@@ -119,7 +119,7 @@ public abstract class YarnGateway extends AbstractGateway { ...@@ -119,7 +119,7 @@ public abstract class YarnGateway extends AbstractGateway {
return result; return result;
} }
public GatewayResult savepointJob(){ public SavePointResult savepointJob(){
if(Asserts.isNull(yarnClient)){ if(Asserts.isNull(yarnClient)){
init(); init();
} }
......
import {Empty, Tag, Divider, Tooltip, message, Select, Button, Space, Modal} from "antd"; import {Empty, Tag, Divider, Tooltip, message, Select, Button, Space, Modal,Dropdown,Menu} from "antd";
import {StateType} from "@/pages/FlinkSqlStudio/model"; import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi"; import {connect} from "umi";
import {useState} from "react"; import {useState} from "react";
import {SearchOutlined,CheckCircleOutlined,SyncOutlined,CloseCircleOutlined,ClockCircleOutlined,MinusCircleOutlined} from '@ant-design/icons'; import {SearchOutlined,CheckCircleOutlined,SyncOutlined,CloseCircleOutlined,ClockCircleOutlined,MinusCircleOutlined,DownOutlined} from '@ant-design/icons';
import ProTable from '@ant-design/pro-table'; import ProTable from '@ant-design/pro-table';
import {cancelJob, showFlinkJobs} from "../../StudioEvent/DDL"; import {cancelJob, savepointJob, showFlinkJobs} from "../../StudioEvent/DDL";
import {ClusterTableListItem} from "@/pages/Cluster/data";
import React from "react";
const {Option} = Select; const {Option} = Select;
...@@ -14,6 +16,46 @@ const StudioProcess = (props: any) => { ...@@ -14,6 +16,46 @@ const StudioProcess = (props: any) => {
const [jobsData, setJobsData] = useState<any>({}); const [jobsData, setJobsData] = useState<any>({});
const [clusterId, setClusterId] = useState<number>(); const [clusterId, setClusterId] = useState<number>();
const savepoint = (key: string | number, currentItem: {}) => {
Modal.confirm({
title: key+'任务',
content: `确定${key}该作业吗?`,
okText: '确认',
cancelText: '取消',
onOk: async () => {
if (!clusterId) return;
let res = savepointJob(clusterId, currentItem.jid,key,key);
res.then((result) => {
if (result.datas == true) {
message.success(key+"成功");
onRefreshJobs();
} else {
message.error(key+"失败");
}
});
}
});
};
const SavePointBtn: React.FC<{
item: {};
}> = ({item}) => (
<Dropdown
overlay={
<Menu onClick={({key}) => savepoint(key, item)}>
<Menu.Item key="trigger">Trigger</Menu.Item>
<Menu.Item key="dispose">Dispose</Menu.Item>
<Menu.Item key="stop">Stop</Menu.Item>
<Menu.Item key="cancel">Cancel</Menu.Item>
</Menu>
}
>
<a>
SavePoint <DownOutlined/>
</a>
</Dropdown>
);
const getColumns = () => { const getColumns = () => {
let columns: any = [{ let columns: any = [{
title: "作业ID", title: "作业ID",
...@@ -121,6 +163,7 @@ const StudioProcess = (props: any) => { ...@@ -121,6 +163,7 @@ const StudioProcess = (props: any) => {
停止 停止
</a>); </a>);
} }
option.push(<SavePointBtn key="savepoint" item={record}/>,)
return option; return option;
}, },
},]; },];
......
...@@ -141,6 +141,10 @@ export function showFlinkJobs(clusterId:number) { ...@@ -141,6 +141,10 @@ export function showFlinkJobs(clusterId:number) {
export function cancelJob(clusterId:number,jobId:string) { export function cancelJob(clusterId:number,jobId:string) {
return getData('api/studio/cancel',{clusterId:clusterId,jobId:jobId}); return getData('api/studio/cancel',{clusterId:clusterId,jobId:jobId});
} }
/*--- 停止 SavePoint Jobs ---*/
export function savepointJob(clusterId:number,jobId:string,savePointType:string,name:string) {
return getData('api/studio/savepoint',{clusterId,jobId,savePointType,name});
}
/*--- 根据版本号获取所有自动补全的文档 ---*/ /*--- 根据版本号获取所有自动补全的文档 ---*/
export function getFillAllByVersion(version:string,dispatch: any) { export function getFillAllByVersion(version:string,dispatch: any) {
const res = getData('api/document/getFillAllByVersion',{version:version}); const res = getData('api/document/getFillAllByVersion',{version:version});
......
...@@ -130,6 +130,7 @@ const StudioMenu = (props: any) => { ...@@ -130,6 +130,7 @@ const StudioMenu = (props: any) => {
} else { } else {
message.success('异步提交失败'); message.success('异步提交失败');
} }
showCluster(dispatch);
} }
}); });
}; };
......
export type SavePointTableListItem = {
id: number,
taskId: number,
name: string,
type: string,
path: string,
createTime: Date,
};
import React, {useRef, useState} from "react";
import {DownOutlined, HeartOutlined, PlusOutlined, UserOutlined} from '@ant-design/icons';
import {ActionType, ProColumns} from "@ant-design/pro-table";
import {Drawer} from 'antd';
import {PageContainer} from '@ant-design/pro-layout';
import ProTable from '@ant-design/pro-table';
import ProDescriptions from '@ant-design/pro-descriptions';
import {queryData} from "@/components/Common/crud";
import {SavePointTableListItem} from "@/components/Studio/StudioRightTool/StudioSavePoint/data";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
const url = '/api/clusterConfiguration';
const StudioSavePoint: React.FC<{}> = (props: any) => {
const {current,dispatch} = props;
const [row, setRow] = useState<SavePointTableListItem>();
const actionRef = useRef<ActionType>();
const columns: ProColumns<SavePointTableListItem>[] = [
{
title: '名称',
dataIndex: 'name',
sorter: true,
render: (dom, entity) => {
return <a onClick={() => setRow(entity)}>{dom}</a>;
},
},
{
title: 'id',
dataIndex: 'id',
hideInTable: true,
hideInForm: true,
hideInSearch: true,
},
{
title: '作业ID',
dataIndex: 'taskId',
hideInTable: true,
hideInForm: true,
hideInSearch: true,
},
{
title: '类型',
dataIndex: 'type',
hideInTable: true,
hideInForm: true,
hideInSearch: true,
},
{
title: '路径',
dataIndex: 'path',
hideInTable: true,
hideInForm: true,
hideInSearch: true,
},
{
title: '创建时间',
dataIndex: 'createTime',
sorter: true,
valueType: 'dateTime',
hideInTable: true,
},
];
return (
<PageContainer>
<ProTable<SavePointTableListItem>
headerTitle="savepoints"
actionRef={actionRef}
rowKey="id"
search={{
labelWidth: 120,
}}
request={(params, sorter, filter) => queryData(url, {taskId:current.taskId,...params, sorter, filter})}
columns={columns}
/>
<Drawer
width={600}
visible={!!row}
onClose={() => {
setRow(undefined);
}}
closable={false}
>
{row?.name && (
<ProDescriptions<SavePointTableListItem>
column={2}
title={row?.name}
request={async () => ({
data: row || {},
})}
params={{
id: row?.name,
}}
columns={columns}
/>
)}
</Drawer>
</PageContainer>
);
};
export default connect(({ Studio }: { Studio: StateType }) => ({
current: Studio.current,
}))(StudioSavePoint);
...@@ -38,7 +38,10 @@ const StudioSetting = (props: any) => { ...@@ -38,7 +38,10 @@ const StudioSetting = (props: any) => {
return itemList; return itemList;
}; };
form.setFieldsValue(current.task); useEffect(()=>{
form.setFieldsValue(current.task);
},[current.task]);
const onValuesChange = (change:any,all:any)=>{ const onValuesChange = (change:any,all:any)=>{
let newTabs = tabs; let newTabs = tabs;
......
...@@ -5,6 +5,7 @@ import {connect} from "umi"; ...@@ -5,6 +5,7 @@ import {connect} from "umi";
import styles from "./index.less"; import styles from "./index.less";
import StudioConfig from "./StudioConfig"; import StudioConfig from "./StudioConfig";
import StudioSetting from "./StudioSetting"; import StudioSetting from "./StudioSetting";
import StudioSavePoint from "./StudioSavePoint";
const { TabPane } = Tabs; const { TabPane } = Tabs;
...@@ -22,8 +23,8 @@ const StudioRightTool = (props:any) => { ...@@ -22,8 +23,8 @@ const StudioRightTool = (props:any) => {
<TabPane tab={<span><ScheduleOutlined /> 执行配置</span>} key="StudioConfig" > <TabPane tab={<span><ScheduleOutlined /> 执行配置</span>} key="StudioConfig" >
<StudioConfig form={form}/> <StudioConfig form={form}/>
</TabPane> </TabPane>
<TabPane tab={<span><ScheduleOutlined /> 详情</span>} key="3" > <TabPane tab={<span><ScheduleOutlined /> 保存点</span>} key="3" >
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} /> <StudioSavePoint />
</TabPane> </TabPane>
<TabPane tab={<span><AuditOutlined /> 审计</span>} key="4" > <TabPane tab={<span><AuditOutlined /> 审计</span>} key="4" >
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} /> <Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
......
import React, {useState} from 'react'; import React, {useEffect, useState} from 'react';
import {List,Input} from 'antd'; import {List,Input,Switch,Form} from 'antd';
import {connect} from "umi"; import {connect} from "umi";
import {SettingsStateType} from "@/pages/Settings/model"; import {SettingsStateType} from "@/pages/Settings/model";
import {saveSettings} from "@/pages/Settings/function"; import {saveSettings} from "@/pages/Settings/function";
...@@ -8,20 +8,28 @@ type FlinkConfigProps = { ...@@ -8,20 +8,28 @@ type FlinkConfigProps = {
sqlSubmitJarPath: SettingsStateType['sqlSubmitJarPath']; sqlSubmitJarPath: SettingsStateType['sqlSubmitJarPath'];
sqlSubmitJarParas: SettingsStateType['sqlSubmitJarParas']; sqlSubmitJarParas: SettingsStateType['sqlSubmitJarParas'];
sqlSubmitJarMainAppClass: SettingsStateType['sqlSubmitJarMainAppClass']; sqlSubmitJarMainAppClass: SettingsStateType['sqlSubmitJarMainAppClass'];
useRestAPI: SettingsStateType['useRestAPI'];
dispatch: any; dispatch: any;
}; };
const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => { const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => {
const {sqlSubmitJarPath, sqlSubmitJarParas, sqlSubmitJarMainAppClass, dispatch} = props; const {sqlSubmitJarPath, sqlSubmitJarParas, sqlSubmitJarMainAppClass,useRestAPI, dispatch} = props;
const [editName, setEditName] = useState<string>(''); const [editName, setEditName] = useState<string>('');
const [formValues, setFormValues] = useState(props); const [formValues, setFormValues] = useState(props);
const [form] = Form.useForm();
useEffect(()=>{
form.setFieldsValue(props);
},[props]);
const getData = () => [ const getData = () => [
{ {
title: '提交FlinkSQL的Jar文件路径', title: '提交FlinkSQL的Jar文件路径',
description: ( description: (
editName!='sqlSubmitJarPath'? editName!='sqlSubmitJarPath'?
(sqlSubmitJarPath?sqlSubmitJarPath:'未设置'):(<Input (sqlSubmitJarPath?sqlSubmitJarPath:'未设置'):(
<Input
id='sqlSubmitJarPath' id='sqlSubmitJarPath'
defaultValue={sqlSubmitJarPath} defaultValue={sqlSubmitJarPath}
onChange={onChange} onChange={onChange}
...@@ -55,6 +63,16 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => { ...@@ -55,6 +63,16 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => {
actions: editName!='sqlSubmitJarMainAppClass'?[<a onClick={({}) => handleEditClick('sqlSubmitJarMainAppClass')}>修改</a>]: actions: editName!='sqlSubmitJarMainAppClass'?[<a onClick={({}) => handleEditClick('sqlSubmitJarMainAppClass')}>修改</a>]:
[<a onClick={({}) => handleSaveClick('sqlSubmitJarMainAppClass')}>保存</a>, [<a onClick={({}) => handleSaveClick('sqlSubmitJarMainAppClass')}>保存</a>,
<a onClick={({}) => handleCancelClick()}>取消</a>], <a onClick={({}) => handleCancelClick()}>取消</a>],
},{
title: '使用 RestAPI',
description: '启用后,Flink 任务的 savepoint、停止等操作将通过 JobManager 的 RestAPI 进行',
actions: [
<Form.Item
name="useRestAPI" valuePropName="checked"
>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
checked={useRestAPI}
/></Form.Item>],
}, },
]; ];
...@@ -64,6 +82,14 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => { ...@@ -64,6 +82,14 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => {
setFormValues({...formValues,...values}); setFormValues({...formValues,...values});
}; };
const onValuesChange = (change:any,all:any) => {
let values = {};
for(let key in change){
values[key]=all[key];
}
saveSettings(values, dispatch);
};
const handleEditClick = (name:string)=>{ const handleEditClick = (name:string)=>{
setEditName(name); setEditName(name);
}; };
...@@ -85,6 +111,11 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => { ...@@ -85,6 +111,11 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => {
const data = getData(); const data = getData();
return ( return (
<> <>
<Form
form={form}
layout="vertical"
onValuesChange={onValuesChange}
>
<List <List
itemLayout="horizontal" itemLayout="horizontal"
dataSource={data} dataSource={data}
...@@ -94,6 +125,7 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => { ...@@ -94,6 +125,7 @@ const FlinkConfigView: React.FC<FlinkConfigProps> = (props) => {
</List.Item> </List.Item>
)} )}
/> />
</Form>
</> </>
); );
}; };
...@@ -101,4 +133,5 @@ export default connect(({Settings}: { Settings: SettingsStateType }) => ({ ...@@ -101,4 +133,5 @@ export default connect(({Settings}: { Settings: SettingsStateType }) => ({
sqlSubmitJarPath: Settings.sqlSubmitJarPath, sqlSubmitJarPath: Settings.sqlSubmitJarPath,
sqlSubmitJarParas: Settings.sqlSubmitJarParas, sqlSubmitJarParas: Settings.sqlSubmitJarParas,
sqlSubmitJarMainAppClass: Settings.sqlSubmitJarMainAppClass, sqlSubmitJarMainAppClass: Settings.sqlSubmitJarMainAppClass,
useRestAPI: Settings.useRestAPI,
}))(FlinkConfigView); }))(FlinkConfigView);
...@@ -4,6 +4,7 @@ export type SettingsStateType = { ...@@ -4,6 +4,7 @@ export type SettingsStateType = {
sqlSubmitJarPath:string, sqlSubmitJarPath:string,
sqlSubmitJarParas:string, sqlSubmitJarParas:string,
sqlSubmitJarMainAppClass:string, sqlSubmitJarMainAppClass:string,
useRestAPI:boolean,
}; };
export type ModelType = { export type ModelType = {
...@@ -22,6 +23,7 @@ const SettingsModel: ModelType = { ...@@ -22,6 +23,7 @@ const SettingsModel: ModelType = {
sqlSubmitJarPath:'', sqlSubmitJarPath:'',
sqlSubmitJarParas:'', sqlSubmitJarParas:'',
sqlSubmitJarMainAppClass:'', sqlSubmitJarMainAppClass:'',
useRestAPI:true,
}, },
effects: { effects: {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment