Commit e9526e8d authored by godkaikai's avatar godkaikai

savepoint策略实现

parent 4006fdfa
......@@ -35,6 +35,7 @@ public class StudioExecuteDTO {
private Integer maxRowNum;
private Integer checkPoint;
private Integer parallelism;
private Integer savePointStrategy;
private String savePointPath;
private String configJson;
private static final ObjectMapper mapper = new ObjectMapper();
......@@ -56,6 +57,6 @@ public class StudioExecuteDTO {
return new JobConfig(
type,useResult, useSession, session, useRemote, clusterId,
clusterConfigurationId, taskId, jobName, fragment,useStatementSet,
maxRowNum, checkPoint, parallelism, savePointPath,config);
maxRowNum, checkPoint, parallelism,savePointStrategy, savePointPath,config);
}
}
......@@ -76,7 +76,7 @@ public class Job2MysqlHandler implements JobHandler {
if(job.isUseGateway()){
Cluster cluster = clusterService.registersCluster(Cluster.autoRegistersCluster(job.getJobManagerAddress(),
job.getJobId(),job.getJobConfig().getJobName()+ LocalDateTime.now(), job.getType().getLongValue(),
job.getJobConfig().getClusterConfigurationId(),job.getJobConfig().getClusterConfigurationId()));
job.getJobConfig().getClusterConfigurationId(),job.getJobConfig().getTaskId()));
if(Asserts.isNotNull(cluster)){
history.setClusterId(cluster.getId());
}
......
......@@ -12,4 +12,8 @@ import org.apache.ibatis.annotations.Mapper;
**/
@Mapper
public interface SavepointsMapper extends SuperMapper<Savepoints> {
Savepoints getLatestSavepointByTaskId(Integer id);
Savepoints getEarliestSavepointByTaskId(Integer id);
}
......@@ -38,10 +38,10 @@ public class ClusterConfiguration extends SuperEntity {
private String note;
@TableField(exist = false)
private Map<String,String> config = new HashMap<>();
private Map<String,Object> config = new HashMap<>();
public Map<String,String> parseConfig(){
public Map<String,Object> parseConfig(){
ObjectMapper objectMapper = new ObjectMapper();
try {
if(Asserts.isNotNullString(configJson)) {
......
......@@ -30,6 +30,8 @@ public class Task extends SuperEntity{
private Integer checkPoint;
private Integer savePointStrategy;
private String savePointPath;
private Integer parallelism;
......@@ -68,7 +70,7 @@ public class Task extends SuperEntity{
if(clusterId==null||clusterId==0){
useRemote = false;
}
return new JobConfig(type,false,false,useRemote,clusterId,clusterConfigurationId,getId(),alias,fragment,statementSet,checkPoint,parallelism,savePointPath);
return new JobConfig(type,false,false,useRemote,clusterId,clusterConfigurationId,getId(),alias,fragment,statementSet,checkPoint,parallelism,savePointStrategy,savePointPath);
}
}
......@@ -18,6 +18,6 @@ public interface ClusterConfigurationService extends ISuperService<ClusterConfig
List<ClusterConfiguration> listEnabledAll();
Map<String,String> getGatewayConfig(Integer id);
Map<String,Object> getGatewayConfig(Integer id);
}
......@@ -15,4 +15,9 @@ import java.util.Map;
**/
public interface SavepointsService extends ISuperService<Savepoints> {
List<Savepoints> listSavepointsByTaskId(Integer taskId);
Savepoints getLatestSavepointByTaskId(Integer taskId);
Savepoints getEarliestSavepointByTaskId(Integer taskId);
}
......@@ -22,4 +22,14 @@ public class SavepointsServiceImpl extends SuperServiceImpl<SavepointsMapper, Sa
public List<Savepoints> listSavepointsByTaskId(Integer taskId) {
return list(new QueryWrapper<Savepoints>().eq("task_id",taskId));
}
@Override
public Savepoints getLatestSavepointByTaskId(Integer taskId) {
return baseMapper.getLatestSavepointByTaskId(taskId);
}
@Override
public Savepoints getEarliestSavepointByTaskId(Integer taskId) {
return baseMapper.getEarliestSavepointByTaskId(taskId);
}
}
......@@ -157,7 +157,7 @@ public class StudioServiceImpl implements StudioService {
JobConfig jobConfig = new JobConfig();
jobConfig.setAddress(cluster.getJobManagerHost());
if(Asserts.isNotNull(cluster.getClusterConfigurationId())){
Map<String, String> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
}
jobConfig.setUseRestAPI(SystemConfiguration.getInstances().isUseRestAPI());
......@@ -173,9 +173,10 @@ public class StudioServiceImpl implements StudioService {
jobConfig.setAddress(cluster.getJobManagerHost());
jobConfig.setType(cluster.getType());
if(Asserts.isNotNull(cluster.getClusterConfigurationId())){
Map<String, String> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName());
jobConfig.setTaskId(cluster.getTaskId());
}
jobConfig.setUseRestAPI(SystemConfiguration.getInstances().isUseRestAPI());
JobManager jobManager = JobManager.build(jobConfig);
......
package com.dlink.service.impl;
import com.dlink.assertion.Assert;
import com.dlink.assertion.Asserts;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.job.JobResult;
import com.dlink.mapper.TaskMapper;
import com.dlink.model.Cluster;
import com.dlink.model.Savepoints;
import com.dlink.model.Statement;
import com.dlink.model.SystemConfiguration;
import com.dlink.model.Task;
......@@ -58,7 +60,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
if (!JobManager.useGateway(config.getType())) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
} else {
Map<String, String> gatewayConfig = clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
if ("yarn-application".equals(config.getType()) || "ya".equals(config.getType())) {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
gatewayConfig.put("userJarPath", systemConfiguration.getSqlSubmitJarPath());
......@@ -67,6 +69,24 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
config.buildGatewayConfig(gatewayConfig);
}
switch (config.getSavePointStrategy()) {
case LATEST:
Savepoints latestSavepoints = savepointsService.getLatestSavepointByTaskId(id);
if (Asserts.isNotNull(latestSavepoints)) {
config.setSavePointPath(latestSavepoints.getPath());
}
break;
case EARLIEST:
Savepoints earliestSavepoints = savepointsService.getEarliestSavepointByTaskId(id);
if (Asserts.isNotNull(earliestSavepoints)) {
config.setSavePointPath(earliestSavepoints.getPath());
}
break;
case CUSTOM:
break;
default:
config.setSavePointPath(null);
}
JobManager jobManager = JobManager.build(config);
return jobManager.executeSql(statement.getStatement());
}
......
......@@ -23,4 +23,18 @@
</if>
</where>
</select>
<select id="getLatestSavepointByTaskId" resultType="com.dlink.model.Savepoints">
SELECT * FROM dlink_savepoints
WHERE task_id = #{id}
ORDER BY create_time DESC
LIMIT 1
</select>
<select id="getEarliestSavepointByTaskId" resultType="com.dlink.model.Savepoints">
SELECT * FROM dlink_savepoints
WHERE task_id = #{id}
ORDER BY create_time ASC
LIMIT 1
</select>
</mapper>
......@@ -9,6 +9,7 @@
<result column="alias" property="alias" />
<result column="type" property="type" />
<result column="check_point" property="checkPoint" />
<result column="save_point_strategy" property="savePointStrategy" />
<result column="save_point_path" property="savePointPath" />
<result column="parallelism" property="parallelism" />
<result column="fragment" property="fragment" />
......@@ -24,7 +25,7 @@
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, name, alias, type,check_point,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,config,note, enabled, create_time, update_time
id, name, alias, type,check_point,save_point_strategy,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,config,note, enabled, create_time, update_time
</sql>
......
......@@ -3,7 +3,9 @@ package com.dlink.job;
import com.dlink.executor.ExecutorSetting;
import com.dlink.gateway.config.AppConfig;
import com.dlink.gateway.config.ClusterConfig;
import com.dlink.gateway.config.FlinkConfig;
import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.config.SavePointStrategy;
import com.dlink.session.SessionConfig;
import lombok.Getter;
import lombok.Setter;
......@@ -35,6 +37,7 @@ public class JobConfig {
private Integer maxRowNum;
private Integer checkpoint;
private Integer parallelism;
private SavePointStrategy savePointStrategy;
private String savePointPath;
private GatewayConfig gatewayConfig;
private boolean useRestAPI;
......@@ -47,7 +50,7 @@ public class JobConfig {
public JobConfig(String type, boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId,
Integer clusterConfigurationId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet, Integer maxRowNum, Integer checkpoint,
Integer parallelism, String savePointPath, Map<String,String> config) {
Integer parallelism, Integer savePointStrategyValue, String savePointPath, Map<String,String> config) {
this.type = type;
this.useResult = useResult;
this.useSession = useSession;
......@@ -62,6 +65,7 @@ public class JobConfig {
this.maxRowNum = maxRowNum;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.savePointStrategy = SavePointStrategy.get(savePointStrategyValue);
this.savePointPath = savePointPath;
this.config = config;
}
......@@ -77,7 +81,7 @@ public class JobConfig {
public JobConfig(String type,boolean useResult, boolean useSession, boolean useRemote, Integer clusterId,
Integer clusterConfigurationId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet,Integer checkpoint, Integer parallelism, String savePointPath) {
boolean useStatementSet,Integer checkpoint, Integer parallelism, Integer savePointStrategyValue, String savePointPath) {
this.type = type;
this.useResult = useResult;
this.useSession = useSession;
......@@ -90,6 +94,7 @@ public class JobConfig {
this.useStatementSet = useStatementSet;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.savePointStrategy = SavePointStrategy.get(savePointStrategyValue);
this.savePointPath = savePointPath;
}
......@@ -105,17 +110,20 @@ public class JobConfig {
}
}
public void buildGatewayConfig(Map<String,String> config){
public void buildGatewayConfig(Map<String,Object> config){
gatewayConfig = new GatewayConfig();
gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath"),
config.get("flinkLibPath"),
config.get("hadoopConfigPath")));
gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath").toString(),
config.get("flinkLibPath").toString(),
config.get("hadoopConfigPath").toString()));
if(config.containsKey("userJarPath")){
gatewayConfig.setAppConfig(AppConfig.build(
config.get("userJarPath"),
config.get("userJarParas"),
config.get("userJarMainAppClass")
config.get("userJarPath").toString(),
config.get("userJarParas").toString(),
config.get("userJarMainAppClass").toString()
));
}
if(config.containsKey("flinkConfig")){
gatewayConfig.setFlinkConfig(FlinkConfig.build((Map<String, String>)config.get("flinkConfig")));
}
}
}
......@@ -96,8 +96,8 @@ public class JobManager extends RunTime {
Asserts.checkNull(config.getGatewayConfig(), "GatewayConfig 不能为空");
config.getGatewayConfig().setType(GatewayType.get(config.getType()));
config.getGatewayConfig().setTaskId(config.getTaskId());
config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(config.getJobName(),
null, null, null, config.getSavePointPath(), null));
config.getGatewayConfig().getFlinkConfig().setJobName(config.getJobName());
config.getGatewayConfig().getFlinkConfig().setSavePoint(config.getSavePointPath());
config.setUseRemote(false);
}
}
......@@ -371,7 +371,7 @@ public class JobManager extends RunTime {
public boolean cancel(String jobId) {
if (useGateway && !config.isUseRestAPI()) {
config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.CANCEL.getValue(),
null, config.getSavePointPath()));
null, null));
Gateway.build(config.getGatewayConfig()).savepointJob();
return true;
} else {
......@@ -382,7 +382,7 @@ public class JobManager extends RunTime {
public SavePointResult savepoint(String jobId,String savePointType) {
if (useGateway && !config.isUseRestAPI()) {
config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.SAVEPOINT.getValue(),
savePointType, config.getSavePointPath()));
savePointType, null));
return Gateway.build(config.getGatewayConfig()).savepointJob();
} else {
return null;
......
......@@ -65,7 +65,7 @@ public class JobManagerTest {
JobConfig config = new JobConfig("session-yarn",true, true, "s1", true, 2,
null, null, "测试", false,false, 100, 0,
1, null,new HashMap<>());
1, 0,null,new HashMap<>());
if(config.isUseRemote()) {
config.setAddress("192.168.123.157:8081");
}
......
......@@ -437,4 +437,7 @@ CREATE TABLE `dlink_savepoints` (
PRIMARY KEY (`id`)
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
ALTER TABLE `dlink_task`
ADD COLUMN `save_point_strategy` int(1) NULL COMMENT 'SavePoint策略' AFTER `check_point`;
SET FOREIGN_KEY_CHECKS = 1;
......@@ -9,6 +9,7 @@ import lombok.Setter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* FlinkConfig
......@@ -32,6 +33,10 @@ public class FlinkConfig {
public FlinkConfig() {
}
public FlinkConfig(List<ConfigPara> configParas) {
this.configParas = configParas;
}
public FlinkConfig(String jobName, String jobId, ActionType action, SavePointType savePointType, String savePoint, List<ConfigPara> configParas) {
this.jobName = jobName;
this.jobId = jobId;
......@@ -41,6 +46,14 @@ public class FlinkConfig {
this.configParas = configParas;
}
public static FlinkConfig build(Map<String, String> paras){
List<ConfigPara> configParasList = new ArrayList<>();
for (Map.Entry<String, String> entry : paras.entrySet()) {
configParasList.add(new ConfigPara(entry.getKey(),entry.getValue()));
}
return new FlinkConfig(configParasList);
}
public static FlinkConfig build(String jobName, String jobId, String actionStr, String savePointTypeStr, String savePoint, String configParasStr){
List<ConfigPara> configParasList = new ArrayList<>();
JsonNode paras = null;
......
package com.dlink.gateway.config;
/**
* SavePointStrategy
*
* @author wenmo
* @since 2021/11/23 10:28
**/
public enum SavePointStrategy {
NONE(0), LATEST(1), EARLIEST(2), CUSTOM(3);
private Integer value;
SavePointStrategy(Integer value) {
this.value = value;
}
public Integer getValue() {
return value;
}
public static SavePointStrategy get(Integer value) {
for (SavePointStrategy type : SavePointStrategy.values()) {
if (type.getValue() == value) {
return type;
}
}
return SavePointStrategy.NONE;
}
}
......@@ -175,13 +175,17 @@ public abstract class YarnGateway extends AbstractGateway {
}
private void runSavePointJob(List<JobInfo> jobInfos,ClusterClient<ApplicationId> clusterClient) throws Exception{
String savePoint = FlinkConfig.DEFAULT_SAVEPOINT_PREFIX;
if(Asserts.isNotNull(config.getTaskId())){
savePoint = savePoint + config.getTaskId();
}
String savePoint = null;
/*String savePoint = FlinkConfig.DEFAULT_SAVEPOINT_PREFIX;
if(Asserts.isNotNullString(config.getFlinkConfig().getSavePoint())){
savePoint = config.getFlinkConfig().getSavePoint();
}
if(Asserts.isNotNull(config.getTaskId())){
if(savePoint.lastIndexOf("/")!=savePoint.length()){
savePoint = savePoint + "/";
}
savePoint = savePoint + config.getTaskId();
}*/
for( JobInfo jobInfo: jobInfos){
if(ActionType.CANCEL== config.getFlinkConfig().getAction()){
clusterClient.cancel(JobID.fromHexString(jobInfo.getJobId()));
......@@ -193,9 +197,6 @@ public abstract class YarnGateway extends AbstractGateway {
CompletableFuture<String> triggerFuture = clusterClient.triggerSavepoint(JobID.fromHexString(jobInfo.getJobId()), savePoint);
jobInfo.setSavePoint(triggerFuture.get());
break;
case DISPOSE:
clusterClient.disposeSavepoint(savePoint);
break;
case STOP:
CompletableFuture<String> stopFuture = clusterClient.stopWithSavepoint(JobID.fromHexString(jobInfo.getJobId()), true, savePoint);
jobInfo.setStatus(JobInfo.JobStatus.STOP);
......
......@@ -56,6 +56,7 @@ const StudioMenu = (props: any) => {
jobName: current.task.jobName,
parallelism: current.task.parallelism,
checkPoint: current.task.checkPoint,
savePointStrategy: current.task.savePointStrategy,
savePointPath: current.task.savePointPath,
};
const key = current.key;
......
......@@ -61,6 +61,8 @@ const StudioSavePoint: React.FC<{}> = (props: any) => {
dataIndex: 'createTime',
sorter: true,
valueType: 'dateTime',
hideInForm: true,
hideInSearch: true,
render: (dom, entity) => {
return <a onClick={() => setRow(entity)}>{dom}</a>;
},
......@@ -68,12 +70,13 @@ const StudioSavePoint: React.FC<{}> = (props: any) => {
];
return (
<PageContainer>
<>
<ProTable<SavePointTableListItem>
actionRef={actionRef}
rowKey="id"
request={(params, sorter, filter) => queryData(url, {taskId:current.taskId,...params, sorter, filter})}
columns={columns}
search={false}
/>
<Drawer
width={600}
......@@ -97,7 +100,7 @@ const StudioSavePoint: React.FC<{}> = (props: any) => {
/>
)}
</Drawer>
</PageContainer>
</>
);
};
......
......@@ -175,11 +175,24 @@ const StudioSetting = (props: any) => {
</Col>
</Row>
<Form.Item
label="SavePoint策略" className={styles.form_item} name="savePointStrategy"
tooltip='指定 SavePoint策略,默认为禁用'
>
<Select defaultValue="0">
<Option value="0">禁用</Option>
<Option value="1">最近一次</Option>
<Option value="2">最早一次</Option>
<Option value="3">自定义</Option>
</Select>
</Form.Item>
{current.task.savePointStrategy == 3 ?
(<Form.Item
label="SavePointPath" className={styles.form_item} name="savePointPath"
tooltip='从SavePointPath恢复Flink任务'
>
<Input placeholder="hdfs://..." />
</Form.Item>
<Input placeholder="hdfs://..."/>
</Form.Item>):''
}
<Form.Item
label="其他配置" className={styles.form_item}
tooltip={{ title: '其他配置项,将被应用于执行环境,如 pipeline.name', icon: <InfoCircleOutlined /> }}
......
......@@ -57,6 +57,7 @@ export type TaskType = {
alias?: string,
type?: string,
checkPoint?: number,
savePointStrategy?: number,
savePointPath?: string,
parallelism?: number,
fragment?: boolean,
......@@ -183,6 +184,7 @@ const Model: ModelType = {
jobName: '草稿',
// type: 'standalone',
checkPoint: 0,
savePointStrategy: 0,
savePointPath: '',
parallelism: 1,
fragment: true,
......@@ -220,6 +222,7 @@ const Model: ModelType = {
jobName: '草稿',
// type: 'standalone',
checkPoint: 0,
savePointStrategy: 0,
savePointPath: '',
parallelism: 1,
fragment: true,
......
......@@ -388,7 +388,25 @@ export default (): React.ReactNode => {
<Link>新增 yarn-application 的sql作业提交方式</Link>
</li>
<li>
<Link>新增 yarn-application 和 yarn-application 集群的自动注册</Link>
<Link>新增 yarn-perjob 和 yarn-application 集群的作业停止</Link>
</li>
<li>
<Link>新增 yarn-perjob 和 yarn-application 集群的自动注册</Link>
</li>
<li>
<Link>新增 savepoint 各种机制触发</Link>
</li>
<li>
<Link>新增 savepoint 的归档管理</Link>
</li>
<li>
<Link>新增任务启动 savepoint 多种启动策略</Link>
</li>
<li>
<Link>新增 yarn-perjob 和 yarn-application 的从 savepoint 启动</Link>
</li>
<li>
<Link>新增 yarn-perjob 和 yarn-application 的启动时多样化集群配置生效</Link>
</li>
</ul>
</Paragraph>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment