Commit e9d05036 authored by godkaikai's avatar godkaikai

changlog和table的查询实现

parent 09c9f5bf
package com.dlink.utils; package com.dlink.utils;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectIdentifier;
...@@ -26,4 +27,8 @@ public class FlinkUtil { ...@@ -26,4 +27,8 @@ public class FlinkUtil {
return new ArrayList<String>(); return new ArrayList<String>();
} }
} }
public static List<String> catchColumn(TableResult tableResult){
return Arrays.asList(tableResult.getTableSchema().getFieldNames());
}
} }
package com.dlink.utils; package com.dlink.utils;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectIdentifier;
...@@ -26,4 +27,8 @@ public class FlinkUtil { ...@@ -26,4 +27,8 @@ public class FlinkUtil {
return new ArrayList<String>(); return new ArrayList<String>();
} }
} }
public static List<String> catchColumn(TableResult tableResult){
return Arrays.asList(tableResult.getTableSchema().getFieldNames());
}
} }
package com.dlink.utils; package com.dlink.utils;
import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.operations.Operation;
import java.util.*; import java.util.*;
...@@ -22,8 +21,11 @@ public class FlinkUtil { ...@@ -22,8 +21,11 @@ public class FlinkUtil {
if (tableOpt.isPresent()) { if (tableOpt.isPresent()) {
return tableOpt.get().getResolvedSchema().getColumnNames(); return tableOpt.get().getResolvedSchema().getColumnNames();
}else{ }else{
return new ArrayList<String>(); return new ArrayList<>();
} }
} }
public static List<String> catchColumn(TableResult tableResult){
return tableResult.getResolvedSchema().getColumnNames();
}
} }
package com.dlink.utils; package com.dlink.utils;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectIdentifier;
...@@ -25,4 +26,9 @@ public class FlinkUtil { ...@@ -25,4 +26,9 @@ public class FlinkUtil {
return new ArrayList<String>(); return new ArrayList<String>();
} }
} }
public static List<String> catchColumn(TableResult tableResult){
return tableResult.getResolvedSchema().getColumnNames();
}
} }
...@@ -268,7 +268,7 @@ public class JobManager { ...@@ -268,7 +268,7 @@ public class JobManager {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString()); job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
} }
if (config.isUseResult()) { if (config.isUseResult()) {
IResult result = ResultBuilder.build(SqlType.INSERT, maxRowNum, "", true).getResult(tableResult); IResult result = ResultBuilder.build(SqlType.INSERT, maxRowNum, true).getResult(tableResult);
job.setResult(result); job.setResult(result);
} }
} }
...@@ -300,7 +300,7 @@ public class JobManager { ...@@ -300,7 +300,7 @@ public class JobManager {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString()); job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
} }
if (config.isUseResult()) { if (config.isUseResult()) {
IResult result = ResultBuilder.build(item.getType(), maxRowNum, "", true).getResult(tableResult); IResult result = ResultBuilder.build(item.getType(), maxRowNum, true).getResult(tableResult);
job.setResult(result); job.setResult(result);
} }
} }
...@@ -351,7 +351,7 @@ public class JobManager { ...@@ -351,7 +351,7 @@ public class JobManager {
} }
LocalDateTime startTime = LocalDateTime.now(); LocalDateTime startTime = LocalDateTime.now();
TableResult tableResult = executor.executeSql(newStatement); TableResult tableResult = executor.executeSql(newStatement);
IResult result = ResultBuilder.build(operationType, maxRowNum, "", false).getResult(tableResult); IResult result = ResultBuilder.build(operationType, maxRowNum, false).getResult(tableResult);
result.setStartTime(startTime); result.setStartTime(startTime);
return result; return result;
} }
......
...@@ -11,13 +11,13 @@ import org.apache.flink.table.api.TableResult; ...@@ -11,13 +11,13 @@ import org.apache.flink.table.api.TableResult;
**/ **/
public interface ResultBuilder { public interface ResultBuilder {
static ResultBuilder build(SqlType operationType, Integer maxRowNum, String nullColumn, boolean printRowKind){ static ResultBuilder build(SqlType operationType, Integer maxRowNum, boolean isChangeLog){
switch (operationType){ switch (operationType){
case SELECT: case SELECT:
return new SelectResultBuilder(maxRowNum,nullColumn,printRowKind); return new SelectResultBuilder(maxRowNum,isChangeLog);
case SHOW: case SHOW:
case DESCRIBE: case DESCRIBE:
return new ShowResultBuilder(nullColumn,false); return new ShowResultBuilder(false);
case INSERT: case INSERT:
return new InsertResultBuilder(); return new InsertResultBuilder();
default: default:
......
package com.dlink.result; package com.dlink.result;
import org.apache.flink.table.api.TableColumn; import com.dlink.constant.FlinkConstant;
import com.dlink.utils.FlinkUtil;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.StringUtils; import org.apache.flink.util.StringUtils;
import java.util.*; import java.util.*;
import java.util.stream.Stream;
/** /**
* ResultRunnable * ResultRunnable
...@@ -18,81 +19,85 @@ public class ResultRunnable implements Runnable { ...@@ -18,81 +19,85 @@ public class ResultRunnable implements Runnable {
private TableResult tableResult; private TableResult tableResult;
private Integer maxRowNum; private Integer maxRowNum;
private boolean printRowKind; private boolean isChangeLog;
private String nullColumn; private String nullColumn = "";
public ResultRunnable(TableResult tableResult, Integer maxRowNum, boolean printRowKind, String nullColumn) { public ResultRunnable(TableResult tableResult, Integer maxRowNum, boolean isChangeLog) {
this.tableResult = tableResult; this.tableResult = tableResult;
this.maxRowNum = maxRowNum; this.maxRowNum = maxRowNum;
this.printRowKind = printRowKind; this.isChangeLog = isChangeLog;
this.nullColumn = nullColumn;
} }
@Override @Override
public void run() { public void run() {
if(tableResult.getJobClient().isPresent()) { if (tableResult.getJobClient().isPresent()) {
String jobId = tableResult.getJobClient().get().getJobID().toHexString(); String jobId = tableResult.getJobClient().get().getJobID().toHexString();
if (!ResultPool.containsKey(jobId)) { if (!ResultPool.containsKey(jobId)) {
ResultPool.put(new SelectResult(jobId, new ArrayList<Map<String, Object>>(), new LinkedHashSet<String>())); ResultPool.put(new SelectResult(jobId, new ArrayList<>(), new LinkedHashSet<>()));
} }
try { try {
if(isChangeLog) {
catchChangLog(ResultPool.get(jobId));
}else{
catchData(ResultPool.get(jobId)); catchData(ResultPool.get(jobId));
}catch (Exception e){ }
} catch (Exception e) {
} }
} }
} }
private void catchData(SelectResult selectResult){ private void catchChangLog(SelectResult selectResult) {
List<TableColumn> columns = tableResult.getTableSchema().getTableColumns(); List<String> columns = FlinkUtil.catchColumn(tableResult);
String[] columnNames = columns.stream().map(TableColumn::getName).map(s -> s.replace(" ", "")).toArray((x$0) -> { columns.add(0, FlinkConstant.OP);
return (new String[x$0]); Set<String> column = new LinkedHashSet(columns);
});
if (printRowKind) {
columnNames = Stream.concat(Stream.of("op"), Arrays.stream(columnNames)).toArray((x$0) -> {
return new String[x$0];
});
}
Set<String> column = new LinkedHashSet(Arrays.asList(columnNames));
selectResult.setColumns(column); selectResult.setColumns(column);
long numRows = 0L;
List<Map<String, Object>> rows = selectResult.getRowData(); List<Map<String, Object>> rows = selectResult.getRowData();
Iterator<Row> it = tableResult.collect(); Iterator<Row> it = tableResult.collect();
while (it.hasNext()) { while (it.hasNext()) {
if (numRows < maxRowNum) { if (rows.size() >= maxRowNum) {
String[] cols = rowToString(it.next()); break;
Map<String, Object> row = new HashMap<>();
for (int i = 0; i < cols.length; i++) {
if (i > columnNames.length) {
/*column.add("UKN" + i);
row.put("UKN" + i, cols[i]);*/
} else {
// column.add(columnNames[i]);
row.put(columnNames[i], cols[i]);
}
} }
rows.add(row); Map<String, Object> map = new LinkedHashMap<>();
numRows++; Row row = it.next();
map.put(columns.get(0), row.getKind().shortString());
for (int i = 0; i < row.getArity(); ++i) {
Object field = row.getField(i);
if (field == null) {
map.put(columns.get(i+1), nullColumn);
} else { } else {
break; map.put(columns.get(i+1), StringUtils.arrayAwareToString(field));
} }
} }
rows.add(map);
}
} }
private String[] rowToString(Row row) { private void catchData(SelectResult selectResult) {
int len = printRowKind ? row.getArity() + 1 : row.getArity(); List<String> columns = FlinkUtil.catchColumn(tableResult);
List<String> fields = new ArrayList(len); Set<String> column = new LinkedHashSet(columns);
if (printRowKind) { selectResult.setColumns(column);
fields.add(row.getKind().shortString()); List<Map<String, Object>> rows = selectResult.getRowData();
Iterator<Row> it = tableResult.collect();
while (it.hasNext()) {
if (rows.size() >= maxRowNum) {
break;
} }
Map<String, Object> map = new LinkedHashMap<>();
Row row = it.next();
for (int i = 0; i < row.getArity(); ++i) { for (int i = 0; i < row.getArity(); ++i) {
Object field = row.getField(i); Object field = row.getField(i);
if (field == null) { if (field == null) {
fields.add(nullColumn); map.put(columns.get(i), nullColumn);
} else { } else {
fields.add(StringUtils.arrayAwareToString(field)); map.put(columns.get(i), StringUtils.arrayAwareToString(field));
}
}
if (RowKind.UPDATE_BEFORE == row.getKind() || RowKind.DELETE == row.getKind()) {
rows.remove(map);
}else {
rows.add(map);
} }
} }
return fields.toArray(new String[0]);
} }
} }
...@@ -18,20 +18,18 @@ import java.util.stream.Stream; ...@@ -18,20 +18,18 @@ import java.util.stream.Stream;
public class SelectResultBuilder implements ResultBuilder { public class SelectResultBuilder implements ResultBuilder {
private Integer maxRowNum; private Integer maxRowNum;
private boolean printRowKind; private boolean isChangeLog;
private String nullColumn;
public SelectResultBuilder(Integer maxRowNum, String nullColumn, boolean printRowKind) { public SelectResultBuilder(Integer maxRowNum, boolean isChangeLog) {
this.maxRowNum = maxRowNum; this.maxRowNum = maxRowNum;
this.printRowKind = printRowKind; this.isChangeLog = isChangeLog;
this.nullColumn = nullColumn;
} }
@Override @Override
public IResult getResult(TableResult tableResult) { public IResult getResult(TableResult tableResult) {
if (tableResult.getJobClient().isPresent()) { if (tableResult.getJobClient().isPresent()) {
String jobId = tableResult.getJobClient().get().getJobID().toHexString(); String jobId = tableResult.getJobClient().get().getJobID().toHexString();
ResultRunnable runnable = new ResultRunnable(tableResult, maxRowNum, printRowKind, nullColumn); ResultRunnable runnable = new ResultRunnable(tableResult, maxRowNum, isChangeLog);
Thread thread = new Thread(runnable, jobId); Thread thread = new Thread(runnable, jobId);
thread.start(); thread.start();
return SelectResult.buildSuccess(jobId); return SelectResult.buildSuccess(jobId);
......
...@@ -18,11 +18,10 @@ import java.util.stream.Stream; ...@@ -18,11 +18,10 @@ import java.util.stream.Stream;
public class ShowResultBuilder implements ResultBuilder { public class ShowResultBuilder implements ResultBuilder {
private boolean printRowKind; private boolean printRowKind;
private String nullColumn; private String nullColumn = "";
public ShowResultBuilder(String nullColumn, boolean printRowKind) { public ShowResultBuilder(boolean printRowKind) {
this.printRowKind = printRowKind; this.printRowKind = printRowKind;
this.nullColumn = nullColumn;
} }
@Override @Override
......
...@@ -24,4 +24,8 @@ public interface FlinkConstant { ...@@ -24,4 +24,8 @@ public interface FlinkConstant {
* 本地模式host * 本地模式host
*/ */
String LOCAL_HOST = "localhost:8081"; String LOCAL_HOST = "localhost:8081";
/**
* changlog op
*/
String OP = "op";
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment