Unverified Commit db71177f authored by hxp0618's avatar hxp0618 Committed by GitHub

[feature][admin,web] add task version history (#659)

Co-authored-by: 's avatarhuangxp <18670141050@163.com>
parent 6ae871f2
package com.dlink.controller;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result;
import com.dlink.dto.TaskRollbackVersionDTO;
import com.dlink.job.JobResult;
import com.dlink.model.Task;
import com.dlink.service.TaskService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
/**
* 任务 Controller
......@@ -138,6 +130,13 @@ public class TaskController {
return taskService.releaseTask(id);
}
@PostMapping("/rollbackTask")
public Result rollbackTask(@RequestBody TaskRollbackVersionDTO dto) throws Exception {
return taskService.rollbackTask(dto);
}
/**
* 维护任务
*/
......
package com.dlink.controller;
import cn.hutool.core.bean.BeanUtil;
import com.dlink.common.result.ProTableResult;
import com.dlink.dto.TaskVersionHistoryDTO;
import com.dlink.model.TaskVersion;
import com.dlink.service.TaskVersionService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.stream.Collectors;
/**
* 任务版本 Controller
*
* @author wenmo
* @since 2022-06-28
*/
@Slf4j
@RestController
@RequestMapping("/api/task/version")
public class TaskVersionController {
@Autowired
private TaskVersionService versionService;
/**
* 动态查询列表
*/
@PostMapping
public ProTableResult<TaskVersionHistoryDTO> listTasks(@RequestBody JsonNode para) {
ProTableResult<TaskVersionHistoryDTO> versionHistoryDTOProTableResult = new ProTableResult<>();
ProTableResult<TaskVersion> versionProTableResult = versionService.selectForProTable(para);
BeanUtil.copyProperties(versionProTableResult, versionHistoryDTOProTableResult);
List<TaskVersionHistoryDTO> collect = versionProTableResult.getData().stream().map(t -> {
TaskVersionHistoryDTO versionHistoryDTO = new TaskVersionHistoryDTO();
versionHistoryDTO.setVersionId(t.getVersionId());
versionHistoryDTO.setId(t.getId());
versionHistoryDTO.setCreateTime(t.getCreateTime());
return versionHistoryDTO;
}).collect(Collectors.toList());
versionHistoryDTOProTableResult.setData(collect);
return versionHistoryDTOProTableResult;
}
}
package com.dlink.dto;
import lombok.Data;
import java.io.Serializable;
/**
* @author huang
* @description: 任务回滚DTO
* @date 2022/6/23 10:25
*/
@Data
public class TaskRollbackVersionDTO implements Serializable {
private Integer id;
private Integer versionId;
}
package com.dlink.dto;
import lombok.Data;
import java.io.Serializable;
/**
* @author huang
* @description: 版本信息配置
* @date 2022/6/23 10:25
*/
@Data
public class TaskVersionConfigureDTO implements Serializable {
/**
* CheckPoint
*/
private Integer checkPoint;
/**
* SavePoint策略
*/
private Integer savePointStrategy;
/**
* SavePointPath
*/
private String savePointPath;
/**
* parallelism
*/
private Integer parallelism;
/**
* fragment
*/
private Boolean fragment;
/**
* 启用语句集
*/
private Boolean statementSet;
/**
* 使用批模式
*/
private Boolean batchModel;
/**
* Flink集群ID
*/
private Integer clusterId;
/**
* 集群配置ID
*/
private Integer clusterConfigurationId;
/**
* 数据源ID
*/
private Integer databaseId;
/**
* jarID
*/
private Integer jarId;
/**
* 环境ID
*/
private Integer envId;
/**
* 报警组ID
*/
private Integer alertGroupId;
/**
* 配置JSON
*/
private String configJson;
/**
* 注释
*/
private String note;
/**
* 作业生命周期
*/
private Integer step;
/**
* 作业实例ID
*/
private Integer jobInstanceId;
/**
* 是否启用
*/
private Boolean enabled;
}
package com.dlink.dto;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* @author huang
* @description: 任务版本记录
* @date 2022/6/27 18:17
*/
@Data
public class TaskVersionHistoryDTO implements Serializable {
private Integer id;
private Integer versionId;
private Date createTime;
}
package com.dlink.mapper;
import com.dlink.db.mapper.SuperMapper;
import com.dlink.model.TaskVersion;
import org.apache.ibatis.annotations.Mapper;
/**
* @author huang
*/
@Mapper
public interface TaskVersionMapper extends SuperMapper<TaskVersion> {
}
......@@ -8,7 +8,6 @@ import com.dlink.db.model.SuperEntity;
import com.dlink.job.JobConfig;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.EqualsAndHashCode;
......@@ -16,7 +15,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* 任务
......@@ -72,6 +70,8 @@ public class Task extends SuperEntity {
private Integer jobInstanceId;
private Integer versionId;
@TableField(exist = false)
private String statement;
......
package com.dlink.model;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler;
import com.dlink.dto.TaskVersionConfigureDTO;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
import java.util.Date;
/**
* 作业
*/
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
@TableName(value = "dlink_task_version", autoResultMap = true)
public class TaskVersion implements Serializable {
private static final long serialVersionUID = 1L;
/**
* ID
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* 作业ID
*/
@TableField(value = "task_id")
private Integer taskId;
/**
* 版本ID
*/
@TableField(value = "version_id")
private Integer versionId;
/**
* flink sql 内容
*/
@TableField(value = "`statement`")
private String statement;
/**
* 名称
*/
@TableField(value = "`name`")
private String name;
/**
* 别名
*/
@TableField(value = "`alias`")
private String alias;
/**
* 方言
*/
@TableField(value = "dialect")
private String dialect;
/**
* 类型
*/
@TableField(value = "`type`")
private String type;
@TableField(value = "task_configure",typeHandler = JacksonTypeHandler.class)
private TaskVersionConfigureDTO taskConfigure;
/**
* 创建时间
*/
@TableField(value = "create_time")
private Date createTime;
}
......@@ -5,10 +5,12 @@ import java.util.List;
import com.dlink.common.result.Result;
import com.dlink.db.service.ISuperService;
import com.dlink.dto.TaskRollbackVersionDTO;
import com.dlink.job.JobResult;
import com.dlink.model.JobInfoDetail;
import com.dlink.model.JobInstance;
import com.dlink.model.Task;
import com.dlink.model.TaskVersion;
import com.dlink.result.SqlExplainResult;
/**
......@@ -58,4 +60,7 @@ public interface TaskService extends ISuperService<Task> {
JobInfoDetail refreshJobInfoDetail(Integer id);
String getTaskAPIAddress();
Result rollbackTask(TaskRollbackVersionDTO dto);
}
package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.model.TaskVersion;
import java.util.List;
/**
* @author huang
*/
public interface TaskVersionService extends ISuperService<TaskVersion> {
/**
* @description 通过作业Id查询版本数据
* @param taskId
* @return java.util.List<com.dlink.model.TaskVersion>
* @author huang
* @date 2022/6/22 17:17
*/
List<TaskVersion> getTaskVersionByTaskId(Integer taskId);
}
package com.dlink.service.impl;
import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.alert.*;
import com.dlink.api.FlinkAPI;
......@@ -14,6 +16,8 @@ import com.dlink.daemon.task.DaemonFactory;
import com.dlink.daemon.task.DaemonTaskConfig;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.dto.SqlDTO;
import com.dlink.dto.TaskRollbackVersionDTO;
import com.dlink.dto.TaskVersionConfigureDTO;
import com.dlink.exception.BusException;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.config.SavePointStrategy;
......@@ -35,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
......@@ -43,6 +48,7 @@ import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.stream.Collectors;
/**
* 任务 服务实现类
......@@ -75,6 +81,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private AlertHistoryService alertHistoryService;
@Autowired
private HistoryService historyService;
@Resource
private TaskVersionService taskVersionService;
@Value("${spring.datasource.driver-class-name}")
private String driver;
......@@ -329,7 +337,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
}
task.setStep(JobLifeCycle.RELEASE.getValue());
if (updateById(task)) {
Task newTask = createTaskVersionSnapshot(task);
if (updateById(newTask)) {
return Result.succeed("发布成功");
} else {
return Result.failed("由于未知原因,发布失败");
......@@ -338,6 +347,76 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return Result.succeed("发布成功");
}
public Task createTaskVersionSnapshot(Task task) {
List<TaskVersion> taskVersions = taskVersionService.getTaskVersionByTaskId(task.getId());
List<Integer> versionIds = taskVersions.stream().map(TaskVersion::getVersionId).collect(Collectors.toList());
Map<Integer, TaskVersion> versionMap = taskVersions.stream().collect(Collectors.toMap(TaskVersion::getVersionId, t -> t));
TaskVersion taskVersion = new TaskVersion();
BeanUtil.copyProperties(task, taskVersion);
TaskVersionConfigureDTO taskVersionConfigureDTO = new TaskVersionConfigureDTO();
BeanUtil.copyProperties(task, taskVersionConfigureDTO);
taskVersion.setTaskConfigure(taskVersionConfigureDTO);
taskVersion.setTaskId(taskVersion.getId());
taskVersion.setId(null);
if (Asserts.isNull(task.getVersionId())) {
//首次发布,新增版本
taskVersion.setVersionId(1);
task.setVersionId(1);
taskVersionService.save(taskVersion);
} else {
//说明存在版本,需要判断是否 是回退后的老版本
//1、版本号存在
//2、md5值与上一个版本一致
TaskVersion version = versionMap.get(task.getVersionId());
version.setId(null);
if (versionIds.contains(task.getVersionId()) && !taskVersion.equals(version)
//|| !versionIds.contains(task.getVersionId()) && !taskVersion.equals(version)
) {
taskVersion.setVersionId(Collections.max(versionIds) + 1);
task.setVersionId(Collections.max(versionIds) + 1);
taskVersionService.save(taskVersion);
}
}
return task;
}
@Override
public Result rollbackTask(TaskRollbackVersionDTO dto) {
if (Asserts.isNull(dto.getVersionId()) || Asserts.isNull(dto.getId())) {
return Result.failed("版本指定失败");
}
Task taskInfo = getTaskInfoById(dto.getId());
if (JobLifeCycle.RELEASE.equalsValue(taskInfo.getStep()) ||
JobLifeCycle.ONLINE.equalsValue(taskInfo.getStep()) ||
JobLifeCycle.CANCEL.equalsValue(taskInfo.getStep())) {
//throw new BusException("该作业已" + JobLifeCycle.get(taskInfo.getStep()).getLabel() + ",禁止回滚!");
return Result.failed("该作业已" + JobLifeCycle.get(taskInfo.getStep()).getLabel() + ",禁止回滚!");
}
LambdaQueryWrapper<TaskVersion> queryWrapper = new LambdaQueryWrapper<TaskVersion>().
eq(TaskVersion::getTaskId, dto.getId()).
eq(TaskVersion::getVersionId, dto.getVersionId());
TaskVersion taskVersion = taskVersionService.getOne(queryWrapper);
Task updateTask = new Task();
BeanUtil.copyProperties(taskVersion, updateTask);
BeanUtil.copyProperties(taskVersion.getTaskConfigure(), updateTask);
updateTask.setId(taskVersion.getTaskId());
updateTask.setStep(JobLifeCycle.DEVELOP.getValue());
baseMapper.updateById(updateTask);
Statement statement = new Statement();
statement.setStatement(taskVersion.getStatement());
statement.setId(taskVersion.getTaskId());
statementService.updateById(statement);
return Result.succeed("回滚版本成功!");
}
@Override
public boolean developTask(Integer id) {
Task task = getTaskInfoById(id);
......
package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.TaskVersionMapper;
import com.dlink.model.TaskVersion;
import com.dlink.service.TaskVersionService;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author huang
*/
@Service
public class TaskVersionServiceImpl extends SuperServiceImpl<TaskVersionMapper, TaskVersion> implements TaskVersionService {
@Override
public List<TaskVersion> getTaskVersionByTaskId(Integer taskId) {
return baseMapper.selectList(new LambdaQueryWrapper<TaskVersion>().eq(TaskVersion::getTaskId, taskId));
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dlink.mapper.TaskVersionMapper">
<select id="selectForProTable" resultType="com.dlink.model.TaskVersion">
select
a.*
from
dlink_task_version a
<where>
1=1
<if test='param.versionId!=null and param.versionId!=""'>
and a.version_id = "${param.versionId}"
</if>
<if test='param.taskId!=null and param.taskId!=""'>
and a.task_id = "${param.taskId}"
</if>
<if test='param.createTime!=null and param.createTime!=""'>
and a.create_time <![CDATA[>=]]> str_to_date( #{param.createTime},'%Y-%m-%d %H:%i:%s')
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!="" and !ew.sqlSegment.startsWith(" ORDER BY")'>
and
</if>
<if test='ew.sqlSegment!=null and ew.sqlSegment!=""'>
${ew.sqlSegment}
</if>
</where>
</select>
</mapper>
......@@ -470,10 +470,30 @@ CREATE TABLE `dlink_task` (
`enabled` tinyint(1) NOT NULL DEFAULT 1 COMMENT '是否启用',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
`version_id` int(11) DEFAULT NULL COMMENT '版本Id',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `idx_name`(`name`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 33 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '作业' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Table structure for dlink_task_version
-- ----------------------------
DROP TABLE IF EXISTS `dlink_task_version`;
CREATE TABLE `dlink_task_version` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`task_id` int(11) NOT NULL COMMENT '作业ID ',
`version_id` int(11) NOT NULL COMMENT '版本ID ',
`statement` text COMMENT 'flink sql 内容',
`name` varchar(255) NOT NULL COMMENT '名称',
`alias` varchar(255) DEFAULT NULL COMMENT '别名',
`dialect` varchar(50) DEFAULT NULL COMMENT '方言',
`type` varchar(50) DEFAULT NULL COMMENT '类型',
`task_configure` text NOT NULL COMMENT '作业配置',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='作业历史版本';
-- ----------------------------
-- Table structure for dlink_task_statement
-- ----------------------------
......
......@@ -668,3 +668,25 @@ SET FOREIGN_KEY_CHECKS = 1;
alter table dlink_task alter column fragment set default 0;
alter table dlink_task alter column statement_set set default 0;
alter table dlink_cluster_configuration modify column is_available tinyint(1) NOT NULL DEFAULT 0 COMMENT '是否可用';
-- 0.6.5-SNAPSHOT 2022-06-28
-- ----------------------------
alter table dlink_task
ADD COLUMN `version_id` INT NULL COMMENT '版本号ID' ;
CREATE TABLE `dlink_task_version` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`task_id` int(11) NOT NULL COMMENT '作业ID ',
`version_id` int(11) NOT NULL COMMENT '版本ID ',
`statement` text COMMENT 'flink sql 内容',
`name` varchar(255) NOT NULL COMMENT '名称',
`alias` varchar(255) DEFAULT NULL COMMENT '别名',
`dialect` varchar(50) DEFAULT NULL COMMENT '方言',
`type` varchar(50) DEFAULT NULL COMMENT '类型',
`task_configure` text NOT NULL COMMENT '作业配置',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='作业历史版本';
export type TaskHistoryTableListItem = {
id: number,
versionId: number,
createTime: Date,
};
export type TaskHistoryRollbackItem = {
id: number,
versionId: number,
};
import {useRef, useState} from "react";
import {MinusSquareOutlined} from '@ant-design/icons';
import ProTable, {ActionType, ProColumns} from "@ant-design/pro-table";
import {Button, Col, Drawer, Modal, Row, Space, Tooltip} from 'antd';
import ProDescriptions from '@ant-design/pro-descriptions';
import {queryData, handleOption} from "@/components/Common/crud";
import {
TaskHistoryTableListItem
} from "@/components/Studio/StudioRightTool/StudioHistory/data";
import {StateType} from "@/pages/DataStudio/model";
import {connect} from "umi";
import {Scrollbars} from 'react-custom-scrollbars';
const url = '/api/task/version';
const StudioHistory = (props: any) => {
const {current, toolHeight, dispatch} = props;
const [row, setRow] = useState<TaskHistoryTableListItem>();
const actionRef = useRef<ActionType>();
if (current.key) {
actionRef.current?.reloadAndRest?.();
}
const columns: ProColumns<TaskHistoryTableListItem>[] = [
// {
// title: 'id',
// dataIndex: 'id',
// hideInForm: false,
// hideInSearch: false,
// },
{
title: '版本ID',
dataIndex: 'versionId',
sorter: true,
hideInForm: true,
hideInSearch: true,
render: (dom, entity) => {
return <a onClick={() => setRow(entity)}>{dom}</a>;
},
},
{
title: '创建时间',
dataIndex: 'createTime',
sorter: true,
valueType: 'dateTime',
hideInForm: true,
hideInSearch: true,
},
{
title: '操作',
valueType: 'option',
align: "center",
render: (text, record, index) => (
<Space size="middle">
<Button type="link" onClick={() => onRollBackVersion(record)}>回滚</Button>
</Space>
)
},
];
const onRollBackVersion = (row: TaskHistoryTableListItem) => {
Modal.confirm({
title: '回滚Flink SQL版本',
content: `确定回滚Flink SQL版本至【${row.versionId === "" ? row.versionId : row.versionId}】吗?`,
okText: '确认',
cancelText: '取消',
onOk: async () => {
const TaskHistoryRollbackItem = {
id: current.key, versionId: row.versionId
}
await handleOption('api/task/rollbackTask', "回滚Flink SQL版本", TaskHistoryRollbackItem);
actionRef.current?.reloadAndRest?.();
}
});
};
return (
<>
<Row>
<Col span={24}>
<div style={{float: "right"}}>
<Tooltip title="最小化">
<Button
type="text"
icon={<MinusSquareOutlined/>}
/>
</Tooltip>
</div>
</Col>
</Row>
<Scrollbars style={{height: (toolHeight - 32)}}>
<ProTable<TaskHistoryTableListItem>
actionRef={actionRef}
rowKey="id"
request={(params, sorter, filter) => queryData(url, {taskId: current.key, ...params, sorter, filter})}
columns={columns}
search={false}
/>
<Drawer
width={600}
visible={!!row}
onClose={() => {
setRow(undefined);
}}
closable={false}
>
{row?.versionId && (
<ProDescriptions<TaskHistoryTableListItem>
column={2}
title={row?.versionId}
request={async () => ({
data: row || {},
})}
params={{
id: row?.versionId,
}}
columns={columns}
/>
)}
</Drawer>
</Scrollbars>
</>
);
};
export default connect(({Studio}: { Studio: StateType }) => ({
current: Studio.current,
toolHeight: Studio.toolHeight,
}))(StudioHistory);
......@@ -5,6 +5,7 @@ import {connect} from "umi";
import StudioConfig from "./StudioConfig";
import StudioSetting from "./StudioSetting";
import StudioSavePoint from "./StudioSavePoint";
import StudioHistory from "./StudioHistory";
import StudioEnvSetting from "./StudioEnvSetting";
import StudioSqlConfig from "./StudioSqlConfig";
import StudioUDFInfo from "./StudioUDFInfo";
......@@ -85,6 +86,9 @@ const StudioRightTool = (props: any) => {
<TabPane tab={<span><ScheduleOutlined/> 保存点</span>} key="StudioSavePoint">
<StudioSavePoint/>
</TabPane>
<TabPane tab={<span><ScheduleOutlined /> 版本历史</span>} key="StudioHistory" >
<StudioHistory />
</TabPane>
</>)
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment