Unverified Commit 0b3f8051 authored by Licho's avatar Licho Committed by GitHub

refactor: simplify code. (#1165)

parent 24bd97a1
......@@ -37,7 +37,7 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
......@@ -53,8 +53,9 @@ import java.util.regex.Pattern;
**/
public final class SqlManager {
private Map<String, String> sqlFragments;
public static final String FRAGMENT = "fragment";
static final String SHOW_FRAGMENTS = "SHOW FRAGMENTS";
private final Map<String, String> sqlFragments;
public SqlManager() {
sqlFragments = new HashMap<>();
......@@ -78,14 +79,11 @@ public final class SqlManager {
* But at the moment, with CatalogException, not SqlException
*/
public void registerSqlFragment(String sqlFragmentName, String sqlFragment) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragment name cannot be null or empty.");
checkArgument(!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName), "sql fragment name cannot be null or empty.");
checkNotNull(sqlFragment, "sql fragment cannot be null");
if (sqlFragments.containsKey(sqlFragmentName)) {
throw new CatalogException(
format("The fragment of sql %s already exists.", sqlFragmentName));
throw new CatalogException(format("The fragment of sql %s already exists.", sqlFragmentName));
}
sqlFragments.put(sqlFragmentName, sqlFragment);
......@@ -114,15 +112,12 @@ public final class SqlManager {
* failed. But at the moment, with CatalogException, not SqlException
*/
public void unregisterSqlFragment(String sqlFragmentName, boolean ignoreIfNotExists) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
checkArgument(!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName), "sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
sqlFragments.remove(sqlFragmentName);
} else if (!ignoreIfNotExists) {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
throw new CatalogException(format("The fragment of sql %s does not exist.", sqlFragmentName));
}
}
......@@ -134,26 +129,22 @@ public final class SqlManager {
* failed. But at the moment, with CatalogException, not SqlException
*/
public String getSqlFragment(String sqlFragmentName) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName),
"sql fragmentName name cannot be null or empty.");
checkArgument(!StringUtils.isNullOrWhitespaceOnly(sqlFragmentName), "sql fragmentName name cannot be null or empty.");
if (sqlFragments.containsKey(sqlFragmentName)) {
return sqlFragments.get(sqlFragmentName);
} else {
throw new CatalogException(
format("The fragment of sql %s does not exist.", sqlFragmentName));
}
throw new CatalogException(format("The fragment of sql %s does not exist.", sqlFragmentName));
}
public TableResult getSqlFragmentResult(String sqlFragmentName) {
if (Asserts.isNullString(sqlFragmentName)) {
return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("fragment", DataTypes.STRING()))), new ArrayList<>());
return CustomTableResultImpl.buildTableResult(Collections.singletonList(new TableSchemaField(FRAGMENT, DataTypes.STRING())), new ArrayList<>());
}
String sqlFragment = getSqlFragment(sqlFragmentName);
List<Row> rows = new ArrayList<>();
rows.add(Row.of(sqlFragment));
return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("fragment", DataTypes.STRING()))), rows);
return CustomTableResultImpl.buildTableResult(Collections.singletonList(new TableSchemaField(FRAGMENT, DataTypes.STRING())), Collections.singletonList(Row.of(sqlFragment)));
}
/**
......@@ -171,7 +162,7 @@ public final class SqlManager {
for (String key : sqlFragments.keySet()) {
rows.add(Row.of(key));
}
return CustomTableResultImpl.buildTableResult(new ArrayList<>(Arrays.asList(new TableSchemaField("fragmentName", DataTypes.STRING()))), rows);
return CustomTableResultImpl.buildTableResult(Collections.singletonList(new TableSchemaField("fragmentName", DataTypes.STRING())), rows);
}
public Iterator getSqlFragmentsIterator() {
......@@ -179,10 +170,7 @@ public final class SqlManager {
}
public Table getSqlFragmentsTable(CustomTableEnvironmentImpl environment) {
List<String> keys = new ArrayList<>();
for (String key : sqlFragments.keySet()) {
keys.add(key);
}
List<String> keys = new ArrayList<>(sqlFragments.keySet());
return environment.fromValues(keys);
}
......@@ -200,27 +188,21 @@ public final class SqlManager {
if (Asserts.isNullString(statement)) {
return statement;
}
String[] strs = statement.split(SystemConfiguration.getInstances().getSqlSeparator());
String[] values = statement.split(SystemConfiguration.getInstances().getSqlSeparator());
StringBuilder sb = new StringBuilder();
for (int i = 0; i < strs.length; i++) {
String str = strs[i];
if (str.trim().length() == 0) {
continue;
}
str = strs[i];
if (str.contains(FlinkSQLConstant.FRAGMENTS)) {
String[] strs2 = str.split(FlinkSQLConstant.FRAGMENTS);
if (strs2.length >= 2) {
if (strs2[0].length() == 0) {
throw new ExpressionParserException("Illegal variable name.");
}
String valueString = str.substring(str.indexOf(FlinkSQLConstant.FRAGMENTS) + 2);
this.registerSqlFragment(strs2[0], replaceVariable(valueString));
} else {
throw new ExpressionParserException("Illegal variable definition.");
for (String assignment : values) {
String[] splits = assignment.split(FlinkSQLConstant.FRAGMENTS, 2);
if (splits.length == 2) {
if (splits[0].trim().isEmpty()) {
throw new ExpressionParserException("Illegal variable name.");
}
this.registerSqlFragment(splits[0], replaceVariable(splits[1]));
} else if (splits.length == 1) {
// string not contains FlinkSQLConstant.FRAGMENTS
sb.append(replaceVariable(assignment));
} else {
sb.append(replaceVariable(str));
throw new ExpressionParserException("Illegal variable definition.");
}
}
return sb.toString();
......@@ -232,15 +214,13 @@ public final class SqlManager {
* @param statement A sql will be replaced.
*/
private String replaceVariable(String statement) {
String pattern = "\\$\\{(.+?)\\}";
Pattern p = Pattern.compile(pattern);
Pattern p = Pattern.compile("\\$\\{(.+?)}");
Matcher m = p.matcher(statement);
StringBuffer sb = new StringBuffer();
while (m.find()) {
String key = m.group(1);
String value = this.getSqlFragment(key);
m.appendReplacement(sb, "");
sb.append(value == null ? "" : value);
m.appendReplacement(sb, value == null ? "" : value);
}
m.appendTail(sb);
return sb.toString();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment