Unverified Commit 9df3898e authored by Licho's avatar Licho Committed by GitHub

refactor: improve readability, add log, fix iterator exception after miniCluster close (#1148)

* refactor: improve readability, add log, fix iterator exception after miniCluster close.

* chore: format
parent c1c99aa5
...@@ -22,6 +22,7 @@ package com.dlink.result; ...@@ -22,6 +22,7 @@ package com.dlink.result;
import com.dlink.constant.FlinkConstant; import com.dlink.constant.FlinkConstant;
import com.dlink.utils.FlinkUtil; import com.dlink.utils.FlinkUtil;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
...@@ -29,12 +30,15 @@ import org.apache.flink.types.RowKind; ...@@ -29,12 +30,15 @@ import org.apache.flink.types.RowKind;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import com.google.common.collect.Streams;
import lombok.extern.slf4j.Slf4j;
/** /**
* ResultRunnable * ResultRunnable
...@@ -42,14 +46,15 @@ import java.util.Set; ...@@ -42,14 +46,15 @@ import java.util.Set;
* @author wenmo * @author wenmo
* @since 2021/7/1 22:50 * @since 2021/7/1 22:50
*/ */
@Slf4j
public class ResultRunnable implements Runnable { public class ResultRunnable implements Runnable {
private TableResult tableResult; private static final String nullColumn = "";
private Integer maxRowNum; private final TableResult tableResult;
private boolean isChangeLog; private final Integer maxRowNum;
private boolean isAutoCancel; private final boolean isChangeLog;
private String timeZone = ZoneId.systemDefault().getId(); private final boolean isAutoCancel;
private String nullColumn = ""; private final String timeZone;
public ResultRunnable(TableResult tableResult, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel, String timeZone) { public ResultRunnable(TableResult tableResult, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel, String timeZone) {
this.tableResult = tableResult; this.tableResult = tableResult;
...@@ -61,11 +66,12 @@ public class ResultRunnable implements Runnable { ...@@ -61,11 +66,12 @@ public class ResultRunnable implements Runnable {
@Override @Override
public void run() { public void run() {
if (tableResult.getJobClient().isPresent()) { tableResult.getJobClient().ifPresent(jobClient -> {
String jobId = tableResult.getJobClient().get().getJobID().toHexString(); String jobId = jobClient.getJobID().toHexString();
if (!ResultPool.containsKey(jobId)) { if (!ResultPool.containsKey(jobId)) {
ResultPool.put(new SelectResult(jobId, new ArrayList<>(), new LinkedHashSet<>())); ResultPool.put(new SelectResult(jobId, new ArrayList<>(), new LinkedHashSet<>()));
} }
try { try {
if (isChangeLog) { if (isChangeLog) {
catchChangLog(ResultPool.get(jobId)); catchChangLog(ResultPool.get(jobId));
...@@ -73,73 +79,58 @@ public class ResultRunnable implements Runnable { ...@@ -73,73 +79,58 @@ public class ResultRunnable implements Runnable {
catchData(ResultPool.get(jobId)); catchData(ResultPool.get(jobId));
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.error(String.format(e.toString()));
} }
} });
} }
private void catchChangLog(SelectResult selectResult) { private void catchChangLog(SelectResult selectResult) {
List<String> columns = FlinkUtil.catchColumn(tableResult);
columns.add(0, FlinkConstant.OP);
Set<String> column = new LinkedHashSet(columns);
selectResult.setColumns(column);
List<Map<String, Object>> rows = selectResult.getRowData(); List<Map<String, Object>> rows = selectResult.getRowData();
Iterator<Row> it = tableResult.collect(); List<String> columns = FlinkUtil.catchColumn(tableResult);
while (it.hasNext()) {
if (rows.size() >= maxRowNum) { Streams.stream(tableResult.collect()).limit(maxRowNum).forEach(row -> {
if (isAutoCancel && tableResult.getJobClient().isPresent()) { Map<String, Object> map = getFieldMap(columns, row);
tableResult.getJobClient().get().cancel(); map.put(FlinkConstant.OP, row.getKind().shortString());
}
break;
}
Map<String, Object> map = new LinkedHashMap<>();
Row row = it.next();
map.put(columns.get(0), row.getKind().shortString());
for (int i = 0; i < row.getArity(); ++i) {
Object field = row.getField(i);
if (field == null) {
map.put(columns.get(i + 1), nullColumn);
} else {
if (field instanceof Instant) {
map.put(columns.get(i + 1), ((Instant) field).atZone(ZoneId.of(timeZone)).toLocalDateTime().toString());
} else {
map.put(columns.get(i + 1), field);
}
}
}
rows.add(map); rows.add(map);
});
columns.add(0, FlinkConstant.OP);
selectResult.setColumns(new LinkedHashSet<>(columns));
if (isAutoCancel) {
tableResult.getJobClient().ifPresent(JobClient::cancel);
} }
} }
private void catchData(SelectResult selectResult) { private void catchData(SelectResult selectResult) {
List<String> columns = FlinkUtil.catchColumn(tableResult);
Set<String> column = new LinkedHashSet(columns);
selectResult.setColumns(column);
List<Map<String, Object>> rows = selectResult.getRowData(); List<Map<String, Object>> rows = selectResult.getRowData();
Iterator<Row> it = tableResult.collect(); List<String> columns = FlinkUtil.catchColumn(tableResult);
while (it.hasNext()) {
if (rows.size() >= maxRowNum) { Streams.stream(tableResult.collect()).limit(maxRowNum).forEach(row -> {
break; Map<String, Object> map = getFieldMap(columns, row);
}
Map<String, Object> map = new LinkedHashMap<>();
Row row = it.next();
for (int i = 0; i < row.getArity(); ++i) {
Object field = row.getField(i);
if (field == null) {
map.put(columns.get(i), nullColumn);
} else {
if (field instanceof Instant) {
map.put(columns.get(i), ((Instant) field).atZone(ZoneId.of(timeZone)).toLocalDateTime().toString());
} else {
map.put(columns.get(i), field);
}
}
}
if (RowKind.UPDATE_BEFORE == row.getKind() || RowKind.DELETE == row.getKind()) { if (RowKind.UPDATE_BEFORE == row.getKind() || RowKind.DELETE == row.getKind()) {
rows.remove(map); rows.remove(map);
} else { } else {
rows.add(map); rows.add(map);
} }
});
selectResult.setColumns(new LinkedHashSet<>(columns));
}
private Map<String, Object> getFieldMap(List<String> columns, Row row) {
Map<String, Object> map = new LinkedHashMap<>();
for (int i = 0; i < row.getArity(); ++i) {
Object field = row.getField(i);
String column = columns.get(i);
if (field == null) {
map.put(column, nullColumn);
} else if (field instanceof Instant) {
map.put(column, ((Instant) field).atZone(ZoneId.of(timeZone)).toLocalDateTime().toString());
} else {
map.put(column, field);
}
} }
return map;
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment