Commit f437c34c authored by wenmo's avatar wenmo

任务历史持久化部分代码

parent a16ee5ae
package com.dlink.controller;
import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result;
import com.dlink.model.History;
import com.dlink.service.HistoryService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
/**
* HistoryController
*
* @author wenmo
* @since 2021/6/26 23:09
*/
@Slf4j
@RestController
@RequestMapping("/api/history")
public class HistoryController {
@Autowired
private HistoryService historyService;
/**
* 动态查询列表
*/
@PostMapping
public ProTableResult<History> listHistorys(@RequestBody JsonNode para) {
return historyService.selectForProTable(para);
}
/**
* 批量删除
*/
@DeleteMapping
public Result deleteMul(@RequestBody JsonNode para) {
if (para.size()>0){
List<Integer> error = new ArrayList<>();
for (final JsonNode item : para){
Integer id = item.asInt();
if(!historyService.removeById(id)){
error.add(id);
}
}
if(error.size()==0) {
return Result.succeed("删除成功");
}else {
return Result.succeed("删除部分成功,但"+error.toString()+"删除失败,共"+error.size()+"次失败。");
}
}else{
return Result.failed("请选择要删除的记录");
}
}
/**
* 获取指定ID的信息
*/
@PostMapping("/getOneById")
public Result getOneById(@RequestBody History history) throws Exception {
history = historyService.getById(history.getId());
return Result.succeed(history,"获取成功");
}
}
package com.dlink.job;
/**
* Job2MysqlHandler
*
* @author wenmo
* @since 2021/6/27 0:04
*/
public class Job2MysqlHandler implements JobHandler {
@Override
public void init() {
}
@Override
public void start() {
}
@Override
public void running() {
}
@Override
public void success() {
}
@Override
public void failed() {
}
@Override
public void callback() {
}
@Override
public void close() {
}
}
package com.dlink.mapper;
import com.dlink.db.mapper.SuperMapper;
import com.dlink.model.History;
import org.apache.ibatis.annotations.Mapper;
/**
* HistoryMapper
*
* @author wenmo
* @since 2021/6/26 23:00
*/
@Mapper
public interface HistoryMapper extends SuperMapper<History> {
}
package com.dlink.model;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;
import java.time.LocalDate;
/**
* History
*
* @author wenmo
* @since 2021/6/26 22:48
*/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("dlink_history")
public class History implements Serializable {
private static final long serialVersionUID = 4058280957630503072L;
private Integer id;
private Integer clusterId;
private String session;
private String jobId;
private String jobName;
private String jobManagerAddress;
private Integer status;
private String statement;
private Integer type;
private String error;
private String result;
private String config;
private LocalDate startTime;
private LocalDate endTime;
private String msg;
private Integer taskId;
private String statusText;
}
package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.model.History;
/**
* HistoryService
*
* @author wenmo
* @since 2021/6/26 23:07
*/
public interface HistoryService extends ISuperService<History> {
}
package com.dlink.service.impl;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.HistoryMapper;
import com.dlink.model.History;
import com.dlink.service.HistoryService;
import org.springframework.stereotype.Service;
/**
* HistoryServiceImpl
*
* @author wenmo
* @since 2021/6/26 23:08
*/
@Service
public class HistoryServiceImpl extends SuperServiceImpl<HistoryMapper, History> implements HistoryService {
}
com.dlink.job.Job2MysqlHandler
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dlink.mapper.HistoryMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.dlink.model.History">
<id column="id" property="id"/>
<result column="cluster_id" property="clusterId"/>
<result column="session" property="session"/>
<result column="jod_id" property="jobId"/>
<result column="job_name" property="jobName"/>
<result column="job_manager_address" property="jobManagerAddress"/>
<result column="status" property="status"/>
<result column="statement" property="statement"/>
<result column="type" property="type"/>
<result column="error" property="error"/>
<result column="result" property="result"/>
<result column="config" property="config"/>
<result column="start_time" property="startTime"/>
<result column="end_time" property="endTime"/>
<result column="msg" property="msg"/>
<result column="task_id" property="taskId"/>
</resultMap>
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id,cluster_id,session,jod_id,job_name,
job_manager_address,status,statement,type, error,
result,result,config,start_time,end_time,
msg,task_id
</sql>
<select id="selectForProTable" resultType="com.dlink.model.History">
select
a.*
from
dlink_cluster a
<where>
1=1
<if test='param.name!=null and param.name!=""'>
and a.job_name like "%${param.jobName}%"
</if>
<if test='param.alias!=null and param.alias!=""'>
and a.statement like "%${param.statement}%"
</if>
<if test='param.createTime!=null and param.createTime!=""'>
and a.start_time <![CDATA[>=]]> str_to_date( #{param.startTime},'%Y-%m-%d %H:%i:%s')
</if>
<if test='param.updateTime!=null and param.updateTime!=""'>
and a.end_time <![CDATA[>=]]> str_to_date( #{param.endTime},'%Y-%m-%d %H:%i:%s')
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!="" and !ew.sqlSegment.startsWith(" ORDER BY")'>
and
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!=""'>
${ew.sqlSegment}
</if>
</where>
</select>
</mapper>
package com.dlink.exception;
import org.apache.flink.annotation.PublicEvolving;
/**
* JobException
*
* @author wenmo
* @since 2021/6/27
**/
@PublicEvolving
public class JobException extends RuntimeException {
public JobException(String message, Throwable cause) {
super(message, cause);
}
public JobException(String message) {
super(message);
}
}
\ No newline at end of file
package com.dlink.executor; package com.dlink.executor;
import lombok.Getter;
import lombok.Setter;
/** /**
* ExecutorSetting * ExecutorSetting
* *
* @author wenmo * @author wenmo
* @since 2021/5/25 13:43 * @since 2021/5/25 13:43
**/ **/
@Setter
@Getter
public class ExecutorSetting { public class ExecutorSetting {
private String host;
private String type; private String type;
private Integer checkpoint; private Integer checkpoint;
private Integer parallelism; private Integer parallelism;
...@@ -51,55 +57,17 @@ public class ExecutorSetting { ...@@ -51,55 +57,17 @@ public class ExecutorSetting {
this.savePointPath = savePointPath; this.savePointPath = savePointPath;
} }
public boolean isRemote(){ public ExecutorSetting(String host, String type, Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath, String jobName) {
return type.equals(Executor.REMOTE); this.host = host;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type; this.type = type;
}
public Integer getCheckpoint() {
return checkpoint;
}
public void setCheckpoint(Integer checkpoint) {
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
}
public boolean isUseSqlFragment() {
return useSqlFragment;
}
public void setUseSqlFragment(boolean useSqlFragment) {
this.useSqlFragment = useSqlFragment;
}
public Integer getParallelism() {
return parallelism;
}
public void setParallelism(Integer parallelism) {
this.parallelism = parallelism; this.parallelism = parallelism;
} this.useSqlFragment = useSqlFragment;
public String getSavePointPath() {
return savePointPath;
}
public void setSavePointPath(String savePointPath) {
this.savePointPath = savePointPath; this.savePointPath = savePointPath;
this.jobName = jobName;
} }
public String getJobName() { public boolean isRemote(){
return jobName; return type.equals(Executor.REMOTE);
}
public void setJobName(String jobName) {
this.jobName = jobName;
} }
} }
package com.dlink.job;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDate;
/**
* Job
*
* @author wenmo
* @since 2021/6/26 23:39
*/
@Getter
@Setter
public class Job {
private Integer id;
private Integer clusterId;
private String session;
private String jobId;
private String jobName;
private String jobManagerAddress;
private Integer status;
private String statement;
private Integer type;
private String error;
private String result;
private ExecutorSetting config;
private LocalDate startTime;
private LocalDate endTime;
private String msg;
private Integer taskId;
private Executor executor;
}
package com.dlink.job;
import com.dlink.executor.ExecutorSetting;
import lombok.Getter;
import lombok.Setter;
/**
* JobConfig
*
* @author wenmo
* @since 2021/6/27 18:45
*/
@Getter
@Setter
public class JobConfig {
private String host;
private String session;
private String type;
private Integer checkpoint;
private Integer parallelism;
private boolean useSqlFragment;
private String savePointPath;
private String jobName;
public ExecutorSetting getExecutorSetting(){
return new ExecutorSetting(host,type,checkpoint,parallelism,useSqlFragment,savePointPath,jobName);
}
}
package com.dlink.job;
/**
* JobContextHolder
*
* @author wenmo
* @since 2021/6/26 23:29
*/
public class JobContextHolder {
private static final ThreadLocal<Job> CONTEXT = new ThreadLocal<>();
public static void setJob(Job job) {
CONTEXT.set(job);
}
public static Job getJob() {
return CONTEXT.get();
}
public static void clear() {
CONTEXT.remove();
}
}
package com.dlink.job;
import com.dlink.exception.JobException;
import java.util.ServiceLoader;
/**
* jobHandler
*
* @author wenmo
* @since 2021/6/26 23:22
*/
public interface JobHandler {
void init();
void start();
void running();
void success();
void failed();
void callback();
void close();
static JobHandler build(){
ServiceLoader<JobHandler> jobHandlers = ServiceLoader.load(JobHandler.class);
for(JobHandler jobHandler : jobHandlers){
return jobHandler;
}
throw new JobException("There is no corresponding implementation class for this interface!");
}
}
...@@ -2,6 +2,7 @@ package com.dlink.job; ...@@ -2,6 +2,7 @@ package com.dlink.job;
import com.dlink.constant.FlinkConstant; import com.dlink.constant.FlinkConstant;
import com.dlink.constant.FlinkSQLConstant; import com.dlink.constant.FlinkSQLConstant;
import com.dlink.constant.NetConstant;
import com.dlink.executor.EnvironmentSetting; import com.dlink.executor.EnvironmentSetting;
import com.dlink.executor.Executor; import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting; import com.dlink.executor.ExecutorSetting;
...@@ -13,12 +14,10 @@ import com.dlink.session.SessionPool; ...@@ -13,12 +14,10 @@ import com.dlink.session.SessionPool;
import com.dlink.trans.Operations; import com.dlink.trans.Operations;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.junit.Assert;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Arrays; import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** /**
* JobManager * JobManager
...@@ -26,38 +25,42 @@ import java.util.Map; ...@@ -26,38 +25,42 @@ import java.util.Map;
* @author wenmo * @author wenmo
* @since 2021/5/25 15:27 * @since 2021/5/25 15:27
**/ **/
public class JobManager { public class JobManager extends RunTime {
private JobHandler handler = JobHandler.build();
private String flinkHost; private String flinkHost;
private String jobManagerHost;
private Integer jobManagerPort;
private Integer port; private Integer port;
private boolean isRemote;
private boolean isSession;
private String sessionId; private String sessionId;
private Integer maxRowNum = 100; private Integer maxRowNum = 100;
private ExecutorSetting executorSetting; private ExecutorSetting executorSetting;
private JobConfig config;
private Executor executor;
public JobManager() { public JobManager() {
} }
public JobManager(ExecutorSetting executorSetting) { public JobManager(String host, ExecutorSetting executorSetting) {
this.executorSetting=executorSetting;
}
public JobManager(String host,ExecutorSetting executorSetting) {
if (host != null) { if (host != null) {
String[] strs = host.split(":"); String[] strs = host.split(NetConstant.COLON);
if (strs.length >= 2) { if (strs.length >= 2) {
this.flinkHost = strs[0]; this.flinkHost = strs[0];
this.port = Integer.parseInt(strs[1]); this.port = Integer.parseInt(strs[1]);
} else { } else {
this.flinkHost = strs[0]; this.flinkHost = strs[0];
this.port = 8081; this.port = FlinkConstant.PORT;
} }
this.executorSetting=executorSetting; this.executorSetting = executorSetting;
this.executor = createExecutor();
} }
} }
public JobManager(String host, String sessionId, Integer maxRowNum,ExecutorSetting executorSetting) { public JobManager(String host, String sessionId, Integer maxRowNum, ExecutorSetting executorSetting) {
if (host != null) { if (host != null) {
String[] strs = host.split(":"); String[] strs = host.split(NetConstant.COLON);
if (strs.length >= 2) { if (strs.length >= 2) {
this.flinkHost = strs[0]; this.flinkHost = strs[0];
this.port = Integer.parseInt(strs[1]); this.port = Integer.parseInt(strs[1]);
...@@ -68,53 +71,101 @@ public class JobManager { ...@@ -68,53 +71,101 @@ public class JobManager {
} }
this.sessionId = sessionId; this.sessionId = sessionId;
this.maxRowNum = maxRowNum; this.maxRowNum = maxRowNum;
this.executorSetting=executorSetting; this.executorSetting = executorSetting;
this.executor = createExecutorWithSession();
} }
public JobManager(String flinkHost, Integer port) { public JobManager(JobConfig config) {
this.flinkHost = flinkHost; this.config = config;
this.port = port;
} }
public JobManager(String flinkHost, Integer port, String sessionId, Integer maxRowNum) { public static JobManager build() {
this.flinkHost = flinkHost; return new JobManager();
this.sessionId = sessionId; }
this.maxRowNum = maxRowNum;
this.port = port; public static JobManager build(JobConfig config) {
return new JobManager(config);
}
private Executor createExecutor() {
if (!isSession) {
if (isRemote) {
executor = Executor.build(new EnvironmentSetting(jobManagerHost, jobManagerPort), config.getExecutorSetting());
return executor;
} else {
executor = Executor.build(null, executorSetting);
return executor;
}
} else {
createExecutorWithSession();
return executor;
}
} }
private boolean checkSession(){ private boolean checkSession() {
if(sessionId!=null&&!"".equals(sessionId)){ if (config != null) {
String[] strs = sessionId.split("_"); String session = config.getSession();
if(strs.length>1&&!"".equals(strs[1])){ if (session != null && !"".equals(session)) {
return true; String[] keys = session.split("_");
if (keys.length > 1 && !"".equals(keys[1])) {
isSession = true;
return true;
}
} }
} }
isSession = false;
return false; return false;
} }
private Executor createExecutor(){ private Executor createExecutorWithSession() {
if (executorSetting.isRemote()) { ExecutorEntity executorEntity = SessionPool.get(config.getSession());
return Executor.build(new EnvironmentSetting(flinkHost, port), executorSetting); if (executorEntity != null) {
executor = executorEntity.getExecutor();
} else { } else {
return Executor.build(null, executorSetting); createExecutor();
SessionPool.push(new ExecutorEntity(sessionId, executor));
} }
return executor;
} }
private Executor createExecutorWithSession(){ @Override
Executor executor; public void init() {
if(checkSession()){ String host = config.getHost();
ExecutorEntity executorEntity = SessionPool.get(sessionId); if (host != null && !("").equals(host)) {
if (executorEntity != null) { String[] strs = host.split(NetConstant.COLON);
executor = executorEntity.getExecutor(); if (strs.length >= 2) {
jobManagerHost = strs[0];
jobManagerPort = Integer.parseInt(strs[1]);
} else { } else {
executor = createExecutor(); jobManagerHost = strs[0];
SessionPool.push(new ExecutorEntity(sessionId, executor)); jobManagerPort = FlinkConstant.PORT;
} }
}else { isRemote = true;
executor = createExecutor(); } else {
isRemote = false;
} }
return executor; checkSession();
createExecutor();
}
@Override
public boolean ready() {
return false;
}
@Override
public boolean success() {
return false;
}
@Override
public boolean error() {
return false;
}
@Override
public void close() {
} }
public RunResult execute(String statement) { public RunResult execute(String statement) {
...@@ -223,4 +274,46 @@ public class JobManager { ...@@ -223,4 +274,46 @@ public class JobManager {
result.setMsg(LocalDateTime.now().toString() + ":任务提交成功!"); result.setMsg(LocalDateTime.now().toString() + ":任务提交成功!");
return result; return result;
} }
public void executeSql(String statement) {
RunResult runResult = new RunResult(sessionId, statement, flinkHost, port, executorSetting, executorSetting.getJobName());
String[] Statements = statement.split(";");
int currentIndex = 0;
try {
for (String item : Statements) {
if (item.trim().isEmpty()) {
continue;
}
currentIndex++;
String operationType = Operations.getOperationType(item);
long start = System.currentTimeMillis();
CustomTableEnvironmentImpl stEnvironment = executor.getCustomTableEnvironmentImpl();
if (!FlinkInterceptor.build(stEnvironment, item)) {
TableResult tableResult = executor.executeSql(item);
if (tableResult.getJobClient().isPresent()) {
runResult.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
}
IResult result = ResultBuilder.build(operationType, maxRowNum, "", false).getResult(tableResult);
runResult.setResult(result);
}
long finish = System.currentTimeMillis();
long timeElapsed = finish - start;
runResult.setTime(timeElapsed);
runResult.setFinishDate(LocalDateTime.now());
runResult.setSuccess(true);
}
} catch (Exception e) {
e.printStackTrace();
StackTraceElement[] trace = e.getStackTrace();
StringBuffer resMsg = new StringBuffer("");
for (StackTraceElement s : trace) {
resMsg.append(" \n " + s + " ");
}
runResult.setFinishDate(LocalDateTime.now());
runResult.setSuccess(false);
// runResult.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage());
runResult.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + " \n >>>堆栈信息<<<" + resMsg.toString());
// runResult.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "\n >>>异常原因<<< \n" + e.getCause().toString());
}
}
} }
package com.dlink.job;
/**
* RunTime
*
* @author wenmo
* @since 2021/6/27 18:06
*/
public abstract class RunTime {
abstract void init();
abstract boolean ready();
abstract boolean success();
abstract boolean error();
abstract void close();
}
/*
Navicat Premium Data Transfer
Source Server : hetl
Source Server Type : MySQL
Source Server Version : 80015
Source Host : localhost:3306
Source Schema : dlink
Target Server Type : MySQL
Target Server Version : 80015
File Encoding : 65001
Date: 27/06/2021 23:28:39
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for dlink_flink_document
-- ----------------------------
DROP TABLE IF EXISTS `dlink_flink_document`;
CREATE TABLE `dlink_flink_document` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`category` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '文档类型',
`type` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '类型',
`subtype` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '子类型',
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '信息',
`description` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '描述',
`version` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '版本号',
`like_num` int(255) NULL DEFAULT 0 COMMENT '喜爱值',
`enabled` tinyint(1) NOT NULL DEFAULT 0 COMMENT '是否启用',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 262 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '文档管理' ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
...@@ -46,7 +46,7 @@ ...@@ -46,7 +46,7 @@
"not ie <= 10" "not ie <= 10"
], ],
"dependencies": { "dependencies": {
"@ant-design/charts": "^1.1.17", "@ant-design/charts": "^1.1.18",
"@ant-design/icons": "^4.5.0", "@ant-design/icons": "^4.5.0",
"@ant-design/pro-descriptions": "^1.6.8", "@ant-design/pro-descriptions": "^1.6.8",
"@ant-design/pro-form": "^1.18.3", "@ant-design/pro-form": "^1.18.3",
...@@ -67,7 +67,6 @@ ...@@ -67,7 +67,6 @@
"nzh": "^1.0.3", "nzh": "^1.0.3",
"omit.js": "^2.0.2", "omit.js": "^2.0.2",
"react": "^17.0.0", "react": "^17.0.0",
"react-custom-scrollbars": "^4.2.1",
"react-dev-inspector": "^1.1.1", "react-dev-inspector": "^1.1.1",
"react-dom": "^17.0.0", "react-dom": "^17.0.0",
"react-helmet-async": "^1.0.4", "react-helmet-async": "^1.0.4",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment