Unverified Commit d5ff1efc authored by 金鑫's avatar 金鑫 Committed by GitHub

cdcsource增加多目标库同步功能 (#969)

单目标使用 'sink.*'
多目标使用 'sink[N].*', N为0开始的index索引.
Co-authored-by: 's avatar金鑫 <jinyanhui@huansi.net>
parent 40211153
...@@ -42,6 +42,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; ...@@ -42,6 +42,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
...@@ -66,6 +67,7 @@ import java.util.HashMap; ...@@ -66,6 +67,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.bind.DatatypeConverter; import javax.xml.bind.DatatypeConverter;
...@@ -76,9 +78,9 @@ import javax.xml.bind.DatatypeConverter; ...@@ -76,9 +78,9 @@ import javax.xml.bind.DatatypeConverter;
* @since 2022/4/25 23:02 * @since 2022/4/25 23:02
*/ */
public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable { public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, Serializable {
private static final String KEY_WORD = "sql"; private static final String KEY_WORD = "sql";
private static final long serialVersionUID = -3699685106324048226L; private static final long serialVersionUID = -3699685106324048226L;
private static AtomicInteger atomicInteger = new AtomicInteger(0);
private ZoneId sinkTimeZone = ZoneId.of("UTC"); private ZoneId sinkTimeZone = ZoneId.of("UTC");
public SQLSinkBuilder() { public SQLSinkBuilder() {
...@@ -90,7 +92,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -90,7 +92,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
@Override @Override
public void addSink(StreamExecutionEnvironment env, DataStream<RowData> rowDataDataStream, Table table, List<String> columnNameList, List<LogicalType> columnTypeList) { public void addSink(StreamExecutionEnvironment env, DataStream<RowData> rowDataDataStream, Table table, List<String> columnNameList, List<LogicalType> columnTypeList) {
} }
private DataStream<Row> buildRow( private DataStream<Row> buildRow(
...@@ -100,10 +101,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -100,10 +101,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
String schemaTableName) { String schemaTableName) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]); final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]); final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
TypeInformation<?>[] typeInformations = TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.fromLogicalToDataType(columnTypes)); TypeInformation<?>[] typeInformations = TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.fromLogicalToDataType(columnTypes));
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames); RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() { .flatMap(new FlatMapFunction<Map, Row>() {
@Override @Override
...@@ -152,18 +151,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -152,18 +151,25 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
private void addTableSink( private void addTableSink(
CustomTableEnvironment customTableEnvironment, int indexSink,
DataStream<Row> rowDataDataStream, CustomTableEnvironment customTableEnvironment,
Table table, DataStream<Row> rowDataDataStream,
List<String> columnNameList) { Table table,
List<String> columnNameList) {
String sinkSchemaName = getSinkSchemaName(table); String sinkSchemaName = getSinkSchemaName(table);
String sinkTableName = getSinkTableName(table); String tableName = getSinkTableName(table);
String sinkTableName = tableName + "_" + indexSink;
String pkList = StringUtils.join(getPKList(table), "."); String pkList = StringUtils.join(getPKList(table), ".");
String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline(); String viewName = "VIEW_" + table.getSchemaTableNameWithUnderline();
customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ",")); try {
logger.info("Create " + viewName + " temporaryView successful..."); customTableEnvironment.createTemporaryView(viewName, rowDataDataStream, StringUtils.join(columnNameList, ","));
String flinkDDL = FlinkBaseUtil.getFlinkDDL(table, sinkTableName, config, sinkSchemaName, sinkTableName, pkList); logger.info("Create " + viewName + " temporaryView successful...");
} catch (ValidationException exception) {
if (!exception.getMessage().contains("already exists")) {
logger.error(exception.getMessage(), exception);
}
}
String flinkDDL = FlinkBaseUtil.getFlinkDDL(table, "" + sinkTableName, config, sinkSchemaName, tableName, pkList);
logger.info(flinkDDL); logger.info(flinkDDL);
customTableEnvironment.executeSql(flinkDDL); customTableEnvironment.executeSql(flinkDDL);
logger.info("Create " + sinkTableName + " FlinkSQL DDL successful..."); logger.info("Create " + sinkTableName + " FlinkSQL DDL successful...");
...@@ -196,10 +202,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -196,10 +202,10 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
@Override @Override
public DataStreamSource build( public DataStreamSource build(
CDCBuilder cdcBuilder, CDCBuilder cdcBuilder,
StreamExecutionEnvironment env, StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone"); final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone"); config.getSink().remove("timezone");
if (Asserts.isNotNullString(timeZone)) { if (Asserts.isNotNullString(timeZone)) {
...@@ -207,7 +213,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -207,7 +213,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
logger.info("Build deserialize successful..."); logger.info("Build deserialize successful...");
Map<Table, OutputTag<Map>> tagMap = new HashMap<>(); Map<Table, OutputTag<Map>> tagMap = new HashMap<>();
Map<String, Table> tableMap = new HashMap<>(); Map<String, Table> tableMap = new HashMap<>();
...@@ -218,13 +223,11 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -218,13 +223,11 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}; };
tagMap.put(table, outputTag); tagMap.put(table, outputTag);
tableMap.put(table.getSchemaTableName(), table); tableMap.put(table.getSchemaTableName(), table);
} }
} }
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(x -> objectMapper.readValue(x,Map.class)).returns(Map.class); SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(x -> objectMapper.readValue(x, Map.class)).returns(Map.class);
SingleOutputStreamOperator<Map> processOperator = mapOperator.process(new ProcessFunction<Map, Map>() { SingleOutputStreamOperator<Map> processOperator = mapOperator.process(new ProcessFunction<Map, Map>() {
@Override @Override
public void processElement(Map map, ProcessFunction<Map, Map>.Context ctx, Collector<Map> out) throws Exception { public void processElement(Map map, ProcessFunction<Map, Map>.Context ctx, Collector<Map> out) throws Exception {
...@@ -239,7 +242,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -239,7 +242,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
} }
}); });
tagMap.forEach((table,tag) -> { final int indexSink = atomicInteger.getAndAdd(1);
tagMap.forEach((table, tag) -> {
final String schemaTableName = table.getSchemaTableName(); final String schemaTableName = table.getSchemaTableName();
try { try {
DataStream<Map> filterOperator = shunt(processOperator, table, tag); DataStream<Map> filterOperator = shunt(processOperator, table, tag);
...@@ -250,13 +254,12 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -250,13 +254,12 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName).rebalance(); DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName).rebalance();
logger.info("Build " + schemaTableName + " flatMap successful..."); logger.info("Build " + schemaTableName + " flatMap successful...");
logger.info("Start build " + schemaTableName + " sink..."); logger.info("Start build " + schemaTableName + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); addTableSink(indexSink, customTableEnvironment, rowDataDataStream, table, columnNameList);
} catch (Exception e) { } catch (Exception e) {
logger.error("Build " + schemaTableName + " cdc sync failed..."); logger.error("Build " + schemaTableName + " cdc sync failed...");
logger.error(LogUtil.getError(e)); logger.error(LogUtil.getError(e));
} }
}); });
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations); List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) { for (Transformation<?> item : trans) {
env.addOperator(item); env.addOperator(item);
......
...@@ -22,6 +22,7 @@ package com.dlink.trans.ddl; ...@@ -22,6 +22,7 @@ package com.dlink.trans.ddl;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.parser.SingleSqlParserFactory; import com.dlink.parser.SingleSqlParserFactory;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -35,7 +36,6 @@ import java.util.regex.Pattern; ...@@ -35,7 +36,6 @@ import java.util.regex.Pattern;
* @since 2022/1/29 23:30 * @since 2022/1/29 23:30
*/ */
public class CDCSource { public class CDCSource {
private String connector; private String connector;
private String statement; private String statement;
private String name; private String name;
...@@ -53,9 +53,15 @@ public class CDCSource { ...@@ -53,9 +53,15 @@ public class CDCSource {
private Map<String, String> jdbc; private Map<String, String> jdbc;
private Map<String, String> source; private Map<String, String> source;
private Map<String, String> sink; private Map<String, String> sink;
private List<Map<String, String>> sinks;
public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode, public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode,
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink, Map<String, String> jdbc) { Map<String, String> debezium, Map<String, String> source, Map<String, String> sink, Map<String, String> jdbc) {
this(connector, statement, name, hostname, port, username, password, checkpoint, parallelism, startupMode, debezium, source, sink, null, jdbc);
}
public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode,
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink, List<Map<String, String>> sinks, Map<String, String> jdbc) {
this.connector = connector; this.connector = connector;
this.statement = statement; this.statement = statement;
this.name = name; this.name = name;
...@@ -70,6 +76,7 @@ public class CDCSource { ...@@ -70,6 +76,7 @@ public class CDCSource {
this.jdbc = jdbc; this.jdbc = jdbc;
this.source = source; this.source = source;
this.sink = sink; this.sink = sink;
this.sinks = sinks;
} }
public static CDCSource build(String statement) { public static CDCSource build(String statement) {
...@@ -106,7 +113,6 @@ public class CDCSource { ...@@ -106,7 +113,6 @@ public class CDCSource {
} }
} }
} }
Map<String, String> sink = new HashMap<>(); Map<String, String> sink = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) { for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("sink.")) { if (entry.getKey().startsWith("sink.")) {
...@@ -117,6 +123,33 @@ public class CDCSource { ...@@ -117,6 +123,33 @@ public class CDCSource {
} }
} }
} }
/**
* 支持多目标写入功能, 从0开始顺序写入配置.
*/
Map<String, Map<String, String>> sinks = new HashMap<>();
final Pattern p = Pattern.compile("sink\\[(?<index>.*)\\]");
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("sink[")) {
String key = entry.getKey();
Matcher matcher = p.matcher(key);
if (matcher.find()) {
final String index = matcher.group("index");
Map<String, String> sinkMap = sinks.get(index);
if (sinkMap == null) {
sinkMap = new HashMap<>();
sinks.put(index, sinkMap);
}
key = key.replaceFirst("sink\\[" + index + "\\].", "");
if (!sinkMap.containsKey(key)) {
sinkMap.put(key, entry.getValue());
}
}
}
}
final ArrayList<Map<String, String>> sinkList = new ArrayList<>(sinks.values());
if (sink.isEmpty() && sinkList.size() > 0) {
sink = sinkList.get(0);
}
CDCSource cdcSource = new CDCSource( CDCSource cdcSource = new CDCSource(
config.get("connector"), config.get("connector"),
statement, statement,
...@@ -131,6 +164,7 @@ public class CDCSource { ...@@ -131,6 +164,7 @@ public class CDCSource {
debezium, debezium,
source, source,
sink, sink,
sinkList,
jdbc jdbc
); );
if (Asserts.isNotNullString(config.get("database-name"))) { if (Asserts.isNotNullString(config.get("database-name"))) {
...@@ -292,4 +326,8 @@ public class CDCSource { ...@@ -292,4 +326,8 @@ public class CDCSource {
public void setJdbc(Map<String, String> jdbc) { public void setJdbc(Map<String, String> jdbc) {
this.jdbc = jdbc; this.jdbc = jdbc;
} }
public List<Map<String, String>> getSinks() {
return sinks;
}
} }
...@@ -47,7 +47,6 @@ import java.util.Map; ...@@ -47,7 +47,6 @@ import java.util.Map;
* @since 2022/1/29 23:25 * @since 2022/1/29 23:25
*/ */
public class CreateCDCSourceOperation extends AbstractOperation implements Operation { public class CreateCDCSourceOperation extends AbstractOperation implements Operation {
private static final String KEY_WORD = "EXECUTE CDCSOURCE"; private static final String KEY_WORD = "EXECUTE CDCSOURCE";
public CreateCDCSourceOperation() { public CreateCDCSourceOperation() {
...@@ -72,8 +71,8 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -72,8 +71,8 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
logger.info("Start build CDCSOURCE Task..."); logger.info("Start build CDCSOURCE Task...");
CDCSource cdcSource = CDCSource.build(statement); CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername() FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername()
, cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema() , cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema()
, cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSource(), cdcSource.getSink(),cdcSource.getJdbc()); , cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSource(), cdcSource.getSink(), cdcSource.getJdbc());
try { try {
CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config); CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config);
Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs(); Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs();
...@@ -127,10 +126,17 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera ...@@ -127,10 +126,17 @@ public class CreateCDCSourceOperation extends AbstractOperation implements Opera
} }
DataStreamSource<String> streamSource = cdcBuilder.build(streamExecutionEnvironment); DataStreamSource<String> streamSource = cdcBuilder.build(streamExecutionEnvironment);
logger.info("Build " + config.getType() + " successful..."); logger.info("Build " + config.getType() + " successful...");
SinkBuilderFactory.buildSinkBuilder(config).build(cdcBuilder, streamExecutionEnvironment, executor.getCustomTableEnvironment(), streamSource); if (cdcSource.getSinks() == null || cdcSource.getSinks().size() == 0) {
SinkBuilderFactory.buildSinkBuilder(config).build(cdcBuilder, streamExecutionEnvironment, executor.getCustomTableEnvironment(), streamSource);
} else {
for (Map<String, String> sink : cdcSource.getSinks()) {
config.setSink(sink);
SinkBuilderFactory.buildSinkBuilder(config).build(cdcBuilder, streamExecutionEnvironment, executor.getCustomTableEnvironment(), streamSource);
}
}
logger.info("Build CDCSOURCE Task successful!"); logger.info("Build CDCSOURCE Task successful!");
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); logger.error(e.getMessage(), e);
} }
return null; return null;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment