Commit 6ae896f5 authored by godkaikai's avatar godkaikai

血缘分析核心代码

parent 88e1526e
package com.dlink.controller;
import com.dlink.common.result.Result;
import com.dlink.dto.StudioCADTO;
import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.model.Task;
......@@ -46,6 +47,17 @@ public class StudioController {
return Result.succeed(runResult,"执行成功");
}
/**
* 获取单表的血缘分析
*/
@PostMapping("/getCAByStatement")
public Result getCAByStatement(@RequestBody StudioCADTO studioCADTO) {
switch (studioCADTO.getType()){
case 1:return Result.succeed(studioService.getOneTableColumnCAByStatement(studioCADTO.getStatement()),"执行成功");
default:return Result.failed("敬请期待");
}
}
/**
* 清除指定session
*/
......
package com.dlink.dto;
import lombok.Getter;
import lombok.Setter;
/**
* StudioCADTO
*
* @author qiwenkai
* @since 2021/6/23 14:00
**/
@Getter
@Setter
public class StudioCADTO {
private String statement;
/* 1:单表表级血缘
* 2:单表字段血缘
* 3.全局表级血缘
* 4.全局字段血缘
* */
private Integer type;
}
......@@ -2,8 +2,11 @@ package com.dlink.service;
import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.explainer.ca.TableCANode;
import com.dlink.result.RunResult;
import java.util.List;
/**
* StudioService
*
......@@ -16,4 +19,8 @@ public interface StudioService {
RunResult executeDDL(StudioDDLDTO studioDDLDTO);
boolean clearSession(String session);
List<TableCANode> getOneTableCAByStatement(String statement);
List<TableCANode> getOneTableColumnCAByStatement(String statement);
}
......@@ -7,6 +7,8 @@ import com.dlink.dto.StudioExecuteDTO;
import com.dlink.exception.BusException;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.explainer.ca.CABuilder;
import com.dlink.explainer.ca.TableCANode;
import com.dlink.job.JobManager;
import com.dlink.model.Cluster;
import com.dlink.result.RunResult;
......@@ -16,6 +18,8 @@ import com.dlink.session.SessionPool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* StudioServiceImpl
*
......@@ -94,4 +98,14 @@ public class StudioServiceImpl implements StudioService {
return false;
}
}
@Override
public List<TableCANode> getOneTableCAByStatement(String statement) {
return CABuilder.getOneTableCAByStatement(statement);
}
@Override
public List<TableCANode> getOneTableColumnCAByStatement(String statement) {
return CABuilder.getOneTableColumnCAByStatement(statement);
}
}
......@@ -3,6 +3,9 @@ package com.dlink.executor.custom;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.calcite.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.calcite.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
......@@ -199,13 +202,14 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
} else if (operation instanceof QueryOperation) {
record.setType("Query DML");
} else {
operationlist.remove(i);
record.setExplain(operation.asSummaryString());
record.setExplainTrue(true);
record.setType("DDL");
operationlist.remove(i);
i=i-1;
}
}
if(operationlist.size()==0){
//record.setExplain("DDL语句不进行解释。");
return record;
}
record.setExplain(planner.explain(operationlist, extraDetails));
......
......@@ -22,6 +22,10 @@
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
......@@ -31,17 +35,17 @@
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId>
<scope>provided</scope>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc</artifactId>
<scope>provided</scope>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-function</artifactId>
<scope>provided</scope>
<!--<scope>provided</scope>-->
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.dlink.exception;
import org.apache.flink.annotation.PublicEvolving;
/**
* SqlException
*
* @author wenmo
* @since 2021/6/22
**/
@PublicEvolving
public class SqlException extends RuntimeException {
public SqlException(String message, Throwable cause) {
super(message, cause);
}
public SqlException(String message) {
super(message);
}
}
\ No newline at end of file
......@@ -8,6 +8,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
......@@ -28,7 +29,7 @@ public abstract class Executor {
public static Executor build(){
return new LocalStreamExecutor(new ExecutorSetting(LOCAL));
return new LocalStreamExecutor(new ExecutorSetting(LOCAL,true));
}
public static Executor build(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting){
......@@ -92,4 +93,8 @@ public abstract class Executor {
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> var2){
stEnvironment.createTemporarySystemFunction(name,var2);
}
public CatalogManager getCatalogManager(){
return stEnvironment.getCatalogManager();
}
}
......@@ -18,6 +18,11 @@ public class ExecutorSetting {
this.type = type;
}
public ExecutorSetting(String type, boolean useSqlFragment) {
this.type = type;
this.useSqlFragment = useSqlFragment;
}
public ExecutorSetting(String type, Integer checkpoint) {
this.type = type;
this.checkpoint = checkpoint;
......
package com.dlink.explainer;
import com.dlink.executor.Executor;
import com.dlink.explainer.ca.ColumnCAGenerator;
import com.dlink.explainer.ca.ColumnCAResult;
import com.dlink.explainer.ca.TableCA;
import com.dlink.explainer.ca.TableCAGenerator;
import com.dlink.explainer.ca.TableCAResult;
import com.dlink.explainer.trans.Trans;
import com.dlink.explainer.trans.TransGenerator;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Optional;
/**
* Explainer
*
* @author wenmo
* @since 2021/6/22
**/
public class Explainer {
private Executor executor;
public Explainer(Executor executor) {
this.executor = executor;
}
public List<SqlExplainResult> explainSqlResult(String statement, ExplainDetail... extraDetails) {
String[] sqls = statement.split(";");
List<SqlExplainResult> sqlExplainRecords = new ArrayList<>();
for (int i = 0; i < sqls.length; i++) {
SqlExplainResult record = new SqlExplainResult();
try {
if (!FlinkInterceptor.build(executor.getCustomTableEnvironmentImpl(), sqls[i])) {
record = executor.explainSqlRecord(sqls[i], extraDetails);
if ("DDL".equals(record.getType())) {
executor.executeSql(sqls[i]);
}
}
}catch (Exception e){
e.printStackTrace();
record.setError(e.getMessage());
}finally {
record.setExplainTime(new Date());
record.setIndex(i+1);
record.setSql(sqls[i]);
sqlExplainRecords.add(record);
}
}
return sqlExplainRecords;
}
private List<TableCAResult> explainSqlTableCA(String statement,boolean onlyTable) {
List<SqlExplainResult> sqlExplainRecords = explainSqlResult(statement);
List<String> strPlans = new ArrayList<>();
for (int i = 0; i < sqlExplainRecords.size(); i++) {
if(sqlExplainRecords.get(i).getType()!=null&&sqlExplainRecords.get(i).getType().contains("DML")){
strPlans.add(sqlExplainRecords.get(i).getSql());
}
}
List<TableCAResult> results = new ArrayList<>();
for (int i = 0; i < strPlans.size(); i++) {
List<Trans> trans = translateTrans(translateObjectNode(strPlans.get(i)));
TableCAGenerator generator = new TableCAGenerator(trans);
if(onlyTable) {
generator.translateOnlyTable();
}else{
generator.translate();
}
results.add(new TableCAResult(generator));
}
if(results.size()>0){
CatalogManager catalogManager = executor.getCatalogManager();
for (int i = 0; i < results.size(); i++) {
TableCA sinkTableCA = (TableCA)results.get(i).getSinkTableCA();
if(sinkTableCA!=null){
ObjectIdentifier objectIdentifier = ObjectIdentifier.of(sinkTableCA.getCatalog(), sinkTableCA.getDatabase(), sinkTableCA.getTable());
Optional<CatalogManager.TableLookupResult> tableOpt = catalogManager.getTable(objectIdentifier);
if(tableOpt.isPresent()){
String[] fieldNames = tableOpt.get().getResolvedSchema().getFieldNames();
sinkTableCA.setFields(Arrays.asList(fieldNames));
}
}
}
}
return results;
}
public List<TableCAResult> explainSqlTableCA(String statement) {
return explainSqlTableCA(statement,true);
}
public List<TableCAResult> explainSqlTableColumnCA(String statement) {
return explainSqlTableCA(statement,false);
}
public List<ColumnCAResult> explainSqlColumnCA(String statement) {
List<SqlExplainResult> sqlExplainRecords = explainSqlResult(statement);
List<String> strPlans = new ArrayList<>();
for (int i = 0; i < sqlExplainRecords.size(); i++) {
if(sqlExplainRecords.get(i).getType().contains("DML")){
strPlans.add(sqlExplainRecords.get(i).getSql());
}
}
List<ColumnCAResult> results = new ArrayList<>();
for (int i = 0; i < strPlans.size(); i++) {
List<Trans> trans = translateTrans(translateObjectNode(strPlans.get(i)));
ColumnCAGenerator generator = new ColumnCAGenerator(trans);
TableCAGenerator tableGenerator = new TableCAGenerator(trans);
tableGenerator.translate();
generator.setSourceTableCAS(tableGenerator.getSourceTableCAS());
generator.translate();
results.add(new ColumnCAResult(generator));
}
return results;
}
private ObjectNode translateObjectNode(String strPlans){
return executor.getStreamGraph(strPlans);
}
private List<Trans> translateTrans(ObjectNode plan){
return new TransGenerator(plan).translateTrans();
}
}
package com.dlink.explainer.ca;
import com.dlink.plus.FlinkSqlPlus;
import java.util.ArrayList;
import java.util.List;
/**
* CABuilder
*
* @author qiwenkai
* @since 2021/6/23 11:03
**/
public class CABuilder {
public static List<TableCANode> getOneTableCAByStatement(String statement){
List<TableCANode> tableCANodes = new ArrayList<>();
FlinkSqlPlus plus = FlinkSqlPlus.build();
List<TableCAResult> results = plus.explainSqlTableCA(statement);
for (int j = 0; j < results.size(); j++) {
TableCAResult result = results.get(j);
TableCANode node = new TableCANode();
TableCA sinkTableCA = (TableCA)result.getSinkTableCA();
node.setName(sinkTableCA.getTableName());
List<TableCANode> children = new ArrayList<>();
for (int k = 0; k < result.getSourceTableCAS().size(); k++) {
children.add(new TableCANode(result.getSourceTableCAS().get(k).getTableName()));
}
node.setChildren(children);
tableCANodes.add(node);
}
return tableCANodes;
}
public static List<TableCANode> getOneTableColumnCAByStatement(String statement){
List<TableCANode> tableCANodes = new ArrayList<>();
FlinkSqlPlus plus = FlinkSqlPlus.build();
int id=1;
List<TableCAResult> results = plus.explainSqlTableColumnCA(statement);
for (int j = 0; j < results.size(); j++) {
TableCAResult result = results.get(j);
TableCA sinkTableCA = (TableCA)result.getSinkTableCA();
TableCANode node = new TableCANode(id++,sinkTableCA.getTableName(),sinkTableCA.getFields());
List<TableCANode> children = new ArrayList<>();
for (int k = 0; k < result.getSourceTableCAS().size(); k++) {
TableCA tableCA = (TableCA) result.getSourceTableCAS().get(k);
children.add(new TableCANode(id++,tableCA.getTableName(),tableCA.getFields()));
}
node.setChildren(children);
tableCANodes.add(node);
}
return tableCANodes;
}
}
package com.dlink.explainer.ca;
public interface CAGenerator {
void translate();
}
package com.dlink.explainer.ca;
import com.dlink.explainer.trans.Trans;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
/**
* ColumnCA
*
* @author wenmo
* @since 2021/6/22
**/
@Getter
@Setter
public class ColumnCA implements ICA{
private Integer id;
private Integer tableId;
private List<Integer> parentId;
private String name;
private String alias;
private String operation;
private String columnName;
private String familyName;
private String type;
private String columnType;
private Trans trans;
private TableCA tableCA;
private String tableName;
public ColumnCA(Integer id, String name, String alias, String columnName, String familyName,String operation, TableCA tableCA,Trans trans) {
this.id = id;
this.name = name;
this.alias = alias;
this.columnName = columnName;
this.familyName = familyName;
this.operation = operation;
this.tableCA = tableCA;
this.tableId = tableCA.getId();
this.tableName = tableCA.getName();
this.trans = trans;
this.type = trans.getPact();
}
public ColumnCA(Integer id, List<Integer> parentId, String name, String alias, String columnName, String familyName, String type, TableCA tableCA) {
this.id = id;
this.parentId = parentId;
this.name = name;
this.alias = alias;
this.columnName = columnName;
this.familyName = familyName;
this.type = type;
this.tableCA = tableCA;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public List<Integer> getParentId() {
return parentId;
}
public void setParentId(List<Integer> parentId) {
this.parentId = parentId;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAlias() {
return alias;
}
public void setAlias(String alias) {
this.alias = alias;
}
public String getColumnName() {
return columnName;
}
public void setColumnName(String columnName) {
this.columnName = columnName;
}
public String getFamilyName() {
return familyName;
}
public void setFamilyName(String familyName) {
this.familyName = familyName;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public TableCA getTableCA() {
return tableCA;
}
public void setTableCA(TableCA tableCA) {
this.tableCA = tableCA;
}
public Integer getTableId() {
return tableId;
}
public void setTableId(Integer tableId) {
this.tableId = tableId;
}
public String getColumnType() {
return columnType;
}
public void setColumnType(String columnType) {
this.columnType = columnType;
}
public Trans getTrans() {
return trans;
}
public void setTrans(Trans trans) {
this.trans = trans;
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public String getOperation() {
return operation;
}
public void setOperation(String operation) {
this.operation = operation;
}
@Override
public String toString() {
return "ColumnCA{" +
"id=" + id +
", parentId=" + parentId +
", name='" + name + '\'' +
", columnName='" + columnName + '\'' +
", familyName='" + familyName + '\'' +
", type='" + type + '\'' +
", tableCA=" + tableCA +
'}';
}
}
package com.dlink.explainer.ca;
import com.dlink.explainer.trans.Field;
import com.dlink.explainer.trans.OperatorTrans;
import com.dlink.explainer.trans.SinkTrans;
import com.dlink.explainer.trans.SourceTrans;
import com.dlink.explainer.trans.Trans;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* ColumnCAGenerator
*
* @author wenmo
* @since 2021/6/22
**/
public class ColumnCAGenerator implements CAGenerator {
private List<Trans> transList;
private Map<Integer, Trans> transMaps;
private Set<Integer> parentIdSet;
private List<ICA> sourceTableCAS = new ArrayList<>();
private List<ICA> columnCAS = new ArrayList<>();
private Map<Integer, ICA> columnCASMaps;
private Set<NodeRel> columnCASRel;
private ICA sinkTableCA = null;
private String sinkTableName;
private Integer index = 0;
private List<Integer> sinkColumns;
private List<Integer> sourceColumns;
public ColumnCAGenerator(List<Trans> transList) {
this.transList = transList;
this.transMaps = new HashMap<>();
this.parentIdSet = new HashSet<>();
this.columnCASMaps = new HashMap<>();
this.columnCASRel = new HashSet<>();
for (int i = 0; i < transList.size(); i++) {
this.transMaps.put(transList.get(i).getId(), transList.get(i));
if (transList.get(i).getParentId() != null) {
parentIdSet.add(transList.get(i).getParentId());
}
}
}
@Override
public void translate() {
for (int i = 0; i < transList.size(); i++) {
if (transList.get(i) instanceof SourceTrans) {
TableCA tableCA = new TableCA((SourceTrans) transList.get(i));
paddingFields(tableCA);
List<String> sourceFields = new ArrayList<>();
CollectionUtils.addAll(sourceFields, new Object[tableCA.getFields().size()]);
Collections.copy(sourceFields, tableCA.getFields());
for (int j = 0; j < sourceFields.size(); j++) {
String fieldName = sourceFields.get(j);
Integer id = index++;
ColumnCA columnCA = new ColumnCA(id, fieldName, fieldName, fieldName, fieldName,fieldName, tableCA,transList.get(i));
this.columnCASMaps.put(id, columnCA);
this.columnCAS.add(columnCA);
}
for (int j = 0; j < this.columnCAS.size(); j++) {
ColumnCA columnCA = (ColumnCA) this.columnCAS.get(j);
if (columnCA.getTableCA().getId() == tableCA.getId()) {
buildColumnCAFields(tableCA, tableCA.getParentId(), columnCA);
}
}
} else if (transList.get(i) instanceof SinkTrans) {
TableCA tableCA = new TableCA((SinkTrans) transList.get(i));
searchColumnCAId(tableCA);
this.sinkTableCA = tableCA;
this.sinkTableName = tableCA.getName();
}
}
}
private void searchColumnCAId(TableCA tableCA){
List<Integer> sufOnly = new ArrayList<>();
for (NodeRel nodeRel : this.columnCASRel) {
if(!sufOnly.contains(nodeRel.getSufId())) {
sufOnly.add(nodeRel.getSufId());
}
}
for (NodeRel nodeRel : this.columnCASRel) {
if(sufOnly.contains(nodeRel.getPreId())) {
sufOnly.remove(nodeRel.getPreId());
}
}
List<Integer> preOnly = new ArrayList<>();
for (NodeRel nodeRel : this.columnCASRel) {
if(!preOnly.contains(nodeRel.getPreId())) {
preOnly.add(nodeRel.getPreId());
}
}
for (NodeRel nodeRel : this.columnCASRel) {
if(preOnly.contains(nodeRel.getSufId())) {
preOnly.remove(nodeRel.getSufId());
}
}
for (int i = 0; i < sufOnly.size(); i++) {
ColumnCA columnCA = (ColumnCA)this.columnCASMaps.get(sufOnly.get(i));
List<String> fields = tableCA.getFields();
for (int j = 0; j < fields.size(); j++) {
if(columnCA.getAlias().equals(fields.get(j))){
tableCA.getColumnCAIds().add(sufOnly.get(i));
break;
}
}
}
this.sinkColumns = sufOnly;
this.sourceColumns = preOnly;
}
private void paddingFields(TableCA tableCA) {
for (int i = 0; i < this.sourceTableCAS.size(); i++) {
if (this.sourceTableCAS.get(i) instanceof TableCA) {
TableCA sourceTableCA = (TableCA) this.sourceTableCAS.get(i);
if (sourceTableCA.getId() == tableCA.getId()) {
tableCA.setFields(sourceTableCA.getFields());
}
}
}
}
private void buildColumnCAFields(TableCA tableCA, Integer id, ColumnCA columnCA) {
if (transMaps.get(id) instanceof OperatorTrans) {
OperatorTrans trans = (OperatorTrans) transMaps.get(id);
List<Field> selects = trans.getSelect();
if (selects != null && selects.size() > 0) {
for (int i = 0; i < selects.size(); i++) {
String operation = selects.get(i).getFragment();
String alias = selects.get(i).getAlias();
searchSelect(tableCA, columnCA, trans, operation, alias);
}
}
if (trans.getParentId() != null) {
buildColumnCAFields(tableCA, trans.getParentId(), columnCA);
}
}
}
private void searchSelect(TableCA tableCA, ColumnCA columnCA, OperatorTrans trans, String operation, String alias) {
if (operation.contains(" " + columnCA.getAlias() + " ") ||
operation.contains("(" + columnCA.getAlias() + " ") ||
operation.contains(" " + columnCA.getAlias() + ")")) {
boolean isHad = false;
Integer cid = null;
for (int j = 0; j < this.columnCAS.size(); j++) {
ColumnCA columnCA1 = (ColumnCA) this.columnCAS.get(j);
if (columnCA1.getTableCA().getId() == tableCA.getId() &&
columnCA1.getName().equals(operation)) {
isHad = true;
cid = columnCA1.getId();
break;
}
}
if (!isHad) {
cid = index++;
String columnOperation = operation.replaceAll(" " + columnCA.getAlias() + " "," " + columnCA.getOperation() + " ")
.replaceAll("\\(" + columnCA.getAlias() + " "," " + columnCA.getOperation() + " ")
.replaceAll(" " + columnCA.getAlias() + "\\)"," " + columnCA.getOperation() + " ");
ColumnCA columnCA2 = new ColumnCA(cid, operation, alias, operation, operation,columnOperation, tableCA,trans);
this.columnCASMaps.put(cid, columnCA2);
this.columnCAS.add(columnCA2);
buildColumnCAFields(tableCA, trans.getParentId(), columnCA2);
}
this.columnCASRel.add(new NodeRel(columnCA.getId(), cid));
}
}
public List<Trans> getTransList() {
return transList;
}
public void setTransList(List<Trans> transList) {
this.transList = transList;
}
public Map<Integer, Trans> getTransMaps() {
return transMaps;
}
public void setTransMaps(Map<Integer, Trans> transMaps) {
this.transMaps = transMaps;
}
public Set<Integer> getParentIdSet() {
return parentIdSet;
}
public void setParentIdSet(Set<Integer> parentIdSet) {
this.parentIdSet = parentIdSet;
}
public List<ICA> getSourceTableCAS() {
return sourceTableCAS;
}
public void setSourceTableCAS(List<ICA> sourceTableCAS) {
this.sourceTableCAS = sourceTableCAS;
}
public List<ICA> getColumnCAS() {
return columnCAS;
}
public void setColumnCAS(List<ICA> columnCAS) {
this.columnCAS = columnCAS;
}
public ICA getSinkTableCA() {
return sinkTableCA;
}
public void setSinkTableCA(ICA sinkTableCA) {
this.sinkTableCA = sinkTableCA;
}
public String getSinkTableName() {
return sinkTableName;
}
public void setSinkTableName(String sinkTableName) {
this.sinkTableName = sinkTableName;
}
public Map<Integer, ICA> getColumnCASMaps() {
return columnCASMaps;
}
public void setColumnCASMaps(Map<Integer, ICA> columnCASMaps) {
this.columnCASMaps = columnCASMaps;
}
public Set<NodeRel> getColumnCASRel() {
return columnCASRel;
}
public void setColumnCASRel(Set<NodeRel> columnCASRel) {
this.columnCASRel = columnCASRel;
}
public List<Integer> getSinkColumns() {
return sinkColumns;
}
public void setSinkColumns(List<Integer> sinkColumns) {
this.sinkColumns = sinkColumns;
}
public List<Integer> getSourceColumns() {
return sourceColumns;
}
public void setSourceColumns(List<Integer> sourceColumns) {
this.sourceColumns = sourceColumns;
}
}
package com.dlink.explainer.ca;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.List;
/**
* ColumnCANode
*
* @author qiwenkai
* @since 2021/6/23 11:03
**/
@Getter
@Setter
public class ColumnCANode implements Serializable {
private static final long serialVersionUID = 122624200268430762L;
private Integer id;
private Integer tableId;
private String name;
private String title;
private String value;
private String type;
private String operation;
// private Tables tables;
// private Columns columns;
private List<ColumnCANode> children;
public ColumnCANode() {
}
}
package com.dlink.explainer.ca;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* ColumnCAResult
*
* @author wenmo
* @since 2021/6/22
**/
@Getter
@Setter
public class ColumnCAResult {
private String sinkName;
private ICA sinkTableCA;
private List<ICA> columnCAS;
private Map<Integer, ICA> columnCASMaps;
private Set<NodeRel> columnCASRel;
private List<Integer> sinkColumns;
private List<Integer> sourColumns;
public ColumnCAResult(ColumnCAGenerator generator) {
this.columnCAS = generator.getColumnCAS();
this.sinkTableCA = generator.getSinkTableCA();
this.sinkName = generator.getSinkTableName();
this.columnCASMaps = generator.getColumnCASMaps();
this.columnCASRel = generator.getColumnCASRel();
this.sinkColumns = generator.getSinkColumns();
this.sourColumns = generator.getSourceColumns();
}
}
package com.dlink.explainer.ca;
public interface ICA {
Integer getId() ;
String getTableName();
}
package com.dlink.explainer.ca;
import java.util.Objects;
/**
* NodeRel
*
* @author wenmo
* @since 2021/6/22
**/
public class NodeRel {
private Integer preId;
private Integer sufId;
public NodeRel(Integer preId, Integer sufId) {
this.preId = preId;
this.sufId = sufId;
}
public Integer getPreId() {
return preId;
}
public void setPreId(Integer preId) {
this.preId = preId;
}
public Integer getSufId() {
return sufId;
}
public void setSufId(Integer sufId) {
this.sufId = sufId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NodeRel nodeRel = (NodeRel) o;
return Objects.equals(preId, nodeRel.preId) &&
Objects.equals(sufId, nodeRel.sufId);
}
@Override
public int hashCode() {
return Objects.hash(preId, sufId);
}
}
package com.dlink.explainer.ca;
import com.dlink.explainer.trans.SinkTrans;
import com.dlink.explainer.trans.SourceTrans;
import lombok.Getter;
import lombok.Setter;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* TableCA
*
* @author wenmo
* @since 2021/6/22
**/
@Getter
@Setter
public class TableCA implements ICA{
private Integer id;
private Integer parentId;
private String name;
private String catalog;
private String database;
private String table;
private String type;
private List<String> fields;
private List<String> useFields;
private List<String> alias;
private Set<Integer> columnCAIds = new HashSet<>();
private Integer parallelism;
public TableCA(SourceTrans trans) {
this.id = trans.getId();
this.parentId = trans.getParentId();
this.name = trans.getName();
this.catalog = trans.getCatalog();
this.database = trans.getDatabase();
this.table = trans.getTable();
this.fields = trans.getFields();
this.useFields = trans.getFields();
this.parallelism = trans.getParallelism();
this.type = trans.getPact();
}
public TableCA(SinkTrans trans) {
this.id = trans.getId();
this.parentId = trans.getParentId();
this.name = trans.getName();
this.catalog = trans.getCatalog();
this.database = trans.getDatabase();
this.table = trans.getTable();
this.fields = trans.getFields();
this.useFields = trans.getFields();
this.parallelism = trans.getParallelism();
this.type = trans.getPact();
}
@Override
public String toString() {
return "TableCA{" +
"id=" + id +
", parentId=" + parentId +
", name='" + name + '\'' +
", catalog='" + catalog + '\'' +
", database='" + database + '\'' +
", table='" + table + '\'' +
", fields=" + fields +
", useFields=" + useFields +
", parallelism=" + parallelism +
'}';
}
@Override
public String getTableName() {
return this.table;
}
}
package com.dlink.explainer.ca;
import com.dlink.explainer.trans.Field;
import com.dlink.explainer.trans.OperatorTrans;
import com.dlink.explainer.trans.SinkTrans;
import com.dlink.explainer.trans.SourceTrans;
import com.dlink.explainer.trans.Trans;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* TableCAGenerator
*
* @author wenmo
* @since 2021/6/22
**/
public class TableCAGenerator implements CAGenerator {
private List<Trans> transList;
private Map<Integer, Trans> transMaps;
private Set<Integer> parentIdSet;
private List<ICA> sourceTableCAS = new ArrayList<>();
private ICA sinkTableCA = null;
private String sinkTableName;
public TableCAGenerator(List<Trans> transList) {
this.transList = transList;
this.transMaps = new HashMap<>();
this.parentIdSet = new HashSet<>();
for (int i = 0; i < transList.size(); i++) {
this.transMaps.put(transList.get(i).getId(),transList.get(i));
if(transList.get(i).getParentId()!=null) {
parentIdSet.add(transList.get(i).getParentId());
}
}
}
@Override
public void translate() {
for (int i = 0; i < transList.size(); i++) {
if(transList.get(i) instanceof SourceTrans) {
TableCA tableCA = new TableCA((SourceTrans) transList.get(i));
List<String> sourceFields = new ArrayList<>();
CollectionUtils.addAll(sourceFields, new Object[tableCA.getFields().size()]);
Collections.copy(sourceFields, tableCA.getFields());
for (int j = 0; j < sourceFields.size(); j++) {
buildTableCAFields(tableCA,tableCA.getParentId(),sourceFields.get(j));
}
this.sourceTableCAS.add(tableCA);
}else if(transList.get(i) instanceof SinkTrans) {
TableCA tableCA = new TableCA((SinkTrans) transList.get(i));
this.sinkTableCA = tableCA;
this.sinkTableName = tableCA.getName();
}
}
}
public void translateOnlyTable() {
for (int i = 0; i < transList.size(); i++) {
if(transList.get(i) instanceof SourceTrans) {
this.sourceTableCAS.add(new TableCA((SourceTrans) transList.get(i)));
}else if(transList.get(i) instanceof SinkTrans) {
TableCA tableCA = new TableCA((SinkTrans) transList.get(i));
this.sinkTableCA = tableCA;
this.sinkTableName = tableCA.getName();
}
}
}
private void buildTableCAFields(TableCA tableCA,Integer id,String field){
if(transMaps.get(id) instanceof OperatorTrans){
OperatorTrans trans = (OperatorTrans) transMaps.get(id);
searchSelectFields(tableCA, trans.getSelect(),field);
searchWhereFields(tableCA, trans.getWhere(),field);
if(trans.getParentId()!=null){
buildTableCAFields(tableCA,trans.getParentId(),field);
}
}
}
private void searchSelectFields(TableCA tableCA, List<Field> selects, String field){
if(selects!=null&&selects.size()>0){
for (int i = 0; i < selects.size(); i++){
List<String> fields = matchFields( selects.get(i).getFragment(),field);
/*if(tableCA.getFields().contains(field)){
tableCA.getFields().remove(field);
}*/
for (int j = 0; j < fields.size(); j++) {
if(!tableCA.getUseFields().contains(fields.get(j))){
tableCA.getUseFields().add(fields.get(j));
}
}
}
}
}
private void searchWhereFields(TableCA tableCA,String wheres,String field){
if(wheres!=null&&!"[]".equals(wheres)){
List<String> fields = matchFields( wheres,field);
/*if(tableCA.getFields().contains(field)){
tableCA.getFields().remove(field);
}*/
for (int j = 0; j < fields.size(); j++) {
if(!tableCA.getUseFields().contains(fields.get(j))){
tableCA.getUseFields().add(fields.get(j));
}
}
}
}
private List<String> matchFields(String fragement,String field){
List<String> fields = new ArrayList<>();
Pattern p = Pattern.compile(field+"\\.(.*?) ");
Matcher m = p.matcher(fragement+" ");
while(m.find()){
fields.add(m.group(0).replaceFirst("\\)","").trim());
}
if(fragement.equals(field)){
fields.add(fragement.trim());
}
return fields;
}
public List<Trans> getTransList() {
return transList;
}
public Map<Integer, Trans> getTransMaps() {
return transMaps;
}
public Set<Integer> getParentIdSet() {
return parentIdSet;
}
public List<ICA> getSourceTableCAS() {
return sourceTableCAS;
}
public ICA getSinkTableCA() {
return sinkTableCA;
}
public String getSinkTableName() {
return sinkTableName;
}
}
package com.dlink.explainer.ca;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.List;
/**
* TableCANode
*
* @author qiwenkai
* @since 2021/6/23 11:03
**/
@Getter
@Setter
public class TableCANode implements Serializable {
private static final long serialVersionUID = 356665302973181483L;
private String id;
private Integer tableId;
private String name;
private String title;
private String value;
private String type;
private Integer columnSize;
// private Tables tables;
private List<String> columns;
private List<TableCANode> children;
public TableCANode() {
}
public TableCANode(String name) {
this.name = name;
}
public TableCANode(String name, String value) {
this.name = name;
this.value = value;
}
public TableCANode(Integer id,String name, List<String> columns) {
this.id = id.toString();
this.name = name;
this.title = name;
this.columnSize = columns.size();
this.columns = columns;
}
}
package com.dlink.explainer.ca;
import java.util.List;
/**
* TableCAResult
*
* @author wenmo
* @since 2021/6/22
**/
public class TableCAResult {
private String sinkName;
private List<ICA> sourceTableCAS ;
private ICA sinkTableCA;
public TableCAResult(TableCAGenerator generator) {
this.sourceTableCAS = generator.getSourceTableCAS();
this.sinkTableCA = generator.getSinkTableCA();
this.sinkName = generator.getSinkTableName();
}
public String getSinkName() {
return sinkName;
}
public void setSinkName(String sinkName) {
this.sinkName = sinkName;
}
public List<ICA> getSourceTableCAS() {
return sourceTableCAS;
}
public void setSourceTableCAS(List<ICA> sourceTableCAS) {
this.sourceTableCAS = sourceTableCAS;
}
public ICA getSinkTableCA() {
return sinkTableCA;
}
public void setSinkTableCA(ICA sinkTableCA) {
this.sinkTableCA = sinkTableCA;
}
}
package com.dlink.explainer.trans;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* AbstractTrans
*
* @author wenmo
* @since 2021/6/22
**/
public abstract class AbstractTrans {
protected Integer id;
protected Integer parentId;
protected String name;
protected String type;
protected String text;
protected Integer stage;
protected String contents;
protected String pact;
protected Integer parallelism;
protected List<Predecessor> predecessors;
public void build(JsonNode node) {
id = node.get("id").asInt();
text = node.toPrettyString();
stage = id;
type = matchType(node.get("type").asText());
name = matchType(node.get("type").asText());
pact = node.get("pact").asText();
contents = matchContents(node.get("contents").asText());
translate();
parallelism = node.get("parallelism").asInt();
predecessors = new ArrayList<>();
if (node.has("predecessors")) {
JsonNode predecessornodes = node.get("predecessors");
for (JsonNode predecessor : predecessornodes) {
predecessors.add(new Predecessor(predecessor.get("id").asInt(), predecessor.get("ship_strategy").asText(), predecessor.get("side").asText()));
}
}
}
abstract void translate();
public static String matchType(String str){
Pattern p = Pattern.compile("(.*?)\\(");
Matcher m = p.matcher(str);
String type = null;
if(m.find()){
type = m.group(0).replaceAll("\\(", "").trim();
}else{
type = str;
}
return type;
}
public static String matchPact(String str){
Pattern p = Pattern.compile(": (.*?)$");
Matcher m = p.matcher(str);
String pact = null;
if(m.find()){
pact = m.group(0).replaceAll(": ", "").trim();
}else{
pact = str;
}
return pact;
}
public static String matchContents(String str){
Pattern p = Pattern.compile("\\((.*?)$");
Matcher m = p.matcher(str);
String contents = null;
if(m.find()){
contents = m.group(0).replaceFirst("\\(", "").trim();
contents = contents.substring(0, contents.lastIndexOf(")"));
}else{
contents = str;
}
return contents;
}
public static String matchStage(String str){
Pattern p = Pattern.compile("Stage (.*?) :");
Matcher m = p.matcher(str);
String type = null;
if(m.find()){
type = m.group(0).replaceFirst("Stage ", "").replaceFirst(" :", "").trim();
}
return type;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Integer getParentId() {
return parentId;
}
public void setParentId(Integer parentId) {
this.parentId = parentId;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
public Integer getStage() {
return stage;
}
public void setStage(Integer stage) {
this.stage = stage;
}
public String getContents() {
return contents;
}
public void setContents(String contents) {
this.contents = contents;
}
public String getPact() {
return pact;
}
public void setPact(String pact) {
this.pact = pact;
}
public Integer getParallelism() {
return parallelism;
}
public void setParallelism(Integer parallelism) {
this.parallelism = parallelism;
}
public List<Predecessor> getPredecessors() {
return predecessors;
}
public void setPredecessors(List<Predecessor> predecessors) {
this.predecessors = predecessors;
}
}
package com.dlink.explainer.trans;
/**
* Field
*
* @author wenmo
* @since 2021/6/22
**/
public class Field {
private String fragment;
private String alias;
public Field(String fragment) {
this.fragment = fragment;
this.alias = fragment;
}
public Field(String fragment, String alias) {
this.fragment = fragment;
this.alias = alias;
}
public String getFragment() {
return fragment;
}
public void setFragment(String fragment) {
this.fragment = fragment;
}
public String getAlias() {
return alias;
}
public void setAlias(String alias) {
this.alias = alias;
}
}
package com.dlink.explainer.trans;
import com.dlink.utils.MapParseUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* OperatorTrans
*
* @author wenmo
* @since 2021/6/22
**/
public class OperatorTrans extends AbstractTrans implements Trans {
private List<Field> select;
private List<String> joinType;
private String where;
private List<String> leftInputSpec;
private List<String> rightInputSpec;
public final static String TRANS_TYPE = "Operator";
private final static String FIELD_SEPARATOR = " AS ";
public List<Field> getSelect() {
return select;
}
public List<String> getJoinType() {
return joinType;
}
public String getWhere() {
return where;
}
public List<String> getLeftInputSpec() {
return leftInputSpec;
}
public List<String> getRightInputSpec() {
return rightInputSpec;
}
@Override
public String getHandle() {
return TRANS_TYPE;
}
@Override
public boolean canHandle(String pact) {
return TRANS_TYPE.equals(pact);
}
@Override
public void translate() {
name = pact;
Map map = MapParseUtils.parse(contents,"where");
translateSelect((ArrayList<String>) map.get("select"));
joinType = (ArrayList<String>) map.get("joinType");
where = map.containsKey("where")?map.get("where").toString():null;
leftInputSpec = (ArrayList<String>) map.get("leftInputSpec");
rightInputSpec = (ArrayList<String>) map.get("rightInputSpec");
}
private void translateSelect(ArrayList<String> fieldStrs){
if(fieldStrs!=null&&fieldStrs.size()>0) {
select = new ArrayList<>();
for (int i = 0; i < fieldStrs.size(); i++) {
String fieldStr = fieldStrs.get(i);
if(fieldStr.toUpperCase().contains(FIELD_SEPARATOR)){
String [] fieldNames = fieldStr.split(FIELD_SEPARATOR);
if(fieldNames.length==2) {
select.add(new Field(fieldNames[0], fieldNames[1]));
}else if(fieldNames.length==1) {
select.add(new Field(fieldNames[0]));
}else{
List<String> fieldNameList = new ArrayList<>();
for (int j = 0; j < fieldNames.length-1; j++) {
fieldNameList.add(fieldNames[j]);
}
select.add(new Field(StringUtils.join(fieldNameList,FIELD_SEPARATOR),fieldNames[fieldNames.length-1]));
}
}else{
select.add(new Field(fieldStr));
}
}
}
}
@Override
public String asSummaryString() {
return null;
}
}
package com.dlink.explainer.trans;
import lombok.Getter;
import lombok.Setter;
/**
* Predecessor
*
* @author wenmo
* @since 2021/6/22
**/
@Getter
@Setter
public class Predecessor {
private Integer id;
private String shipStrategy;
private String side;
public Predecessor(Integer id, String shipStrategy, String side) {
this.id = id;
this.shipStrategy = shipStrategy;
this.side = side;
}
@Override
public String toString() {
return "Predecessor{" +
"id=" + id +
", shipStrategy='" + shipStrategy + '\'' +
", side='" + side + '\'' +
'}';
}
}
package com.dlink.explainer.trans;
import com.dlink.utils.MapParseUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* SinkTrans
*
* @author wenmo
* @since 2021/6/22
**/
public class SinkTrans extends AbstractTrans implements Trans {
private String catalog;
private String database;
private String table;
private List<String> fields;
public final static String TRANS_TYPE = "Data Sink";
public SinkTrans() {
}
public String getCatalog() {
return catalog;
}
public void setCatalog(String catalog) {
this.catalog = catalog;
}
public String getDatabase() {
return database;
}
public void setDatabase(String database) {
this.database = database;
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
public List<String> getFields() {
return fields;
}
public void setFields(List<String> fields) {
this.fields = fields;
}
@Override
public String getHandle() {
return TRANS_TYPE;
}
@Override
public boolean canHandle(String pact) {
return TRANS_TYPE.equals(pact);
}
@Override
public void translate() {
Map map = MapParseUtils.parse(contents);
ArrayList<String> tables = (ArrayList<String>) map.get("table");
if(tables!=null&&tables.size()>0) {
name = tables.get(0);
String [] names = tables.get(0).split("\\.");
if (names.length >= 3) {
catalog = names[0];
database = names[1];
table = names[2];
} else {
table = name;
}
}
fields = (ArrayList<String>) map.get("fields");
}
@Override
public String asSummaryString() {
return "";
}
}
package com.dlink.explainer.trans;
import com.dlink.utils.MapParseUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* SourceTrans
*
* @author wenmo
* @since 2021/6/22
**/
public class SourceTrans extends AbstractTrans implements Trans {
private String catalog;
private String database;
private String table;
private List<String> project;
private List<String> fields;
public final static String TRANS_TYPE = "Data Source";
public SourceTrans() {
}
public String getCatalog() {
return catalog;
}
public String getDatabase() {
return database;
}
public String getTable() {
return table;
}
public List<String> getProject() {
return project;
}
public List<String> getFields() {
return fields;
}
@Override
public String getHandle() {
return TRANS_TYPE;
}
@Override
public boolean canHandle(String pact) {
return TRANS_TYPE.equals(pact);
}
@Override
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
/*params.put("originalQuery", catalogView.getOriginalQuery());
params.put("expandedQuery", catalogView.getExpandedQuery());
params.put("identifier", viewIdentifier);
params.put("ignoreIfExists", ignoreIfExists);
params.put("isTemporary", isTemporary);*/
return OperationUtils.formatWithChildren(
"CREATE VIEW", params, Collections.emptyList(), Operation::asSummaryString);
}
@Override
public void translate() {
Map map = MapParseUtils.parse(contents);
ArrayList<ArrayList<Object>> tables = (ArrayList<ArrayList<Object>>) map.get("table");
ArrayList<Object> names = tables.get(0);
if (names.size() == 4) {
project = (ArrayList<String>)((Map) names.get(3)).get("project");
names.remove(3);
}
name = StringUtils.join(names, ".");
if (names.size() >= 3) {
catalog = names.get(0).toString();
database = names.get(1).toString();
table = names.get(2).toString();
} else {
table = name;
}
fields = (ArrayList<String>) map.get("fields");
}
}
package com.dlink.explainer.trans;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
/**
* Trans
*
* @author wenmo
* @since 2021/6/22
**/
public interface Trans {
String getHandle();
boolean canHandle(String pact);
void build(JsonNode node);
void translate();
String asSummaryString();
Integer getId();
List<Predecessor> getPredecessors();
void setParentId(Integer parentId);
Integer getParentId();
String getPact();
}
package com.dlink.explainer.trans;
import com.dlink.exception.SqlException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* TransGenerator
*
* @author wenmo
* @since 2021/6/22
**/
public class TransGenerator {
private ObjectNode plan;
public TransGenerator(ObjectNode plan) {
this.plan = plan;
}
public static Trans get(String pact) {
switch (pact) {
case OperatorTrans.TRANS_TYPE:
return new OperatorTrans();
case SourceTrans.TRANS_TYPE:
return new SourceTrans();
case SinkTrans.TRANS_TYPE:
return new SinkTrans();
default:
return null;
}
}
public List<Trans> translateTrans() {
JsonNode nodes = plan.get("nodes");
List<Trans> nodeList = new ArrayList<>();
Map<Integer, Trans> nodemap = new HashMap<>();
for (JsonNode node : nodes) {
String pact = node.get("pact").asText();
Trans trans = get(pact);
if (trans==null) {
throw new SqlException("该转换无法被解析,原文如下:" + pact);
}
trans.build(node);
nodemap.put(trans.getId(), trans);
}
setParentId(nodemap);
for (Map.Entry<Integer, Trans> entry : nodemap.entrySet()) {
nodeList.add(entry.getValue());
}
return nodeList;
}
private void setParentId(Map<Integer, Trans> nodemap) {
for (Map.Entry<Integer, Trans> entry : nodemap.entrySet()) {
Trans trans = entry.getValue();
List<Predecessor> predecessors = trans.getPredecessors();
if (predecessors == null || predecessors.size() == 0) {
continue;
}
for (int i = 0; i < predecessors.size(); i++) {
nodemap.get(predecessors.get(i).getId()).setParentId(trans.getId());
}
}
}
}
package com.dlink.plus;
import com.dlink.executor.Executor;
import com.dlink.explainer.Explainer;
import com.dlink.explainer.ca.ColumnCAResult;
import com.dlink.explainer.ca.TableCAResult;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.table.api.ExplainDetail;
import java.util.ArrayList;
import java.util.List;
/**
* FlinkSqlPlus
*
* @author wenmo
* @since 2021/6/22
**/
public class FlinkSqlPlus {
private Executor executor;
private Explainer explainer;
public FlinkSqlPlus(Executor executor) {
this.executor = executor;
this.explainer = new Explainer(executor);
}
public static FlinkSqlPlus build(){
return new FlinkSqlPlus(Executor.build());
}
public List<SqlResult> executeSql(String sql) {
if (sql == null || "".equals(sql)) {
return new ArrayList<>();
}
String[] sqls = sql.split(";");
List<SqlResult> sqlResults = new ArrayList<>();
try {
for (int i = 0; i < sqls.length; i++) {
sqlResults.add(new SqlResult(executor.executeSql(sqls[i])));
}
} catch (Exception e) {
e.printStackTrace();
sqlResults.add(new SqlResult(false, e.getMessage()));
return sqlResults;
}
return sqlResults;
}
public SqlResult execute(String sql) {
if (sql == null || "".equals(sql)) {
return SqlResult.NULL;
}
try {
return new SqlResult(executor.executeSql(sql));
} catch (Exception e) {
return new SqlResult(false,e.getMessage());
}
}
public List<SqlExplainResult> explainSqlRecord(String statement, ExplainDetail... extraDetails) {
return explainer.explainSqlResult(statement,extraDetails);
}
public List<TableCAResult> explainSqlTableColumnCA(String statement) {
return explainer.explainSqlTableColumnCA(statement);
}
public List<TableCAResult> explainSqlTableCA(String statement) {
return explainer.explainSqlTableCA(statement);
}
public List<ColumnCAResult> explainSqlColumnCA(String statement) {
return explainer.explainSqlColumnCA(statement);
}
public String getStreamGraphString(String statement) {
return executor.getStreamGraphString(statement);
}
public ObjectNode getStreamGraph(String statement) {
return executor.getStreamGraph(statement);
}
}
package com.dlink.plus;
import org.apache.flink.table.api.TableResult;
/**
* SqlResult
*
* @author wenmo
* @since 2021/6/22
**/
public class SqlResult {
private TableResult tableResult;
private boolean isSuccess = true;
private String errorMsg;
public static SqlResult NULL = new SqlResult(false,"未检测到有效的Sql");
public SqlResult(TableResult tableResult) {
this.tableResult = tableResult;
}
public SqlResult(boolean isSuccess, String errorMsg) {
this.isSuccess = isSuccess;
this.errorMsg = errorMsg;
}
public TableResult getTableResult() {
return tableResult;
}
public void setTableResult(TableResult tableResult) {
this.tableResult = tableResult;
}
public boolean isSuccess() {
return isSuccess;
}
public void setSuccess(boolean success) {
isSuccess = success;
}
public String getErrorMsg() {
return errorMsg;
}
public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
}
package com.dlink.utils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Stack;
import java.util.stream.Collectors;
/**
* MapParseUtils
*
* @author wenmo
* @since 2021/6/22
**/
public class MapParseUtils {
/**
* 数组是否嵌套
*
* @param inStr
* @return
*/
public static Boolean getStrIsNest(String inStr) {
if (inStr == null || inStr.isEmpty()) {
return false;
}
Deque<Integer> stack = new LinkedList<>();
for (int i = 0; i < inStr.length(); i++) {
if (inStr.charAt(i) == '[') {
stack.push(i);
}
if (inStr.charAt(i) == ']') {
stack.pop();
if (stack.size() != 0) {
return true;
}
}
}
return false;
}
/**
* 获取嵌套最外层的下标对 table=[[default_catalog, default_database, score, project=[sid, cls, score]]], fields=[sid, cls, score]
* ^(下标x) ^(下标y) ^(下标z) ^(下标n)
* List<Integer> [x, y, z, n]
*
* @param inStr
* @return
*/
public static List<Integer> getNestList(String inStr) {
Stack nestIndexList = new Stack();
if (inStr == null || inStr.isEmpty()) {
return nestIndexList;
}
Deque<Integer> stack = new LinkedList<>();
for (int i = 0; i < inStr.length(); i++) {
if (inStr.charAt(i) == '[') {
if (stack.isEmpty()) {
nestIndexList.add(i);
}
stack.push(i);
}
if (inStr.charAt(i) == ']') {
stack.pop();
if (stack.size() == 0) {
nestIndexList.add(i);
}
}
}
return nestIndexList;
}
/**
* 转换map
*
* @param inStr
* @return
*/
public static Map parse(String inStr,String... blackKeys) {
if (getStrIsNest(inStr)) {
return parseForNest(inStr,blackKeys);
} else {
return parseForNotNest(inStr);
}
}
/**
* 嵌套解析
*
* @param inStr
* @return
*/
public static Map parseForNest(String inStr,String... blackKeys) {
Map map = new HashMap();
List<Integer> nestList = getNestList(inStr);
int num = nestList.size() / 2;
for (int i = 0; i < num; i++) {
if (i == 0) {
String substring = inStr.substring(0, nestList.get(i + 1) + 1);
String key = getMapKey(substring);
boolean isNext = true;
for (int j = 0; j < blackKeys.length; j++) {
if(key.equals(blackKeys[j])){
isNext = false;
}
}
if(isNext) {
if (getStrIsNest(substring)) {
map.put(key, getMapListNest(substring));
} else {
map.put(key, getMapList(substring));
}
}else{
map.put(key, getTextValue(substring));
}
} else {
String substring = inStr.substring(nestList.get(2 * i - 1) + 2, nestList.get(2 * i + 1) + 1);
String key = getMapKey(substring);
boolean isNext = true;
for (int j = 0; j < blackKeys.length; j++) {
if(key.equals(blackKeys[j])){
isNext = false;
}
}
if(isNext) {
if (getStrIsNest(substring)) {
map.put(key, getMapListNest(substring));
} else {
map.put(key, getMapList(substring));
}
}else{
map.put(key, getTextValue(substring));
}
}
}
return map;
}
/**
* 非嵌套解析
*
* @param inStr
* @return
*/
public static Map parseForNotNest(String inStr) {
String[] split = inStr.split("], ");
Map map = new HashMap();
for (int i = 0; i < split.length; i++) {
if(i == split.length -1){
map.put(getMapKey( split[i]), getMapList(split[i]));
}else {
map.put(getMapKey( split[i]+ "]"), getMapList(split[i] + "]"));
}
}
return map;
}
/**
* 获取主键 例子where=[(sid = sid0)] =[ 前即key
*
* @param splitStr
* @return
*/
public static String getMapKey(String splitStr) {
if (splitStr == null || splitStr.indexOf("=[") == -1) {
return "";
}
return splitStr.substring(0, splitStr.indexOf("=[")).replace(" ", "");
}
/**
* 获取主键对应的集合值 例子where=[(sid = sid0)] []中内容为集合内容
*
* @param splitStr
* @return
*/
public static List getMapList(String splitStr) {
if (splitStr == null || splitStr.indexOf("[") == -1 || splitStr.indexOf("]") == -1) {
return new ArrayList();
}
return Arrays.stream(splitStr.substring(splitStr.indexOf("[") + 1, splitStr.lastIndexOf("]")).split(", ")).collect(Collectors.toList());
}
/**
* 获取嵌套主键对应的集合值 例子table=[[default_catalog, default_database, score, project=[sid, cls, score]]] []中内容为集合内容
*
* @param splitStr
* @return
*/
public static List getMapListNest(String splitStr) {
List list = new ArrayList();
if (splitStr == null || splitStr.indexOf("[") == -1 || splitStr.indexOf("]") == -1) {
return new ArrayList();
}
String substring = splitStr.substring(splitStr.indexOf("[") + 1, splitStr.lastIndexOf("]")).trim();
//样例 [default_catalog, default_database, score, project=[sid, cls, score]]
if (substring.startsWith("[")) {
//还是一个集合
list.add(getMapListNest(substring));
} else {
//不是一个集合 而是元素时 default_catalog, default_database, score, project=[sid, cls, score], course=[en, ds, as]
//嵌套所以 还会有[]
List<Integer> nestList = getNestList(substring);
int num = nestList.size() / 2;
String[] str = new String[num];
for (int i = 0; i < num; i++) {
str[i] = substring.substring(nestList.get(2 * i), nestList.get(2 * i + 1) + 1);
}
//倒叙替换 去除集合内容干扰
for (int i = num - 1; i >= 0; i--) {
substring = substring.substring(0, nestList.get(2 * i)) + "_str" + i + "_" + substring.substring(nestList.get(2 * i + 1) + 1);
}
//去除干扰后 default_catalog, default_database, score, project=_str0_, course=_str1_
// _str0_ = [sid, cls, score]
// _str1_ = [en, ds, as]
String[] split = substring.split(", ");
int index = 0;
for (String s : split) {
if (s.startsWith("[")) {
list.add(getMapListNest(splitStr));
} else if (s.indexOf("_str") != -1) {
// project=_str0_ 还原集合干扰 project=[sid, cls, score]
list.add(parseForNest(s.replace("_str" + index + "_", str[index])));
index++;
} else {
list.add(s);
}
}
}
return list;
}
private static String getTextValue(String splitStr){
return splitStr.substring(splitStr.indexOf("[") + 1, splitStr.lastIndexOf("]"));
}
}
package com.dlink.core;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.explainer.ca.CABuilder;
import com.dlink.explainer.ca.TableCANode;
import com.dlink.explainer.ca.TableCAResult;
import com.dlink.job.JobManager;
import com.dlink.parser.SingleSqlParserFactory;
import com.dlink.plus.FlinkSqlPlus;
import com.dlink.result.SubmitResult;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* FlinkSqlPlusTest
*
* @author wenmo
* @since 2021/6/23 10:37
**/
public class FlinkSqlPlusTest {
@Test
public void tableCATest(){
String sql1 ="CREATE TABLE student (\n" +
" sid INT,\n" +
" name STRING,\n" +
" PRIMARY KEY (sid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://10.1.51.25:3306/dataxweb?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',\n" +
" 'username'='dfly',\n" +
" 'password'='Dareway',\n" +
" 'table-name' = 'student'\n" +
")";
String sql2 ="CREATE TABLE man (\n" +
" pid INT,\n" +
" name STRING,\n" +
" PRIMARY KEY (pid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://10.1.51.25:3306/dataxweb?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',\n" +
" 'username'='dfly',\n" +
" 'password'='Dareway',\n" +
" 'table-name' = 'man'\n" +
")";
String sql3 = "INSERT INTO man SELECT sid as pid,name from student";
List<String> sqls = new ArrayList<>();
sqls.add(sql1);
sqls.add(sql2);
sqls.add(sql3);
FlinkSqlPlus plus = FlinkSqlPlus.build();
// List<TableCAResult> tableCAResults = plus.explainSqlTableColumnCA(String.join(";", sqls));
// List<TableCANode> tableCANodes = CABuilder.getOneTableCASByStatement(String.join(";", sqls));
List<TableCANode> tableCANodes = CABuilder.getOneTableColumnCAByStatement(String.join(";", sqls));
System.out.println(tableCANodes.toString());
}
}
......@@ -46,6 +46,7 @@
"not ie <= 10"
],
"dependencies": {
"@ant-design/charts": "^1.1.17",
"@ant-design/icons": "^4.5.0",
"@ant-design/pro-descriptions": "^1.6.8",
"@ant-design/pro-form": "^1.18.3",
......
import { Tabs,Empty,Tooltip,Button } from "antd";
import { IndentedTreeGraph } from '@ant-design/charts';
import {SearchOutlined} from "@ant-design/icons";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
import styles from "./index.less";
import { getCAByStatement} from "@/pages/FlinkSqlStudio/service";
import {useState} from "react";
import React from 'react';
const { TabPane } = Tabs;
const data = {
id: '青年X高收入',
title: '青年X高收入',
body: '1,323,945,835',
children: [
{
id: '青年',
title: {
content: '青年',
style: {
fill: 'yellow',
},
},
body: {
content: '89,133,24',
style: {
fill: 'red',
},
},
footer: {
content: '占比',
value: '30%',
style: {
fill: '#aaa',
},
valueStyle: {
fill: '#000',
},
},
children: [
{ id: 'A11', title: '15~17', body: '89,133,24' },
{
id: 'A12',
title: '17~19',
footer: {
content: '占比',
value: '30%',
},
},
{ id: 'A13', title: '19~21', body: '89,133,24' },
{ id: 'A14', title: '21~24', body: '89,133,24' },
],
},
{
id: 'A2',
title: '高收入',
body: '761,871,877',
},
],
};
const StudioCA = (props:any) => {
const {current} = props;
const [oneTableCAData,setOneTableCAData] = useState<any>(null);
const nodeStateStyles = {
hover: {
stroke: '#1890ff',
lineWidth: 2,
},
selected: {
stroke: '#f00',
lineWidth: 3,
},
};
const config = {
data:oneTableCAData,
behaviors: ['drag-canvas', 'zoom-canvas', 'drag-node'],
bodyStyle: {
fill: '#aaa',
},
nodeStateStyles,
onReady: (graph) => {
graph.on('node:mouseenter', (evt) => {
const item = evt.item;
graph.setItemState(item, 'hover', true);
});
graph.on('node:mouseleave', (evt) => {
const item = evt.item;
graph.setItemState(item, 'hover', false);
});
},
edgeStyle: (item, graph) => {
/**
* graph.findById(item.target).getModel()
* item.source: 获取 source 数据
* item.target: 获取 target 数据
*/
// console.log(graph.findById(item.target).getModel());
return {
stroke: '#40a9ff',
lineWidth: graph.findById(item.target).getModel().columnSize,
strokeOpacity: 0.5,
};
},
nodeStyle: () => {
return {
stroke: '#40a9ff',
};
},
};
const getOneTableCA=()=>{
const res = getCAByStatement({
statement:current.value,
type: 1,
});
res.then((result)=>{
if(result.code==0){
setOneTableCAData(convertTreeData(result.datas[0]));
}else{
setOneTableCAData(null);
}
})
};
const convertTreeData=(node)=>{
if(node){
node.body=node.columns.toString();
for(let i in node.children){
node.children[i] = convertTreeData(node.children[i])
}
return node;
}
return null;
};
return (
<Tabs defaultActiveKey="OneTableCA" size="small" tabPosition="left" >
<TabPane
tab={
<span>
单表表级血缘
</span>
}
key="OneTableCA"
>
<div>
<div style={{float: "left"}}>
<Tooltip title="重新计算血缘">
<Button
type="text"
icon={<SearchOutlined />}
onClick={getOneTableCA}
/>
</Tooltip>
</div>
{oneTableCAData!=null?<IndentedTreeGraph {...config} />:<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />}
</div>
</TabPane>
<TabPane
tab={
<span>
单表字段级血缘
</span>
}
key="OneColumnCA"
>
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
</TabPane>
<TabPane
tab={
<span>
全局表级血缘
</span>
}
key="AllTableCA"
>
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
</TabPane>
<TabPane
tab={
<span>
全局字段级血缘
</span>
}
key="AllColumnCA"
>
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
</TabPane>
</Tabs>
);
};
export default connect(({ Studio }: { Studio: StateType }) => ({
current: Studio.current,
}))(StudioCA);
......@@ -8,6 +8,7 @@ import StudioMsg from "./StudioMsg";
import StudioTable from "./StudioTable";
import StudioHistory from "./StudioHistory";
import StudioFX from "./StudioFX";
import StudioCA from "./StudioCA";
const { TabPane } = Tabs;
......@@ -59,7 +60,7 @@ const StudioConsole = (props:any) => {
}
key="StudioConsanguinity"
>
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
<StudioCA />
</TabPane>
<TabPane
tab={
......
export type BaseDataSourceField ={
fields:[{
label?:string,
displayName?:string,
aliasName?:string,
kind?:any,
insertText?:string,
insertTextRules?:any,
detail?:string,
export type BaseDataSourceField = {
fields: [{
label?: string,
displayName?: string,
aliasName?: string,
kind?: any,
insertText?: string,
insertTextRules?: any,
detail?: string,
}]
}
export type BaseDataSourceHeader ={
fields:[{
label?:string,
displayName?:string,
aliasName?:string,
kind?:any,
insertText?:string,
insertTextRules?:any,
detail?:string,
export type BaseDataSourceHeader = {
fields: [{
label?: string,
displayName?: string,
aliasName?: string,
kind?: any,
insertText?: string,
insertTextRules?: any,
detail?: string,
}]
}
export type CompletionItem ={
label?:string,
kind?:any,
insertText?:string,
insertTextRules?:any,
detail?:string,
export type CompletionItem = {
label?: string,
kind?: any,
insertText?: string,
insertTextRules?: any,
detail?: string,
}
export type StudioParam = {
statement:string,
export type StudioParam = {
statement: string,
checkPoint?: number,
savePointPath?: string,
parallelism?: number,
fragment?: boolean,
clusterId: number,
session:string,
session: string,
maxRowNum?: number,
}
}
export type CAParam = {
statement: string,
type: number,
}
......@@ -73,7 +73,7 @@ const Studio: React.FC<StudioProps> = (props) => {
</Col>
<Col span={16}>
<StudioTabs/>
{/*<StudioEdit/>*/}
{/*<StudioConsole/>*/}
</Col>
<Col span={4} className={styles["vertical-tabs"]}>
<Tabs defaultActiveKey="1" size="small" tabPosition="right" style={{ height: "100%",border: "1px solid #f0f0f0"}}>
......
import request from 'umi-request';
import {StudioParam} from "@/components/Studio/StudioEdit/data";
import {CAParam, StudioParam} from "@/components/Studio/StudioEdit/data";
export async function executeSql(params: StudioParam) {
return request<API.Result>('/api/studio/executeSql', {
......@@ -27,3 +27,12 @@ export async function getCatalogueTreeData(params?: StudioParam) {
},
});
}
export async function getCAByStatement(params: CAParam) {
return request<API.Result>('/api/studio/getCAByStatement', {
method: 'POST',
data: {
...params,
},
});
}
......@@ -260,6 +260,16 @@ export default (): React.ReactNode => {
</ul>
</Paragraph>
</Timeline.Item>
<Timeline.Item><Text code>0.3.0</Text> <Text type="secondary">2021-07-??</Text>
<p> </p>
<Paragraph>
<ul>
<li>
<Link>实现了单任务的表级血缘分析</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
</Timeline>
</Card>
</PageContainer>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment