Commit 0e91d5e4 authored by wenmo's avatar wenmo

扩展Flink集群功能

parent 5a320e99
......@@ -163,11 +163,20 @@ dlink -- 父项目
#### 编译打包
以下环境版本实测编译成功:
| 环境 | 版本 |
| :-------------------: | :------: |
| npm | 7.19.0 |
| node.js | 14.17.0 |
| jdk | 1.8.0_201|
| maven | 3.6.0 |
```shell
mvn clean install -Dmaven.test.skip=true
```
前端编译umi报错时:npm install -g umi
如果前端编译 umi 报错时:npm install -g umi
#### 扩展Connector及UDF
......
package com.dlink.controller;
import com.dlink.api.FlinkAPI;
import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result;
import com.dlink.model.Cluster;
......@@ -32,6 +33,7 @@ public class ClusterController {
@PutMapping
public Result saveOrUpdate(@RequestBody Cluster cluster) throws Exception {
checkHealth(cluster);
if(clusterService.saveOrUpdate(cluster)){
return Result.succeed("新增成功");
}else {
......@@ -110,6 +112,7 @@ public class ClusterController {
}else{
cluster.setJobManagerHost(jobManagerHost);
cluster.setStatus(1);
cluster.setVersion(FlinkAPI.build(jobManagerHost).getVersion());
}
}
}
......@@ -26,6 +26,8 @@ public class Cluster extends SuperEntity {
private String jobManagerHost;
private String version;
private Integer status;
private String note;
......
......@@ -43,7 +43,7 @@ public class ClusterServiceImpl extends SuperServiceImpl<ClusterMapper, Cluster>
@Override
public String buildEnvironmentAddress(boolean useRemote, Integer id) {
if(useRemote) {
if(useRemote&&id!=0) {
return buildRemoteEnvironmentAddress(id);
}else{
return buildLocalEnvironmentAddress();
......
......@@ -10,6 +10,7 @@
<result column="type" property="type" />
<result column="hosts" property="hosts" />
<result column="job_manager_host" property="jobManagerHost" />
<result column="version" property="version" />
<result column="status" property="status" />
<result column="note" property="note" />
<result column="enabled" property="enabled" />
......@@ -19,7 +20,7 @@
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, name, alias, type,hosts,job_manager_host, status,note, enabled, create_time, update_time
id, name, alias, type,hosts,job_manager_host,version, status,note, enabled, create_time, update_time
</sql>
......
......@@ -74,4 +74,9 @@ public class FlinkAPI {
get(FlinkRestAPIConstant.JOBS+jobId+FlinkRestAPIConstant.CANCEL);
return true;
}
public String getVersion(){
JsonNode result = get(FlinkRestAPIConstant.CONFIG);
return result.get("flink-version").asText();
}
}
package com.dlink.cluster;
import cn.hutool.http.HttpUtil;
import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkConstant;
import com.dlink.constant.FlinkHistoryConstant;
import com.dlink.constant.NetConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
......@@ -16,72 +20,26 @@ import java.util.Map;
**/
public class FlinkCluster {
private static String flinkJobMangerHost;
public static String getFlinkJobMangerHost() {
return flinkJobMangerHost;
}
public static void setFlinkJobMangerHost(String flinkJobMangerHost) {
FlinkCluster.flinkJobMangerHost = flinkJobMangerHost;
}
public static String getFlinkJobManagerIP(String hosts) {
try {
String res = HttpUtil.get(NetConstant.HTTP + getFlinkJobMangerHost() + NetConstant.SLASH + FlinkHistoryConstant.JOBS, NetConstant.SERVER_TIME_OUT_ACTIVE);
if (!res.isEmpty()) {
return getFlinkJobMangerHost();
}
} catch (Exception e) {
}
String[] servers = hosts.split(",");
for (String server : servers) {
try {
String url = NetConstant.HTTP + server + NetConstant.SLASH + FlinkHistoryConstant.JOBS;
String res = HttpUtil.get(url, NetConstant.SERVER_TIME_OUT_ACTIVE);
if (!res.isEmpty()) {
setFlinkJobMangerHost(server);
return server;
}
} catch (Exception e) {
}
}
return "";
}
public static String getFlinkJobManagerHost(String hosts) {
String[] servers = hosts.split(",");
for (String server : servers) {
try {
String res = HttpUtil.get(NetConstant.HTTP + server + NetConstant.SLASH + FlinkHistoryConstant.JOBS, NetConstant.SERVER_TIME_OUT_ACTIVE);
if (!res.isEmpty()) {
setFlinkJobMangerHost(server);
return server;
}
} catch (Exception e) {
}
}
return "";
}
private static Logger logger = LoggerFactory.getLogger(FlinkCluster.class);
public static String testFlinkJobManagerIP(String hosts,String host) {
try {
String res = HttpUtil.get(NetConstant.HTTP + host + NetConstant.SLASH + FlinkHistoryConstant.JOBS, NetConstant.SERVER_TIME_OUT_ACTIVE);
if (!res.isEmpty()) {
String res = FlinkAPI.build(host).getVersion();
if (Asserts.isNotNullString(res)) {
return host;
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
String[] servers = hosts.split(",");
for (String server : servers) {
try {
String url = NetConstant.HTTP + server + NetConstant.SLASH + FlinkHistoryConstant.JOBS;
String res = HttpUtil.get(url, NetConstant.SERVER_TIME_OUT_ACTIVE);
if (!res.isEmpty()) {
setFlinkJobMangerHost(server);
String res = FlinkAPI.build(server).getVersion();
if (Asserts.isNotNullString(res)) {
return server;
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
}
return null;
......
......@@ -7,6 +7,10 @@ package com.dlink.constant;
* @since 2021/6/24 14:04
**/
public interface FlinkRestAPIConstant {
/**
* config
*/
String CONFIG = "config";
/**
* jobs
*/
......
......@@ -143,7 +143,7 @@ const StudioCA = (props:any) => {
<TabPane
tab={
<span>
表级血缘
任务表级血缘
</span>
}
key="OneTableCA"
......@@ -164,7 +164,7 @@ const StudioCA = (props:any) => {
<TabPane
tab={
<span>
字段级血缘
任务字段级血缘
</span>
}
key="OneColumnCA"
......
......@@ -46,6 +46,12 @@
width: 100%;
}
/* --- table 宽度 --- start */
/* --- 右键菜单选项 高度 --- start */
.ant-menu-vertical > .ant-menu-item, .ant-menu-vertical-left > .ant-menu-item, .ant-menu-vertical-right > .ant-menu-item, .ant-menu-inline > .ant-menu-item, .ant-menu-vertical > .ant-menu-submenu > .ant-menu-submenu-title, .ant-menu-vertical-left > .ant-menu-submenu > .ant-menu-submenu-title, .ant-menu-vertical-right > .ant-menu-submenu > .ant-menu-submenu-title, .ant-menu-inline > .ant-menu-submenu > .ant-menu-submenu-title {
height: 30px;
line-height: 30px;
}
/* --- 右键菜单选项 高度 --- start */
}
/* --- tabs 垂直样式 --- start */
......
import React, {useEffect, useState} from 'react';
import {Form, Button, Input, Modal, Select,Switch} from 'antd';
import {Form, Button, Input, Modal, Select} from 'antd';
import {ClusterTableListItem} from "@/pages/Cluster/data";
......@@ -39,6 +39,7 @@ const ClusterForm: React.FC<ClusterFormProps> = (props) => {
rules={[{required: true, message: '请输入名称!'}]}>
<Input placeholder="请输入唯一英文标识"/>
</Form.Item>
<Form.Item
name="alias"
label="名称"
......@@ -57,10 +58,12 @@ const ClusterForm: React.FC<ClusterFormProps> = (props) => {
</Form.Item>
<Form.Item
name="hosts"
label="Hosts"
label="JobManager HA 地址"
>
<Input.TextArea placeholder="添加 Flink Hosts...例如:127.0.0.1:8081,127.0.0.1:8091" allowClear
autoSize={{minRows: 3, maxRows: 10}}/>
<Input.TextArea
placeholder="添加 Flink 集群的 JobManager 的 RestApi 地址。当 HA 模式时,地址间用英文逗号分隔,例如:192.168.123.101:8081,192.168.123.102:8081,192.168.123.103:8081"
allowClear
autoSize={{minRows: 3, maxRows: 10}}/>
</Form.Item>
<Form.Item
name="note"
......
import React, {useEffect, useState} from 'react';
import { Form, Button, Input, Modal,Select } from 'antd';
import {Form, Button, Input, Modal, Select} from 'antd';
import Switch from "antd/es/switch";
import TextArea from "antd/es/input/TextArea";
import {ClusterTableListItem} from "@/pages/Cluster/data";
export type UpdateFormProps = {
onCancel: (flag?: boolean, formVals?: Partial<ClusterTableListItem>) => void;
onSubmit: (values: Partial<ClusterTableListItem>) => void;
updateModalVisible: boolean;
values: Partial<ClusterTableListItem>;
onCancel: (flag?: boolean, formVals?: Partial<ClusterTableListItem>) => void;
onSubmit: (values: Partial<ClusterTableListItem>) => void;
updateModalVisible: boolean;
values: Partial<ClusterTableListItem>;
};
const FormItem = Form.Item;
const Option = Select.Option;
const formLayout = {
labelCol: { span: 7 },
wrapperCol: { span: 13 },
labelCol: {span: 7},
wrapperCol: {span: 13},
};
const UpdateForm: React.FC<UpdateFormProps> = (props) => {
const [formVals, setFormVals] = useState<Partial<ClusterTableListItem>>({
id: props.values.id,
name: props.values.name,
alias: props.values.alias,
type: props.values.type,
hosts: props.values.hosts,
note: props.values.note,
enabled: props.values.enabled,
});
const [formVals, setFormVals] = useState<Partial<ClusterTableListItem>>({
id: props.values.id,
name: props.values.name,
alias: props.values.alias,
type: props.values.type,
hosts: props.values.hosts,
note: props.values.note,
enabled: props.values.enabled,
});
const [form] = Form.useForm();
const [form] = Form.useForm();
const {
onSubmit: handleUpdate,
onCancel: handleUpdateModalVisible,
updateModalVisible,
values,
} = props;
const {
onSubmit: handleUpdate,
onCancel: handleUpdateModalVisible,
updateModalVisible,
values,
} = props;
const submitForm = async () => {
const fieldsValue = await form.validateFields();
setFormVals({ ...formVals, ...fieldsValue });
handleUpdate({ ...formVals, ...fieldsValue });
};
const renderContent = (formVals) => {
return (
<>
<FormItem
name="name"
label="名称"
rules={[{ required: true, message: '请输入名称!' }]} >
<Input placeholder="请输入" />
</FormItem>
<FormItem
name="alias"
label="别名"
>
<Input placeholder="请输入" />
</FormItem>
<FormItem
name="type"
label="类型"
>
<Select defaultValue="Yarn" allowClear>
<Option value="Standalone">Standalone</Option>
<Option value="Yarn">Yarn</Option>
<Option value="Others">Others</Option>
</Select>
</FormItem>
<FormItem
name="hosts"
label="Hosts"
>
<TextArea placeholder="添加 Flink Hosts...例如:127.0.0.1:8081,127.0.0.1:8091" allowClear autoSize={{ minRows: 3, maxRows: 10 }}/>
</FormItem>
<FormItem
name="note"
label="注释"
>
<Input placeholder="请输入" />
</FormItem>
<FormItem
name="enabled"
label="是否启用"
rules={[{ required: true, message: '请输入是否启用!' }]} >
<Switch checkedChildren="启用" unCheckedChildren="禁用"
defaultChecked={formVals.enabled}/>
</FormItem>
</>
);
};
const renderFooter = () => {
return (
<>
<Button onClick={() => handleUpdateModalVisible(false, values)}>取消</Button>
<Button type="primary" onClick={() => submitForm()}>
完成
</Button>
</>
);
};
const submitForm = async () => {
const fieldsValue = await form.validateFields();
setFormVals({...formVals, ...fieldsValue});
handleUpdate({...formVals, ...fieldsValue});
};
const renderContent = (formVals) => {
return (
<Modal
width={640}
bodyStyle={{ padding: '32px 40px 48px' }}
destroyOnClose
title="编辑集群"
visible={updateModalVisible}
footer={renderFooter()}
onCancel={() => handleUpdateModalVisible()}
<>
<FormItem
name="name"
label="名称"
rules={[{required: true, message: '请输入名称!'}]}>
<Input placeholder="请输入"/>
</FormItem>
<FormItem
name="alias"
label="别名"
>
<Input placeholder="请输入"/>
</FormItem>
<FormItem
name="type"
label="类型"
>
<Select defaultValue="Yarn" allowClear>
<Option value="Standalone">Standalone</Option>
<Option value="Yarn">Yarn</Option>
<Option value="Others">Others</Option>
</Select>
</FormItem>
<FormItem
name="hosts"
label="JobManager HA 地址"
>
<TextArea
placeholder="添加 Flink 集群的 JobManager 的 RestApi 地址。当 HA 模式时,地址间用英文逗号分隔,例如:192.168.123.101:8081,192.168.123.102:8081,192.168.123.103:8081"
allowClear
autoSize={{minRows: 3, maxRows: 10}}/>
</FormItem>
<FormItem
name="note"
label="注释"
>
<Form
{...formLayout}
form={form}
initialValues={{
id: formVals.id,
name: formVals.name,
alias: formVals.alias,
type: formVals.type,
hosts: formVals.hosts,
note: formVals.note,
enabled: formVals.enabled,
}}
>
{renderContent(formVals)}
</Form>
</Modal>
<TextArea
placeholder="请输入"
allowClear
autoSize={{minRows: 3, maxRows: 10}}/>
</FormItem>
<FormItem
name="enabled"
label="是否启用"
rules={[{required: true, message: '请输入是否启用!'}]}>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
defaultChecked={formVals.enabled}/>
</FormItem>
</>
);
};
const renderFooter = () => {
return (
<>
<Button onClick={() => handleUpdateModalVisible(false, values)}>取消</Button>
<Button type="primary" onClick={() => submitForm()}>
完成
</Button>
</>
);
};
return (
<Modal
width={640}
bodyStyle={{padding: '32px 40px 48px'}}
destroyOnClose
title="编辑集群"
visible={updateModalVisible}
footer={renderFooter()}
onCancel={() => handleUpdateModalVisible()}
>
<Form
{...formLayout}
form={form}
initialValues={{
id: formVals.id,
name: formVals.name,
alias: formVals.alias,
type: formVals.type,
hosts: formVals.hosts,
note: formVals.note,
enabled: formVals.enabled,
}}
>
{renderContent(formVals)}
</Form>
</Modal>
);
};
export default UpdateForm;
......@@ -5,6 +5,7 @@ export type ClusterTableListItem = {
type: string,
hosts: string,
jobManagerHost: string,
version: string,
status: number,
note: string,
enabled: boolean,
......
......@@ -131,7 +131,7 @@ const ClusterTableList: React.FC<{}> = (props: any) => {
},
},
{
title: 'Hosts',
title: 'JobManager HA 地址',
sorter: true,
dataIndex: 'hosts',
valueType: 'textarea',
......@@ -139,16 +139,23 @@ const ClusterTableList: React.FC<{}> = (props: any) => {
hideInSearch: true,
hideInTable: true,
renderFormItem: (item, {defaultRender, ...rest}, form) => {
return <TextArea placeholder="添加 Flink Hosts...例如:127.0.0.1:8081,127.0.0.1:8091" allowClear autoSize={{ minRows: 3, maxRows: 10 }}/>;
return <TextArea placeholder="添加 Flink 集群的 JobManager 的 RestApi 地址。当 HA 模式时,地址间用英文逗号分隔,例如:192.168.123.101:8081,192.168.123.102:8081,192.168.123.103:8081" allowClear autoSize={{ minRows: 3, maxRows: 10 }}/>;
},
},
{
title: 'JMHost',
title: '当前 JobManager 地址',
sorter: true,
dataIndex: 'jobManagerHost',
hideInForm: true,
hideInSearch: true,
hideInTable: true,
hideInTable: false,
},{
title: '版本',
sorter: true,
dataIndex: 'version',
hideInForm: true,
hideInSearch: true,
hideInTable: false,
},
{
title: '状态',
......
......@@ -286,6 +286,15 @@ export default (): React.ReactNode => {
<li>
<Link>解决数据源注册可能失败的bug</Link>
</li>
<li>
<Link>扩展了对Flink 1.11的支持,并更新了其他的最新版本</Link>
</li>
<li>
<Link>Flink集群添加了版本号的自动获取及展示</Link>
</li>
<li>
<Link>修复了本地环境+远程执行导致集群查询未果的bug</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment