Unverified Commit 20f7991c authored by ZackYoung's avatar ZackYoung Committed by GitHub

[Feature][dlink-client]CDCSOURCE Add sub database and sub table support (#994)

* add cdcsource split table function

* optimization java code style
parent 2fb2abce
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package com.dlink.cdc; package com.dlink.cdc;
import com.dlink.exception.SplitTableException;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
...@@ -48,4 +49,9 @@ public interface CDCBuilder { ...@@ -48,4 +49,9 @@ public interface CDCBuilder {
Map<String, Map<String, String>> parseMetaDataConfigs(); Map<String, Map<String, String>> parseMetaDataConfigs();
String getSchemaFieldName(); String getSchemaFieldName();
default Map<String, String> parseMetaDataConfig() {
throw new SplitTableException("此数据源并未实现分库分表");
}
} }
...@@ -83,8 +83,8 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -83,8 +83,8 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
Properties debeziumProperties = new Properties(); Properties debeziumProperties = new Properties();
// 为部分转换添加默认值 // 为部分转换添加默认值
debeziumProperties.setProperty("bigint.unsigned.handling.mode","long"); debeziumProperties.setProperty("bigint.unsigned.handling.mode", "long");
debeziumProperties.setProperty("decimal.handling.mode","string"); debeziumProperties.setProperty("decimal.handling.mode", "string");
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) { for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
...@@ -101,10 +101,10 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -101,10 +101,10 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
} }
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder() MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname()) .hostname(config.getHostname())
.port(config.getPort()) .port(config.getPort())
.username(config.getUsername()) .username(config.getUsername())
.password(config.getPassword()); .password(config.getPassword());
if (Asserts.isNotNullString(database)) { if (Asserts.isNotNullString(database)) {
String[] databases = database.split(FlinkParamConstant.SPLIT); String[] databases = database.split(FlinkParamConstant.SPLIT);
...@@ -169,6 +169,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -169,6 +169,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source"); return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
} }
@Override
public List<String> getSchemaList() { public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>(); List<String> schemaList = new ArrayList<>();
String schema = config.getDatabase(); String schema = config.getDatabase();
...@@ -189,6 +190,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -189,6 +190,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
return schemaList; return schemaList;
} }
@Override
public Map<String, Map<String, String>> parseMetaDataConfigs() { public Map<String, Map<String, String>> parseMetaDataConfigs() {
Map<String, Map<String, String>> allConfigMap = new HashMap<>(); Map<String, Map<String, String>> allConfigMap = new HashMap<>();
List<String> schemaList = getSchemaList(); List<String> schemaList = getSchemaList();
...@@ -210,6 +212,24 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -210,6 +212,24 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
return allConfigMap; return allConfigMap;
} }
@Override
public Map<String, String> parseMetaDataConfig() {
Map<String, String> configMap = new HashMap<>();
configMap.put(ClientConstant.METADATA_TYPE, METADATA_TYPE);
StringBuilder sb = new StringBuilder("jdbc:mysql://");
sb.append(config.getHostname());
sb.append(":");
sb.append(config.getPort());
sb.append("/");
configMap.put(ClientConstant.METADATA_NAME, sb.toString());
configMap.put(ClientConstant.METADATA_URL, sb.toString());
configMap.put(ClientConstant.METADATA_USERNAME, config.getUsername());
configMap.put(ClientConstant.METADATA_PASSWORD, config.getPassword());
return configMap;
}
@Override @Override
public String getSchemaFieldName() { public String getSchemaFieldName() {
return "db"; return "db";
......
...@@ -30,6 +30,7 @@ import com.dlink.model.Table; ...@@ -30,6 +30,7 @@ import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil; import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil; import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import com.dlink.utils.SplitUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
...@@ -69,6 +70,7 @@ import java.util.HashMap; ...@@ -69,6 +70,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.bind.DatatypeConverter; import javax.xml.bind.DatatypeConverter;
...@@ -218,6 +220,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -218,6 +220,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
logger.info("Build deserialize successful..."); logger.info("Build deserialize successful...");
Map<Table, OutputTag<Map>> tagMap = new HashMap<>(); Map<Table, OutputTag<Map>> tagMap = new HashMap<>();
Map<String, Table> tableMap = new HashMap<>(); Map<String, Table> tableMap = new HashMap<>();
Map<String, String> splitConfMap = config.getSplit();
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
String sinkTableName = getSinkTableName(table); String sinkTableName = getSinkTableName(table);
...@@ -235,8 +239,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -235,8 +239,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
public void processElement(Map map, ProcessFunction<Map, Map>.Context ctx, Collector<Map> out) throws Exception { public void processElement(Map map, ProcessFunction<Map, Map>.Context ctx, Collector<Map> out) throws Exception {
LinkedHashMap source = (LinkedHashMap) map.get("source"); LinkedHashMap source = (LinkedHashMap) map.get("source");
try { try {
Table table = tableMap.get(source.get(schemaFieldName).toString() + "." + source.get("table").toString()); String tableName = SplitUtil.getReValue(source.get(schemaFieldName).toString(), splitConfMap) + "." + SplitUtil.getReValue(source.get("table").toString(), splitConfMap);
Table table = tableMap.get(tableName);
OutputTag<Map> outputTag = tagMap.get(table); OutputTag<Map> outputTag = tagMap.get(table);
Optional.ofNullable(outputTag).orElseThrow(() -> new RuntimeException("data outPutTag is not exists!table name is " + tableName));
ctx.output(outputTag, map); ctx.output(outputTag, map);
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
......
...@@ -42,6 +42,7 @@ public class FlinkCDCConfig { ...@@ -42,6 +42,7 @@ public class FlinkCDCConfig {
private String table; private String table;
private List<String> schemaTableNameList; private List<String> schemaTableNameList;
private String startupMode; private String startupMode;
private Map<String, String> split;
private Map<String, String> debezium; private Map<String, String> debezium;
private Map<String, String> source; private Map<String, String> source;
private Map<String, String> jdbc; private Map<String, String> jdbc;
...@@ -54,7 +55,7 @@ public class FlinkCDCConfig { ...@@ -54,7 +55,7 @@ public class FlinkCDCConfig {
public FlinkCDCConfig(String type, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String database, String schema, String table, public FlinkCDCConfig(String type, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String database, String schema, String table,
String startupMode, String startupMode,
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink,Map<String, String> jdbc) { Map<String, String> split, Map<String, String> debezium, Map<String, String> source, Map<String, String> sink, Map<String, String> jdbc) {
this.type = type; this.type = type;
this.hostname = hostname; this.hostname = hostname;
this.port = port; this.port = port;
...@@ -66,6 +67,7 @@ public class FlinkCDCConfig { ...@@ -66,6 +67,7 @@ public class FlinkCDCConfig {
this.schema = schema; this.schema = schema;
this.table = table; this.table = table;
this.startupMode = startupMode; this.startupMode = startupMode;
this.split = split;
this.debezium = debezium; this.debezium = debezium;
this.source = source; this.source = source;
this.sink = sink; this.sink = sink;
...@@ -250,4 +252,12 @@ public class FlinkCDCConfig { ...@@ -250,4 +252,12 @@ public class FlinkCDCConfig {
public void setDebezium(Map<String, String> debezium) { public void setDebezium(Map<String, String> debezium) {
this.debezium = debezium; this.debezium = debezium;
} }
public Map<String, String> getSplit() {
return split;
}
public void setSplit(Map<String, String> split) {
this.split = split;
}
} }
package com.dlink.exception;
/**
* @author ZackYoung
* @version 1.0
* @since 2022/9/2
*/
public class SplitTableException extends RuntimeException {
public SplitTableException(String message, Throwable cause) {
super(message, cause);
}
public SplitTableException(String message) {
super(message);
}
}
...@@ -30,6 +30,8 @@ import java.util.List; ...@@ -30,6 +30,8 @@ import java.util.List;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
/** /**
* Table * Table
* *
...@@ -52,6 +54,16 @@ public class Table implements Serializable, Comparable<Table> { ...@@ -52,6 +54,16 @@ public class Table implements Serializable, Comparable<Table> {
private Long rows; private Long rows;
private Date createTime; private Date createTime;
private Date updateTime; private Date updateTime;
/**
* 表类型
*/
private TableType tableType = TableType.SINGLE_DATABASE_AND_TABLE;
/**
* 分库或分表对应的表名
*/
private List<String> schemaTableNameList;
private List<Column> columns; private List<Column> columns;
public Table() { public Table() {
...@@ -98,7 +110,7 @@ public class Table implements Serializable, Comparable<Table> { ...@@ -98,7 +110,7 @@ public class Table implements Serializable, Comparable<Table> {
} }
public String getFlinkTableSql(String flinkConfig) { public String getFlinkTableSql(String flinkConfig) {
return getFlinkDDL(flinkConfig,name); return getFlinkDDL(flinkConfig, name);
} }
public String getFlinkDDL(String flinkConfig, String tableName) { public String getFlinkDDL(String flinkConfig, String tableName) {
......
package com.dlink.model;
/**
* 分库分表的类型
*/
public enum TableType {
/**
* 分库分表
*/
SPLIT_DATABASE_AND_TABLE,
/**
* 分表单库
*/
SPLIT_DATABASE_AND_SINGLE_TABLE,
/**
* 单库分表
*/
SINGLE_DATABASE_AND_SPLIT_TABLE
/**
* 单库单表
*/
, SINGLE_DATABASE_AND_TABLE;
public static TableType type(boolean splitDatabase, boolean splitTable) {
if (splitDatabase && splitTable) {
return TableType.SPLIT_DATABASE_AND_TABLE;
}
if (splitTable) {
return TableType.SINGLE_DATABASE_AND_SPLIT_TABLE;
}
if (!splitDatabase) {
return TableType.SINGLE_DATABASE_AND_TABLE;
}
return TableType.SPLIT_DATABASE_AND_SINGLE_TABLE;
}
}
package com.dlink.utils;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
/**
* 分库分表的工具类
*
* @author ZackYoung
* @version 1.0
* @since 2022/9/2
*/
@Slf4j
public class SplitUtil {
public static boolean contains(String regex, String sourceData) {
return Pattern.matches(regex, sourceData);
}
public static boolean isSplit(String value, Map<String, String> splitConfig) {
String matchNumberRegex = splitConfig.get("match_number_regex");
Pattern pattern = Pattern.compile(matchNumberRegex);
Matcher matcher = pattern.matcher(value);
if (matcher.find()) {
int splitNum = Integer.parseInt(matcher.group(0).replaceFirst("_", ""));
int maxMatchValue = Integer.parseInt(splitConfig.get("max_match_value"));
return splitNum <= maxMatchValue;
}
return false;
}
public static String getReValue(String value, Map<String, String> splitConfig) {
if (isEnabled(splitConfig)) {
try {
String matchNumberRegex = splitConfig.get("match_number_regex");
String matchWay = splitConfig.get("match_way");
Pattern pattern = Pattern.compile(matchNumberRegex);
Matcher matcher = pattern.matcher(value);
// Determine whether it is a prefix or a suffix
if ("prefix".equalsIgnoreCase(matchWay)) {
if (matcher.find()) {
String num = matcher.group(0);
int splitNum = Integer.parseInt(num.replaceFirst("_", ""));
int maxMatchValue = Integer.parseInt(splitConfig.get("max_match_value"));
if (splitNum <= maxMatchValue) {
return value.substring(0, value.lastIndexOf(num));
}
}
} else {
String num = null;
while (matcher.find()) {
num = matcher.group(0);
}
if (num == null) {
return value;
}
int splitNum = Integer.parseInt(num.replaceFirst("_", ""));
int maxMatchValue = Integer.parseInt(splitConfig.get("max_match_value"));
if (splitNum <= maxMatchValue) {
return value.substring(0, value.lastIndexOf(num));
}
}
} catch (Exception ignored) {
log.warn("Unable to determine sub-database sub-table");
}
}
return value;
}
public static boolean isEnabled(Map<String, String> split) {
return Boolean.parseBoolean(split.get("enable"));
}
}
...@@ -50,18 +50,19 @@ public class CDCSource { ...@@ -50,18 +50,19 @@ public class CDCSource {
private String table; private String table;
private String startupMode; private String startupMode;
private Map<String, String> debezium; private Map<String, String> debezium;
private Map<String, String> split;
private Map<String, String> jdbc; private Map<String, String> jdbc;
private Map<String, String> source; private Map<String, String> source;
private Map<String, String> sink; private Map<String, String> sink;
private List<Map<String, String>> sinks; private List<Map<String, String>> sinks;
public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode, public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode,
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink, Map<String, String> jdbc) { Map<String, String> split, Map<String, String> debezium, Map<String, String> source, Map<String, String> sink, Map<String, String> jdbc) {
this(connector, statement, name, hostname, port, username, password, checkpoint, parallelism, startupMode, debezium, source, sink, null, jdbc); this(connector, statement, name, hostname, port, username, password, checkpoint, parallelism, startupMode, split, debezium, source, sink, null, jdbc);
} }
public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode, public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode,
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink, List<Map<String, String>> sinks, Map<String, String> jdbc) { Map<String, String> split, Map<String, String> debezium, Map<String, String> source, Map<String, String> sink, List<Map<String, String>> sinks, Map<String, String> jdbc) {
this.connector = connector; this.connector = connector;
this.statement = statement; this.statement = statement;
this.name = name; this.name = name;
...@@ -73,6 +74,7 @@ public class CDCSource { ...@@ -73,6 +74,7 @@ public class CDCSource {
this.parallelism = parallelism; this.parallelism = parallelism;
this.startupMode = startupMode; this.startupMode = startupMode;
this.debezium = debezium; this.debezium = debezium;
this.split = split;
this.jdbc = jdbc; this.jdbc = jdbc;
this.source = source; this.source = source;
this.sink = sink; this.sink = sink;
...@@ -83,6 +85,7 @@ public class CDCSource { ...@@ -83,6 +85,7 @@ public class CDCSource {
Map<String, List<String>> map = SingleSqlParserFactory.generateParser(statement); Map<String, List<String>> map = SingleSqlParserFactory.generateParser(statement);
Map<String, String> config = getKeyValue(map.get("WITH")); Map<String, String> config = getKeyValue(map.get("WITH"));
Map<String, String> debezium = new HashMap<>(); Map<String, String> debezium = new HashMap<>();
Map<String, String> split = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) { for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("debezium.")) { if (entry.getKey().startsWith("debezium.")) {
String key = entry.getKey(); String key = entry.getKey();
...@@ -92,6 +95,16 @@ public class CDCSource { ...@@ -92,6 +95,16 @@ public class CDCSource {
} }
} }
} }
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("split.")) {
String key = entry.getKey();
key = key.replaceFirst("split.", "");
if (!split.containsKey(key)) {
split.put(key, entry.getValue());
}
}
}
splitMapInit(split);
Map<String, String> source = new HashMap<>(); Map<String, String> source = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) { for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("source.")) { if (entry.getKey().startsWith("source.")) {
...@@ -161,6 +174,7 @@ public class CDCSource { ...@@ -161,6 +174,7 @@ public class CDCSource {
Integer.valueOf(config.get("checkpoint")), Integer.valueOf(config.get("checkpoint")),
Integer.valueOf(config.get("parallelism")), Integer.valueOf(config.get("parallelism")),
config.get("scan.startup.mode"), config.get("scan.startup.mode"),
split,
debezium, debezium,
source, source,
sink, sink,
...@@ -179,6 +193,13 @@ public class CDCSource { ...@@ -179,6 +193,13 @@ public class CDCSource {
return cdcSource; return cdcSource;
} }
private static void splitMapInit(Map<String, String> split) {
split.putIfAbsent("max_match_value", "100");
split.putIfAbsent("match_number_regex", "_[0-9]+");
split.putIfAbsent("match_way", "suffix");
split.putIfAbsent("enable", "false");
}
private static Map<String, String> getKeyValue(List<String> list) { private static Map<String, String> getKeyValue(List<String> list) {
Map<String, String> map = new HashMap<>(); Map<String, String> map = new HashMap<>();
Pattern p = Pattern.compile("'(.*?)'\\s*=\\s*'(.*?)'"); Pattern p = Pattern.compile("'(.*?)'\\s*=\\s*'(.*?)'");
...@@ -311,6 +332,18 @@ public class CDCSource { ...@@ -311,6 +332,18 @@ public class CDCSource {
this.debezium = debezium; this.debezium = debezium;
} }
public Map<String, String> getSplit() {
return split;
}
public void setSplit(Map<String, String> split) {
this.split = split;
}
public void setSinks(List<Map<String, String>> sinks) {
this.sinks = sinks;
}
public Map<String, String> getSource() { public Map<String, String> getSource() {
return source; return source;
} }
......
...@@ -31,14 +31,18 @@ import com.dlink.model.Schema; ...@@ -31,14 +31,18 @@ import com.dlink.model.Schema;
import com.dlink.model.Table; import com.dlink.model.Table;
import com.dlink.trans.AbstractOperation; import com.dlink.trans.AbstractOperation;
import com.dlink.trans.Operation; import com.dlink.trans.Operation;
import com.dlink.utils.SplitUtil;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/** /**
* CreateCDCSourceOperation * CreateCDCSourceOperation
...@@ -71,8 +75,8 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -71,8 +75,8 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
logger.info("Start build CDCSOURCE Task..."); logger.info("Start build CDCSOURCE Task...");
CDCSource cdcSource = CDCSource.build(statement); CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername() FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername()
, cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema() , cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema()
, cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSource(), cdcSource.getSink(), cdcSource.getJdbc()); , cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getSplit(), cdcSource.getDebezium(), cdcSource.getSource(), cdcSource.getSink(), cdcSource.getJdbc());
try { try {
CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config); CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config);
Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs(); Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs();
...@@ -81,34 +85,54 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -81,34 +85,54 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
final List<String> schemaNameList = cdcBuilder.getSchemaList(); final List<String> schemaNameList = cdcBuilder.getSchemaList();
final List<String> tableRegList = cdcBuilder.getTableList(); final List<String> tableRegList = cdcBuilder.getTableList();
final List<String> schemaTableNameList = new ArrayList<>(); final List<String> schemaTableNameList = new ArrayList<>();
for (String schemaName : schemaNameList) { if (SplitUtil.isEnabled(cdcSource.getSplit())) {
Schema schema = Schema.build(schemaName); DriverConfig driverConfig = DriverConfig.build(cdcBuilder.parseMetaDataConfig());
if (!allConfigMap.containsKey(schemaName)) {
continue;
}
DriverConfig driverConfig = DriverConfig.build(allConfigMap.get(schemaName));
Driver driver = Driver.build(driverConfig); Driver driver = Driver.build(driverConfig);
final List<Table> tables = driver.listTables(schemaName);
// 这直接传正则过去
schemaTableNameList.addAll(tableRegList.stream().map(x -> x.replaceFirst("\\\\.", ".")).collect(Collectors.toList()));
Set<Table> tables = driver.getSplitTables(tableRegList, cdcSource.getSplit());
for (Table table : tables) { for (Table table : tables) {
if (!Asserts.isEquals(table.getType(), "VIEW")) { String schemaName = table.getSchema();
if (Asserts.isNotNullCollection(tableRegList)) { Schema schema = Schema.build(schemaName);
for (String tableReg : tableRegList) { schema.setTables(Collections.singletonList(table));
if (table.getSchemaTableName().matches(tableReg.trim()) && !schema.getTables().contains(Table.build(table.getName()))) { table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName()));
table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName())); schemaList.add(schema);
schema.getTables().add(table); }
schemaTableNameList.add(table.getSchemaTableName());
break; } else {
for (String schemaName : schemaNameList) {
Schema schema = Schema.build(schemaName);
if (!allConfigMap.containsKey(schemaName)) {
continue;
}
DriverConfig driverConfig = DriverConfig.build(allConfigMap.get(schemaName));
Driver driver = Driver.build(driverConfig);
final List<Table> tables = driver.listTables(schemaName);
for (Table table : tables) {
if (!Asserts.isEquals(table.getType(), "VIEW")) {
if (Asserts.isNotNullCollection(tableRegList)) {
for (String tableReg : tableRegList) {
if (table.getSchemaTableName().matches(tableReg.trim()) && !schema.getTables().contains(Table.build(table.getName()))) {
table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName()));
schema.getTables().add(table);
schemaTableNameList.add(table.getSchemaTableName());
break;
}
} }
} else {
table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName()));
schemaTableNameList.add(table.getSchemaTableName());
schema.getTables().add(table);
} }
} else {
table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName()));
schemaTableNameList.add(table.getSchemaTableName());
schema.getTables().add(table);
} }
} }
schemaList.add(schema);
} }
schemaList.add(schema);
} }
logger.info("A total of " + schemaTableNameList.size() + " tables were detected..."); logger.info("A total of " + schemaTableNameList.size() + " tables were detected...");
for (int i = 0; i < schemaTableNameList.size(); i++) { for (int i = 0; i < schemaTableNameList.size(); i++) {
logger.info((i + 1) + ": " + schemaTableNameList.get(i)); logger.info((i + 1) + ": " + schemaTableNameList.get(i));
...@@ -140,4 +164,5 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -140,4 +164,5 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
} }
return null; return null;
} }
} }
...@@ -26,6 +26,7 @@ import com.dlink.model.Schema; ...@@ -26,6 +26,7 @@ import com.dlink.model.Schema;
import com.dlink.model.Table; import com.dlink.model.Table;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
...@@ -83,4 +84,8 @@ public abstract class AbstractDriver implements Driver { ...@@ -83,4 +84,8 @@ public abstract class AbstractDriver implements Driver {
return listTables(table.getSchema()).stream().anyMatch(tableItem -> Asserts.isEquals(tableItem.getName(), table.getName())); return listTables(table.getSchema()).stream().anyMatch(tableItem -> Asserts.isEquals(tableItem.getName(), table.getName()));
} }
@Override
public List<Map<String, String>> getSplitSchemaList() {
throw new RuntimeException("该数据源暂不支持分库分表");
}
} }
...@@ -19,6 +19,10 @@ ...@@ -19,6 +19,10 @@
package com.dlink.metadata.driver; package com.dlink.metadata.driver;
import static com.dlink.utils.SplitUtil.contains;
import static com.dlink.utils.SplitUtil.getReValue;
import static com.dlink.utils.SplitUtil.isSplit;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.constant.CommonConstant; import com.dlink.constant.CommonConstant;
import com.dlink.metadata.query.IDBQuery; import com.dlink.metadata.query.IDBQuery;
...@@ -27,6 +31,7 @@ import com.dlink.model.Column; ...@@ -27,6 +31,7 @@ import com.dlink.model.Column;
import com.dlink.model.QueryData; import com.dlink.model.QueryData;
import com.dlink.model.Schema; import com.dlink.model.Schema;
import com.dlink.model.Table; import com.dlink.model.Table;
import com.dlink.model.TableType;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import com.dlink.utils.TextUtil; import com.dlink.utils.TextUtil;
...@@ -38,11 +43,19 @@ import java.sql.ResultSet; ...@@ -38,11 +43,19 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -95,6 +108,7 @@ public abstract class AbstractJdbcDriver extends AbstractDriver { ...@@ -95,6 +108,7 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
return dataSource; return dataSource;
} }
@Override
public Driver setDriverConfig(DriverConfig config) { public Driver setDriverConfig(DriverConfig config) {
this.config = config; this.config = config;
try { try {
...@@ -584,4 +598,94 @@ public abstract class AbstractJdbcDriver extends AbstractDriver { ...@@ -584,4 +598,94 @@ public abstract class AbstractJdbcDriver extends AbstractDriver {
public Map<String, String> getFlinkColumnTypeConversion() { public Map<String, String> getFlinkColumnTypeConversion() {
return new HashMap<>(); return new HashMap<>();
} }
public List<Map<String, String>> getSplitSchemaList() {
PreparedStatement preparedStatement = null;
ResultSet results = null;
IDBQuery dbQuery = getDBQuery();
String sql = "select DATA_LENGTH,TABLE_NAME AS `NAME`,TABLE_SCHEMA AS `Database`,TABLE_COMMENT AS COMMENT,TABLE_CATALOG AS `CATALOG`,TABLE_TYPE"
+ " AS `TYPE`,ENGINE AS `ENGINE`,CREATE_OPTIONS AS `OPTIONS`,TABLE_ROWS AS `ROWS`,CREATE_TIME,UPDATE_TIME from information_schema.tables WHERE TABLE_TYPE='BASE TABLE'";
List<Map<String, String>> schemas = null;
try {
preparedStatement = conn.get().prepareStatement(sql);
results = preparedStatement.executeQuery();
ResultSetMetaData metaData = results.getMetaData();
List<String> columnList = new ArrayList<>();
schemas = new ArrayList<>();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
columnList.add(metaData.getColumnLabel(i));
}
while (results.next()) {
Map<String, String> map = new HashMap<>();
for (String column : columnList) {
map.put(column, results.getString(column));
}
schemas.add(map);
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
close(preparedStatement, results);
}
return schemas;
}
@Override
public Set<Table> getSplitTables(List<String> tableRegList, Map<String, String> splitConfig) {
Set<Table> set = new HashSet<>();
List<Map<String, String>> schemaList = getSplitSchemaList();
IDBQuery dbQuery = getDBQuery();
for (String table : tableRegList) {
String[] split = table.split("\\\\.");
String database = split[0];
String tableName = split[1];
// 匹配对应的表
List<Map<String, String>> mapList = schemaList.stream()
// 过滤不匹配的表
.filter(x -> contains(database, x.get(dbQuery.schemaName())) && contains(tableName, x.get(dbQuery.tableName()))).collect(Collectors.toList());
List<Table> tableList = mapList.stream()
// 去重
.collect(Collectors.collectingAndThen(Collectors.toCollection(
() -> new TreeSet<>(Comparator.comparing(x -> getReValue(x.get(dbQuery.schemaName()), splitConfig) + "." + getReValue(x.get(dbQuery.tableName()), splitConfig)))), ArrayList::new))
.stream().map(x -> {
Table tableInfo = new Table();
tableInfo.setName(getReValue(x.get(dbQuery.tableName()), splitConfig));
tableInfo.setComment(x.get(dbQuery.tableComment()));
tableInfo.setSchema(getReValue(x.get(dbQuery.schemaName()), splitConfig));
tableInfo.setType(x.get(dbQuery.tableType()));
tableInfo.setCatalog(x.get(dbQuery.catalogName()));
tableInfo.setEngine(x.get(dbQuery.engine()));
tableInfo.setOptions(x.get(dbQuery.options()));
tableInfo.setRows(Long.valueOf(x.get(dbQuery.rows())));
try {
tableInfo.setCreateTime(SimpleDateFormat.getDateInstance().parse(x.get(dbQuery.createTime())));
String updateTime = x.get(dbQuery.updateTime());
if (Asserts.isNotNullString(updateTime)) {
tableInfo.setUpdateTime(SimpleDateFormat.getDateInstance().parse(updateTime));
}
} catch (ParseException ignored) {
logger.warn("set date fail");
}
TableType tableType = TableType.type(isSplit(x.get(dbQuery.schemaName()), splitConfig), isSplit(x.get(dbQuery.tableName()), splitConfig));
tableInfo.setTableType(tableType);
if (tableType != TableType.SINGLE_DATABASE_AND_TABLE) {
String currentSchemaName = getReValue(x.get(dbQuery.schemaName()), splitConfig) + "." + getReValue(x.get(dbQuery.tableName()), splitConfig);
List<String> schemaTableNameList =
mapList.stream().filter(y -> (getReValue(y.get(dbQuery.schemaName()), splitConfig) + "." + getReValue(y.get(dbQuery.tableName()), splitConfig)).equals(currentSchemaName))
.map(y -> y.get(dbQuery.schemaName()) + "." + y.get(dbQuery.tableName())).collect(Collectors.toList());
tableInfo.setSchemaTableNameList(schemaTableNameList);
} else {
tableInfo.setSchemaTableNameList(Collections.singletonList(x.get(dbQuery.schemaName()) + "." + x.get(dbQuery.tableName())));
}
return tableInfo;
}).collect(Collectors.toList());
set.addAll(tableList);
}
return set;
}
} }
...@@ -21,6 +21,7 @@ package com.dlink.metadata.driver; ...@@ -21,6 +21,7 @@ package com.dlink.metadata.driver;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.exception.MetaDataException; import com.dlink.exception.MetaDataException;
import com.dlink.exception.SplitTableException;
import com.dlink.metadata.result.JdbcSelectResult; import com.dlink.metadata.result.JdbcSelectResult;
import com.dlink.model.Column; import com.dlink.model.Column;
import com.dlink.model.QueryData; import com.dlink.model.QueryData;
...@@ -32,6 +33,7 @@ import java.util.List; ...@@ -32,6 +33,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import java.util.Set;
/** /**
* Driver * Driver
...@@ -145,4 +147,19 @@ public interface Driver { ...@@ -145,4 +147,19 @@ public interface Driver {
List<SqlExplainResult> explain(String sql); List<SqlExplainResult> explain(String sql);
Map<String, String> getFlinkColumnTypeConversion(); Map<String, String> getFlinkColumnTypeConversion();
/**
* 得到分割表
*
* @param tableRegList 表正则列表
* @param splitConfig 分库配置
* @return {@link Set}<{@link Table}>
*/
default Set<Table> getSplitTables(List<String> tableRegList, Map<String, String> splitConfig) {
throw new SplitTableException("目前此数据源不支持分库分表");
}
;
List<Map<String, String>> getSplitSchemaList();
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment