Commit fee2a695 authored by wenmo's avatar wenmo

作业提交优化

parent d7ff5934
......@@ -91,9 +91,11 @@ DataLink 开源项目及社区正在建设,希望本项目可以帮助你更
## 部署
### 最新版本
### 版本
dlink-0.2.3
抢先体验( main 主支):dlink-0.3.0-SANPSHOT
稳定版本( 0.2.3 分支):dlink-0.2.3
### 从安装包开始
......
......@@ -2,6 +2,7 @@ package com.dlink.controller;
import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result;
import com.dlink.job.JobResult;
import com.dlink.model.Task;
import com.dlink.result.SubmitResult;
import com.dlink.service.TaskService;
......@@ -76,11 +77,11 @@ public class TaskController {
@PostMapping(value = "/submit")
public Result submit(@RequestBody JsonNode para) throws Exception {
if (para.size()>0){
List<SubmitResult> results = new ArrayList<>();
List<JobResult> results = new ArrayList<>();
List<Integer> error = new ArrayList<>();
for (final JsonNode item : para){
Integer id = item.asInt();
SubmitResult result = taskService.submitByTaskId(id);
JobResult result = taskService.submitByTaskId(id);
if(!result.isSuccess()){
error.add(id);
}
......
......@@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.annotation.TableName;
import com.dlink.db.model.SuperEntity;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.job.JobConfig;
import lombok.Data;
import lombok.EqualsAndHashCode;
......@@ -56,5 +57,12 @@ public class Task extends SuperEntity{
return new ExecutorSetting(checkPoint,parallelism,fragment,savePointPath,alias,configMap);
}
public JobConfig getSubmitConfig(){
boolean useRemote = true;
if(clusterId==null||clusterId==0){
useRemote = false;
}
return new JobConfig(false,false,useRemote,clusterId,getId(),alias,fragment,checkPoint,parallelism,savePointPath);
}
}
......@@ -17,5 +17,7 @@ public interface ClusterService extends ISuperService<Cluster> {
String getJobManagerAddress(Cluster cluster);
String buildEnvironmentAddress(boolean useRemote,Integer id);
List<Cluster> listEnabledAll();
}
......@@ -2,6 +2,7 @@ package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.job.JobResult;
import com.dlink.model.Task;
import com.dlink.result.SubmitResult;
......@@ -13,7 +14,7 @@ import com.dlink.result.SubmitResult;
*/
public interface TaskService extends ISuperService<Task> {
SubmitResult submitByTaskId(Integer id);
JobResult submitByTaskId(Integer id);
Task getTaskInfoById(Integer id);
......
......@@ -3,12 +3,16 @@ package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.assertion.Assert;
import com.dlink.cluster.FlinkCluster;
import com.dlink.constant.FlinkConstant;
import com.dlink.constant.NetConstant;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.ClusterMapper;
import com.dlink.model.Cluster;
import com.dlink.service.ClusterService;
import org.springframework.stereotype.Service;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
/**
......@@ -37,6 +41,24 @@ public class ClusterServiceImpl extends SuperServiceImpl<ClusterMapper, Cluster>
return host;
}
@Override
public String buildEnvironmentAddress(boolean useRemote, Integer id) {
String address = FlinkConstant.LOCAL_HOST;
if(useRemote) {
return getJobManagerAddress(getById(id));
}else{
try {
InetAddress inetAddress = InetAddress.getLocalHost();
if(inetAddress!=null) {
return inetAddress.getHostAddress()+ NetConstant.COLON+FlinkConstant.PORT;
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
return address;
}
@Override
public List<Cluster> listEnabledAll() {
return this.list(new QueryWrapper<Cluster>().eq("enabled",1));
......
......@@ -43,34 +43,15 @@ public class StudioServiceImpl implements StudioService {
@Override
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
JobConfig config = studioExecuteDTO.getJobConfig();
buildEnvironmentAddress(config,studioExecuteDTO.getClusterId());
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(),studioExecuteDTO.getClusterId()));
JobManager jobManager = JobManager.build(config);
return jobManager.executeSql(studioExecuteDTO.getStatement());
}
private void buildEnvironmentAddress(JobConfig config,Integer clusterId){
if(config.isUseRemote()) {
config.setAddress(clusterService.getJobManagerAddress(
clusterService.getById(clusterId)
));
}else{
try {
InetAddress address = InetAddress.getLocalHost();
if(address!=null) {
config.setAddress(address.getHostAddress());
}else{
config.setAddress(FlinkConstant.LOCAL_HOST);
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
@Override
public IResult executeDDL(StudioDDLDTO studioDDLDTO) {
JobConfig config = studioDDLDTO.getJobConfig();
buildEnvironmentAddress(config,studioDDLDTO.getClusterId());
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(),studioDDLDTO.getClusterId()));
JobManager jobManager = JobManager.build(config);
return jobManager.executeDDL(studioDDLDTO.getStatement());
}
......
......@@ -3,11 +3,14 @@ package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.assertion.Assert;
import com.dlink.cluster.FlinkCluster;
import com.dlink.constant.FlinkConstant;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.exception.BusException;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.job.JobResult;
import com.dlink.mapper.TaskMapper;
import com.dlink.model.Cluster;
import com.dlink.model.Statement;
......@@ -19,6 +22,9 @@ import com.dlink.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* 任务 服务实现类
*
......@@ -34,8 +40,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private ClusterService clusterService;
@Override
public SubmitResult submitByTaskId(Integer id) {
Task task = this.getById(id);
public JobResult submitByTaskId(Integer id) {
/*Task task = this.getById(id);
Assert.check(task);
Cluster cluster = clusterService.getById(task.getClusterId());
Statement statement = statementService.getById(id);
......@@ -54,7 +60,15 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return jobManager.submit(statement.getStatement());
}else{
throw new BusException("该任务的集群不存在");
}
}*/
Task task = this.getById(id);
Assert.check(task);
Statement statement = statementService.getById(id);
Assert.check(statement);
JobConfig config = task.getSubmitConfig();
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(),task.getClusterId()));
JobManager jobManager = JobManager.build(config);
return jobManager.executeSql(statement.getStatement());
}
@Override
......
......@@ -54,7 +54,27 @@ public class JobConfig {
this.clusterId = clusterId;
}
public JobConfig(boolean useResult, boolean useSession, boolean useRemote, Integer clusterId,
Integer taskId, String jobName, boolean useSqlFragment, Integer checkpoint,
Integer parallelism, String savePointPath) {
this.useResult = useResult;
this.useSession = useSession;
this.useRemote = useRemote;
this.clusterId = clusterId;
this.taskId = taskId;
this.jobName = jobName;
this.useSqlFragment = useSqlFragment;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.savePointPath = savePointPath;
}
public ExecutorSetting getExecutorSetting(){
return new ExecutorSetting(checkpoint,parallelism,useSqlFragment,savePointPath,jobName);
}
public JobConfig buildSubmitConfig(Integer clusterId, Integer taskId, String jobName,boolean useSqlFragment, Integer checkpoint,
Integer parallelism, String savePointPath){
return new JobConfig(false,false,false,clusterId,taskId,jobName,useSqlFragment,checkpoint,parallelism,savePointPath);
}
}
......@@ -124,51 +124,6 @@ public class JobManager extends RunTime {
return false;
}
public RunResult execute(String statement) {
RunResult runResult = new RunResult(sessionId, statement, environmentSetting.getHost(), environmentSetting.getPort(), executorSetting, executorSetting.getJobName());
Executor executor = createExecutorWithSession();
String[] Statements = statement.split(";");
int currentIndex = 0;
try {
for (String item : Statements) {
currentIndex++;
if (item.trim().isEmpty()) {
continue;
}
SqlType operationType = Operations.getOperationType(item);
long start = System.currentTimeMillis();
CustomTableEnvironmentImpl stEnvironment = executor.getCustomTableEnvironmentImpl();
if (!FlinkInterceptor.build(stEnvironment, item)) {
TableResult tableResult = executor.executeSql(item);
if (tableResult.getJobClient().isPresent()) {
runResult.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
}
IResult result = ResultBuilder.build(operationType, maxRowNum, "", false).getResult(tableResult);
runResult.setResult(result);
}
long finish = System.currentTimeMillis();
long timeElapsed = finish - start;
runResult.setTime(timeElapsed);
runResult.setFinishDate(LocalDateTime.now());
runResult.setSuccess(true);
}
} catch (Exception e) {
e.printStackTrace();
StackTraceElement[] trace = e.getStackTrace();
StringBuffer resMsg = new StringBuffer("");
for (StackTraceElement s : trace) {
resMsg.append(" \n " + s + " ");
}
runResult.setFinishDate(LocalDateTime.now());
runResult.setSuccess(false);
// runResult.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage());
runResult.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + " \n >>>堆栈信息<<<" + resMsg.toString());
// runResult.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "\n >>>异常原因<<< \n" + e.getCause().toString());
return runResult;
}
return runResult;
}
public SubmitResult submit(String statement) {
if (statement == null || "".equals(statement)) {
return SubmitResult.error("FlinkSql语句不存在");
......
......@@ -90,8 +90,8 @@ public class JobManagerTest {
sqls.add(sql2);
sqls.add(sql3);
String sql = sql1+sql2+sql3;
RunResult result = jobManager.execute(sql);
System.out.println(result.isSuccess());
JobResult jobResult = jobManager.executeSql(sql);
System.out.println(jobResult.isSuccess());
}
@Test
......
......@@ -34,8 +34,9 @@ const StudioMenu = (props: any) => {
if(selectsql==null||selectsql==''){
selectsql=current.value;
}
let useSession = current.task.useSession;
let param ={
useSession:current.task.useSession,
useSession:useSession,
session:current.task.session,
useRemote:current.task.useRemote,
clusterId:current.task.clusterId,
......@@ -84,7 +85,7 @@ const StudioMenu = (props: any) => {
type: "Studio/saveTabs",
payload: newTabs,
});
showTables(current.task,dispatch);
useSession&&showTables(current.task,dispatch);
})
};
......@@ -110,6 +111,9 @@ const StudioMenu = (props: any) => {
key:taskKey,
icon: <SmileOutlined style={{ color: '#108ee9' }} />,
});
setTimeout(()=>{
refs?.history?.current?.reload();
},2000);
const res = await postDataArray('/api/task/submit',[task.id]);
notification.close(taskKey);
if(res.datas[0].success){
......
......@@ -48,12 +48,12 @@ const getParentKey = (key, tree) => {
};
const StudioTree: React.FC<StudioTreeProps> = (props) => {
const {rightClickMenu,dispatch,tabs,refs} = props;
const [treeData, setTreeData] = useState<TreeDataNode[]>();
const [dataList, setDataList] = useState<[]>();
const [expandedKeys, setExpandedKeys] = useState<[]>();
const [rightClickNodeTreeItem,setRightClickNodeTreeItem] = useState<RightClickMenu>();
const {rightClickMenu,dispatch,tabs} = props;
const [updateCatalogueModalVisible, handleUpdateCatalogueModalVisible] = useState<boolean>(false);
const [updateTaskModalVisible, handleUpdateTaskModalVisible] = useState<boolean>(false);
const [isCreate, setIsCreate] = useState<boolean>(true);
......@@ -193,6 +193,9 @@ const StudioTree: React.FC<StudioTreeProps> = (props) => {
let task = {
id:node.taskId,
};
setTimeout(()=>{
refs?.history?.current?.reload();
},2000);
handleSubmit('/api/task/submit','作业',[task]);
}
});
......@@ -415,4 +418,5 @@ export default connect(({Studio}: { Studio: StateType }) => ({
currentPath:Studio.currentPath,
tabs: Studio.tabs,
rightClickMenu: Studio.rightClickMenu,
refs: Studio.refs,
}))(StudioTree);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment