Commit d9dff4a9 authored by wenmo's avatar wenmo

作业执行持久化Mysql初始化

parent 0c9c3442
package com.dlink.job; package com.dlink.job;
import cn.hutool.extra.spring.SpringUtil;
import com.dlink.model.History;
import com.dlink.service.HistoryService;
/** /**
* Job2MysqlHandler * Job2MysqlHandler
* *
...@@ -7,38 +11,51 @@ package com.dlink.job; ...@@ -7,38 +11,51 @@ package com.dlink.job;
* @since 2021/6/27 0:04 * @since 2021/6/27 0:04
*/ */
public class Job2MysqlHandler implements JobHandler { public class Job2MysqlHandler implements JobHandler {
@Override
public void init() {
@Override
public boolean init() {
Job job = JobContextHolder.getJob();
History history = new History();
history.setClusterId(job.getJobConfig().getClusterId());
history.setJobManagerAddress(job.getJobManagerAddress());
history.setJobName(job.getJobConfig().getJobName());
history.setSession(job.getJobConfig().getSession());
history.setStatus(job.getStatus().ordinal());
history.setStartTime(job.getStartTime());
history.setType(job.getType().ordinal());
history.setTaskId(job.getJobConfig().getTaskId());
HistoryService historyService = SpringUtil.getBean(HistoryService.class);
historyService.save(history);
return true;
} }
@Override @Override
public void start() { public boolean ready() {
return true;
} }
@Override @Override
public void running() { public boolean running() {
return true;
} }
@Override @Override
public void success() { public boolean success() {
return true;
} }
@Override @Override
public void failed() { public boolean failed() {
return true;
} }
@Override @Override
public void callback() { public boolean callback() {
return true;
} }
@Override @Override
public void close() { public boolean close() {
return true;
} }
} }
...@@ -43,4 +43,17 @@ public class Job { ...@@ -43,4 +43,17 @@ public class Job {
FAILED, FAILED,
CANCEL CANCEL
} }
public Job(JobConfig jobConfig, String jobManagerAddress, boolean isRemote, boolean isSession, JobStatus status, String statement, JobType type, ExecutorSetting executorSetting, LocalDate startTime, Executor executor) {
this.jobConfig = jobConfig;
this.jobManagerAddress = jobManagerAddress;
this.isRemote = isRemote;
this.isSession = isSession;
this.status = status;
this.statement = statement;
this.type = type;
this.executorSetting = executorSetting;
this.startTime = startTime;
this.executor = executor;
}
} }
...@@ -17,6 +17,7 @@ public class JobConfig { ...@@ -17,6 +17,7 @@ public class JobConfig {
private String session; private String session;
private String type; private String type;
private Integer taskId; private Integer taskId;
private Integer clusterId;
private Integer checkpoint; private Integer checkpoint;
private Integer parallelism; private Integer parallelism;
private boolean useSqlFragment; private boolean useSqlFragment;
......
...@@ -10,14 +10,14 @@ import java.util.ServiceLoader; ...@@ -10,14 +10,14 @@ import java.util.ServiceLoader;
* @author wenmo * @author wenmo
* @since 2021/6/26 23:22 * @since 2021/6/26 23:22
*/ */
public interface JobHandler { public interface JobHandler {
void init(); boolean init();
void start(); boolean ready();
void running(); boolean running();
void success(); boolean success();
void failed(); boolean failed();
void callback(); boolean callback();
void close(); boolean close();
static JobHandler build(){ static JobHandler build(){
ServiceLoader<JobHandler> jobHandlers = ServiceLoader.load(JobHandler.class); ServiceLoader<JobHandler> jobHandlers = ServiceLoader.load(JobHandler.class);
for(JobHandler jobHandler : jobHandlers){ for(JobHandler jobHandler : jobHandlers){
......
...@@ -16,6 +16,7 @@ import org.apache.flink.api.common.JobID; ...@@ -16,6 +16,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.junit.Assert; import org.junit.Assert;
import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.*; import java.util.*;
...@@ -133,7 +134,7 @@ public class JobManager extends RunTime { ...@@ -133,7 +134,7 @@ public class JobManager extends RunTime {
} }
@Override @Override
public void init() { public boolean init() {
String host = config.getHost(); String host = config.getHost();
if (host != null && !("").equals(host)) { if (host != null && !("").equals(host)) {
String[] strs = host.split(NetConstant.COLON); String[] strs = host.split(NetConstant.COLON);
...@@ -150,11 +151,12 @@ public class JobManager extends RunTime { ...@@ -150,11 +151,12 @@ public class JobManager extends RunTime {
} }
checkSession(); checkSession();
createExecutor(); createExecutor();
return false;
} }
@Override @Override
public boolean ready() { public boolean ready() {
return false; return handler.init();
} }
@Override @Override
...@@ -168,8 +170,8 @@ public class JobManager extends RunTime { ...@@ -168,8 +170,8 @@ public class JobManager extends RunTime {
} }
@Override @Override
public void close() { public boolean close() {
return false;
} }
public RunResult execute(String statement) { public RunResult execute(String statement) {
...@@ -281,7 +283,10 @@ public class JobManager extends RunTime { ...@@ -281,7 +283,10 @@ public class JobManager extends RunTime {
public void executeSql(String statement) { public void executeSql(String statement) {
RunResult runResult = new RunResult(sessionId, statement, flinkHost, port, executorSetting, executorSetting.getJobName()); RunResult runResult = new RunResult(sessionId, statement, flinkHost, port, executorSetting, executorSetting.getJobName());
Job job = new Job(); Job job = new Job(config,jobManagerHost+NetConstant.COLON+jobManagerPort,isRemote, isSession,
Job.JobStatus.INITIALIZE,statement,Job.JobType.EXECUTE,executorSetting, LocalDate.now(),executor);
JobContextHolder.setJob(job);
ready();
String[] Statements = statement.split(";"); String[] Statements = statement.split(";");
int currentIndex = 0; int currentIndex = 0;
try { try {
......
...@@ -8,7 +8,7 @@ package com.dlink.job; ...@@ -8,7 +8,7 @@ package com.dlink.job;
*/ */
public abstract class RunTime { public abstract class RunTime {
abstract void init(); abstract boolean init();
abstract boolean ready(); abstract boolean ready();
...@@ -16,5 +16,5 @@ public abstract class RunTime { ...@@ -16,5 +16,5 @@ public abstract class RunTime {
abstract boolean error(); abstract boolean error();
abstract void close(); abstract boolean close();
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment