Commit fdba1fd3 authored by wenmo's avatar wenmo

0.2.2优化

parent 0f859617
......@@ -205,14 +205,14 @@ ${sf} ${tb}
select * from student
```
5. MaxRowNum 为预览的最大集合长度,默认100,最大9999,当前版本防止您作死
6. SavePointPath 当前版本暂不可用。
5. MaxRowNum 为预览的最大集合长度,默认100,最大9999。
6. SavePointPath 当前版本属于非Jar提交,暂不可用。
7. Flink集群与共享会话构成了唯一的 Catalogue ,即您可以通过自定义一个会话 key,然后将当前会话 key 告诉您的战友,那他可以用该 key 访问您在集群上的 Catalogue信息与缓存。当然会话数量有限制,最大256*0.75,未来版本会开放设置。
8. 连接器为 Catalogue 里的表信息,清除会销毁当前会话。
9. Local 模式请使用少量测试数据,真实数据请使用远程集群。
10. 执行 SQL 时,如果您选中了部分SQL,则会执行选中的内容,否则执行全部内容。
11. 小火箭的提交功能是异步提交当前任务保存的FlinkSQL及配置到集群。无法提交草稿。
12. 执行信息或者历史中那个很长很长的就是集群上的 JobId。
12. 执行信息或者历史中那个很长很长的就是集群上的 JobId,只有同步执行才会记录执行信息和历史
13. 草稿是无法被异步远程提交的,只能同步执行。
14. 灰色按钮代表近期将实现。
......
......@@ -3,7 +3,7 @@
<parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.2.1</version>
<version>0.3.0-SANPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-client-1.12</artifactId>
......
......@@ -13,8 +13,6 @@
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.4</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
......@@ -24,32 +22,6 @@
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>force-shading</artifactId>
<version>${flink.version}</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
......
......@@ -21,6 +21,12 @@ public abstract class Executor {
public static final String LOCAL = "LOCAL";
public static final String REMOTE = "REMOTE";
protected StreamExecutionEnvironment environment;
protected CustomTableEnvironmentImpl stEnvironment;
protected EnvironmentSetting environmentSetting;
protected ExecutorSetting executorSetting;
public static Executor build(){
return new LocalStreamExecutor(new ExecutorSetting(LOCAL));
}
......@@ -35,29 +41,55 @@ public abstract class Executor {
}
}
public abstract StreamExecutionEnvironment getEnvironment();
public StreamExecutionEnvironment getEnvironment(){
return environment;
}
public abstract CustomTableEnvironmentImpl getCustomTableEnvironmentImpl();
public CustomTableEnvironmentImpl getCustomTableEnvironmentImpl(){
return stEnvironment;
}
public abstract ExecutorSetting getExecutorSetting();
public ExecutorSetting getExecutorSetting(){
return executorSetting;
}
public abstract EnvironmentSetting getEnvironmentSetting();
public EnvironmentSetting getEnvironmentSetting(){
return environmentSetting;
}
public abstract JobExecutionResult execute(String statement) throws Exception;
public JobExecutionResult execute(String statement) throws Exception{
return stEnvironment.execute(statement);
}
public abstract TableResult executeSql(String statement);
public TableResult executeSql(String statement){
return stEnvironment.executeSql(statement);
}
public abstract Table sqlQuery(String statement);
public Table sqlQuery(String statement){
return stEnvironment.sqlQuery(statement);
}
public abstract String explainSql(String statement, ExplainDetail... extraDetails);
public String explainSql(String statement, ExplainDetail... extraDetails){
return stEnvironment.explainSql(statement,extraDetails);
}
public abstract SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails);
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails){
return stEnvironment.explainSqlRecord(statement,extraDetails);
}
public abstract String getStreamGraphString(String statement);
public String getStreamGraphString(String statement){
return stEnvironment.getStreamGraphString(statement);
}
public abstract ObjectNode getStreamGraph(String statement);
public ObjectNode getStreamGraph(String statement){
return stEnvironment.getStreamGraph(statement);
}
public abstract void registerFunction(String name, ScalarFunction function);
public void registerFunction(String name, ScalarFunction function){
stEnvironment.registerFunction(name,function);
}
public abstract void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> var2);
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> var2){
stEnvironment.createTemporarySystemFunction(name,var2);
}
}
package com.dlink.executor;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
/**
* LocalStreamExecuter
......@@ -19,10 +11,6 @@ import org.apache.flink.table.functions.UserDefinedFunction;
**/
public class LocalStreamExecutor extends Executor {
private StreamExecutionEnvironment environment;
private CustomTableEnvironmentImpl stEnvironment;
private ExecutorSetting executorSetting;
public LocalStreamExecutor(ExecutorSetting executorSetting) {
this.executorSetting = executorSetting;
this.environment = StreamExecutionEnvironment.createLocalEnvironment();
......@@ -42,70 +30,4 @@ public class LocalStreamExecutor extends Executor {
stEnvironment.unUseSqlFragment();
}
}
@Override
public StreamExecutionEnvironment getEnvironment() {
return this.environment;
}
@Override
public CustomTableEnvironmentImpl getCustomTableEnvironmentImpl() {
return this.stEnvironment;
}
@Override
public ExecutorSetting getExecutorSetting() {
return this.executorSetting;
}
@Override
public EnvironmentSetting getEnvironmentSetting() {
return null;
}
@Override
public JobExecutionResult execute(String statement) throws Exception {
return stEnvironment.execute(statement);
}
@Override
public TableResult executeSql(String statement) {
return stEnvironment.executeSql(statement);
}
@Override
public Table sqlQuery(String statement) {
return stEnvironment.sqlQuery(statement);
}
@Override
public String explainSql(String statement, ExplainDetail... extraDetails) {
return stEnvironment.explainSql(statement,extraDetails);
}
@Override
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
return stEnvironment.explainSqlRecord(statement,extraDetails);
}
@Override
public String getStreamGraphString(String statement) {
return stEnvironment.getStreamGraphString(statement);
}
@Override
public ObjectNode getStreamGraph(String statement) {
return stEnvironment.getStreamGraph(statement);
}
@Override
public void registerFunction(String name, ScalarFunction function) {
stEnvironment.registerFunction(name,function);
}
@Override
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> var2) {
stEnvironment.createTemporarySystemFunction(name,var2);
}
}
package com.dlink.executor;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.result.SqlExplainResult;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
/**
* RemoteStreamExecutor
......@@ -19,12 +11,6 @@ import org.apache.flink.table.functions.UserDefinedFunction;
**/
public class RemoteStreamExecutor extends Executor {
private StreamExecutionEnvironment environment;
private CustomTableEnvironmentImpl stEnvironment;
private EnvironmentSetting environmentSetting;
private ExecutorSetting executorSetting;
public RemoteStreamExecutor(EnvironmentSetting environmentSetting,ExecutorSetting executorSetting) {
this.environmentSetting = environmentSetting;
this.executorSetting = executorSetting;
......@@ -50,68 +36,4 @@ public class RemoteStreamExecutor extends Executor {
}
}
@Override
public StreamExecutionEnvironment getEnvironment() {
return this.environment;
}
@Override
public CustomTableEnvironmentImpl getCustomTableEnvironmentImpl() {
return this.stEnvironment;
}
@Override
public ExecutorSetting getExecutorSetting() {
return this.executorSetting;
}
@Override
public EnvironmentSetting getEnvironmentSetting() {
return this.environmentSetting;
}
@Override
public JobExecutionResult execute(String statement) throws Exception {
return stEnvironment.execute(statement);
}
@Override
public TableResult executeSql(String statement){
return stEnvironment.executeSql(statement);
}
@Override
public Table sqlQuery(String statement){
return stEnvironment.sqlQuery(statement);
}
@Override
public String explainSql(String statement, ExplainDetail... extraDetails) {
return stEnvironment.explainSql(statement,extraDetails);
}
@Override
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
return stEnvironment.explainSqlRecord(statement,extraDetails);
}
@Override
public String getStreamGraphString(String statement) {
return stEnvironment.getStreamGraphString(statement);
}
@Override
public ObjectNode getStreamGraph(String statement) {
return stEnvironment.getStreamGraph(statement);
}
@Override
public void registerFunction(String name, ScalarFunction function) {
stEnvironment.registerFunction(name,function);
}
@Override
public void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> var2) {
stEnvironment.createTemporarySystemFunction(name,var2);
}
}
package com.dlink.interceptor;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* TODO
*
* @author wenmo
* @since 2021/6/11 22:17
*/
public class FlinkInterceptor {
public static void build( CustomTableEnvironmentImpl stEnvironment){
}
}
......@@ -38,10 +38,11 @@ public class SelectBuilder extends AbstractBuilder implements ResultBuilder {
return new String[x$0];
});
}
long numRows;
long numRows = 0L;
List<Map<String,Object>> rows = new ArrayList<>();
Iterator<Row> it = tableResult.collect();
for (numRows = 0L; it.hasNext() ; ++numRows) {
while(it.hasNext()){
// for (numRows = 0L; it.hasNext() ; ++numRows) {
if (numRows < maxRowNum) {
String[] cols = rowToString((Row) it.next());
Map<String,Object> row = new HashMap<>();
......@@ -56,8 +57,10 @@ public class SelectBuilder extends AbstractBuilder implements ResultBuilder {
}
rows.add(row);
}else {
it.next();
break;
// it.next();
}
numRows++;
totalCount++;
}
return new SelectResult(rows,totalCount,rows.size(),column);
......
package com.dlink.trans;
/**
* TODO
*
* @author wenmo
* @since 2021/6/13 19:34
*/
public interface CreateOperation extends Operation{
void create();
}
package com.dlink.trans;
/**
* Operation
*
* @author wenmo
* @since 2021/6/13 19:24
*/
public interface Operation {
String getHandle();
boolean canHandle(String key);
void build();
}
......@@ -9,12 +9,7 @@ import com.dlink.constant.FlinkSQLConstant;
* @since 2021/5/25 15:50
**/
public class Operations {
/**
* 获取操作类型
*
* @param sql
* @return
*/
public static String getOperationType(String sql) {
String sqlTrim = sql.replaceAll("[\\s\\t\\n\\r]", "").toUpperCase();
if (sqlTrim.startsWith(FlinkSQLConstant.CREATE)) {
......
package com.dlink.trans.ddl;
import com.dlink.trans.Operation;
/**
* TODO
*
* @author wenmo
* @since 2021/6/13 19:24
*/
public class CreateAggTableOperation implements Operation{
private String KEY_WORD = "CREATE AGGTABLE";
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public boolean canHandle(String key) {
if(KEY_WORD.equalsIgnoreCase(key)){
return true;
}else {
return false;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment