Commit a3f5b303 authored by wenmo's avatar wenmo

批流异步SELECT

parent ea76fcad
......@@ -5,9 +5,7 @@ import com.dlink.dto.StudioCADTO;
import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.job.JobResult;
import com.dlink.model.Task;
import com.dlink.result.IResult;
import com.dlink.result.RunResult;
import com.dlink.service.StudioService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
......@@ -49,6 +47,14 @@ public class StudioController {
return Result.succeed(result,"执行成功");
}
/**
* 根据jobId获取数据
*/
@GetMapping("/getJobData")
public Result getJobData(@RequestParam String jobId) {
return Result.succeed(studioService.getJobData(jobId),"获取成功");
}
/**
* 获取单表的血缘分析
*/
......
......@@ -5,8 +5,7 @@ import com.dlink.dto.StudioExecuteDTO;
import com.dlink.explainer.ca.TableCANode;
import com.dlink.job.JobResult;
import com.dlink.result.IResult;
import com.dlink.result.RunResult;
import org.apache.flink.table.planner.expressions.In;
import com.dlink.result.SelectResult;
import java.util.List;
......@@ -22,6 +21,8 @@ public interface StudioService {
IResult executeDDL(StudioDDLDTO studioDDLDTO);
SelectResult getJobData(String jobId);
boolean clearSession(String session);
List<TableCANode> getOneTableCAByStatement(String statement);
......
......@@ -16,6 +16,7 @@ import com.dlink.job.JobResult;
import com.dlink.model.Cluster;
import com.dlink.result.IResult;
import com.dlink.result.RunResult;
import com.dlink.result.SelectResult;
import com.dlink.service.ClusterService;
import com.dlink.service.StudioService;
import com.dlink.session.SessionPool;
......@@ -93,6 +94,11 @@ public class StudioServiceImpl implements StudioService {
return jobManager.executeDDL(studioDDLDTO.getStatement());
}
@Override
public SelectResult getJobData(String jobId) {
return JobManager.getJobData(jobId);
}
@Override
public boolean clearSession(String session) {
if(SessionPool.remove(session)>0){
......
......@@ -37,17 +37,17 @@
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId>
<scope>provided</scope>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc</artifactId>
<scope>provided</scope>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-function</artifactId>
<scope>provided</scope>
<!--<scope>provided</scope>-->
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -348,4 +348,8 @@ public class JobManager extends RunTime {
}
return new ErrorResult();
}
public static SelectResult getJobData(String jobId){
return ResultPool.get(jobId);
}
}
......@@ -4,6 +4,9 @@ import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* DDLResult
......@@ -15,8 +18,25 @@ import java.time.LocalDateTime;
@Getter
public class DDLResult extends AbstractResult implements IResult {
private List<Map<String,Object>> rowData;
private Integer total;
private Set<String> columns;
public DDLResult(boolean success) {
this.success = success;
this.endTime = LocalDateTime.now();
}
public DDLResult(List<Map<String, Object>> rowData, Integer total, Set<String> columns) {
this.rowData = rowData;
this.total = total;
this.columns = columns;
this.success = true;
this.endTime = LocalDateTime.now();
}
@Override
public String getJobId() {
return null;
}
}
......@@ -14,4 +14,9 @@ public class ErrorResult extends AbstractResult implements IResult {
this.success = false;
this.endTime = LocalDateTime.now();
}
@Override
public String getJobId() {
return null;
}
}
......@@ -11,4 +11,6 @@ import java.time.LocalDateTime;
public interface IResult {
void setStartTime(LocalDateTime startTime);
String getJobId();
}
......@@ -22,4 +22,9 @@ public class InsertResult extends AbstractResult implements IResult {
this.success = success;
this.endTime = LocalDateTime.now();
}
@Override
public String getJobId() {
return jobID;
}
}
......@@ -14,9 +14,10 @@ public interface ResultBuilder {
static ResultBuilder build(String operationType, Integer maxRowNum, String nullColumn, boolean printRowKind){
switch (operationType.toUpperCase()){
case SelectResultBuilder.OPERATION_TYPE:
return new SelectResultBuilder(maxRowNum,nullColumn,printRowKind);
case FlinkSQLConstant.SHOW:
case FlinkSQLConstant.DESCRIBE:
return new SelectResultBuilder(maxRowNum,nullColumn,printRowKind);
return new ShowResultBuilder(nullColumn,printRowKind);
case InsertResultBuilder.OPERATION_TYPE:
return new InsertResultBuilder();
default:
......
package com.dlink.result;
import java.util.HashMap;
import java.util.Map;
/**
* ResultPool
*
* @author wenmo
* @since 2021/7/1 22:20
*/
public class ResultPool {
private static volatile Map<String,SelectResult> results = new HashMap<String,SelectResult>();
public static boolean containsKey(String key){
return results.containsKey(key);
}
public static void put(SelectResult result) {
results.put(result.getJobId(),result);
}
public static SelectResult get(String key){
if(results.containsKey(key)){
return results.get(key);
}else{
return SelectResult.buildDestruction(key);
}
}
public static boolean remove(String key){
if(results.containsKey(key)){
results.remove(key);
return true;
}
return false;
}
public static void clear(){
results.clear();
}
}
package com.dlink.result;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.util.*;
import java.util.stream.Stream;
/**
* ResultRunnable
*
* @author wenmo
* @since 2021/7/1 22:50
*/
public class ResultRunnable implements Runnable {
private TableResult tableResult;
private Integer maxRowNum;
private boolean printRowKind;
private String nullColumn;
public ResultRunnable(TableResult tableResult, Integer maxRowNum, boolean printRowKind, String nullColumn) {
this.tableResult = tableResult;
this.maxRowNum = maxRowNum;
this.printRowKind = printRowKind;
this.nullColumn = nullColumn;
}
@Override
public void run() {
if(tableResult.getJobClient().isPresent()) {
String jobId = tableResult.getJobClient().get().getJobID().toHexString();
if (!ResultPool.containsKey(jobId)) {
ResultPool.put(new SelectResult(jobId, new ArrayList<Map<String, Object>>(), new LinkedHashSet<String>()));
}
try {
catchData(ResultPool.get(jobId));
}catch (Exception e){
}
}
}
private void catchData(SelectResult selectResult){
List<TableColumn> columns = tableResult.getTableSchema().getTableColumns();
String[] columnNames = columns.stream().map(TableColumn::getName).map(s -> s.replace(" ", "")).toArray((x$0) -> {
return (new String[x$0]);
});
if (printRowKind) {
columnNames = Stream.concat(Stream.of("op"), Arrays.stream(columnNames)).toArray((x$0) -> {
return new String[x$0];
});
}
Set<String> column = new LinkedHashSet(Arrays.asList(columnNames));
selectResult.setColumns(column);
long numRows = 0L;
List<Map<String, Object>> rows = selectResult.getRowData();
Iterator<Row> it = tableResult.collect();
while (it.hasNext()) {
if (numRows < maxRowNum) {
String[] cols = rowToString(it.next());
Map<String, Object> row = new HashMap<>();
for (int i = 0; i < cols.length; i++) {
if (i > columnNames.length) {
/*column.add("UKN" + i);
row.put("UKN" + i, cols[i]);*/
} else {
// column.add(columnNames[i]);
row.put(columnNames[i], cols[i]);
}
}
rows.add(row);
numRows++;
} else {
break;
}
}
}
private String[] rowToString(Row row) {
int len = printRowKind ? row.getArity() + 1 : row.getArity();
List<String> fields = new ArrayList(len);
if (printRowKind) {
fields.add(row.getKind().shortString());
}
for (int i = 0; i < row.getArity(); ++i) {
Object field = row.getField(i);
if (field == null) {
fields.add(nullColumn);
} else {
fields.add(StringUtils.arrayAwareToString(field));
}
}
return fields.toArray(new String[0]);
}
}
......@@ -23,6 +23,7 @@ public class SelectResult extends AbstractResult implements IResult{
private Integer total;
private Integer currentCount;
private Set<String> columns;
private boolean isDestroyed;
public SelectResult(List<Map<String, Object>> rowData, Integer total, Integer currentCount, Set<String> columns,
String jobID,boolean success) {
......@@ -32,7 +33,40 @@ public class SelectResult extends AbstractResult implements IResult{
this.columns = columns;
this.jobID = jobID;
this.success = success;
// this.endTime = LocalDateTime.now();
this.isDestroyed = false;
}
public SelectResult(String jobID,List<Map<String, Object>> rowData, Set<String> columns) {
this.jobID = jobID;
this.rowData = rowData;
this.total = rowData.size();
this.columns = columns;
this.success = true;
this.isDestroyed = false;
}
public SelectResult(String jobID, boolean isDestroyed, boolean success) {
this.jobID = jobID;
this.isDestroyed = isDestroyed;
this.success = success;
this.endTime = LocalDateTime.now();
}
@Override
public String getJobId() {
return jobID;
}
public static SelectResult buildDestruction(String jobID){
return new SelectResult(jobID,true,false);
}
public static SelectResult buildSuccess(String jobID){
return new SelectResult(jobID,false,true);
}
public static SelectResult buildFailed(){
return new SelectResult(null,false,false);
}
}
......@@ -31,8 +31,17 @@ public class SelectResultBuilder implements ResultBuilder {
@Override
public IResult getResult(TableResult tableResult) {
String jobId = null;
if(tableResult.getJobClient().isPresent()) {
if (tableResult.getJobClient().isPresent()) {
String jobId = tableResult.getJobClient().get().getJobID().toHexString();
ResultRunnable runnable = new ResultRunnable(tableResult, maxRowNum, printRowKind, nullColumn);
Thread thread = new Thread(runnable, jobId);
thread.start();
return SelectResult.buildSuccess(jobId);
}else{
return SelectResult.buildFailed();
}
/*String jobId = null;
if (tableResult.getJobClient().isPresent()) {
jobId = tableResult.getJobClient().get().getJobID().toHexString();
}
List<TableColumn> columns = tableResult.getTableSchema().getTableColumns();
......@@ -69,24 +78,7 @@ public class SelectResultBuilder implements ResultBuilder {
numRows++;
totalCount++;
}
return new SelectResult(rows, totalCount, rows.size(), column, jobId, true);
}
public String[] rowToString(Row row) {
int len = printRowKind ? row.getArity() + 1 : row.getArity();
List<String> fields = new ArrayList(len);
if (printRowKind) {
fields.add(row.getKind().shortString());
}
for (int i = 0; i < row.getArity(); ++i) {
Object field = row.getField(i);
if (field == null) {
fields.add(nullColumn);
} else {
fields.add(StringUtils.arrayAwareToString(field));
}
}
return (String[]) fields.toArray(new String[0]);
return new SelectResult(rows, totalCount, rows.size(), column, jobId, true);*/
}
}
package com.dlink.result;
import com.dlink.constant.FlinkSQLConstant;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.util.*;
import java.util.stream.Stream;
/**
* ShowResultBuilder
*
* @author wenmo
* @since 2021/7/1 23:57
*/
public class ShowResultBuilder implements ResultBuilder {
public static final String OPERATION_TYPE = FlinkSQLConstant.SHOW;
private boolean printRowKind;
private String nullColumn;
public ShowResultBuilder(String nullColumn, boolean printRowKind) {
this.printRowKind = printRowKind;
this.nullColumn = nullColumn;
}
@Override
public IResult getResult(TableResult tableResult) {
List<TableColumn> columns = tableResult.getTableSchema().getTableColumns();
Set<String> column = new LinkedHashSet();
String[] columnNames = columns.stream().map(TableColumn::getName).map(s -> s.replace(" ", "")).toArray((x$0) -> {
return (new String[x$0]);
});
if (printRowKind) {
columnNames = Stream.concat(Stream.of("op"), Arrays.stream(columnNames)).toArray((x$0) -> {
return new String[x$0];
});
}
List<Map<String, Object>> rows = new ArrayList<>();
Iterator<Row> it = tableResult.collect();
while (it.hasNext()) {
String[] cols = rowToString(it.next());
Map<String, Object> row = new HashMap<>();
for (int i = 0; i < cols.length; i++) {
if (i > columnNames.length) {
column.add("UKN" + i);
row.put("UKN" + i, cols[i]);
} else {
column.add(columnNames[i]);
row.put(columnNames[i], cols[i]);
}
}
rows.add(row);
}
return new DDLResult(rows, rows.size(), column);
}
private String[] rowToString(Row row) {
int len = printRowKind ? row.getArity() + 1 : row.getArity();
List<String> fields = new ArrayList(len);
if (printRowKind) {
fields.add(row.getKind().shortString());
}
for (int i = 0; i < row.getArity(); ++i) {
Object field = row.getField(i);
if (field == null) {
fields.add(nullColumn);
} else {
fields.add(StringUtils.arrayAwareToString(field));
}
}
return fields.toArray(new String[0]);
}
}
......@@ -4,6 +4,7 @@ import {connect} from "umi";
import {useState} from "react";
// import Highlighter from 'react-highlight-words';
import { SearchOutlined } from '@ant-design/icons';
import {showJobData} from "@/components/Studio/StudioEvent/DQL";
const { Option } = Select;
const { Title, Paragraph, Text, Link } = Typography;
......@@ -11,7 +12,7 @@ const { Title, Paragraph, Text, Link } = Typography;
const StudioTable = (props:any) => {
const {current} = props;
const {current,result,dispatch} = props;
const [dataIndex,setDataIndex] = useState<number>(0);
const [searchText,setSearchText] = useState<string>('');
const [searchedColumn,setSearchedColumn] = useState<string>('');
......@@ -95,8 +96,8 @@ const StudioTable = (props:any) => {
return datas;
};
const onChange=(val:number)=>{
setDataIndex(val);
const onChange=(val:string)=>{
showJobData(val,dispatch);
};
return (
......@@ -109,25 +110,27 @@ const StudioTable = (props:any) => {
onChange={onChange}
>
{current.console.result.map((item,index)=> {
if(item.status=='SUCCESS') {
let tag = (<> <Tooltip placement="topLeft" title={item.statement}><Tag color="processing">{item.finishDate}</Tag>
<Text underline>[{item.sessionId}:{item.flinkHost}:{item.flinkPort}]</Text>
{item.jobName&&<Text code>{item.jobName}</Text>}
if(item.status=='SUCCESS'&&item.jobId) {
let tag = (<> <Tooltip placement="topLeft" title={item.statement}>
<Tag color="processing">{item.startTime}</Tag>
<Tag color="processing">{item.endTime}</Tag>
<Text underline>[{item.jobConfig.sessionKey}:{item.jobConfig.host}]</Text>
{item.jobConfig.jobName&&<Text code>{item.jobConfig.jobName}</Text>}
{item.jobId&&<Text code>{item.jobId}</Text>}
<Text keyboard>{item.time}ms</Text>
{item.statement}</Tooltip></>);
return (<Option value={index} label={tag}>
return (<Option value={item.jobId} label={tag}>
{tag}
</Option>)
}
})}
</Select>
</Form.Item>
{current.console.result[dataIndex]&&current.console.result[dataIndex].result?(<Table dataSource={current.console.result[dataIndex].result.rowData} columns={getColumns(current.console.result[dataIndex].result.columns)} />):(<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />)}
{result&&result.jobId&&!result.isDestroyed?(<Table dataSource={result.rowData} columns={getColumns(result.columns)} />):(<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />)}
</Typography>
);
};
export default connect(({ Studio }: { Studio: StateType }) => ({
current: Studio.current,
result: Studio.result,
}))(StudioTable);
......@@ -7,9 +7,9 @@ export function showTables(task:TaskType,dispatch:any) {
statement:FlinkSQL.SHOW_TABLES,
clusterId: task.clusterId,
session:task.session,
isRemote:task.isRemote,
isSession:task.isSession,
isResult:true,
useRemote:task.useRemote,
useSession:task.useSession,
useResult:true,
});
res.then((result)=>{
let tableData = [];
......
import {getJobData} from "@/pages/FlinkSqlStudio/service";
export function showJobData(jobId:string,dispatch:any) {
const res = getJobData(jobId);
res.then((result)=>{
dispatch&&dispatch({
type: "Studio/saveResult",
payload: result.datas,
});
});
}
......@@ -313,7 +313,7 @@ const StudioTree: React.FC<StudioTreeProps> = (props) => {
};
const onSelect = (selectedKeys:[], e:any) => {
if(e.node.isLeaf) {
if(e.node&&e.node.isLeaf) {
dispatch({
type: "Studio/saveCurrentPath",
payload: e.node.path,
......
......@@ -91,6 +91,7 @@ export type StateType = {
currentPath?: string[];
tabs: TabsType;
session: string[];
result:{};
rightClickMenu?: boolean;
};
......@@ -110,6 +111,7 @@ export type ModelType = {
saveSession: Reducer<StateType>;
showRightClickMenu: Reducer<StateType>;
refreshCurrentSessionCluster: Reducer<StateType>;
saveResult: Reducer<StateType>;
};
};
......@@ -193,6 +195,7 @@ const Model: ModelType = {
}],
},
session: [],
result:{},
rightClickMenu: false
},
......@@ -341,6 +344,14 @@ const Model: ModelType = {
},
};
},
saveResult(state, {payload}) {
return {
...state,
result: {
...payload
},
};
},
},
};
......
......@@ -19,6 +19,15 @@ export async function executeDDL(params: StudioParam) {
});
}
export async function getJobData(jobId:string) {
return request<API.Result>('/api/studio/getJobData', {
method: 'GET',
params: {
jobId,
},
});
}
export async function getCatalogueTreeData(params?: StudioParam) {
return request<API.Result>('/api/catalogue/getCatalogueTreeData', {
method: 'POST',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment