Commit 981881fc authored by wenmo's avatar wenmo

数据传递

parent c0b35d84
package com.dlink.assertion;
import com.dlink.exception.BusException;
import com.dlink.model.Cluster;
import com.dlink.model.Statement;
import com.dlink.model.Task;
/**
* Assert
*
* @author wenmo
* @since 2021/5/30 11:13
*/
public interface Assert {
static void check(Cluster cluster){
if(cluster.getId()==null){
throw new BusException("Flink集群不存在");
}
}
static void check(Task task){
if (task == null) {
throw new BusException("作业不存在");
}
}
static void check(Statement statement){
if (statement == null) {
throw new BusException("FlinkSql语句不存在");
}
}
static void checkHost(String host){
if (host == null || "".equals(host)) {
throw new BusException("集群地址暂不可用");
}
}
}
...@@ -82,4 +82,13 @@ public class CatalogueController { ...@@ -82,4 +82,13 @@ public class CatalogueController {
catalogue = catalogueService.getById(catalogue.getId()); catalogue = catalogueService.getById(catalogue.getId());
return Result.succeed(catalogue,"获取成功"); return Result.succeed(catalogue,"获取成功");
} }
/**
* 获取所有目录
*/
@PostMapping("/getCatalogueTreeData")
public Result getCatalogueTreeData() throws Exception {
List<Catalogue> catalogues = catalogueService.getAllData();
return Result.succeed(catalogues,"获取成功");
}
} }
package com.dlink.controller;
import com.dlink.common.result.Result;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.model.Task;
import com.dlink.result.RunResult;
import com.dlink.service.StudioService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* StudioController
*
* @author wenmo
* @since 2021/5/30 11:05
*/
@Slf4j
@RestController
@RequestMapping("/api/studio")
public class StudioController {
@Autowired
private StudioService studioService;
/**
* 执行Sql
*/
@PostMapping("/executeSql")
public Result executeSql(@RequestBody StudioExecuteDTO studioExecuteDTO) throws Exception {
RunResult runResult = studioService.executeSql(studioExecuteDTO);
return Result.succeed(runResult,"执行成功");
}
}
package com.dlink.dto;
import lombok.Getter;
import lombok.Setter;
/**
* StudioExecuteDTO
*
* @author wenmo
* @since 2021/5/30 11:09
*/
@Getter
@Setter
public class StudioExecuteDTO {
private String statement;
private Integer clusterId;
}
...@@ -3,6 +3,8 @@ package com.dlink.service; ...@@ -3,6 +3,8 @@ package com.dlink.service;
import com.dlink.db.service.ISuperService; import com.dlink.db.service.ISuperService;
import com.dlink.model.Catalogue; import com.dlink.model.Catalogue;
import java.util.List;
/** /**
* CatalogueService * CatalogueService
* *
...@@ -10,4 +12,6 @@ import com.dlink.model.Catalogue; ...@@ -10,4 +12,6 @@ import com.dlink.model.Catalogue;
* @since 2021/5/28 14:01 * @since 2021/5/28 14:01
**/ **/
public interface CatalogueService extends ISuperService<Catalogue> { public interface CatalogueService extends ISuperService<Catalogue> {
List<Catalogue> getAllData();
} }
package com.dlink.service;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.result.RunResult;
/**
* StudioService
*
* @author wenmo
* @since 2021/5/30 11:07
*/
public interface StudioService {
RunResult executeSql(StudioExecuteDTO studioExecuteDTO);
}
...@@ -6,6 +6,8 @@ import com.dlink.model.Catalogue; ...@@ -6,6 +6,8 @@ import com.dlink.model.Catalogue;
import com.dlink.service.CatalogueService; import com.dlink.service.CatalogueService;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List;
/** /**
* CatalogueServiceImpl * CatalogueServiceImpl
* *
...@@ -14,4 +16,8 @@ import org.springframework.stereotype.Service; ...@@ -14,4 +16,8 @@ import org.springframework.stereotype.Service;
**/ **/
@Service @Service
public class CatalogueServiceImpl extends SuperServiceImpl<CatalogueMapper, Catalogue> implements CatalogueService { public class CatalogueServiceImpl extends SuperServiceImpl<CatalogueMapper, Catalogue> implements CatalogueService {
@Override
public List<Catalogue> getAllData() {
return this.list();
}
} }
package com.dlink.service.impl;
import com.dlink.assertion.Assert;
import com.dlink.cluster.FlinkCluster;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.job.JobManager;
import com.dlink.model.Cluster;
import com.dlink.result.RunResult;
import com.dlink.service.ClusterService;
import com.dlink.service.StudioService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* StudioServiceImpl
*
* @author wenmo
* @since 2021/5/30 11:08
*/
@Service
public class StudioServiceImpl implements StudioService {
@Autowired
private ClusterService clusterService;
@Override
public RunResult executeSql(StudioExecuteDTO studioExecuteDTO) {
Cluster cluster = clusterService.getById(studioExecuteDTO.getClusterId());
Assert.check(cluster);
String host = FlinkCluster.testFlinkJobManagerIP(cluster.getHosts(), cluster.getJobManagerHost());
Assert.checkHost(host);
JobManager jobManager = new JobManager(host);
return jobManager.execute(studioExecuteDTO.getStatement(), new ExecutorSetting(Executor.REMOTE));
}
}
package com.dlink.service.impl; package com.dlink.service.impl;
import com.dlink.assertion.Assert;
import com.dlink.cluster.FlinkCluster; import com.dlink.cluster.FlinkCluster;
import com.dlink.db.service.impl.SuperServiceImpl; import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.exception.BusException; import com.dlink.exception.BusException;
...@@ -34,21 +35,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -34,21 +35,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override @Override
public SubmitResult submitByTaskId(Integer id) { public SubmitResult submitByTaskId(Integer id) {
Task task = this.getById(id); Task task = this.getById(id);
if (task == null) { Assert.check(task);
throw new BusException("作业不存在");
}
Cluster cluster = clusterService.getById(task.getClusterId()); Cluster cluster = clusterService.getById(task.getClusterId());
if (cluster == null) { Assert.check(cluster);
throw new BusException("Flink集群不存在");
}
Statement statement = statementService.getById(id); Statement statement = statementService.getById(id);
if (statement == null) { Assert.check(statement);
throw new BusException("FlinkSql语句不存在");
}
String host = FlinkCluster.testFlinkJobManagerIP(cluster.getHosts(), cluster.getJobManagerHost()); String host = FlinkCluster.testFlinkJobManagerIP(cluster.getHosts(), cluster.getJobManagerHost());
if (host == null || "".equals(host)) { Assert.checkHost(host);
throw new BusException("集群地址暂不可用");
}
JobManager jobManager = new JobManager(host); JobManager jobManager = new JobManager(host);
return jobManager.submit(statement.getStatement(), task.getRemoteExecutorSetting()); return jobManager.submit(statement.getStatement(), task.getRemoteExecutorSetting());
} }
......
...@@ -56,14 +56,14 @@ public class JobManager { ...@@ -56,14 +56,14 @@ public class JobManager {
this.port = port; this.port = port;
} }
public RunResult execute(String statement) { public RunResult execute(String statement,ExecutorSetting executorSetting) {
RunResult runResult = new RunResult(sessionId, statement, flinkHost); RunResult runResult = new RunResult(sessionId, statement, flinkHost);
Executor executor = null; Executor executor = null;
ExecutorEntity executorEntity = SessionPool.get(sessionId); ExecutorEntity executorEntity = SessionPool.get(sessionId);
if (executorEntity != null) { if (executorEntity != null) {
executor = executorEntity.getExecutor(); executor = executorEntity.getExecutor();
} else { } else {
executor = Executor.build(new EnvironmentSetting(flinkHost, FlinkConstant.PORT), new ExecutorSetting(Executor.REMOTE)); executor = Executor.build(new EnvironmentSetting(flinkHost, FlinkConstant.PORT), executorSetting);
SessionPool.push(new ExecutorEntity(sessionId, executor)); SessionPool.push(new ExecutorEntity(sessionId, executor));
} }
String[] Statements = statement.split(";"); String[] Statements = statement.split(";");
......
...@@ -5,8 +5,8 @@ import MonacoEditor from "react-monaco-editor"; ...@@ -5,8 +5,8 @@ import MonacoEditor from "react-monaco-editor";
import {BaseDataSourceField, BaseDataSourceHeader, CompletionItem} from "./data"; import {BaseDataSourceField, BaseDataSourceHeader, CompletionItem} from "./data";
import Completion from "./completion"; import Completion from "./completion";
import {executeSql} from "./service"; import {executeSql} from "@/pages/FlinkSqlStudio/service";
import {StateType} from "@/pages/Studio/model"; import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi"; import {connect} from "umi";
let provider = { let provider = {
...@@ -29,8 +29,8 @@ const FlinkSqlEditor = (props:any) => { ...@@ -29,8 +29,8 @@ const FlinkSqlEditor = (props:any) => {
selectOnLineNumbers: true, selectOnLineNumbers: true,
renderSideBySide: false, renderSideBySide: false,
}, },
// sql=props.data.sql, sql=props.catalogue.sql,
sql, // sql,
dispatch, dispatch,
} = props } = props
; ;
...@@ -166,5 +166,7 @@ return ( ...@@ -166,5 +166,7 @@ return (
}; };
export default connect(({ Studio }: { Studio: StateType }) => ({ export default connect(({ Studio }: { Studio: StateType }) => ({
current: Studio.current,
catalogue: Studio.catalogue,
sql: Studio.sql, sql: Studio.sql,
}))(FlinkSqlEditor); }))(FlinkSqlEditor);
...@@ -6,7 +6,7 @@ import Space from "antd/es/space"; ...@@ -6,7 +6,7 @@ import Space from "antd/es/space";
import Divider from "antd/es/divider"; import Divider from "antd/es/divider";
import Button from "antd/es/button/button"; import Button from "antd/es/button/button";
import Breadcrumb from "antd/es/breadcrumb/Breadcrumb"; import Breadcrumb from "antd/es/breadcrumb/Breadcrumb";
import {StateType} from "@/pages/Studio/model"; import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi"; import {connect} from "umi";
const { SubMenu } = Menu; const { SubMenu } = Menu;
......
import { Tabs } from 'antd'; import { Tabs } from 'antd';
import React from 'react'; import React from 'react';
import StudioEdit from "../StudioEdit"; import StudioEdit from "../StudioEdit";
import {connect} from "umi";
import {StateType} from "@/pages/FlinkSqlStudio/model";
const { TabPane } = Tabs; const { TabPane } = Tabs;
...@@ -96,4 +98,8 @@ class EditorTabs extends React.Component { ...@@ -96,4 +98,8 @@ class EditorTabs extends React.Component {
} }
} }
export default EditorTabs; export default connect(({ Studio }: { Studio: StateType }) => ({
current: Studio.current,
catalogue: Studio.catalogue,
sql: Studio.sql,
}))(EditorTabs);
import {DataNode} from "antd/lib/tree";
export type DataType = {
id:number;
parentId:number;
isDir:boolean;
isLeaf:boolean;
children:DataType[];
};
export interface TreeDataNode extends DataNode {
name:String;
parentId:number;
isDir:boolean;
}
export function convertToTreeData(data:TreeDataNode[], pid:number) {
const result:TreeDataNode[] = [];
let temp:TreeDataNode[] = [];
for (let i = 0; i < data.length; i++) {
if (data[i].parentId === pid) {
let obj = data[i];
temp = convertToTreeData(data, data[i].id);
if (temp.length > 0) {
obj.children = temp
}
obj.isLeaf = obj.isDir;
obj.title = obj.name;
result.push(obj)
}
}
return result
}
export function getLeafFromTree(data:DataType[], arr:DataType[]) {
if (typeof arr == 'undefined') {
arr = [];
}
for (let i = 0; i < data.length; i++) {
let sonList = data[i].children;
if (sonList) {
if (sonList.length == 0) {
arr.push(data[i]);
} else {
getLeafFromTree(sonList, arr);
}
} else {
arr.push(data[i]);
}
}
return arr;
}
export function getChildFromTree(data:DataType[], arr:DataType[]) {
if (typeof arr == 'undefined') {
arr = [];
}
for (let i = 0; i < data.length; i++) {
if (data[i].isParent) {
let sonList = data[i].children;
if (!sonList || sonList == null || sonList.length == 0) {
} else {
getChildFromTree(sonList, arr);
}
} else {
arr.push(data[i]);
}
}
return arr;
}
import React from "react"; import React, {useEffect, useState} from "react";
import {connect} from "umi"; import {connect} from "umi";
import {StateType} from "@/pages/Demo/FormStepForm/model"; import {StateType} from "@/pages/Demo/FormStepForm/model";
import {DownOutlined, FrownFilled, FrownOutlined, MehOutlined, SmileOutlined} from "@ant-design/icons"; import {DownOutlined, FrownFilled, FrownOutlined, MehOutlined, SmileOutlined} from "@ant-design/icons";
import { Tree, Input } from 'antd'; import {Tree, Input, Dropdown, Menu} from 'antd';
import {getCatalogueTreeData} from "@/pages/FlinkSqlStudio/service";
import {convertToTreeData, DataType, TreeDataNode} from "@/components/Studio/StudioTree/Function";
const { Search } = Input; const {Search} = Input;
type StudioTreeProps = { type StudioTreeProps = {};
}; /*const treeData = [
const treeData = [
{ {
title: 'parent 1', title: 'parent 1',
key: '0-0', key: '0-0',
...@@ -28,10 +28,7 @@ const treeData = [ ...@@ -28,10 +28,7 @@ const treeData = [
}, },
], ],
}, },
]; ];*/
const StudioTree: React.FC<StudioTreeProps> = (props) => { const StudioTree: React.FC<StudioTreeProps> = (props) => {
// state = { // state = {
...@@ -39,39 +36,49 @@ const StudioTree: React.FC<StudioTreeProps> = (props) => { ...@@ -39,39 +36,49 @@ const StudioTree: React.FC<StudioTreeProps> = (props) => {
// searchValue: '', // searchValue: '',
// autoExpandParent: true, // autoExpandParent: true,
// }; // };
const [treeData, setTreeData] = useState<TreeDataNode[]>();
const getTreeData = async () => {
const result = await getCatalogueTreeData();
let data = result.datas;
data = convertToTreeData(data, 0);
setTreeData(data);
};
useEffect(() => {
getTreeData();
}, []);
const onExpand =()=>{ const onExpand = () => {
// setState({ // setState({
// expandedKeys, // expandedKeys,
// autoExpandParent: false, // autoExpandParent: false,
// }) // })
}; };
const onChange =()=>{ const onChange = () => {
}; };
return ( return (
<div> <div>
<Search style={{ marginBottom: 8 }} placeholder="Search" onChange={onChange} /> <Search style={{marginBottom: 8}} placeholder="Search" onChange={onChange}/>
<Tree <Tree
onExpand={onExpand} onExpand={onExpand}
// expandedKeys={expandedKeys} // expandedKeys={expandedKeys}
// autoExpandParent={autoExpandParent} // autoExpandParent={autoExpandParent}
showIcon showIcon
showLine showLine
defaultExpandAll //defaultExpandAll
defaultSelectedKeys={['0-0-0']} switcherIcon={<DownOutlined/>}
switcherIcon={<DownOutlined />} treeData={treeData}
treeData={treeData} // treeData={treeData()}
/> />
</div> </div>
); );
}; };
export default connect(({ studio }: { studio: StateType }) => ({ export default connect(({studio}: { studio: StateType }) => ({}))(StudioTree);
}))(StudioTree);
...@@ -6,7 +6,7 @@ import StudioMenu from "./StudioMenu"; ...@@ -6,7 +6,7 @@ import StudioMenu from "./StudioMenu";
import {Row,Col,Card} from "antd"; import {Row,Col,Card} from "antd";
import StudioTree from "./StudioTree"; import StudioTree from "./StudioTree";
import StudioTabs from "./StudioTabs"; import StudioTabs from "./StudioTabs";
import {StateType} from "@/pages/Studio/model"; import {StateType} from "@/pages/FlinkSqlStudio/model";
type StudioProps = { type StudioProps = {
sql: StateType['sql']; sql: StateType['sql'];
...@@ -56,6 +56,8 @@ const Studio: React.FC<StudioProps> = ({ sql }) => { ...@@ -56,6 +56,8 @@ const Studio: React.FC<StudioProps> = ({ sql }) => {
// export default connect(mapStateToProps)(Studio); // export default connect(mapStateToProps)(Studio);
export default connect(({ Studio }: { Studio: StateType }) => ({ export default connect(({ Studio }: { Studio: StateType }) => ({
current: Studio.current,
catalogue: Studio.catalogue,
sql: Studio.sql, sql: Studio.sql,
}))(Studio); }))(Studio);
......
import {Effect, Reducer} from "umi"; import {Effect, Reducer} from "umi";
import {fakeSubmitForm} from "@/pages/Demo/FormStepForm/service"; import {executeSql} from "./service";
export type CatalogueType = {
id?: number;
taskId?: number;
sql?: string;
clusterId?: number;
}
export type StateType = { export type StateType = {
current?: string; current?: number;
catalogue: CatalogueType[];
sql?: string; sql?: string;
}; };
...@@ -18,16 +26,20 @@ export type ModelType = { ...@@ -18,16 +26,20 @@ export type ModelType = {
}; };
const Model: ModelType = { const Model: ModelType = {
namespace: 'Studio', namespace: 'Studio',
state: { state: {
current: 'info', current: 0,
catalogue: [{
sql: '',
}],
sql: '', sql: '',
}, },
effects: { effects: {
*executeSql({ payload }, { call, put }) { *executeSql({ payload }, { call, put }) {
yield call(fakeSubmitForm, payload); yield call(executeSql, payload);
yield put({ yield put({
type: 'saveStepFormData', type: 'saveStepFormData',
payload, payload,
...@@ -41,11 +53,15 @@ const Model: ModelType = { ...@@ -41,11 +53,15 @@ const Model: ModelType = {
reducers: { reducers: {
saveSql(state, { payload }) { saveSql(state, { payload }) {
const catalogues = state.catalogue;
for(let i=0;i<catalogues.length;i++){
if(catalogues[i].id==payload.id){
catalogues[i].sql=payload.sql;
}
}
return { return {
...state, ...state,
sql: { catalogue:catalogues,
...payload,
},
}; };
}, },
}, },
......
import request from 'umi-request'; import request from 'umi-request';
import {StudioParam} from "@/components/FlinkSqlEditor/data"; import {StudioParam} from "@/components/Studio/StudioEdit/data";
export async function executeSql(params: StudioParam) { export async function executeSql(params: StudioParam) {
return request('/api/studio/executeSql', { return request<API.Result>('/api/studio/executeSql', {
method: 'POST', method: 'POST',
data: { data: {
...params, ...params,
...@@ -10,3 +10,11 @@ export async function executeSql(params: StudioParam) { ...@@ -10,3 +10,11 @@ export async function executeSql(params: StudioParam) {
}); });
} }
export async function getCatalogueTreeData(params?: StudioParam) {
return request<API.Result>('/api/catalogue/getCatalogueTreeData', {
method: 'POST',
data: {
...params,
},
});
}
...@@ -2,7 +2,6 @@ import { PageContainer } from '@ant-design/pro-layout'; ...@@ -2,7 +2,6 @@ import { PageContainer } from '@ant-design/pro-layout';
import React, { useState, useEffect } from 'react'; import React, { useState, useEffect } from 'react';
import { Spin } from 'antd'; import { Spin } from 'antd';
import styles from './index.less'; import styles from './index.less';
import FlinkSqlEditor from '@/components/FlinkSqlEditor';
import * as monaco from 'monaco-editor'; import * as monaco from 'monaco-editor';
import Card from 'antd/es/card'; import Card from 'antd/es/card';
import DropdownSubMenu from './DropdownSubMenu'; import DropdownSubMenu from './DropdownSubMenu';
......
import {Effect, Reducer} from "umi";
import {fakeSubmitForm} from "@/pages/Demo/FormStepForm/service";
export type StateType = {
current?: string;
sql?: string;
};
export type ModelType = {
namespace: string;
state: StateType;
effects: {
executeSql: Effect;
};
reducers: {
saveSql: Reducer<StateType>;
};
};
const Model: ModelType = {
namespace: 'Studio',
state: {
current: 'info',
sql: '',
},
effects: {
*executeSql({ payload }, { call, put }) {
yield call(fakeSubmitForm, payload);
yield put({
type: 'saveStepFormData',
payload,
});
yield put({
type: 'saveCurrentStep',
payload: 'result',
});
},
},
reducers: {
saveSql(state, { payload }) {
return {
...state,
sql: payload,
};
},
},
};
export default Model;
...@@ -3,9 +3,9 @@ ...@@ -3,9 +3,9 @@
declare namespace API { declare namespace API {
type Result = { type Result = {
code?: number; code: number;
datas?: any; datas: any;
msg?: string; msg: string;
}; };
type CurrentUser = { type CurrentUser = {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment