Commit c0a20894 authored by wenmo's avatar wenmo

持久化与信息反馈

parent a6ad2058
...@@ -32,18 +32,18 @@ DataLink 开源项目及社区正在建设,希望本项目可以帮助你更 ...@@ -32,18 +32,18 @@ DataLink 开源项目及社区正在建设,希望本项目可以帮助你更
| | 同步执行 | 0.1.0 | | | 同步执行 | 0.1.0 |
| | 异步提交 | 0.1.0 | | | 异步提交 | 0.1.0 |
| | 函数自动补全 | 敬请期待 | | | 函数自动补全 | 敬请期待 |
| | 任务详情 | 敬请期待 | | | 任务详情 | 0.3.0 |
| | 任务审计 | 敬请期待 | | | 任务审计 | 敬请期待 |
| | 集群总览 | 敬请期待 | | | 集群总览 | 0.3.0 |
| | 集群任务 | 敬请期待 | | | 集群任务 | 0.3.0 |
| | 元数据查询 | 敬请期待 | | | 元数据查询 | 0.3.0 |
| | FlinkSQL 运行指标 | 敬请期待 | | | FlinkSQL 运行指标 | 敬请期待 |
| | 表级血缘分析 | 敬请期待 | | | 表级血缘分析 | 0.3.0 |
| | 字段级血缘分析 | 敬请期待 | | | 字段级血缘分析 | 0.3.0 |
| | 任务进程 | 敬请期待 | | | 任务进程 | 0.3.0 |
| | 示例与技巧文档 | 敬请期待 | | | 示例与技巧文档 | 敬请期待 |
| | FlinkSQL 执行图 | 敬请期待 | | | FlinkSQL 执行图 | 敬请期待 |
| | 远程任务停止 | 敬请期待 | | | 远程任务停止 | 0.3.0 |
| | 任务恢复 | 敬请期待 | | | 任务恢复 | 敬请期待 |
| | UDF注册 | 敬请期待 | | | UDF注册 | 敬请期待 |
| | 更改对比 | 敬请期待 | | | 更改对比 | 敬请期待 |
...@@ -54,7 +54,7 @@ DataLink 开源项目及社区正在建设,希望本项目可以帮助你更 ...@@ -54,7 +54,7 @@ DataLink 开源项目及社区正在建设,希望本项目可以帮助你更
| | 智能 Select 模式 | 敬请期待 | | | 智能 Select 模式 | 敬请期待 |
| | 自动补全元数据 | 敬请期待 | | | 自动补全元数据 | 敬请期待 |
| | 任务反压和倾斜提示 | 敬请期待 | | | 任务反压和倾斜提示 | 敬请期待 |
| | 流任务数据预览 | 敬请期待 | | | 流任务数据预览 | 0.3.0 |
| | ... | 欢迎提议 | | | ... | 欢迎提议 |
| 集群中心 | 集群注册与管理 | 0.1.0 | | 集群中心 | 集群注册与管理 | 0.1.0 |
| | 心跳检测 | 0.1.0 | | | 心跳检测 | 0.1.0 |
...@@ -68,10 +68,10 @@ DataLink 开源项目及社区正在建设,希望本项目可以帮助你更 ...@@ -68,10 +68,10 @@ DataLink 开源项目及社区正在建设,希望本项目可以帮助你更
| 用户中心 | 用户管理 | 敬请期待 | | 用户中心 | 用户管理 | 敬请期待 |
| | 角色管理 | 敬请期待 | | | 角色管理 | 敬请期待 |
| | 登录授权 | 敬请期待 | | | 登录授权 | 敬请期待 |
| 数据源中心 | 数据源注册与管理 | 敬请期待 | | 数据源中心 | 数据源注册与管理 | 0.3.0 |
| | 心跳检测 | 敬请期待 | | | 心跳检测 | 0.3.0 |
| | 元数据查询 | 敬请期待 | | | 元数据查询 | 0.3.0 |
| | 数据查询 | 敬请期待 | | | 数据查询 | 0.3.0 |
| | 质量分析 | 敬请期待 | | | 质量分析 | 敬请期待 |
| 调度中心 | 定时调度任务管理 | 敬请期待 | | 调度中心 | 定时调度任务管理 | 敬请期待 |
| | 依赖血缘调度任务管理 | 敬请期待 | | | 依赖血缘调度任务管理 | 敬请期待 |
...@@ -227,8 +227,7 @@ AGG BY TOP2(value) as (value,rank); ...@@ -227,8 +227,7 @@ AGG BY TOP2(value) as (value,rank);
#### 使用技巧 #### 使用技巧
1.[Flink AggTable 在 Dlink 的实践](https://github.com/DataLinkDC/dlink/blob/main/dlink-doc/doc/FlinkAggTable%E5%9C%A8Dlink%E7%9A%84%E5%BA%94%E7%94%A8.md) 1.[Flink AggTable 在 Dlink 的实践](https://github.com/DataLinkDC/dlink/blob/main/dlink-doc/doc/FlinkAggTable%E5%9C%A8Dlink%E7%9A%84%E5%BA%94%E7%94%A8.md)
2.[Dlink 概念原理与源码扩展介绍](https://github.com/DataLinkDC/dlink/blob/main/dlink-doc/doc/FlinkAggTable%E5%9C%A8Dlink%E7%9A%84%E5%BA%94%E7%94%A8.md) 2.[Dlink 概念原理与源码扩展介绍](https://github.com/DataLinkDC/dlink/blob/main/dlink-doc/doc/Dlink%E6%A0%B8%E5%BF%83%E6%A6%82%E5%BF%B5%E4%B8%8E%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86%E8%AF%A6%E8%A7%A3.md)
#### 常见问题及解决 #### 常见问题及解决
(=。=)~ 敬请期待。 (=。=)~ 敬请期待。
......
...@@ -42,7 +42,7 @@ public class HistoryController { ...@@ -42,7 +42,7 @@ public class HistoryController {
List<Integer> error = new ArrayList<>(); List<Integer> error = new ArrayList<>();
for (final JsonNode item : para){ for (final JsonNode item : para){
Integer id = item.asInt(); Integer id = item.asInt();
if(!historyService.removeById(id)){ if(!historyService.removeHistoryById(id)){
error.add(id); error.add(id);
} }
} }
......
...@@ -21,14 +21,7 @@ public class StudioDDLDTO { ...@@ -21,14 +21,7 @@ public class StudioDDLDTO {
private String statement; private String statement;
public JobConfig getJobConfig() { public JobConfig getJobConfig() {
return new JobConfig(useResult, useSession, getSession(), useRemote, clusterId); return new JobConfig(useResult, useSession, session, useRemote, clusterId);
} }
public String getSession() {
if(useRemote) {
return clusterId + "_" + session;
}else{
return "0_" + session;
}
}
} }
...@@ -3,6 +3,7 @@ package com.dlink.job; ...@@ -3,6 +3,7 @@ package com.dlink.job;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.dlink.context.SpringContextUtils; import com.dlink.context.SpringContextUtils;
import com.dlink.model.History; import com.dlink.model.History;
import com.dlink.parser.SqlType;
import com.dlink.service.HistoryService; import com.dlink.service.HistoryService;
import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.DependsOn;
...@@ -24,12 +25,16 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -24,12 +25,16 @@ public class Job2MysqlHandler implements JobHandler {
@Override @Override
public boolean init() { public boolean init() {
Job job = JobContextHolder.getJob(); Job job = JobContextHolder.getJob();
if(job.getType()!= SqlType.SELECT&&job.getType()!=SqlType.INSERT){
return false;
}
History history = new History(); History history = new History();
history.setClusterId(job.getJobConfig().getClusterId()); history.setClusterId(job.getJobConfig().getClusterId());
history.setJobManagerAddress(job.getJobManagerAddress()); history.setJobManagerAddress(job.getJobManagerAddress());
history.setJobName(job.getJobConfig().getJobName()); history.setJobName(job.getJobConfig().getJobName());
history.setSession(job.getJobConfig().getSession()); history.setSession(job.getJobConfig().getSession());
history.setStatus(job.getStatus().ordinal()); history.setStatus(job.getStatus().ordinal());
history.setStatement(job.getStatement());
history.setStartTime(job.getStartTime()); history.setStartTime(job.getStartTime());
history.setTaskId(job.getJobConfig().getTaskId()); history.setTaskId(job.getJobConfig().getTaskId());
history.setConfig(JSONUtil.toJsonStr(job.getJobConfig())); history.setConfig(JSONUtil.toJsonStr(job.getJobConfig()));
...@@ -51,10 +56,13 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -51,10 +56,13 @@ public class Job2MysqlHandler implements JobHandler {
@Override @Override
public boolean success() { public boolean success() {
Job job = JobContextHolder.getJob(); Job job = JobContextHolder.getJob();
if(job.getType()!= SqlType.SELECT&&job.getType()!=SqlType.INSERT){
return false;
}
History history = new History(); History history = new History();
history.setId(job.getId()); history.setId(job.getId());
history.setJobId(job.getJobId()); history.setJobId(job.getJobId());
history.setStatement(job.getStatement()); history.setType(job.getType().getType());
history.setStatus(job.getStatus().ordinal()); history.setStatus(job.getStatus().ordinal());
history.setEndTime(job.getEndTime()); history.setEndTime(job.getEndTime());
// history.setResult(JSONUtil.toJsonStr(job.getResult())); // history.setResult(JSONUtil.toJsonStr(job.getResult()));
...@@ -65,10 +73,14 @@ public class Job2MysqlHandler implements JobHandler { ...@@ -65,10 +73,14 @@ public class Job2MysqlHandler implements JobHandler {
@Override @Override
public boolean failed() { public boolean failed() {
Job job = JobContextHolder.getJob(); Job job = JobContextHolder.getJob();
if(job.getType()!= SqlType.SELECT&&job.getType()!=SqlType.INSERT){
return false;
}
History history = new History(); History history = new History();
history.setId(job.getId()); history.setId(job.getId());
history.setJobId(job.getJobId()); history.setJobId(job.getJobId());
history.setStatus(job.getStatus().ordinal()); history.setStatus(job.getStatus().ordinal());
history.setType(job.getType().getType());
history.setEndTime(job.getEndTime()); history.setEndTime(job.getEndTime());
history.setError(job.getError()); history.setError(job.getError());
historyService.updateById(history); historyService.updateById(history);
......
...@@ -29,6 +29,7 @@ public class History implements Serializable { ...@@ -29,6 +29,7 @@ public class History implements Serializable {
private String jobManagerAddress; private String jobManagerAddress;
private Integer status; private Integer status;
private String statement; private String statement;
private String type;
private String error; private String error;
private String result; private String result;
private String config; private String config;
......
...@@ -10,4 +10,5 @@ import com.dlink.model.History; ...@@ -10,4 +10,5 @@ import com.dlink.model.History;
* @since 2021/6/26 23:07 * @since 2021/6/26 23:07
*/ */
public interface HistoryService extends ISuperService<History> { public interface HistoryService extends ISuperService<History> {
public boolean removeHistoryById(Integer id);
} }
...@@ -3,6 +3,7 @@ package com.dlink.service.impl; ...@@ -3,6 +3,7 @@ package com.dlink.service.impl;
import com.dlink.db.service.impl.SuperServiceImpl; import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.HistoryMapper; import com.dlink.mapper.HistoryMapper;
import com.dlink.model.History; import com.dlink.model.History;
import com.dlink.result.ResultPool;
import com.dlink.service.HistoryService; import com.dlink.service.HistoryService;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -14,4 +15,12 @@ import org.springframework.stereotype.Service; ...@@ -14,4 +15,12 @@ import org.springframework.stereotype.Service;
*/ */
@Service @Service
public class HistoryServiceImpl extends SuperServiceImpl<HistoryMapper, History> implements HistoryService { public class HistoryServiceImpl extends SuperServiceImpl<HistoryMapper, History> implements HistoryService {
@Override
public boolean removeHistoryById(Integer id) {
History history = getById(id);
if(history!=null){
ResultPool.remove(history.getJobId());
}
return removeById(id);
}
} }
...@@ -2,6 +2,7 @@ package com.dlink.service.impl; ...@@ -2,6 +2,7 @@ package com.dlink.service.impl;
import com.dlink.assertion.Assert; import com.dlink.assertion.Assert;
import com.dlink.cluster.FlinkCluster; import com.dlink.cluster.FlinkCluster;
import com.dlink.constant.FlinkConstant;
import com.dlink.dto.StudioDDLDTO; import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO; import com.dlink.dto.StudioExecuteDTO;
import com.dlink.exception.BusException; import com.dlink.exception.BusException;
...@@ -77,6 +78,8 @@ public class StudioServiceImpl implements StudioService { ...@@ -77,6 +78,8 @@ public class StudioServiceImpl implements StudioService {
config.setHost(clusterService.getJobManagerAddress( config.setHost(clusterService.getJobManagerAddress(
clusterService.getById(studioExecuteDTO.getClusterId()) clusterService.getById(studioExecuteDTO.getClusterId())
)); ));
}else{
config.setHost(FlinkConstant.LOCAL_HOST);
} }
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
return jobManager.executeSql(studioExecuteDTO.getStatement()); return jobManager.executeSql(studioExecuteDTO.getStatement());
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
<result column="job_manager_address" property="jobManagerAddress"/> <result column="job_manager_address" property="jobManagerAddress"/>
<result column="status" property="status"/> <result column="status" property="status"/>
<result column="statement" property="statement"/> <result column="statement" property="statement"/>
<result column="type" property="type"/>
<result column="error" property="error"/> <result column="error" property="error"/>
<result column="result" property="result"/> <result column="result" property="result"/>
<result column="config" property="config"/> <result column="config" property="config"/>
...@@ -37,17 +38,28 @@ ...@@ -37,17 +38,28 @@
dlink_history a dlink_history a
<where> <where>
1=1 1=1
<if test='param.name!=null and param.name!=""'> <if test='param.jobId!=null and param.jobId!=""'>
and a.job_id = #{param.jobId}
</if>
<if test='param.jobName!=null and param.jobName!=""'>
and a.job_name like "%${param.jobName}%" and a.job_name like "%${param.jobName}%"
</if> </if>
<if test='param.alias!=null and param.alias!=""'> <if test='param.clusterId!=null and param.clusterId!=""'>
and a.statement like "%${param.statement}%" and a.cluster_id = #{param.clusterId}
</if>
<if test='param.session!=null and param.session!=""'>
and a.session = #{param.session}
</if>
<if test='param.status!=null and param.status!=""'>
and a.status = #{param.status}
</if> </if>
<if test='param.createTime!=null and param.createTime!=""'> <if test='param.startTime!=null and param.startTime!=""'>
and a.start_time <![CDATA[>=]]> str_to_date( #{param.startTime},'%Y-%m-%d %H:%i:%s') and a.start_time <![CDATA[>=]]> str_to_date( #{param.startTime[0]},'%Y-%m-%d %H:%i:%s')
and a.start_time <![CDATA[<=]]> str_to_date( #{param.startTime[1]},'%Y-%m-%d %H:%i:%s')
</if> </if>
<if test='param.updateTime!=null and param.updateTime!=""'> <if test='param.endTime!=null and param.endTime!=""'>
and a.end_time <![CDATA[>=]]> str_to_date( #{param.endTime},'%Y-%m-%d %H:%i:%s') and a.end_time <![CDATA[>=]]> str_to_date( #{param.endTime[0]},'%Y-%m-%d %H:%i:%s')
and a.end_time <![CDATA[<=]]> str_to_date( #{param.endTime[1]},'%Y-%m-%d %H:%i:%s')
</if> </if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!="" and !ew.sqlSegment.startsWith(" ORDER BY")'> <if test='ew.sqlSegment!=null and ew.sqlSegment!="" and !ew.sqlSegment.startsWith(" ORDER BY")'>
and and
......
...@@ -24,4 +24,8 @@ public interface FlinkConstant { ...@@ -24,4 +24,8 @@ public interface FlinkConstant {
* flink运行节点 * flink运行节点
*/ */
String FLINK_JOB_MANAGER_HOST = "flinkJobManagerHOST"; String FLINK_JOB_MANAGER_HOST = "flinkJobManagerHOST";
/**
* 本地模式host
*/
String LOCAL_HOST = "localhost:8081";
} }
...@@ -7,6 +7,10 @@ package com.dlink.constant; ...@@ -7,6 +7,10 @@ package com.dlink.constant;
* @since 2021/5/25 15:51 * @since 2021/5/25 15:51
**/ **/
public interface FlinkSQLConstant { public interface FlinkSQLConstant {
/**
* 查询
*/
String SELECT = "SELECT";
/** /**
* 创建 * 创建
*/ */
...@@ -15,30 +19,50 @@ public interface FlinkSQLConstant { ...@@ -15,30 +19,50 @@ public interface FlinkSQLConstant {
* 删除 * 删除
*/ */
String DROP = "DROP"; String DROP = "DROP";
/**
* 修改
*/
String ALTER = "ALTER";
/** /**
* 插入 * 插入
*/ */
String INSERT = "INSERT"; String INSERT = "INSERT";
/** /**
* 修改 * DESCRIBE
*/ */
String ALTER = "ALTER"; String DESCRIBE = "DESCRIBE";
/** /**
* 查询 * EXPLAIN
*/ */
String SELECT = "SELECT"; String EXPLAIN = "EXPLAIN";
/**
* USE
*/
String USE = "USE";
/** /**
* show操作 * SHOW
*/ */
String SHOW = "SHOW"; String SHOW = "SHOW";
/** /**
* DESCRIBE * LOAD
*/ */
String DESCRIBE = "DESCRIBE"; String LOAD = "LOAD";
/**
* UNLOAD
*/
String UNLOAD = "UNLOAD";
/**
* SET
*/
String SET = "SET";
/**
* RESET
*/
String RESET = "RESET";
/** /**
* 未知操作类型 * 未知操作类型
*/ */
String UNKNOWN_TYPE = "UNKNOWN TYPE"; String UNKNOWN = "UNKNOWN";
/** /**
* 查询时null对应的值 * 查询时null对应的值
*/ */
......
...@@ -2,6 +2,7 @@ package com.dlink.job; ...@@ -2,6 +2,7 @@ package com.dlink.job;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting; import com.dlink.executor.ExecutorSetting;
import com.dlink.parser.SqlType;
import com.dlink.result.IResult; import com.dlink.result.IResult;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
...@@ -22,6 +23,7 @@ public class Job { ...@@ -22,6 +23,7 @@ public class Job {
private JobConfig jobConfig; private JobConfig jobConfig;
private String jobManagerAddress; private String jobManagerAddress;
private JobStatus status; private JobStatus status;
private SqlType type;
private String statement; private String statement;
private String jobId; private String jobId;
private String error; private String error;
......
...@@ -8,6 +8,7 @@ import com.dlink.executor.Executor; ...@@ -8,6 +8,7 @@ import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting; import com.dlink.executor.ExecutorSetting;
import com.dlink.executor.custom.CustomTableEnvironmentImpl; import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType;
import com.dlink.result.*; import com.dlink.result.*;
import com.dlink.session.ExecutorEntity; import com.dlink.session.ExecutorEntity;
import com.dlink.session.SessionPool; import com.dlink.session.SessionPool;
...@@ -180,7 +181,7 @@ public class JobManager extends RunTime { ...@@ -180,7 +181,7 @@ public class JobManager extends RunTime {
if (item.trim().isEmpty()) { if (item.trim().isEmpty()) {
continue; continue;
} }
String operationType = Operations.getOperationType(item); SqlType operationType = Operations.getOperationType(item);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
CustomTableEnvironmentImpl stEnvironment = executor.getCustomTableEnvironmentImpl(); CustomTableEnvironmentImpl stEnvironment = executor.getCustomTableEnvironmentImpl();
if (!FlinkInterceptor.build(stEnvironment, item)) { if (!FlinkInterceptor.build(stEnvironment, item)) {
...@@ -230,9 +231,9 @@ public class JobManager extends RunTime { ...@@ -230,9 +231,9 @@ public class JobManager extends RunTime {
Executor executor = createExecutor(); Executor executor = createExecutor();
for (String sqlText : sqlList) { for (String sqlText : sqlList) {
currentIndex++; currentIndex++;
String operationType = Operations.getOperationType(sqlText); SqlType operationType = Operations.getOperationType(sqlText);
CustomTableEnvironmentImpl stEnvironment = executor.getCustomTableEnvironmentImpl(); CustomTableEnvironmentImpl stEnvironment = executor.getCustomTableEnvironmentImpl();
if (operationType.equalsIgnoreCase(FlinkSQLConstant.INSERT)) { if (operationType==SqlType.INSERT) {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
if (!FlinkInterceptor.build(stEnvironment, sqlText)) { if (!FlinkInterceptor.build(stEnvironment, sqlText)) {
TableResult tableResult = executor.executeSql(sqlText); TableResult tableResult = executor.executeSql(sqlText);
...@@ -280,6 +281,7 @@ public class JobManager extends RunTime { ...@@ -280,6 +281,7 @@ public class JobManager extends RunTime {
Job job = new Job(config,jobManagerHost+NetConstant.COLON+jobManagerPort, Job job = new Job(config,jobManagerHost+NetConstant.COLON+jobManagerPort,
Job.JobStatus.INITIALIZE,statement,executorSetting, LocalDateTime.now(),executor); Job.JobStatus.INITIALIZE,statement,executorSetting, LocalDateTime.now(),executor);
JobContextHolder.setJob(job); JobContextHolder.setJob(job);
job.setType(Operations.getSqlTypeFromStatements(statement));
ready(); ready();
String[] statements = statement.split(";"); String[] statements = statement.split(";");
int currentIndex = 0; int currentIndex = 0;
...@@ -289,14 +291,14 @@ public class JobManager extends RunTime { ...@@ -289,14 +291,14 @@ public class JobManager extends RunTime {
continue; continue;
} }
currentIndex++; currentIndex++;
String operationType = Operations.getOperationType(item); SqlType operationType = Operations.getOperationType(item);
if (!FlinkInterceptor.build(executor.getCustomTableEnvironmentImpl(), item)) { if (!FlinkInterceptor.build(executor.getCustomTableEnvironmentImpl(), item)) {
TableResult tableResult = executor.executeSql(item); TableResult tableResult = executor.executeSql(item);
if (tableResult.getJobClient().isPresent()) { if (tableResult.getJobClient().isPresent()) {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString()); job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
} }
if(config.isUseResult()) { if(config.isUseResult()) {
IResult result = ResultBuilder.build(operationType, maxRowNum, "", false).getResult(tableResult); IResult result = ResultBuilder.build(operationType, maxRowNum, "", true).getResult(tableResult);
job.setResult(result); job.setResult(result);
} }
} }
...@@ -333,8 +335,8 @@ public class JobManager extends RunTime { ...@@ -333,8 +335,8 @@ public class JobManager extends RunTime {
if (item.trim().isEmpty()) { if (item.trim().isEmpty()) {
continue; continue;
} }
String operationType = Operations.getOperationType(item); SqlType operationType = Operations.getOperationType(item);
if(FlinkSQLConstant.INSERT.equals(operationType)||FlinkSQLConstant.SELECT.equals(operationType)){ if(SqlType.INSERT==operationType||SqlType.SELECT==operationType){
continue; continue;
} }
LocalDateTime startTime = LocalDateTime.now(); LocalDateTime startTime = LocalDateTime.now();
...@@ -352,4 +354,8 @@ public class JobManager extends RunTime { ...@@ -352,4 +354,8 @@ public class JobManager extends RunTime {
public static SelectResult getJobData(String jobId){ public static SelectResult getJobData(String jobId){
return ResultPool.get(jobId); return ResultPool.get(jobId);
} }
/*public static void cancel(String jobId){
SelectResult selectResult = ResultPool.get(jobId);
}*/
} }
package com.dlink.parser;
/**
* SqlType
*
* @author wenmo
* @since 2021/7/3 11:11
*/
public enum SqlType {
SELECT("SELECT"),
CREATE("CREATE"),
DROP("DROP"),
ALTER("ALTER"),
INSERT("INSERT"),
DESCRIBE("DESCRIBE"),
EXPLAIN("EXPLAIN"),
USE("USE"),
SHOW("SHOW"),
LOAD("LOAD"),
UNLOAD("UNLOAD"),
SET("SET"),
RESET("RESET"),
UNKNOWN("UNKNOWN"),
;
private String type;
SqlType(String type) {
this.type = type;
}
public void setType(String type) {
this.type = type;
}
public String getType() {
return type;
}
}
...@@ -11,8 +11,6 @@ import org.apache.flink.table.api.TableResult; ...@@ -11,8 +11,6 @@ import org.apache.flink.table.api.TableResult;
*/ */
public class InsertResultBuilder implements ResultBuilder { public class InsertResultBuilder implements ResultBuilder {
public static final String OPERATION_TYPE = FlinkSQLConstant.INSERT;
@Override @Override
public IResult getResult(TableResult tableResult) { public IResult getResult(TableResult tableResult) {
if(tableResult.getJobClient().isPresent()){ if(tableResult.getJobClient().isPresent()){
......
package com.dlink.result; package com.dlink.result;
import com.dlink.constant.FlinkSQLConstant; import com.dlink.parser.SqlType;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
/** /**
...@@ -11,14 +11,14 @@ import org.apache.flink.table.api.TableResult; ...@@ -11,14 +11,14 @@ import org.apache.flink.table.api.TableResult;
**/ **/
public interface ResultBuilder { public interface ResultBuilder {
static ResultBuilder build(String operationType, Integer maxRowNum, String nullColumn, boolean printRowKind){ static ResultBuilder build(SqlType operationType, Integer maxRowNum, String nullColumn, boolean printRowKind){
switch (operationType.toUpperCase()){ switch (operationType){
case SelectResultBuilder.OPERATION_TYPE: case SELECT:
return new SelectResultBuilder(maxRowNum,nullColumn,printRowKind); return new SelectResultBuilder(maxRowNum,nullColumn,printRowKind);
case FlinkSQLConstant.SHOW: case SHOW:
case FlinkSQLConstant.DESCRIBE: case DESCRIBE:
return new ShowResultBuilder(nullColumn,printRowKind); return new ShowResultBuilder(nullColumn,false);
case InsertResultBuilder.OPERATION_TYPE: case INSERT:
return new InsertResultBuilder(); return new InsertResultBuilder();
default: default:
return new DDLResultBuilder(); return new DDLResultBuilder();
......
...@@ -17,8 +17,6 @@ import java.util.stream.Stream; ...@@ -17,8 +17,6 @@ import java.util.stream.Stream;
**/ **/
public class SelectResultBuilder implements ResultBuilder { public class SelectResultBuilder implements ResultBuilder {
public static final String OPERATION_TYPE = FlinkSQLConstant.SELECT;
private Integer maxRowNum; private Integer maxRowNum;
private boolean printRowKind; private boolean printRowKind;
private String nullColumn; private String nullColumn;
......
...@@ -16,7 +16,6 @@ import java.util.stream.Stream; ...@@ -16,7 +16,6 @@ import java.util.stream.Stream;
* @since 2021/7/1 23:57 * @since 2021/7/1 23:57
*/ */
public class ShowResultBuilder implements ResultBuilder { public class ShowResultBuilder implements ResultBuilder {
public static final String OPERATION_TYPE = FlinkSQLConstant.SHOW;
private boolean printRowKind; private boolean printRowKind;
private String nullColumn; private String nullColumn;
......
package com.dlink.trans; package com.dlink.trans;
import com.dlink.constant.FlinkSQLConstant; import com.dlink.constant.FlinkSQLConstant;
import com.dlink.parser.SqlType;
import com.dlink.trans.ddl.CreateAggTableOperation; import com.dlink.trans.ddl.CreateAggTableOperation;
/** /**
...@@ -15,9 +16,32 @@ public class Operations { ...@@ -15,9 +16,32 @@ public class Operations {
new CreateAggTableOperation() new CreateAggTableOperation()
}; };
public static String getOperationType(String sql) { public static SqlType getSqlTypeFromStatements(String statement){
String[] statements = statement.split(";");
SqlType sqlType = SqlType.UNKNOWN;
for (String item : statements) {
if (item.trim().isEmpty()) {
continue;
}
sqlType = Operations.getOperationType(item);
if(FlinkSQLConstant.INSERT.equals(sqlType)||FlinkSQLConstant.SELECT.equals(sqlType)){
return sqlType;
}
}
return sqlType;
}
public static SqlType getOperationType(String sql) {
String sqlTrim = sql.replaceAll("[\\s\\t\\n\\r]", "").toUpperCase(); String sqlTrim = sql.replaceAll("[\\s\\t\\n\\r]", "").toUpperCase();
if (sqlTrim.startsWith(FlinkSQLConstant.CREATE)) { SqlType type = SqlType.UNKNOWN;
for (SqlType sqlType : SqlType.values()) {
if (sqlTrim.startsWith(sqlType.getType())) {
type = sqlType;
break;
}
}
return type;
/*if (sqlTrim.startsWith(FlinkSQLConstant.CREATE)) {
return FlinkSQLConstant.CREATE; return FlinkSQLConstant.CREATE;
} }
if (sqlTrim.startsWith(FlinkSQLConstant.ALTER)) { if (sqlTrim.startsWith(FlinkSQLConstant.ALTER)) {
...@@ -27,7 +51,7 @@ public class Operations { ...@@ -27,7 +51,7 @@ public class Operations {
return FlinkSQLConstant.INSERT; return FlinkSQLConstant.INSERT;
} }
if (sqlTrim.startsWith(FlinkSQLConstant.DROP)) { if (sqlTrim.startsWith(FlinkSQLConstant.DROP)) {
return FlinkSQLConstant.INSERT; return FlinkSQLConstant.DROP;
} }
if (sqlTrim.startsWith(FlinkSQLConstant.SELECT)) { if (sqlTrim.startsWith(FlinkSQLConstant.SELECT)) {
return FlinkSQLConstant.SELECT; return FlinkSQLConstant.SELECT;
...@@ -35,7 +59,7 @@ public class Operations { ...@@ -35,7 +59,7 @@ public class Operations {
if (sqlTrim.startsWith(FlinkSQLConstant.SHOW)) { if (sqlTrim.startsWith(FlinkSQLConstant.SHOW)) {
return FlinkSQLConstant.SHOW; return FlinkSQLConstant.SHOW;
} }
return FlinkSQLConstant.UNKNOWN_TYPE; return FlinkSQLConstant.UNKNOWN;*/
} }
public static Operation buildOperation(String statement){ public static Operation buildOperation(String statement){
......
...@@ -2,8 +2,12 @@ package com.dlink.core; ...@@ -2,8 +2,12 @@ package com.dlink.core;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting; import com.dlink.executor.ExecutorSetting;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager; import com.dlink.job.JobManager;
import com.dlink.job.JobResult;
import com.dlink.result.ResultPool;
import com.dlink.result.RunResult; import com.dlink.result.RunResult;
import com.dlink.result.SelectResult;
import com.dlink.result.SubmitResult; import com.dlink.result.SubmitResult;
import org.junit.Test; import org.junit.Test;
...@@ -89,4 +93,29 @@ public class JobManagerTest { ...@@ -89,4 +93,29 @@ public class JobManagerTest {
RunResult result = jobManager.execute(sql); RunResult result = jobManager.execute(sql);
System.out.println(result.isSuccess()); System.out.println(result.isSuccess());
} }
@Test
public void cancelJobSelect(){
JobConfig config = new JobConfig(true, true, "s1", true, 2,
null, "测试", false, 100, 0,
1, null);
if(config.isUseRemote()) {
config.setHost("192.168.123.157:8081");
}
JobManager jobManager = JobManager.build(config);
String sql1 ="CREATE TABLE Orders (\n" +
" order_number BIGINT,\n" +
" price DECIMAL(32,2),\n" +
" order_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '1'\n" +
");";
String sql3 = "select order_number,price,order_time from Orders";
String sql = sql1+sql3;
JobResult result = jobManager.executeSql(sql);
SelectResult selectResult = ResultPool.get(result.getJobId());
System.out.println(result.isSuccess());
}
} }
import {StateType} from "@/pages/FlinkSqlStudio/model"; import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi"; import {connect} from "umi";
import {Button, Tag, Space,Typography, Divider, Badge,Drawer,} from 'antd'; import {Button, Tag, Space, Typography, Divider, Badge, Drawer, Modal,} from 'antd';
import {MessageOutlined,ClusterOutlined,FireOutlined,ReloadOutlined} from "@ant-design/icons"; import {MessageOutlined,ClusterOutlined,FireOutlined,ReloadOutlined} from "@ant-design/icons";
import { LightFilter, ProFormDatePicker } from '@ant-design/pro-form';
import ProList from '@ant-design/pro-list'; import ProList from '@ant-design/pro-list';
import request from 'umi-request';
import {handleRemove, queryData} from "@/components/Common/crud"; import {handleRemove, queryData} from "@/components/Common/crud";
import ProDescriptions from '@ant-design/pro-descriptions'; import ProDescriptions from '@ant-design/pro-descriptions';
import {useRef, useState} from "react"; import React, {useRef, useState} from "react";
import {DocumentTableListItem} from "@/pages/Document/data"; import {
import ProForm, {
ModalForm, ModalForm,
ProFormText,
ProFormDateRangePicker,
ProFormSelect,
} from '@ant-design/pro-form'; } from '@ant-design/pro-form';
import styles from "./index.less"; import styles from "./index.less";
import {ActionType} from "@ant-design/pro-table"; import {ActionType} from "@ant-design/pro-table";
import {showJobData} from "@/components/Studio/StudioEvent/DQL"; import {showJobData} from "@/components/Studio/StudioEvent/DQL";
import StudioPreview from "../StudioPreview";
const { Title, Paragraph, Text, Link } = Typography; const { Title, Paragraph, Text, Link } = Typography;
...@@ -71,12 +66,23 @@ const StudioHistory = (props: any) => { ...@@ -71,12 +66,23 @@ const StudioHistory = (props: any) => {
setModalVisit(true); setModalVisit(true);
setType(type); setType(type);
setConfig(JSON.parse(row.config)); setConfig(JSON.parse(row.config));
if(type==3){
showJobData(row.jobId,dispatch)
}
}; };
const removeHistory=async (row:HistoryItem)=>{ const removeHistory=(row:HistoryItem)=>{
await handleRemove(url,[row]); Modal.confirm({
// refs.current?.reloadAndRest?.(); title: '删除执行记录',
refs.history?.current?.reload(); content: '确定删除该执行记录吗?',
okText: '确认',
cancelText: '取消',
onOk:async () => {
await handleRemove(url,[row]);
// refs.current?.reloadAndRest?.();
refs.history?.current?.reload();
}
});
}; };
return ( return (
...@@ -172,7 +178,7 @@ const StudioHistory = (props: any) => { ...@@ -172,7 +178,7 @@ const StudioHistory = (props: any) => {
<a key="statement" onClick={()=>{showDetail(row,2)}}> <a key="statement" onClick={()=>{showDetail(row,2)}}>
FlinkSql语句 FlinkSql语句
</a>, </a>,
<a key="result" onClick={()=>{showJobData(row.jobId,dispatch)}}> <a key="result" onClick={()=>{showDetail(row,3)}}>
预览数据 预览数据
</a>, </a>,
<a key="error" onClick={()=>{showDetail(row,4)}}> <a key="error" onClick={()=>{showDetail(row,4)}}>
...@@ -190,7 +196,7 @@ const StudioHistory = (props: any) => { ...@@ -190,7 +196,7 @@ const StudioHistory = (props: any) => {
}, },
clusterId: { clusterId: {
dataIndex: 'clusterId', dataIndex: 'clusterId',
title: '执行方式', title: '运行集群',
}, },
session: { session: {
dataIndex: 'session', dataIndex: 'session',
...@@ -201,24 +207,24 @@ const StudioHistory = (props: any) => { ...@@ -201,24 +207,24 @@ const StudioHistory = (props: any) => {
title: '状态', title: '状态',
valueType: 'select', valueType: 'select',
valueEnum: { valueEnum: {
ALL: {text: '全部', status: 'ALL'}, '': {text: '全部', status: 'ALL'},
INITIALIZE: { 0: {
text: '初始化', text: '初始化',
status: 'INITIALIZE', status: 'INITIALIZE',
}, },
RUNNING: { 1: {
text: '运行中', text: '运行中',
status: 'RUNNING', status: 'RUNNING',
}, },
SUCCESS: { 2: {
text: '成功', text: '成功',
status: 'SUCCESS', status: 'SUCCESS',
}, },
FAILED: { 3: {
text: '失败', text: '失败',
status: 'FAILED', status: 'FAILED',
}, },
CANCEL: { 4: {
text: '停止', text: '停止',
status: 'CANCEL', status: 'CANCEL',
}, },
...@@ -263,7 +269,9 @@ const StudioHistory = (props: any) => { ...@@ -263,7 +269,9 @@ const StudioHistory = (props: any) => {
title='执行配置' title='执行配置'
> >
<ProDescriptions.Item span={2} label="JobId" > <ProDescriptions.Item span={2} label="JobId" >
{row.jobId} <Tag color="blue" key={row.jobId}>
<FireOutlined /> {row.jobId}
</Tag>
</ProDescriptions.Item> </ProDescriptions.Item>
<ProDescriptions.Item label="共享会话" > <ProDescriptions.Item label="共享会话" >
{config.useSession?'启用':'禁用'} {config.useSession?'启用':'禁用'}
...@@ -312,20 +320,39 @@ const StudioHistory = (props: any) => { ...@@ -312,20 +320,39 @@ const StudioHistory = (props: any) => {
title='FlinkSql 语句' title='FlinkSql 语句'
> >
<ProDescriptions.Item label="JobId" > <ProDescriptions.Item label="JobId" >
{row.jobId} <Tag color="blue" key={row.jobId}>
<FireOutlined /> {row.jobId}
</Tag>
</ProDescriptions.Item> </ProDescriptions.Item>
<ProDescriptions.Item> <ProDescriptions.Item>
<pre className={styles.code}>{row.statement}</pre> <pre className={styles.code}>{row.statement}</pre>
</ProDescriptions.Item> </ProDescriptions.Item>
</ProDescriptions> </ProDescriptions>
)} )}
{type==3 && (
<ProDescriptions
column={2}
title='数据预览'
>
<ProDescriptions.Item span={2} label="JobId" >
<Tag color="blue" key={row.jobId}>
<FireOutlined /> {row.jobId}
</Tag>
</ProDescriptions.Item>
<ProDescriptions.Item span={2} >
<StudioPreview style={{width: '100%'}}/>
</ProDescriptions.Item>
</ProDescriptions>
)}
{type==4 && ( {type==4 && (
<ProDescriptions <ProDescriptions
column={1} column={1}
title='异常信息' title='异常信息'
> >
<ProDescriptions.Item label="JobId" > <ProDescriptions.Item label="JobId" >
{row.jobId} <Tag color="blue" key={row.jobId}>
<FireOutlined /> {row.jobId}
</Tag>
</ProDescriptions.Item> </ProDescriptions.Item>
<ProDescriptions.Item > <ProDescriptions.Item >
<pre className={styles.code}>{row.error}</pre> <pre className={styles.code}>{row.error}</pre>
......
import {Typography, Divider, Badge, Empty} from "antd"; import {Typography, Divider, Badge, Empty,Tag} from "antd";
import {StateType} from "@/pages/FlinkSqlStudio/model"; import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi"; import {connect} from "umi";
import { FireOutlined } from '@ant-design/icons';
const { Title, Paragraph, Text, Link } = Typography; const { Title, Paragraph, Text, Link } = Typography;
...@@ -10,28 +11,28 @@ const StudioMsg = (props:any) => { ...@@ -10,28 +11,28 @@ const StudioMsg = (props:any) => {
return ( return (
<Typography> <Typography>
{current.console.result.map((item,index)=> { {current.console.result.jobConfig?(<Paragraph>
if(index==0) { <blockquote><Link href={`http://${current.console.result.jobConfig.host}`} target="_blank">
return (<Paragraph> [{current.console.result.jobConfig.session}:{current.console.result.jobConfig.host}]
<blockquote><Link href={`http://${item.jobConfig.host}`} target="_blank"> </Link> <Divider type="vertical"/>{current.console.result.startTime}
[{item.jobConfig.sessionKey}:{item.jobConfig.host}] <Divider type="vertical"/>{current.console.result.endTime}
</Link> <Divider type="vertical"/>{item.startTime} <Divider type="vertical"/>
<Divider type="vertical"/>{item.endTime} {!(current.console.result.status=='SUCCESS') ? <><Badge status="error"/><Text type="danger">Error</Text></> :
<Divider type="vertical"/> <><Badge status="success"/><Text type="success">Success</Text></>}
{!(item.status=='SUCCESS') ? <><Badge status="error"/><Text type="danger">Error</Text></> : <Divider type="vertical"/>
<><Badge status="success"/><Text type="success">Success</Text></>} {current.console.result.jobConfig.jobName&&<Text code>{current.console.result.jobConfig.jobName}</Text>}
<Divider type="vertical"/> {current.console.result.jobId&&
{item.jobConfig.jobName&&<Text code>{item.jobConfig.jobName}</Text>} (<>
{item.jobId&&<Text code>{item.jobId}</Text>} <Divider type="vertical"/>
</blockquote> <Tag color="blue" key={current.console.result.jobId}>
{item.statement && (<pre style={{height: '100px'}}>{item.statement}</pre>)} <FireOutlined /> {current.console.result.jobId}
{item.error && (<pre style={{height: '100px'}}>{item.error}</pre>)} </Tag>
</Paragraph>) </>)}
}else{ </blockquote>
return ''; {current.console.result.statement && (<pre style={{height: '100px'}}>{current.console.result.statement}</pre>)}
} {current.console.result.error && (<pre style={{height: '100px'}}>{current.console.result.error}</pre>)}
})} </Paragraph>):<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
{current.console.result.length==0?<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />:''} }
</Typography> </Typography>
); );
}; };
......
import {Input, Button, Space, Empty} from "antd";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
import {useState} from "react";
import { SearchOutlined } from '@ant-design/icons';
import ProTable from '@ant-design/pro-table';
const StudioPreview = (props:any) => {
const {result} = props;
const [searchText,setSearchText] = useState<string>('');
const [searchedColumn,setSearchedColumn] = useState<string>('');
const getColumnSearchProps = (dIndex) => ({
filterDropdown: ({ setSelectedKeys, selectedKeys, confirm, clearFilters }) => (
<div style={{ padding: 8 }}>
<Input
placeholder={`Search ${dIndex}`}
value={selectedKeys[0]}
onChange={e => setSelectedKeys(e.target.value ? [e.target.value] : [])}
onPressEnter={() => handleSearch(selectedKeys, confirm, dIndex)}
style={{ marginBottom: 8, display: 'block' }}
/>
<Space>
<Button
type="primary"
onClick={() => handleSearch(selectedKeys, confirm, dIndex)}
icon={<SearchOutlined />}
size="small"
style={{ width: 90 }}
>
搜索
</Button>
<Button onClick={() => handleReset(clearFilters)} size="small" style={{ width: 90 }}>
重置
</Button>
<Button
type="link"
size="small"
onClick={() => {
setSearchText(selectedKeys[0]);
setSearchedColumn(dIndex);
}}
>
过滤
</Button>
</Space>
</div>
),
filterIcon: filtered => <SearchOutlined style={{ color: filtered ? '#1890ff' : undefined }} />,
onFilter: (value, record) =>
record[dIndex]
? record[dIndex].toString().toLowerCase().includes(value.toLowerCase())
: '',
});
const handleSearch = (selectedKeys, confirm, dIndex) => {
confirm();
setSearchText(selectedKeys[0]);
setSearchedColumn(dIndex);
};
const handleReset = (clearFilters) => {
clearFilters();
setSearchText('');
};
const getColumns=(columns:[])=>{
let datas:any=[];
columns.map((item)=> {
datas.push({
title: item,
dataIndex: item,
key: item,
sorter: true,
...getColumnSearchProps(item),
});
});
return datas;
};
return (
<div style={{width: '100%'}}>
{result&&result.jobId&&!result.isDestroyed&&result.rowData&&result.columns?
(<ProTable dataSource={result.rowData} columns={getColumns(result.columns)} search={false}
/>):(<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />)}
</div>
);
};
export default connect(({ Studio }: { Studio: StateType }) => ({
current: Studio.current,
result: Studio.result,
}))(StudioPreview);
...@@ -14,7 +14,6 @@ const { Title, Paragraph, Text, Link } = Typography; ...@@ -14,7 +14,6 @@ const { Title, Paragraph, Text, Link } = Typography;
const StudioTable = (props:any) => { const StudioTable = (props:any) => {
const {current,result,dispatch} = props; const {current,result,dispatch} = props;
const [dataIndex,setDataIndex] = useState<number>(0);
const [searchText,setSearchText] = useState<string>(''); const [searchText,setSearchText] = useState<string>('');
const [searchedColumn,setSearchedColumn] = useState<string>(''); const [searchedColumn,setSearchedColumn] = useState<string>('');
...@@ -97,12 +96,28 @@ const StudioTable = (props:any) => { ...@@ -97,12 +96,28 @@ const StudioTable = (props:any) => {
return datas; return datas;
}; };
const showDetail=()=>{
showJobData(current.console.result.jobId,dispatch)
};
return ( return (
<Typography> <div style={{width: '100%'}}>
{result&&result.jobId&&!result.isDestroyed? {current.console&&current.console.result.jobId?
(<ProTable dataSource={result.rowData} columns={getColumns(result.columns)} search={false} (<>
<Button type="primary" onClick={showDetail} icon={<SearchOutlined />}>
获取最新数据
</Button>
{result.rowData&&result.columns?
<ProTable dataSource={result.rowData} columns={getColumns(result.columns)} search={false}
options={{
search: false,
}}/>
:(<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />)
}
</>):
current.console&&current.console.result&&current.console.result.result&&current.console.result.result.rowData&&current.console.result.result.columns?
(<ProTable dataSource={current.console.result.result.rowData} columns={getColumns(current.console.result.result.columns)} search={false}
/>):(<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />)} />):(<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />)}
</Typography> </div>
); );
}; };
......
...@@ -30,17 +30,18 @@ export type CompletionItem = { ...@@ -30,17 +30,18 @@ export type CompletionItem = {
detail?: string, detail?: string,
} }
export type StudioParam = { export type StudioParam = {
useSession:boolean;
session: string,
useRemote?:boolean;
clusterId?: number,
useResult:boolean;
maxRowNum?: number,
statement: string, statement: string,
fragment?: boolean,
jobName?:string,
parallelism?: number,
checkPoint?: number, checkPoint?: number,
savePointPath?: string, savePointPath?: string,
parallelism?: number,
fragment?: boolean,
clusterId: number,
session: string,
maxRowNum?: number,
isResult:boolean;
isSession:boolean;
isRemote:boolean;
} }
export type CAParam = { export type CAParam = {
statement: string, statement: string,
......
import {executeDDL} from "@/pages/FlinkSqlStudio/service"; import {executeDDL} from "@/pages/FlinkSqlStudio/service";
import FlinkSQL from "./FlinkSQL"; import FlinkSQL from "./FlinkSQL";
import {TaskType} from "@/pages/FlinkSqlStudio/model"; import {TaskType} from "@/pages/FlinkSqlStudio/model";
import {Modal} from "antd";
import {handleRemove} from "@/components/Common/crud";
export function showTables(task:TaskType,dispatch:any) { export function showTables(task:TaskType,dispatch:any) {
const res = executeDDL({ const res = executeDDL({
statement:FlinkSQL.SHOW_TABLES, statement:FlinkSQL.SHOW_TABLES,
clusterId: task.clusterId, clusterId: task.clusterId,
session:task.session, session:task.session,
useRemote:task.useRemote,
useSession:task.useSession, useSession:task.useSession,
useResult:true, useResult:true,
}); });
...@@ -27,3 +28,40 @@ export function showTables(task:TaskType,dispatch:any) { ...@@ -27,3 +28,40 @@ export function showTables(task:TaskType,dispatch:any) {
}); });
}); });
} }
export function removeTable(tablename:string,task:TaskType,dispatch:any) {
Modal.confirm({
title: '确定删除表【'+tablename+'】吗?',
okText: '确认',
cancelText: '取消',
onOk:async () => {
const res = executeDDL({
statement:"drop table "+tablename,
clusterId: task.clusterId,
session:task.session,
useSession:task.useSession,
useResult:true,
});
res.then((result)=>{
showTables(task,dispatch);
});
}
});
}
export function clearSession(task:TaskType,dispatch:any) {
Modal.confirm({
title: '确认清空会话【'+task.session+'】?',
okText: '确认',
cancelText: '取消',
onOk:async () => {
let session = {
id:task.session,
};
const res = handleRemove('/api/studio/clearSession',[session]);
res.then((result)=>{
showTables(task,dispatch);
});
}
});
}
...@@ -8,7 +8,7 @@ import { SearchOutlined,DownOutlined,DeleteOutlined,CommentOutlined } from '@ant ...@@ -8,7 +8,7 @@ import { SearchOutlined,DownOutlined,DeleteOutlined,CommentOutlined } from '@ant
import React from "react"; import React from "react";
import {executeDDL} from "@/pages/FlinkSqlStudio/service"; import {executeDDL} from "@/pages/FlinkSqlStudio/service";
import {handleRemove} from "@/components/Common/crud"; import {handleRemove} from "@/components/Common/crud";
import {showTables} from "@/components/Studio/StudioEvent/DDL"; import {removeTable, showTables,clearSession} from "@/components/Studio/StudioEvent/DDL";
const StudioConnector = (props:any) => { const StudioConnector = (props:any) => {
...@@ -18,8 +18,8 @@ const StudioConnector = (props:any) => { ...@@ -18,8 +18,8 @@ const StudioConnector = (props:any) => {
const [loadings,setLoadings] = useState<boolean[]>([]); const [loadings,setLoadings] = useState<boolean[]>([]);
const [searchText,setSearchText] = useState<string>(''); const [searchText,setSearchText] = useState<string>('');
const [searchedColumn,setSearchedColumn] = useState<string>(''); const [searchedColumn,setSearchedColumn] = useState<string>('');
const [clusterName,setClusterName] = useState<string>(''); const [modalVisit, setModalVisit] = useState(false);
const [session,setSession] = useState<string>(''); const [row, setRow] = useState<{}>();
const getColumnSearchProps = (dIndex) => ({ const getColumnSearchProps = (dIndex) => ({
filterDropdown: ({ setSelectedKeys, selectedKeys, confirm, clearFilters }) => ( filterDropdown: ({ setSelectedKeys, selectedKeys, confirm, clearFilters }) => (
...@@ -105,31 +105,7 @@ const StudioConnector = (props:any) => { ...@@ -105,31 +105,7 @@ const StudioConnector = (props:any) => {
const keyEvent=(key, item)=>{ const keyEvent=(key, item)=>{
if(key=='delete'){ if(key=='delete'){
let newLoadings = [...loadings]; removeTable(item.tablename,current.task,dispatch);
newLoadings[1] = true;
setLoadings(newLoadings);
const res = executeDDL({
statement:"drop table "+item.tablename,
clusterId: current.task.clusterId,
session:current.task.session,
});
res.then((result)=>{
if(result.datas.success){
let newTableData = tableData;
for (let i=0; i<newTableData.length; i++) {
if (newTableData[i].tablename == item.tablename) {
newTableData.splice(i, 1);
// delete newTableData[i];
// setTableData(newTableData);
getTables();
break;
}
}
}
let newLoadings = [...loadings];
newLoadings[1] = false;
setLoadings(newLoadings);
});
}else{ }else{
message.warn("敬请期待"); message.warn("敬请期待");
} }
...@@ -139,8 +115,8 @@ const StudioConnector = (props:any) => { ...@@ -139,8 +115,8 @@ const StudioConnector = (props:any) => {
showTables(current.task,dispatch); showTables(current.task,dispatch);
}; };
const clearSession = () => { const onClearSession = () => {
let newLoadings = [...loadings]; /*let newLoadings = [...loadings];
newLoadings[2] = true; newLoadings[2] = true;
setLoadings(newLoadings); setLoadings(newLoadings);
let session = { let session = {
...@@ -152,7 +128,8 @@ const StudioConnector = (props:any) => { ...@@ -152,7 +128,8 @@ const StudioConnector = (props:any) => {
let newLoadings = [...loadings]; let newLoadings = [...loadings];
newLoadings[2] = false; newLoadings[2] = false;
setLoadings(newLoadings); setLoadings(newLoadings);
}); });*/
clearSession(current.task,dispatch);
}; };
const getColumns=()=>{ const getColumns=()=>{
...@@ -199,14 +176,13 @@ const StudioConnector = (props:any) => { ...@@ -199,14 +176,13 @@ const StudioConnector = (props:any) => {
<Button <Button
type="text" type="text"
icon={<DeleteOutlined />} icon={<DeleteOutlined />}
onClick={clearSession} onClick={onClearSession}
/> />
</Tooltip> </Tooltip>
</div> </div>
<Breadcrumb className={styles["session-path"]}> <Breadcrumb className={styles["session-path"]}>
<CommentOutlined /> <CommentOutlined />
<Divider type="vertical" /> <Divider type="vertical" />
<Breadcrumb.Item>{currentSessionCluster.clusterName}</Breadcrumb.Item>
<Breadcrumb.Item>{currentSessionCluster.session}</Breadcrumb.Item> <Breadcrumb.Item>{currentSessionCluster.session}</Breadcrumb.Item>
</Breadcrumb> </Breadcrumb>
{currentSessionCluster.connectors&&currentSessionCluster.connectors.length>0?(<Table dataSource={currentSessionCluster.connectors} columns={getColumns()} size="small" />):(<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />)} {currentSessionCluster.connectors&&currentSessionCluster.connectors.length>0?(<Table dataSource={currentSessionCluster.connectors} columns={getColumns()} size="small" />):(<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />)}
......
...@@ -13,7 +13,6 @@ import { postDataArray} from "@/components/Common/crud"; ...@@ -13,7 +13,6 @@ import { postDataArray} from "@/components/Common/crud";
import {executeSql} from "@/pages/FlinkSqlStudio/service"; import {executeSql} from "@/pages/FlinkSqlStudio/service";
import StudioHelp from "./StudioHelp"; import StudioHelp from "./StudioHelp";
import {showTables} from "@/components/Studio/StudioEvent/DDL"; import {showTables} from "@/components/Studio/StudioEvent/DDL";
import {timeout} from "d3-timer";
const menu = ( const menu = (
<Menu> <Menu>
...@@ -36,24 +35,24 @@ const StudioMenu = (props: any) => { ...@@ -36,24 +35,24 @@ const StudioMenu = (props: any) => {
selectsql=current.value; selectsql=current.value;
} }
let param ={ let param ={
useSession:current.task.useSession,
session:current.task.session, session:current.task.session,
statement:selectsql, useRemote:current.task.useRemote,
clusterId:current.task.clusterId, clusterId:current.task.clusterId,
checkPoint:current.task.checkPoint, useResult:current.task.useResult,
parallelism:current.task.parallelism,
maxRowNum:current.task.maxRowNum, maxRowNum:current.task.maxRowNum,
statement:selectsql,
fragment:current.task.fragment, fragment:current.task.fragment,
savePointPath:current.task.savePointPath,
jobName:current.task.jobName, jobName:current.task.jobName,
useResult:current.task.useResult, parallelism:current.task.parallelism,
useSession:current.task.useSession, checkPoint:current.task.checkPoint,
useRemote:current.task.useRemote, savePointPath:current.task.savePointPath,
}; };
const key = current.key; const key = current.key;
const taskKey = (Math.random()*1000)+''; const taskKey = (Math.random()*1000)+'';
notification.success({ notification.success({
message: `${param.clusterId+"_"+param.session} 新任务正在执行`, message: `新任务【${param.jobName}正在执行`,
description: param.statement, description: param.statement.substring(0,40)+'...',
duration:null, duration:null,
key:taskKey, key:taskKey,
icon: <SmileOutlined style={{ color: '#108ee9' }} />, icon: <SmileOutlined style={{ color: '#108ee9' }} />,
...@@ -72,11 +71,12 @@ const StudioMenu = (props: any) => { ...@@ -72,11 +71,12 @@ const StudioMenu = (props: any) => {
let newTabs = tabs; let newTabs = tabs;
for(let i=0;i<newTabs.panes.length;i++){ for(let i=0;i<newTabs.panes.length;i++){
if(newTabs.panes[i].key==key){ if(newTabs.panes[i].key==key){
let newResult = newTabs.panes[i].console.result; /*let newResult = newTabs.panes[i].console.result;
newResult.unshift(res.datas); newResult.unshift(res.datas);
newTabs.panes[i].console={ newTabs.panes[i].console={
result:newResult, result:newResult,
}; };*/
newTabs.panes[i].console.result=res.datas;
break; break;
} }
} }
......
...@@ -65,12 +65,6 @@ const StudioConfig = (props: any) => { ...@@ -65,12 +65,6 @@ const StudioConfig = (props: any) => {
className={styles.form_setting} className={styles.form_setting}
onValuesChange={onValuesChange} onValuesChange={onValuesChange}
> >
<Form.Item
label="作业名" className={styles.form_item} name="jobName"
tooltip='设置任务名称,默认为作业名'
>
<Input placeholder="自定义作业名" />
</Form.Item>
<Row> <Row>
<Col span={10}> <Col span={10}>
<Form.Item <Form.Item
......
...@@ -73,6 +73,12 @@ const StudioSetting = (props: any) => { ...@@ -73,6 +73,12 @@ const StudioSetting = (props: any) => {
className={styles.form_setting} className={styles.form_setting}
onValuesChange={onValuesChange} onValuesChange={onValuesChange}
> >
<Form.Item
label="作业名" className={styles.form_item} name="jobName"
tooltip='设置任务名称,默认为作业名'
>
<Input placeholder="自定义作业名" />
</Form.Item>
<Row> <Row>
<Col span={12}> <Col span={12}>
<Form.Item label="CheckPoint" tooltip="设置Flink任务的检查点步长,0 代表不启用" name="checkPoint" <Form.Item label="CheckPoint" tooltip="设置Flink任务的检查点步长,0 代表不启用" name="checkPoint"
......
...@@ -47,7 +47,7 @@ export type TaskType = { ...@@ -47,7 +47,7 @@ export type TaskType = {
}; };
export type ConsoleType = { export type ConsoleType = {
result: []; result: {};
} }
export type TabsItemType = { export type TabsItemType = {
...@@ -161,7 +161,7 @@ const Model: ModelType = { ...@@ -161,7 +161,7 @@ const Model: ModelType = {
useRemote:false, useRemote:false,
}, },
console: { console: {
result: [], result: {},
}, },
monaco: {}, monaco: {},
}, },
...@@ -192,7 +192,7 @@ const Model: ModelType = { ...@@ -192,7 +192,7 @@ const Model: ModelType = {
useRemote:false, useRemote:false,
}, },
console: { console: {
result: [], result: {},
}, },
monaco: {}, monaco: {},
}], }],
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment