Commit e2e7c5a3 authored by wenmo's avatar wenmo

flink执行图

parent 3df7d3bb
......@@ -170,7 +170,9 @@ dlink -- 父项目
| npm | 7.19.0 |
| node.js | 14.17.0 |
| jdk | 1.8.0_201|
| maven | 3.6.0 |
| maven | 3.6.0 |
| lombok | 1.18.16 |
| mysql | 5.7+ |
```shell
mvn clean install -Dmaven.test.skip=true
......@@ -191,7 +193,7 @@ mvn clean install -Dmaven.test.skip=true
Flink 的版本取决于 lib 下的 dlink-client-1.12.jar。
当前版本默认为 Flink 1.12.4 API。
向其他版本的集群提交任务可能存在问题,未来将实现 1.13、1.11、1.10.
向其他版本的集群提交任务可能存在问题,已实现 1.11、1.12、1.13,切换版本时只需要将对应依赖在lib下进行替换,然后重启即可。
## 使用手册
......@@ -203,11 +205,9 @@ Flink 的版本取决于 lib 下的 dlink-client-1.12.jar。
#### 集群中心
注册Flink集群地址,格式为 host:port ,用英文逗号分隔。
新增和修改的等待时间较长,是因为需要重新计算最新的 JM 地址。
心跳检测为手动触发,会更新集群状态与 JM 地址。
注册 Flink 集群地址时,格式为 host:port ,用英文逗号分隔。即添加 Flink 集群的 JobManager 的 RestApi 地址。当 HA 模式时,地址间用英文逗号分隔,例如:192.168.123.101:8081,192.168.123.102:8081,192.168.123.103:8081。
新增和修改的等待时间较长,是因为需要检测最新的 JobManager 地址。
心跳检测为手动触发,会更新集群状态与 JobManager 地址。
#### Studio
......@@ -215,7 +215,6 @@ Flink 的版本取决于 lib 下的 dlink-client-1.12.jar。
2. 在中间编辑区编写 FlinkSQL 。
3. 在右侧配置执行参数。
4. Fragment 开启后,可以使用增强的 sql 片段语法:
```sql
sf:=select * from;tb:=student;
${sf} ${tb}
......@@ -223,7 +222,6 @@ ${sf} ${tb}
select * from student
```
5. 内置 sql 增强语法-表值聚合:
```sql
CREATE AGGTABLE aggdemo AS
SELECT myField,value,rank
......@@ -233,11 +231,11 @@ AGG BY TOP2(value) as (value,rank);
```
6. MaxRowNum 为批流执行Select时预览查询结果的最大集合长度,默认 100,最大 9999。
7. SavePointPath 当前版本属于非 Jar 提交,暂不可用。
8. Flink 共享会话共享 Catalogue
9. 连接器为 Catalogue 里的表信息,清空按钮会销毁当前会话。
8. Flink 共享会话共享 Catalog 。
9. 连接器为 Catalog 里的表信息,清空按钮会销毁当前会话。
10. Local 模式请使用少量测试数据,真实数据请使用远程集群。
11. 执行 SQL 时,如果您选中了部分 SQL,则会执行选中的内容,否则执行全部内容。
12. 小火箭的提交功能是异步提交当前任务保存的 FlinkSQL 及配置到集群。无法提交草稿。
12. 小火箭的提交功能是异步提交当前任务保存的 FlinkSQL 及配置到集群。无法提交草稿。
13. 执行信息或者历史中那个很长很长的就是集群上的 JobId。
14. 草稿是无法被异步远程提交的,只能同步执行。
15. 灰色按钮代表近期将实现。
......@@ -256,7 +254,7 @@ AGG BY TOP2(value) as (value,rank);
[Mybatis Plus](https://github.com/baomidou/mybatis-plus)
[ant-design-pro](https://github.com/aiwenmo/ant-design-pro)
[ant-design-pro](https://github.com/ant-design/ant-design-pro)
[Monaco Editor](https://github.com/Microsoft/monaco-editor)
......
......@@ -47,6 +47,14 @@ public class StudioController {
return Result.succeed(studioService.explainSql(studioExecuteDTO),"解释成功");
}
/**
* 解释Sql
*/
@PostMapping("/getStreamGraph")
public Result getStreamGraph(@RequestBody StudioExecuteDTO studioExecuteDTO) {
return Result.succeed(studioService.getStreamGraph(studioExecuteDTO),"获取执行图成功");
}
/**
* 进行DDL操作
*/
......
......@@ -11,6 +11,7 @@ import com.dlink.result.SelectResult;
import com.dlink.result.SqlExplainResult;
import com.dlink.session.SessionInfo;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.List;
......@@ -28,6 +29,8 @@ public interface StudioService {
List<SqlExplainResult> explainSql(StudioExecuteDTO studioExecuteDTO);
ObjectNode getStreamGraph(StudioExecuteDTO studioExecuteDTO);
SelectResult getJobData(String jobId);
SessionInfo createSession(SessionDTO sessionDTO, String createUser);
......
......@@ -23,6 +23,7 @@ import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool;
import com.dlink.trans.Operations;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
......@@ -71,6 +72,16 @@ public class StudioServiceImpl implements StudioService {
return jobManager.explainSql(studioExecuteDTO.getStatement());
}
@Override
public ObjectNode getStreamGraph(StudioExecuteDTO studioExecuteDTO) {
JobConfig config = studioExecuteDTO.getJobConfig();
if(!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), studioExecuteDTO.getClusterId()));
}
JobManager jobManager = JobManager.build(config);
return jobManager.getStreamGraph(studioExecuteDTO.getStatement());
}
@Override
public SelectResult getJobData(String jobId) {
return JobManager.getJobData(jobId);
......
......@@ -3,9 +3,9 @@ package com.dlink.executor.custom;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
......
package com.dlink.executor.custom;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
......
......@@ -3,9 +3,9 @@ package com.dlink.executor.custom;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
......
......@@ -15,6 +15,14 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -2,13 +2,12 @@ package com.dlink.executor;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
......
......@@ -13,7 +13,8 @@ import com.dlink.explainer.trans.TransGenerator;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.result.SqlExplainResult;
import com.dlink.utils.SqlUtil;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier;
......@@ -33,6 +34,7 @@ import java.util.Optional;
public class Explainer {
private Executor executor;
private ObjectMapper mapper = new ObjectMapper();
public Explainer(Executor executor) {
this.executor = executor;
......@@ -67,6 +69,22 @@ public class Explainer {
return sqlExplainRecords;
}
public ObjectNode getStreamGraph(String statement){
List<SqlExplainResult> sqlExplainRecords = explainSqlResult(statement);
List<String> strPlans = new ArrayList<>();
for (int i = 0; i < sqlExplainRecords.size(); i++) {
if (Asserts.isNotNull(sqlExplainRecords.get(i).getType())
&& sqlExplainRecords.get(i).getType().contains(FlinkSQLConstant.DML)) {
strPlans.add(sqlExplainRecords.get(i).getSql());
}
}
if(strPlans.size()>0){
return translateObjectNode(strPlans.get(0));
}else{
return mapper.createObjectNode();
}
}
private List<TableCAResult> generateTableCA(String statement, boolean onlyTable) {
List<SqlExplainResult> sqlExplainRecords = explainSqlResult(statement);
List<String> strPlans = new ArrayList<>();
......@@ -134,8 +152,8 @@ public class Explainer {
return results;
}
private ObjectNode translateObjectNode(String strPlans) {
return executor.getStreamGraph(strPlans);
private ObjectNode translateObjectNode(String statement) {
return executor.getStreamGraph(statement);
}
private List<Trans> translateTrans(ObjectNode plan) {
......
package com.dlink.explainer.trans;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
import java.util.List;
......
package com.dlink.explainer.trans;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
......
package com.dlink.explainer.trans;
import com.dlink.assertion.Asserts;
import com.dlink.exception.SqlException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
import java.util.HashMap;
......
......@@ -15,6 +15,7 @@ import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool;
import com.dlink.trans.Operations;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.table.api.TableResult;
......@@ -302,4 +303,9 @@ public class JobManager extends RunTime {
Explainer explainer = Explainer.build(executor);
return explainer.explainSqlResult(statement);
}
public ObjectNode getStreamGraph(String statement){
Explainer explainer = Explainer.build(executor);
return explainer.getStreamGraph(statement);
}
}
......@@ -5,7 +5,7 @@ import com.dlink.explainer.Explainer;
import com.dlink.explainer.ca.ColumnCAResult;
import com.dlink.explainer.ca.TableCAResult;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.table.api.ExplainDetail;
import java.util.ArrayList;
......
......@@ -46,7 +46,7 @@
"not ie <= 10"
],
"dependencies": {
"@ant-design/charts": "^1.1.18",
"@ant-design/charts": "^1.2.10",
"@ant-design/icons": "^4.5.0",
"@ant-design/pro-descriptions": "^1.6.8",
"@ant-design/pro-form": "^1.18.3",
......
......@@ -106,7 +106,7 @@ const StudioCA = (props:any) => {
});
res.then((result)=>{
if(result.code==0){
setOneTableCAData(convertTreeData(result.datas[0]));
setOneTableCAData(fullTreeData(result.datas[0]));
}else{
setOneTableCAData(null);
}
......@@ -120,18 +120,18 @@ const StudioCA = (props:any) => {
});
res.then((result)=>{
if(result.code==0){
setOneColumnCAData(convertTreeData(result.datas[0]));
setOneColumnCAData(fullTreeData(result.datas[0]));
}else{
setOneColumnCAData(null);
}
})
};
const convertTreeData=(node)=>{
const fullTreeData=(node)=>{
if(node){
node.body=node.columns.toString();
for(let i in node.children){
node.children[i] = convertTreeData(node.children[i])
node.children[i] = fullTreeData(node.children[i])
}
return node;
}
......
import { Empty } from "antd";
import {FlowAnalysisGraph} from '@ant-design/charts';
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
import styles from "./index.less";
import React, {useState} from "react";
const StudioGraph = (props:any) => {
const {graphData,current,currentSession} = props;
const config = {
data:graphData,
nodeCfg: {
size: [140, 25],
items: {
padding: 6,
containerStyle: {
fill: '#fff',
},
style: (cfg, group, type) => {
const styles = {
icon: {
width: 12,
height: 12,
},
value: {
fill: '#f00',
},
text: {
fill: '#aaa',
},
};
return styles[type];
},
},
nodeStateStyles: {
hover: {
stroke: '#1890ff',
lineWidth: 2,
},
},
title: {
containerStyle: {
fill: 'transparent',
},
style: {
fill: '#000',
fontSize: 12,
},
},
style: {
fill: '#E6EAF1',
stroke: '#B2BED5',
radius: [2, 2, 2, 2],
},
},
edgeCfg: {
label: {
style: {
fill: '#aaa',
fontSize: 12,
fillOpacity: 1,
},
},
style: (edge) => {
const stroke = edge.target === '0' ? '#c86bdd' : '#5ae859';
return {
stroke,
lineWidth: 1,
strokeOpacity: 0.5,
};
},
edgeStateStyles: {
hover: {
lineWidth: 2,
strokeOpacity: 1,
},
},
},
markerCfg: (cfg) => {
const {edges} = graphData;
return {
position: 'right',
show: edges.find((item) => item.source == cfg.id),
collapsed: !edges.find((item) => item.source == cfg.id),
};
},
behaviors: ['drag-canvas', 'zoom-canvas', 'drag-node'],
};
/*const buildGraphEdges=(nodes)=>{
let edges = [];
for(let i in nodes){
if(nodes[i].predecessors){
for(let j in nodes[i].predecessors){
edges.push({source: nodes[i].predecessors[j].id.toString(),
target: nodes[i].id.toString(),
value: nodes[i].predecessors[j].ship_strategy})
}
}
}
return edges;
};*/
return (
<>{graphData? <FlowAnalysisGraph {...config} /> :<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />}
</>
);
};
export default connect(({ Studio }: { Studio: StateType }) => ({
current: Studio.current,
currentSession: Studio.currentSession,
}))(StudioGraph);
......@@ -12,8 +12,9 @@ import Breadcrumb from "antd/es/breadcrumb/Breadcrumb";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
import {handleAddOrUpdate, postDataArray} from "@/components/Common/crud";
import {executeSql, explainSql} from "@/pages/FlinkSqlStudio/service";
import {executeSql, explainSql, getStreamGraph} from "@/pages/FlinkSqlStudio/service";
import StudioHelp from "./StudioHelp";
import StudioGraph from "./StudioGraph";
import {showCluster, showTables} from "@/components/Studio/StudioEvent/DDL";
import {useState} from "react";
import StudioExplain from "../StudioConsole/StudioExplain";
......@@ -29,7 +30,9 @@ const StudioMenu = (props: any) => {
const {tabs, current, currentPath, form, refs, dispatch, currentSession} = props;
const [modalVisible, handleModalVisible] = useState<boolean>(false);
const [graphModalVisible, handleGraphModalVisible] = useState<boolean>(false);
const [explainData, setExplainData] = useState([]);
const [graphData, setGraphData] = useState();
const execute = () => {
let selectsql = null;
......@@ -131,7 +134,7 @@ const StudioMenu = (props: any) => {
});
};
const onCheckSql = ()=>{
const onCheckSql = () => {
let selectsql = null;
if (current.monaco.current) {
let selection = current.monaco.current.editor.getSelection();
......@@ -171,6 +174,65 @@ const StudioMenu = (props: any) => {
})
};
const onGetStreamGraph=()=>{
let selectsql = null;
if (current.monaco.current) {
let selection = current.monaco.current.editor.getSelection();
selectsql = current.monaco.current.editor.getModel().getValueInRange(selection);
}
if (selectsql == null || selectsql == '') {
selectsql = current.value;
}
let useSession = !!currentSession.session;
let param = {
useSession: useSession,
session: currentSession.session,
useRemote: current.task.useRemote,
clusterId: current.task.clusterId,
useResult: current.task.useResult,
maxRowNum: current.task.maxRowNum,
statement: selectsql,
fragment: current.task.fragment,
jobName: current.task.jobName,
parallelism: current.task.parallelism,
checkPoint: current.task.checkPoint,
savePointPath: current.task.savePointPath,
};
const res = getStreamGraph(param);
handleGraphModalVisible(true);
res.then((result)=>{
if(result.code==0){
setGraphData(buildGraphData(result.datas));
}else{
setGraphData(undefined);
}
})
};
const buildGraphData=(data)=>{
let edges = [];
for(let i in data.nodes){
data.nodes[i].id=data.nodes[i].id.toString();
data.nodes[i].value={
title:data.nodes[i].pact,
items: [
{
text: data.nodes[i].contents,
},
],
};
if(data.nodes[i].predecessors){
for(let j in data.nodes[i].predecessors){
edges.push({source: data.nodes[i].predecessors[j].id.toString(),
target: data.nodes[i].id.toString(),
value: data.nodes[i].predecessors[j].ship_strategy})
}
}
}
data.edges = edges;
return data;
};
const saveSqlAndSettingToTask = async () => {
const fieldsValue = await form.validateFields();
if (current.task) {
......@@ -288,14 +350,17 @@ const StudioMenu = (props: any) => {
<Tooltip title="检查当前的 FlinkSql">
<Button
type="text"
icon={<SafetyCertificateTwoTone />}
icon={<SafetyCertificateTwoTone/>}
onClick={onCheckSql}
/>
</Tooltip>
<Button
type="text"
icon={<FlagTwoTone twoToneColor="#ddd"/>}
/>
<Tooltip title="获取当前的 FlinkSql 的执行图">
<Button
type="text"
icon={<FlagTwoTone/>}
onClick={onGetStreamGraph}
/>
</Tooltip>
<Tooltip title="执行当前的 FlinkSql">
<Button
type="text"
......@@ -358,6 +423,16 @@ const StudioMenu = (props: any) => {
modalVisible={modalVisible}
data={explainData}
/>
<Modal
width={1200}
bodyStyle={{padding: '32px 40px 48px'}}
destroyOnClose
title="FlinkSQL 的 StreamGraph"
visible={graphModalVisible}
onCancel={() => handleGraphModalVisible(false)}
>
<StudioGraph graphData={graphData} />
</Modal>
</Row>
);
};
......
......@@ -28,6 +28,15 @@ export async function explainSql(params: StudioParam) {
});
}
export async function getStreamGraph(params: StudioParam) {
return request<API.Result>('/api/studio/getStreamGraph', {
method: 'POST',
data: {
...params,
},
});
}
export async function getJobData(jobId:string) {
return request<API.Result>('/api/studio/getJobData', {
method: 'GET',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment