Commit 0c9c3442 authored by wenmo's avatar wenmo

作业提交持久化

parent f437c34c
......@@ -50,7 +50,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
JobManager jobManager = new JobManager(host,task.getRemoteExecutorSetting());
return jobManager.submit(statement.getStatement());
}else if(task.getClusterId()==0){
JobManager jobManager = new JobManager(task.getLocalExecutorSetting());
JobManager jobManager = new JobManager();
return jobManager.submit(statement.getStatement());
}else{
throw new BusException("该任务的集群不存在");
......
......@@ -16,21 +16,31 @@ import java.time.LocalDate;
@Getter
@Setter
public class Job {
private Integer id;
private Integer clusterId;
private String session;
private String jobId;
private String jobName;
private JobConfig jobConfig;
private String jobManagerAddress;
private Integer status;
private boolean isRemote;
private boolean isSession;
private JobStatus status;
private String statement;
private Integer type;
private JobType type;
private String error;
private String result;
private ExecutorSetting config;
private ExecutorSetting executorSetting;
private LocalDate startTime;
private LocalDate endTime;
private String msg;
private Integer taskId;
private Executor executor;
enum JobType{
EXECUTE,
SUBMIT
}
enum JobStatus{
INITIALIZE,
RUNNING,
SUCCESS,
FAILED,
CANCEL
}
}
......@@ -16,6 +16,7 @@ public class JobConfig {
private String host;
private String session;
private String type;
private Integer taskId;
private Integer checkpoint;
private Integer parallelism;
private boolean useSqlFragment;
......
......@@ -80,11 +80,15 @@ public class JobManager extends RunTime {
}
public static JobManager build() {
return new JobManager();
JobManager manager = new JobManager();
manager.init();
return manager;
}
public static JobManager build(JobConfig config) {
return new JobManager(config);
JobManager manager = new JobManager(config);
manager.init();
return manager;
}
private Executor createExecutor() {
......@@ -277,6 +281,7 @@ public class JobManager extends RunTime {
public void executeSql(String statement) {
RunResult runResult = new RunResult(sessionId, statement, flinkHost, port, executorSetting, executorSetting.getJobName());
Job job = new Job();
String[] Statements = statement.split(";");
int currentIndex = 0;
try {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment