Commit 9cb959f9 authored by wenmo's avatar wenmo

console

parent 65f8ba05
......@@ -12,6 +12,12 @@ import lombok.Setter;
@Getter
@Setter
public class StudioExecuteDTO {
private String session;
private String statement;
private Integer clusterId;
private Integer clusterId=0;
private Integer checkPoint=0;
private Integer parallelism=1;
private Integer maxRowNum=100;
private boolean fragment=false;
private String savePointPath;
}
......@@ -31,7 +31,9 @@ public class StudioServiceImpl implements StudioService {
Assert.check(cluster);
String host = FlinkCluster.testFlinkJobManagerIP(cluster.getHosts(), cluster.getJobManagerHost());
Assert.checkHost(host);
JobManager jobManager = new JobManager(host);
return jobManager.execute(studioExecuteDTO.getStatement(), new ExecutorSetting(Executor.REMOTE));
JobManager jobManager = new JobManager(host,studioExecuteDTO.getSession(),studioExecuteDTO.getMaxRowNum());
return jobManager.execute(studioExecuteDTO.getStatement(), new ExecutorSetting(
Executor.REMOTE,studioExecuteDTO.getCheckPoint(),studioExecuteDTO.getParallelism(),
studioExecuteDTO.isFragment(),studioExecuteDTO.getSavePointPath()));
}
}
spring:
datasource:
url: jdbc:mysql://10.1.51.25:3306/dlink?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: dfly
password: Dareway@2020
url: jdbc:mysql://127.0.0.1:3306/dlink?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: dlink
password: dlink
driver-class-name: com.mysql.cj.jdbc.Driver
application:
name: dlink
......
......@@ -12,7 +12,7 @@ import com.dlink.trans.Operations;
import org.apache.flink.api.common.JobID;
import org.apache.flink.table.api.TableResult;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
......@@ -44,6 +44,21 @@ public class JobManager {
}
}
public JobManager(String host,String sessionId, Integer maxRowNum) {
if(host!=null) {
String[] strs = host.split(":");
if(strs.length>=2) {
this.flinkHost = strs[0];
this.port = Integer.parseInt(strs[1]);
}else{
this.flinkHost = strs[0];
this.port = 8081;
}
}
this.sessionId = sessionId;
this.maxRowNum = maxRowNum;
}
public JobManager(String flinkHost, Integer port) {
this.flinkHost = flinkHost;
this.port = port;
......@@ -57,7 +72,7 @@ public class JobManager {
}
public RunResult execute(String statement,ExecutorSetting executorSetting) {
RunResult runResult = new RunResult(sessionId, statement, flinkHost);
RunResult runResult = new RunResult(sessionId, statement, flinkHost,port,executorSetting);
Executor executor = null;
ExecutorEntity executorEntity = SessionPool.get(sessionId);
if (executorEntity != null) {
......@@ -83,7 +98,8 @@ public class JobManager {
IResult result = ResultBuilder.build(operationType, maxRowNum, "", false).getResult(tableResult);
runResult.setResult(result);
runResult.setTime(timeElapsed);
runResult.setFinishDate(LocalDate.now());
runResult.setFinishDate(LocalDateTime.now());
runResult.setSuccess(true);
}
} catch (Exception e) {
e.printStackTrace();
......@@ -92,7 +108,9 @@ public class JobManager {
for (StackTraceElement s : trace) {
resMsg.append(" </br> " + s + " ");
}
runResult.setError(LocalDate.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "</br> >>>堆栈信息<<<" + resMsg.toString());
runResult.setFinishDate(LocalDateTime.now());
runResult.setSuccess(false);
runResult.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "</br> >>>堆栈信息<<<" + resMsg.toString());
return runResult;
}
return runResult;
......@@ -125,8 +143,8 @@ public class JobManager {
JobID jobID = tableResult.getJobClient().get().getJobID();
result.setSuccess(true);
result.setTime(timeElapsed);
result.setFinishDate(LocalDate.now());
InsertResult insertResult = new InsertResult(sqlText,(jobID == null ? "" : jobID.toHexString()),true,timeElapsed,LocalDate.now());
result.setFinishDate(LocalDateTime.now());
InsertResult insertResult = new InsertResult(sqlText,(jobID == null ? "" : jobID.toHexString()),true,timeElapsed,LocalDateTime.now());
result.setResult(insertResult);
} else {
executor.executeSql(sqlText);
......@@ -134,7 +152,7 @@ public class JobManager {
}
} else {
result.setSuccess(false);
result.setMsg(LocalDate.now().toString()+":执行sql语句为空。");
result.setMsg(LocalDateTime.now().toString()+":执行sql语句为空。");
return result;
}
} catch (Exception e) {
......@@ -144,12 +162,12 @@ public class JobManager {
for (StackTraceElement s : trace) {
resMsg.append(" </br> " + s + " ");
}
result.setError(LocalDate.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "</br> >>>堆栈信息<<<" + resMsg.toString());
result.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "</br> >>>堆栈信息<<<" + resMsg.toString());
return result;
}
result.setSuccess(true);
result.setMsg(LocalDate.now().toString() + ":任务提交成功!");
result.setMsg(LocalDateTime.now().toString() + ":任务提交成功!");
return result;
}
}
package com.dlink.result;
import java.time.LocalDate;
import java.time.LocalDateTime;
/**
* InsertResult
......@@ -13,9 +13,9 @@ public class InsertResult implements IResult {
private String jobID;
private boolean success;
private long time;
private LocalDate finishDate;
private LocalDateTime finishDate;
public InsertResult(String statement, String jobID, boolean success, long time, LocalDate finishDate) {
public InsertResult(String statement, String jobID, boolean success, long time, LocalDateTime finishDate) {
this.statement = statement;
this.jobID = jobID;
this.success = success;
......@@ -55,11 +55,11 @@ public class InsertResult implements IResult {
this.time = time;
}
public LocalDate getFinishDate() {
public LocalDateTime getFinishDate() {
return finishDate;
}
public void setFinishDate(LocalDate finishDate) {
public void setFinishDate(LocalDateTime finishDate) {
this.finishDate = finishDate;
}
}
package com.dlink.result;
import java.time.LocalDate;
import com.dlink.executor.ExecutorSetting;
import java.time.LocalDateTime;
/**
* RunResult
......@@ -12,20 +14,32 @@ public class RunResult {
private String sessionId;
private String statement;
private String flinkHost;
private Integer flinkPort;
private boolean success;
private long time;
private LocalDate finishDate;
private LocalDateTime finishDate;
private String msg;
private String error;
private IResult result;
private ExecutorSetting setting;
public RunResult() {
}
public RunResult(String sessionId, String statement, String flinkHost) {
public RunResult(String sessionId, String statement, String flinkHost, Integer flinkPort,ExecutorSetting setting) {
this.sessionId = sessionId;
this.statement = statement;
this.flinkHost = flinkHost;
this.flinkPort = flinkPort;
this.setting = setting;
}
public ExecutorSetting getSetting() {
return setting;
}
public void setSetting(ExecutorSetting setting) {
this.setting = setting;
}
public String getSessionId() {
......@@ -84,11 +98,11 @@ public class RunResult {
this.time = time;
}
public LocalDate getFinishDate() {
public LocalDateTime getFinishDate() {
return finishDate;
}
public void setFinishDate(LocalDate finishDate) {
public void setFinishDate(LocalDateTime finishDate) {
this.finishDate = finishDate;
}
......@@ -99,4 +113,12 @@ public class RunResult {
public void setMsg(String msg) {
this.msg = msg;
}
public Integer getFlinkPort() {
return flinkPort;
}
public void setFlinkPort(Integer flinkPort) {
this.flinkPort = flinkPort;
}
}
......@@ -11,6 +11,7 @@ import java.util.Set;
* @since 2021/5/25 16:01
**/
public class SelectResult implements IResult{
private List<Map<String,Object>> rowData;
private Integer total;
private Integer currentCount;
......@@ -22,4 +23,36 @@ public class SelectResult implements IResult{
this.currentCount = currentCount;
this.columns = columns;
}
public List<Map<String, Object>> getRowData() {
return rowData;
}
public void setRowData(List<Map<String, Object>> rowData) {
this.rowData = rowData;
}
public Integer getTotal() {
return total;
}
public void setTotal(Integer total) {
this.total = total;
}
public Integer getCurrentCount() {
return currentCount;
}
public void setCurrentCount(Integer currentCount) {
this.currentCount = currentCount;
}
public Set<String> getColumns() {
return columns;
}
public void setColumns(Set<String> columns) {
this.columns = columns;
}
}
package com.dlink.result;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
/**
......@@ -15,7 +15,7 @@ public class SubmitResult {
private String flinkHost;
private boolean success;
private long time;
private LocalDate finishDate;
private LocalDateTime finishDate;
private String msg;
private String error;
private IResult result;
......@@ -78,11 +78,11 @@ public class SubmitResult {
this.time = time;
}
public LocalDate getFinishDate() {
public LocalDateTime getFinishDate() {
return finishDate;
}
public void setFinishDate(LocalDate finishDate) {
public void setFinishDate(LocalDateTime finishDate) {
this.finishDate = finishDate;
}
......
import { Typography,Divider,Badge } from "antd";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
const { Title, Paragraph, Text, Link } = Typography;
const StudioMsg = (props:any) => {
const {current} = props;
return (
<Typography>
{current.console.result.map((item)=> {
return (<Paragraph>
<blockquote><Link href={`http://${item.flinkHost}:${item.flinkPort}`} target="_blank">
[{item.sessionId}:{item.flinkHost}:{item.flinkPort}]
</Link> <Divider type="vertical" />{item.finishDate}
<Divider type="vertical" />
{!item.success ? <><Badge status="error"/><Text type="danger">Error</Text></> :
<><Badge status="success"/><Text type="success">Success</Text></>}
<Divider type="vertical" />
<Text keyboard>{item.time}ms</Text></blockquote>
{item.statement && (<pre style={{height:'40px'}}>{item.statement}</pre>)}
{item.msg ? item.msg : ''}
{item.error && (<pre style={{height:'100px'}}>{item.error}</pre>)}
</Paragraph>)
})}
</Typography>
);
};
export default connect(({ Studio }: { Studio: StateType }) => ({
current: Studio.current,
}))(StudioMsg);
import { Typography,Divider,Badge,Select,Tag,Form} from "antd";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
const { Option } = Select;
const { Title, Paragraph, Text, Link } = Typography;
const StudioTable = (props:any) => {
const {current} = props;
return (
<Typography>
<Form.Item label="当前执行记录" tooltip="选择最近的执行记录,仅包含成功的记录">
<Select
//mode="multiple"
style={{ width: '100%' }}
placeholder="选择最近的执行记录"
defaultValue={[0]}
optionLabelProp="label"
>
{current.console.result.map((item,index)=> {
if(item.success) {
let tag = (<><Tag color="processing">{item.finishDate}</Tag>
<Text underline>[{item.sessionId}:{item.flinkHost}:{item.flinkPort}]</Text>
<Text keyboard>{item.time}ms</Text>
{item.statement.substring(0,20)}</>);
return (<Option value={index} label={tag}>
{tag}
</Option>)
}
})}
</Select>
</Form.Item>
</Typography>
);
};
export default connect(({ Studio }: { Studio: StateType }) => ({
current: Studio.current,
}))(StudioTable);
......@@ -4,6 +4,8 @@ import {CodeOutlined, TableOutlined,RadarChartOutlined,CalendarOutlined,FileSear
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
import styles from "./index.less";
import StudioMsg from "./StudioMsg";
import StudioTable from "./StudioTable";
const { TabPane } = Tabs;
......@@ -11,13 +13,6 @@ const { TabPane } = Tabs;
const StudioConsole = (props:any) => {
const {sql} = props;
const executeSql = () =>{
console.log('获取'+sql);
};
return (
<Tabs defaultActiveKey="1" size="small">
<TabPane
......@@ -29,7 +24,7 @@ const StudioConsole = (props:any) => {
}
key="1"
>
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
<StudioMsg />
</TabPane>
<TabPane
tab={
......@@ -40,7 +35,7 @@ const StudioConsole = (props:any) => {
}
key="2"
>
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
<StudioTable />
</TabPane>
<TabPane
tab={
......
......@@ -9,7 +9,7 @@ import Breadcrumb from "antd/es/breadcrumb/Breadcrumb";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
import {useEffect, useState} from "react";
import {handleAddOrUpdate} from "@/components/Common/crud";
import {handleAddOrUpdate, postAll} from "@/components/Common/crud";
const {SubMenu} = Menu;
//<Button shape="circle" icon={<CaretRightOutlined />} />
......@@ -35,7 +35,44 @@ const StudioMenu = (props: any) => {
const [pathItem, setPathItem] = useState<[]>();
const executeSql = () => {
console.log('获取' + current.value);
let param ={
session:current.task.session,
statement:current.value,
clusterId:current.task.clusterId,
checkPoint:current.task.checkPoint,
parallelism:current.task.parallelism,
maxRowNum:current.task.maxRowNum,
fragment:current.task.fragemnt,
savePointPath:current.task.savePointPath,
};
const key = current.key;
const result = postAll('api/studio/executeSql',param);
result.then(res=>{
let newTabs = tabs;
for(let i=0;i<newTabs.panes.length;i++){
if(newTabs.panes[i].key==key){
let newResult = newTabs.panes[i].console.result;
newResult.unshift(res.datas);
newTabs.panes[i].console={
result:newResult,
};
break;
}
}
console.log(newTabs);
dispatch&&dispatch({
type: "Studio/saveTabs",
payload: newTabs,
});
})
};
const buildMsg=(res)=>{
const result = res.datas;
let msg=`[${result.sessionId}:${result.flinkHost}:${result.flinkPort}] ${result.finishDate} ${result.success?'Success':'Error'}
[${result.time}ms] ${result.msg?result.msg:''} ${result.error?result.error:''} \r\n
Statement: ${result.statement}`;
return msg;
};
const saveSqlAndSettingToTask = async() => {
......
......@@ -83,6 +83,8 @@ const StudioSetting = (props: any) => {
</Form.Item>
</Col>
</Row>
<Row>
<Col span={12}>
<Form.Item
label="Fragment" className={styles.form_item} name="fragment"
tooltip={{ title: '【增强特性】 开启FlinkSql片段机制,使用“:=”进行定义(以“;”结束),“${}”进行调用', icon: <InfoCircleOutlined /> }}
......@@ -91,6 +93,16 @@ const StudioSetting = (props: any) => {
// defaultChecked={formVals.enabled}
/>
</Form.Item>
</Col>
<Col span={12}>
<Form.Item
label="MaxRowNum" className={styles.form_item} name="maxRowNum"
tooltip='预览数据的最大行数'
>
<InputNumber min={1} max={9999} defaultValue={100} />
</Form.Item>
</Col>
</Row>
<Form.Item
label="SavePointPath" className={styles.form_item} name="savePointPath"
tooltip='从SavePointPath恢复Flink任务'
......
......@@ -107,7 +107,10 @@ const StudioTree: React.FC<StudioTreeProps> = (props) => {
key: node.taskId,
value:(result.datas.statement?result.datas.statement:''),
closable: true,
task:result.datas
task:result.datas,
console:{
result:[],
}
};
newTabs.activeKey = node.taskId;
newTabs.panes.push(newPane);
......
import React, {useEffect, useState} from "react";
import {connect} from "umi";
import styles from './index.less';
import {BarsOutlined,SettingOutlined,AuditOutlined,ScheduleOutlined,AppstoreOutlined,GoldOutlined,DashboardOutlined,
import {BarsOutlined,SettingOutlined,AuditOutlined,ScheduleOutlined,AppstoreOutlined,ApiOutlined,DashboardOutlined,
FireOutlined} from "@ant-design/icons";
import StudioMenu from "./StudioMenu";
......@@ -52,7 +52,6 @@ const Studio: React.FC<StudioProps> = ({sql}) => {
<Tabs defaultActiveKey="1" size="small">
<TabPane tab={<span><SettingOutlined />配置</span>} key="1" >
<StudioSetting form={form} />
</TabPane>
<TabPane tab={<span><ScheduleOutlined />详情</span>} key="2" >
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
......@@ -62,13 +61,13 @@ const Studio: React.FC<StudioProps> = ({sql}) => {
</TabPane>
</Tabs>
<Tabs defaultActiveKey="1" size="small">
<TabPane tab={<span>&nbsp;<GoldOutlined />Catalogue</span>} key="1" >
<TabPane tab={<span>&nbsp;<ApiOutlined />连接器</span>} key="1" >
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
</TabPane>
<TabPane tab={<span>&nbsp;<DashboardOutlined />Overview</span>} key="2" >
<TabPane tab={<span>&nbsp;<DashboardOutlined />总览</span>} key="2" >
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
</TabPane>
<TabPane tab={<span>&nbsp;<FireOutlined />Jobs</span>} key="3" >
<TabPane tab={<span>&nbsp;<FireOutlined />任务</span>} key="3" >
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
</TabPane>
</Tabs>
......
......@@ -35,14 +35,20 @@ export type TaskType = {
updateTime?: Date,
statement?: string,
session:string;
maxRowNum:number;
};
export type ConsoleType = {
result: [];
}
export type TabsItemType = {
title: string;
key: number ,
value:string;
closable: boolean;
task?:TaskType;
console:ConsoleType;
}
export type TabsType = {
......@@ -101,8 +107,12 @@ const Model: ModelType = {
parallelism: 1,
fragment: true,
clusterId: '0',
maxRowNum: 100,
session:'admin',
},
console:{
result:[],
}
},
sql: '',
currentPath: [],
......@@ -120,7 +130,11 @@ const Model: ModelType = {
fragment: true,
clusterId: '0',
session:'admin',
maxRowNum: 100,
},
console:{
result:[],
}
}],
},
session:['admin'],
......@@ -170,6 +184,7 @@ const Model: ModelType = {
newCurrent=payload.panes[i];
}
}
console.log(newCurrent);
return {
...state,
current:{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment