Commit 8d691890 authored by wenmo's avatar wenmo

展示JobPlan

parent ea7550b3
...@@ -55,6 +55,14 @@ public class StudioController { ...@@ -55,6 +55,14 @@ public class StudioController {
return Result.succeed(studioService.getStreamGraph(studioExecuteDTO),"获取执行图成功"); return Result.succeed(studioService.getStreamGraph(studioExecuteDTO),"获取执行图成功");
} }
/**
* 获取sql的jobplan
*/
@PostMapping("/getJobPlan")
public Result getJobPlan(@RequestBody StudioExecuteDTO studioExecuteDTO) {
return Result.succeed(studioService.getJobPlan(studioExecuteDTO),"获取作业计划成功");
}
/** /**
* 进行DDL操作 * 进行DDL操作
*/ */
......
...@@ -31,6 +31,8 @@ public interface StudioService { ...@@ -31,6 +31,8 @@ public interface StudioService {
ObjectNode getStreamGraph(StudioExecuteDTO studioExecuteDTO); ObjectNode getStreamGraph(StudioExecuteDTO studioExecuteDTO);
ObjectNode getJobPlan(StudioExecuteDTO studioExecuteDTO);
SelectResult getJobData(String jobId); SelectResult getJobData(String jobId);
SessionInfo createSession(SessionDTO sessionDTO, String createUser); SessionInfo createSession(SessionDTO sessionDTO, String createUser);
......
...@@ -27,7 +27,9 @@ import com.dlink.service.StudioService; ...@@ -27,7 +27,9 @@ import com.dlink.service.StudioService;
import com.dlink.session.SessionConfig; import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo; import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool; import com.dlink.session.SessionPool;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -98,6 +100,26 @@ public class StudioServiceImpl implements StudioService { ...@@ -98,6 +100,26 @@ public class StudioServiceImpl implements StudioService {
return jobManager.getStreamGraph(studioExecuteDTO.getStatement()); return jobManager.getStreamGraph(studioExecuteDTO.getStatement());
} }
@Override
public ObjectNode getJobPlan(StudioExecuteDTO studioExecuteDTO) {
JobConfig config = studioExecuteDTO.getJobConfig();
config.setType(GatewayType.LOCAL.getLongValue());
if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
}
JobManager jobManager = JobManager.build(config);
String planJson = jobManager.getJobPlanJson(studioExecuteDTO.getStatement());
ObjectMapper mapper = new ObjectMapper();
ObjectNode objectNode =mapper.createObjectNode();
try {
objectNode = (ObjectNode) mapper.readTree(planJson);
} catch (JsonProcessingException e) {
e.printStackTrace();
}finally {
return objectNode;
}
}
@Override @Override
public SelectResult getJobData(String jobId) { public SelectResult getJobData(String jobId) {
return JobManager.getJobData(jobId); return JobManager.getJobData(jobId);
......
...@@ -365,13 +365,15 @@ public class JobManager extends RunTime { ...@@ -365,13 +365,15 @@ public class JobManager extends RunTime {
} }
public List<SqlExplainResult> explainSql(String statement) { public List<SqlExplainResult> explainSql(String statement) {
Explainer explainer = Explainer.build(executor); return Explainer.build(executor).explainSqlResult(statement);
return explainer.explainSqlResult(statement);
} }
public ObjectNode getStreamGraph(String statement) { public ObjectNode getStreamGraph(String statement) {
Explainer explainer = Explainer.build(executor); return Explainer.build(executor).getStreamGraph(statement);
return explainer.getStreamGraph(statement); }
public String getJobPlanJson(String statement) {
return Explainer.build(executor).getJobPlanInfo(statement).getJsonPlan();
} }
public boolean cancel(String jobId) { public boolean cancel(String jobId) {
......
...@@ -24,4 +24,53 @@ import java.util.Map; ...@@ -24,4 +24,53 @@ import java.util.Map;
**/ **/
public class FlinkSqlPlusTest { public class FlinkSqlPlusTest {
@Test
public void getJobPlanInfo(){
String sql = "jdbcconfig:='connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',\n" +
" 'username'='dlink',\n" +
" 'password'='dlink',;\n" +
"create temporary function TOP2 as 'com.dlink.ud.udtaf.Top2';\n" +
"CREATE TABLE student (\n" +
" sid INT,\n" +
" name STRING,\n" +
" PRIMARY KEY (sid) NOT ENFORCED\n" +
") WITH (\n" +
" ${jdbcconfig}\n" +
" 'table-name' = 'student'\n" +
");\n" +
"CREATE TABLE score (\n" +
" cid INT,\n" +
" sid INT,\n" +
" cls STRING,\n" +
" score INT,\n" +
" PRIMARY KEY (cid) NOT ENFORCED\n" +
") WITH (\n" +
" ${jdbcconfig}\n" +
" 'table-name' = 'score'\n" +
");\n" +
"CREATE TABLE scoretop2 (\n" +
" cls STRING,\n" +
" score INT,\n" +
" `rank` INT,\n" +
" PRIMARY KEY (cls,`rank`) NOT ENFORCED\n" +
") WITH (\n" +
" ${jdbcconfig}\n" +
" 'table-name' = 'scoretop2'\n" +
");\n" +
"CREATE AGGTABLE aggscore AS \n" +
"SELECT cls,score,rank\n" +
"FROM score\n" +
"GROUP BY cls\n" +
"AGG BY TOP2(score) as (score,rank);\n" +
"\n" +
"insert into scoretop2\n" +
"select \n" +
"b.cls,b.score,b.`rank`\n" +
"from aggscore b";
FlinkSqlPlus plus = FlinkSqlPlus.build();
JobPlanInfo jobPlanInfo = plus.getJobPlanInfo(sql);
System.out.println(jobPlanInfo.getJsonPlan());
}
} }
...@@ -14,6 +14,7 @@ import java.util.Map; ...@@ -14,6 +14,7 @@ import java.util.Map;
* @author wenmo * @author wenmo
* @since 2021/6/14 21:19 * @since 2021/6/14 21:19
*/ */
@Deprecated
public class FunctionManager { public class FunctionManager {
private static Map<String,UDFunction> functions = new HashMap<String,UDFunction>(){ private static Map<String,UDFunction> functions = new HashMap<String,UDFunction>(){
......
...@@ -8,6 +8,7 @@ import org.apache.flink.table.functions.FunctionDefinition; ...@@ -8,6 +8,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
* @author wenmo * @author wenmo
* @since 2021/6/14 22:14 * @since 2021/6/14 22:14
*/ */
@Deprecated
public class UDFunction { public class UDFunction {
public enum UDFunctionType { public enum UDFunctionType {
......
...@@ -30,7 +30,7 @@ public class FlinkInterceptor { ...@@ -30,7 +30,7 @@ public class FlinkInterceptor {
if(executor.isUseSqlFragment()) { if(executor.isUseSqlFragment()) {
statement = executor.getSqlManager().parseVariable(statement); statement = executor.getSqlManager().parseVariable(statement);
} }
initFunctions(executor.getCustomTableEnvironmentImpl(), statement); // initFunctions(executor.getCustomTableEnvironmentImpl(), statement);
return statement.trim(); return statement.trim();
} }
...@@ -47,6 +47,7 @@ public class FlinkInterceptor { ...@@ -47,6 +47,7 @@ public class FlinkInterceptor {
return false; return false;
} }
@Deprecated
private static void initFunctions(CustomTableEnvironmentImpl stEnvironment, String statement) { private static void initFunctions(CustomTableEnvironmentImpl stEnvironment, String statement) {
Map<String, UDFunction> usedFunctions = FunctionManager.getUsedFunctions(statement); Map<String, UDFunction> usedFunctions = FunctionManager.getUsedFunctions(statement);
String[] udfs = stEnvironment.listUserDefinedFunctions(); String[] udfs = stEnvironment.listUserDefinedFunctions();
......
...@@ -10,44 +10,29 @@ const StudioGraph = (props:any) => { ...@@ -10,44 +10,29 @@ const StudioGraph = (props:any) => {
const config = { const config = {
data, data,
height:350,
nodeCfg: { nodeCfg: {
size: [140, 65], size: [160, 65],
/*anchorPoints: [
[0.5, 1],
[0.5, 0],
],*/
items: { items: {
padding: [6, 0, 0], autoEllipsis: false,
padding: [10],
containerStyle: { containerStyle: {
fill: '#fff', fill: '#fff',
width:'100px', width:'100px',
display: 'inline-block',
overflow:'hidden',
textOverflow:'ellipsis',
whiteSpace:'nowrap',
}, },
style: (cfg, group, type) => { style: (cfg, group, type) => {
const styles = { const styles = {
icon: {
width: 12,
height: 12,
},
value: { value: {
fill: '#f00', fill: '#000',
}, },
text: { text: {
fill: '#aaa', fill: '#222',
width:'100px', width:'100px',
display: 'inline-block',
overflow:'hidden',
textOverflow:'ellipsis',
whiteSpace:'nowrap',
}, },
}; };
return styles[type]; return styles[type];
}, },
}, },
nodeStateStyles: { nodeStateStyles: {
hover: { hover: {
stroke: '#1890ff', stroke: '#1890ff',
...@@ -55,20 +40,21 @@ const StudioGraph = (props:any) => { ...@@ -55,20 +40,21 @@ const StudioGraph = (props:any) => {
}, },
}, },
style: { style: {
radius: [2, 2, 2, 2], fill: '#40a9ff',
stroke: '#1890ff',
}, },
}, },
edgeCfg: { edgeCfg: {
type: 'polyline', type: 'polyline',
label: { label: {
style: { style: {
fill: '#aaa', fill: '#666',
fontSize: 12, fontSize: 12,
fillOpacity: 1, fillOpacity: 1,
}, },
}, },
endArrow: { endArrow: {
fill: '#ddd', fill: '#333',
}, },
edgeStateStyles: { edgeStateStyles: {
hover: { hover: {
......
...@@ -12,7 +12,7 @@ import Breadcrumb from "antd/es/breadcrumb/Breadcrumb"; ...@@ -12,7 +12,7 @@ import Breadcrumb from "antd/es/breadcrumb/Breadcrumb";
import {StateType} from "@/pages/FlinkSqlStudio/model"; import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi"; import {connect} from "umi";
import {handleAddOrUpdate, postDataArray} from "@/components/Common/crud"; import {handleAddOrUpdate, postDataArray} from "@/components/Common/crud";
import {executeSql, explainSql, getStreamGraph} from "@/pages/FlinkSqlStudio/service"; import {executeSql, explainSql, getJobPlan} from "@/pages/FlinkSqlStudio/service";
import StudioHelp from "./StudioHelp"; import StudioHelp from "./StudioHelp";
import StudioGraph from "./StudioGraph"; import StudioGraph from "./StudioGraph";
import {showCluster, showTables, saveTask} from "@/components/Studio/StudioEvent/DDL"; import {showCluster, showTables, saveTask} from "@/components/Studio/StudioEvent/DDL";
...@@ -134,18 +134,11 @@ const StudioMenu = (props: any) => { ...@@ -134,18 +134,11 @@ const StudioMenu = (props: any) => {
} }
let useSession = !!currentSession.session; let useSession = !!currentSession.session;
let param = { let param = {
...current.task,
useSession: useSession, useSession: useSession,
session: currentSession.session, session: currentSession.session,
useRemote: current.task.useRemote, configJson: JSON.stringify(current.task.config),
clusterId: current.task.clusterId,
useResult: current.task.useResult,
maxRowNum: current.task.maxRowNum,
statement: selectsql, statement: selectsql,
fragment: current.task.fragment,
jobName: current.task.jobName,
parallelism: current.task.parallelism,
checkPoint: current.task.checkPoint,
savePointPath: current.task.savePointPath,
}; };
const taskKey = (Math.random() * 1000) + ''; const taskKey = (Math.random() * 1000) + '';
notification.success({ notification.success({
...@@ -180,7 +173,7 @@ const StudioMenu = (props: any) => { ...@@ -180,7 +173,7 @@ const StudioMenu = (props: any) => {
configJson: JSON.stringify(current.task.config), configJson: JSON.stringify(current.task.config),
statement: selectsql, statement: selectsql,
}; };
const res = getStreamGraph(param); const res = getJobPlan(param);
handleGraphModalVisible(true); handleGraphModalVisible(true);
res.then((result)=>{ res.then((result)=>{
if(result.code==0){ if(result.code==0){
...@@ -199,16 +192,19 @@ const StudioMenu = (props: any) => { ...@@ -199,16 +192,19 @@ const StudioMenu = (props: any) => {
title:data.nodes[i].pact, title:data.nodes[i].pact,
items: [ items: [
{ {
text: data.nodes[i].contents, text: getRangeText(data.nodes[i].description),
value: data.nodes[i].parallelism, },
{
text: '\r\nParallelism: ',
value: '\r\n '+data.nodes[i].parallelism,
}, },
], ],
}; };
if(data.nodes[i].predecessors){ if(data.nodes[i].inputs){
for(let j in data.nodes[i].predecessors){ for(let j in data.nodes[i].inputs){
edges.push({source: data.nodes[i].predecessors[j].id.toString(), edges.push({source: data.nodes[i].inputs[j].id.toString(),
target: data.nodes[i].id.toString(), target: data.nodes[i].id.toString(),
value: data.nodes[i].predecessors[j].ship_strategy}) value: data.nodes[i].inputs[j].ship_strategy})
} }
} }
} }
...@@ -216,6 +212,37 @@ const StudioMenu = (props: any) => { ...@@ -216,6 +212,37 @@ const StudioMenu = (props: any) => {
return data; return data;
}; };
const getRangeText = (str:string) => {
str = escape2Html(str);
var canvas = getRangeText.canvas || (getRangeText.canvas = document.createElement("canvas"));
var context = canvas.getContext("2d");
context.font = "10px sans-serif";
let result = '';
let count = 1;
for(let i=0,len=str.length;i<len;i++){
result += str[i];
let width = context.measureText(result).width;
if(width >= 110*count) {
result += '\r\n';
count++;
}
}
return result;
}
const getTextWidth = (text:string, font:string) => {
var canvas = getTextWidth.canvas || (getTextWidth.canvas = document.createElement("canvas"));
var context = canvas.getContext("2d");
context.font = font;
var metrics = context.measureText(text);
return metrics.width;
}
const escape2Html = (str:string) => {
let arrEntities={'lt':'<','gt':'>','nbsp':' ','amp':'&','quot':'"'};
return str.replace(/&(lt|gt|nbsp|amp|quot);/ig,function(all,t){return arrEntities[t];});
}
const saveSqlAndSettingToTask = () => { const saveSqlAndSettingToTask = () => {
saveTask(current,dispatch); saveTask(current,dispatch);
}; };
...@@ -392,10 +419,10 @@ const StudioMenu = (props: any) => { ...@@ -392,10 +419,10 @@ const StudioMenu = (props: any) => {
data={explainData} data={explainData}
/> />
<Modal <Modal
width={1200} width={1000}
bodyStyle={{padding: '32px 40px 48px'}} bodyStyle={{padding: '32px 40px 48px'}}
destroyOnClose destroyOnClose
title="FlinkSQL 的 StreamGraph" title="FlinkSQL 的 JobPlan"
visible={graphModalVisible} visible={graphModalVisible}
onCancel={() => handleGraphModalVisible(false)} onCancel={() => handleGraphModalVisible(false)}
> >
......
...@@ -37,6 +37,15 @@ export async function getStreamGraph(params: StudioParam) { ...@@ -37,6 +37,15 @@ export async function getStreamGraph(params: StudioParam) {
}); });
} }
export async function getJobPlan(params: StudioParam) {
return request<API.Result>('/api/studio/getJobPlan', {
method: 'POST',
data: {
...params,
},
});
}
export async function getJobData(jobId:string) { export async function getJobData(jobId:string) {
return request<API.Result>('/api/studio/getJobData', { return request<API.Result>('/api/studio/getJobData', {
method: 'GET', method: 'GET',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment