Commit 181f2961 authored by godkaikai's avatar godkaikai

0.2.0 优化

parent bde28e87
......@@ -93,7 +93,7 @@ DataLink 开源项目及社区正在建设,希望本项目可以帮助你更
### 最新版本
dlink-0.1.0
dlink-0.2.0
### 从安装包开始
......@@ -108,10 +108,10 @@ lib/ -- 外部依赖及Connector
|- flink-json-1.12.4.jar
|- mysql-connector-java-8.0.21.jar
|- ojdbc6-11.2.0.3.jar
|- slf4j-api-1.7.30.jar -- 必需
sql/ --Mysql初始化脚本
|- auto.sh --启动停止脚本
|- dlink-admin.jar --程序包
sql/
|- dlink.sql --Mysql初始化脚本
auto.sh --启动停止脚本
dlink-admin.jar --程序包
```
解压后结构如上所示,修改配置文件内容。
......@@ -140,6 +140,9 @@ dlink -- 父项目
| |-dlink-connector-jdbc -- Jdbc 扩展
|-dlink-core -- 执行中心
|-dlink-doc -- 文档
| |-bin -- 启动脚本
| |-config -- 配置文件
| |-sql -- sql脚本
|-dlink-web -- React 前端
```
......@@ -153,9 +156,14 @@ npm run build
#### 后台编译
打包所有模块
```shell
maven clean install -Dmaven.test.skip=true
```
打包Client模块,dlink-client目录下
```shell
maven assembly:assembly
```
#### 扩展Connector
......
......@@ -92,19 +92,6 @@
<groupId>org.codehaus.mojo</groupId>
<artifactId>versions-maven-plugin</artifactId>
</plugin>
<!--<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.dlink.Dlink</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>-->
</plugins>
<finalName>${project.artifactId}</finalName>
</build>
......
......@@ -20,4 +20,5 @@ public class StudioExecuteDTO {
private Integer maxRowNum=100;
private boolean fragment=false;
private String savePointPath;
private String jobName;
}
......@@ -41,11 +41,11 @@ public class Task extends SuperEntity{
private String statement;
public ExecutorSetting getLocalExecutorSetting(){
return new ExecutorSetting(Executor.LOCAL,checkPoint,parallelism,fragment,savePointPath);
return new ExecutorSetting(Executor.LOCAL,checkPoint,parallelism,fragment,savePointPath,alias);
}
public ExecutorSetting getRemoteExecutorSetting(){
return new ExecutorSetting(Executor.REMOTE,checkPoint,parallelism,fragment,savePointPath);
return new ExecutorSetting(Executor.REMOTE,checkPoint,parallelism,fragment,savePointPath,alias);
}
......
......@@ -50,7 +50,7 @@ public class StudioServiceImpl implements StudioService {
JobManager jobManager = new JobManager(host,studioExecuteDTO.getSession(),studioExecuteDTO.getMaxRowNum());
return jobManager.execute(studioExecuteDTO.getStatement(), new ExecutorSetting(
ExecuteType,studioExecuteDTO.getCheckPoint(),studioExecuteDTO.getParallelism(),
studioExecuteDTO.isFragment(),studioExecuteDTO.getSavePointPath()));
studioExecuteDTO.isFragment(),studioExecuteDTO.getSavePointPath(),studioExecuteDTO.getJobName()));
}
@Override
......
......@@ -38,17 +38,23 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Task task = this.getById(id);
Assert.check(task);
Cluster cluster = clusterService.getById(task.getClusterId());
Assert.check(cluster);
Statement statement = statementService.getById(id);
Assert.check(statement);
if(cluster!=null) {
String host = FlinkCluster.testFlinkJobManagerIP(cluster.getHosts(), cluster.getJobManagerHost());
Assert.checkHost(host);
if(!host.equals(cluster.getJobManagerHost())){
if (!host.equals(cluster.getJobManagerHost())) {
cluster.setJobManagerHost(host);
clusterService.updateById(cluster);
}
JobManager jobManager = new JobManager(host);
return jobManager.submit(statement.getStatement(), task.getRemoteExecutorSetting());
}else if(task.getClusterId()==0){
JobManager jobManager = new JobManager();
return jobManager.submit(statement.getStatement(), task.getLocalExecutorSetting());
}else{
throw new BusException("该任务的集群不存在");
}
}
@Override
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-client-1.12</artifactId>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer>
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<flink.version>1.12.4</flink.version>
<maven.compiler.source>1.8</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
</properties>
</project>
......@@ -42,8 +42,12 @@
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
<build>
<!--<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
......@@ -55,6 +59,62 @@
</plugin>
</plugins>
<finalName>${project.artifactId}</finalName>
</build>-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!--打jar包-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
......@@ -59,12 +59,12 @@
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId>
<!--<scope>provided</scope>-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc</artifactId>
<!--<scope>provided</scope>-->
<scope>provided</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -12,6 +12,7 @@ public class ExecutorSetting {
private Integer parallelism;
private boolean useSqlFragment;
private String savePointPath;
private String jobName;
public ExecutorSetting(String type) {
this.type = type;
......@@ -28,6 +29,15 @@ public class ExecutorSetting {
this.useSqlFragment = useSqlFragment;
}
public ExecutorSetting(String type, Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath,String jobName) {
this.type = type;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
this.savePointPath = savePointPath;
this.jobName = jobName;
}
public ExecutorSetting(String type, Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath) {
this.type = type;
this.checkpoint = checkpoint;
......@@ -79,4 +89,12 @@ public class ExecutorSetting {
public void setSavePointPath(String savePointPath) {
this.savePointPath = savePointPath;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
}
......@@ -33,6 +33,9 @@ public class LocalStreamExecutor extends Executor {
environment.setParallelism(executorSetting.getParallelism());
}
stEnvironment = CustomTableEnvironmentImpl.create(environment);
if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){
stEnvironment.getConfig().getConfiguration().setString("pipeline.name", executorSetting.getJobName());
}
if(executorSetting.isUseSqlFragment()){
stEnvironment.useSqlFragment();
}else{
......
......@@ -39,6 +39,9 @@ public class RemoteStreamExecutor extends Executor {
if(stEnvironment == null){
stEnvironment = CustomTableEnvironmentImpl.create(environment);
}
if(executorSetting.getJobName()!=null&&!"".equals(executorSetting.getJobName())){
stEnvironment.getConfig().getConfiguration().setString("pipeline.name", executorSetting.getJobName());
}
if(executorSetting.isUseSqlFragment()){
stEnvironment.useSqlFragment();
}else{
......
......@@ -31,6 +31,9 @@ public class JobManager {
private String sessionId;
private Integer maxRowNum = 100;
public JobManager() {
}
public JobManager(String host) {
if(host!=null) {
String[] strs = host.split(":");
......@@ -72,7 +75,7 @@ public class JobManager {
}
public RunResult execute(String statement,ExecutorSetting executorSetting) {
RunResult runResult = new RunResult(sessionId, statement, flinkHost,port,executorSetting);
RunResult runResult = new RunResult(sessionId, statement, flinkHost,port,executorSetting,executorSetting.getJobName());
Executor executor = null;
ExecutorEntity executorEntity = SessionPool.get(sessionId);
if (executorEntity != null) {
......@@ -125,21 +128,24 @@ public class JobManager {
return runResult;
}
public SubmitResult submit(String statement, ExecutorSetting executerSetting) {
public SubmitResult submit(String statement, ExecutorSetting executorSetting) {
if(statement==null||"".equals(statement)){
return SubmitResult.error("FlinkSql语句不存在");
}
String [] statements = statement.split(FlinkSQLConstant.SEPARATOR);
return submit(Arrays.asList(statements),executerSetting);
return submit(Arrays.asList(statements),executorSetting);
}
public SubmitResult submit(List<String> sqlList, ExecutorSetting executerSetting) {
SubmitResult result = new SubmitResult(sessionId,sqlList,flinkHost);
Map<String, String> map = new HashMap<>();
public SubmitResult submit(List<String> sqlList, ExecutorSetting executorSetting) {
SubmitResult result = new SubmitResult(sessionId,sqlList,flinkHost,executorSetting.getJobName());
int currentIndex = 0;
try {
if (sqlList != null && sqlList.size() > 0) {
Executor executor = Executor.build(new EnvironmentSetting(flinkHost, port), executerSetting);
EnvironmentSetting environmentSetting = null;
if(executorSetting.isRemote()) {
environmentSetting = new EnvironmentSetting(flinkHost, port);
}
Executor executor = Executor.build(environmentSetting, executorSetting);
for (String sqlText : sqlList) {
currentIndex++;
String operationType = Operations.getOperationType(sqlText);
......@@ -154,6 +160,7 @@ public class JobManager {
result.setFinishDate(LocalDateTime.now());
InsertResult insertResult = new InsertResult(sqlText,(jobID == null ? "" : jobID.toHexString()),true,timeElapsed,LocalDateTime.now());
result.setResult(insertResult);
result.setJobId((jobID == null ? "" : jobID.toHexString()));
} else {
executor.executeSql(sqlText);
}
......@@ -173,7 +180,7 @@ public class JobManager {
result.setSuccess(false);
// result.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage());
// result.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "\n >>>堆栈信息<<<" + resMsg.toString());
result.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "\n >>>异常原因<<< \n" + e.getCause().toString());
result.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "\n >>>异常原因<<< \n" + e.toString());
return result;
}
......
......@@ -13,6 +13,7 @@ import java.time.LocalDateTime;
public class RunResult {
private String sessionId;
private String jobId;
private String jobName;
private String statement;
private String flinkHost;
private Integer flinkPort;
......@@ -27,12 +28,21 @@ public class RunResult {
public RunResult() {
}
public RunResult(String sessionId, String statement, String flinkHost, Integer flinkPort,ExecutorSetting setting) {
public RunResult(String sessionId, String statement, String flinkHost, Integer flinkPort,ExecutorSetting setting,String jobName) {
this.sessionId = sessionId;
this.statement = statement;
this.flinkHost = flinkHost;
this.flinkPort = flinkPort;
this.setting = setting;
this.jobName = jobName;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public String getJobId() {
......
......@@ -13,6 +13,8 @@ public class SubmitResult {
private String sessionId;
private List<String> statements;
private String flinkHost;
private String jobId;
private String jobName;
private boolean success;
private long time;
private LocalDateTime finishDate;
......@@ -32,10 +34,11 @@ public class SubmitResult {
this.error = error;
}
public SubmitResult(String sessionId, List<String> statements, String flinkHost) {
public SubmitResult(String sessionId, List<String> statements, String flinkHost,String jobName) {
this.sessionId = sessionId;
this.statements = statements;
this.flinkHost = flinkHost;
this.jobName = jobName;
}
public String getSessionId() {
......@@ -109,4 +112,20 @@ public class SubmitResult {
public void setResult(IResult result) {
this.result = result;
}
public String getJobId() {
return jobId;
}
public void setJobId(String jobId) {
this.jobId = jobId;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
}
......@@ -19,7 +19,7 @@ start() {
pid=`ps -ef | grep $JAR_NAME | grep -v grep | awk '{print $2}'`
# -z 表示如果$pid为空时执行
if [ -z $pid ]; then
nohup java $SETTING -jar -Xms512M -Xmx2048M -XX:PermSize=512M -XX:MaxPermSize=1024M $JAR_NAME > /dev/null 2>&1 &
nohup java $SETTING -jar -Xms512M -Xmx2048M -XX:PermSize=512M -XX:MaxPermSize=1024M $JAR_NAME > dlink.log 2>&1 &
pid=`ps -ef | grep $JAR_NAME | grep -v grep | awk '{print $2}'`
echo ""
echo "Service ${JAR_NAME} is starting!pid=${pid}"
......
......@@ -19,6 +19,7 @@ const StudioHistory = (props:any) => {
{!item.success ? <><Badge status="error"/><Text type="danger">Error</Text></> :
<><Badge status="success"/><Text type="success">Success</Text></>}
<Divider type="vertical" />
{item.jobName&&<Text code>{item.jobName}</Text>}
{item.jobId&&<Text code>{item.jobId}</Text>}
<Text keyboard>{item.time}ms</Text></blockquote>
{item.statement && (<pre style={{height:'40px'}}>{item.statement}</pre>)}
......
......@@ -20,6 +20,7 @@ const StudioMsg = (props:any) => {
{!item.success ? <><Badge status="error"/><Text type="danger">Error</Text></> :
<><Badge status="success"/><Text type="success">Success</Text></>}
<Divider type="vertical"/>
{item.jobName&&<Text code>{item.jobName}</Text>}
{item.jobId&&<Text code>{item.jobId}</Text>}
<Text keyboard>{item.time}ms</Text></blockquote>
{item.statement && (<pre style={{height: '100px'}}>{item.statement}</pre>)}
......
......@@ -112,6 +112,8 @@ const StudioTable = (props:any) => {
if(item.success) {
let tag = (<><Tag color="processing">{item.finishDate}</Tag>
<Text underline>[{item.sessionId}:{item.flinkHost}:{item.flinkPort}]</Text>
{item.jobName&&<Text code>{item.jobName}</Text>}
{item.jobId&&<Text code>{item.jobId}</Text>}
<Text keyboard>{item.time}ms</Text>
{item.statement.substring(0,20)}</>);
return (<Option value={index} label={tag}>
......
......@@ -260,12 +260,14 @@ const StudioTree: React.FC<StudioTreeProps> = (props) => {
};
const onSelect = (selectedKeys:[], e:any) => {
if(e.node.isLeaf) {
dispatch({
type: "Studio/saveCurrentPath",
payload: e.node.path,
});
setRightClickNodeTreeItem(null);
toOpen(e.node);
}
setRightClickNodeTreeItem(null);
};
return (
......
......@@ -29,7 +29,7 @@ export default {
'pages.welcome.link': '欢迎加入',
'pages.welcome.star': '欢迎 Star ',
'pages.welcome.advancedLayout': 'Github',
'pages.welcome.alertMessage': '实时计算平台 Dlink & Apache Flink 即将发布,目前为体验版,版本号为 0.1.0。',
'pages.welcome.alertMessage': '实时计算平台 Dlink & Apache Flink 即将发布,目前为体验版,版本号为 0.2.0。',
'pages.admin.subPage.title': ' 这个页面只有 admin 权限才能查看',
'pages.admin.subPage.alertMessage': 'umi ui 现已发布,欢迎使用 npm run ui 启动体验。',
'pages.searchTable.createForm.newRule': '新建规则',
......
......@@ -111,6 +111,7 @@ const Model: ModelType = {
clusterId: '0',
maxRowNum: 100,
session:'admin',
alias:'草稿',
},
console:{
result:[],
......@@ -134,6 +135,7 @@ const Model: ModelType = {
clusterId: '0',
session:'admin',
maxRowNum: 100,
alias:'草稿',
},
console:{
result:[],
......
......@@ -20,7 +20,7 @@ export default (): React.ReactNode => {
<Alert
message={intl.formatMessage({
id: 'pages.welcome.alertMessage',
defaultMessage: '更快更强的重型组件,已经发布。',
defaultMessage: '实时计算平台 Dlink & Apache Flink 即将发布,目前为体验版,版本号为 0.2.0。',
})}
type="success"
showIcon
......@@ -62,7 +62,7 @@ export default (): React.ReactNode => {
</Typography.Text>
</Paragraph>
<p> </p>
<Timeline pending={<><Text code>0.2.0</Text>
<Timeline pending={<><Text code>0.3.0</Text>
<Text type="secondary">敬请期待</Text>
<p> </p>
<Paragraph>
......@@ -112,12 +112,12 @@ export default (): React.ReactNode => {
</ul>
</Paragraph>
</Timeline.Item>
<Timeline.Item><Text code>0.1.1</Text> <Text type="secondary">2021-06-08</Text>
<Timeline.Item><Text code>0.2.0</Text> <Text type="secondary">2021-06-08</Text>
<p> </p>
<Paragraph>
<ul>
<li>
<Link href="">FlinkSql Studio 代码底层架构优化</Link>
<Link href="">FlinkSql Studio 代码底层架构进行大优化</Link>
</li>
<li>
<Link href="">支持以 SPI 的方式扩展任意 Connector,同 Flink 官网</Link>
......@@ -128,6 +128,15 @@ export default (): React.ReactNode => {
<li>
<Link href="">提供了 dlink-client-1.12,支持 Flink 1.12.0+ 多集群的远程使用与本地隔离使用,1.10、1.11 和 1.13 集群可能存在问题</Link>
</li>
<li>
<Link href="">优化了 FlinkSQL 执行与提交到远程集群的任务名,默认为作业的中文别名</Link>
</li>
<li>
<Link href="">优化了目录的操作,点击节点即可打开作业,无须右键打开</Link>
</li>
<li>
<Link href="">优化了执行结果信息,添加了任务名的展示</Link>
</li>
<li>
<Link href="">对 Studio 界面进行了一定的提示优化</Link>
</li>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment