Unverified Commit d09efa36 authored by ZackYoung's avatar ZackYoung Committed by GitHub

Optimize cdc SQLSinkBuilder KafkaSinkBuilder, filter to process (#806)

* 优化 cdc SQLSinkBuilder KafkaSinkBuilder, filter to process

* change "_" to "."

* change "db" to config.getSchemaFieldName()

* change "tableMap key" to table.getSchemaTableName()
parent ebe5b6af
...@@ -37,6 +37,7 @@ import org.apache.flink.table.operations.ModifyOperation; ...@@ -37,6 +37,7 @@ import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.logical.*;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -110,7 +111,13 @@ public abstract class AbstractSinkBuilder { ...@@ -110,7 +111,13 @@ public abstract class AbstractSinkBuilder {
} }
}); });
} }
protected DataStream<Map> shunt(
SingleOutputStreamOperator<Map> processOperator,
Table table,
OutputTag<Map> tag) {
return processOperator.getSideOutput(tag);
}
protected DataStream<RowData> buildRowData( protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
......
...@@ -20,31 +20,32 @@ ...@@ -20,31 +20,32 @@
package com.dlink.cdc.kafka; package com.dlink.cdc.kafka;
import org.apache.flink.api.common.functions.FilterFunction; import com.dlink.assertion.Asserts;
import org.apache.flink.api.common.functions.MapFunction; import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/** /**
* MysqlCDCBuilder * MysqlCDCBuilder
* *
...@@ -89,44 +90,47 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -89,44 +90,47 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
if (Asserts.isNotNullString(config.getSink().get("topic"))) { if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"), dataStreamSource.addSink(new FlinkKafkaProducer<>(config.getSink().get("brokers"),
config.getSink().get("topic"), config.getSink().get("topic"),
new SimpleStringSchema())); new SimpleStringSchema()));
} else { } else {
Map<Table, OutputTag<String>> tagMap = new HashMap<>();
Map<String, Table> tableMap = new HashMap<>();
ObjectMapper objectMapper = new ObjectMapper();
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(x -> objectMapper.readValue(x,Map.class)).returns(Map.class);
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(value, Map.class);
}
});
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
final String tableName = table.getName(); String sinkTableName = getSinkTableName(table);
final String schemaName = table.getSchema(); OutputTag<String> outputTag = new OutputTag<String>(sinkTableName) {
SingleOutputStreamOperator<Map> filterOperator = mapOperator.filter(new FilterFunction<Map>() { };
@Override tagMap.put(table, outputTag);
public boolean filter(Map value) throws Exception { tableMap.put(table.getSchemaTableName(), table);
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get(schemaFieldName).toString());
} }
});
SingleOutputStreamOperator<String> stringOperator = filterOperator.map(new MapFunction<Map, String>() {
@Override
public String map(Map value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(value);
} }
}); SingleOutputStreamOperator<String> process = mapOperator.process(new ProcessFunction<Map, String>() {
stringOperator.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"), @Override
getSinkTableName(table), public void processElement(Map map, ProcessFunction<Map, String>.Context ctx, Collector<String> out) throws Exception {
new SimpleStringSchema())); LinkedHashMap source = (LinkedHashMap) map.get("source");
try {
String result = objectMapper.writeValueAsString(map);
Table table = tableMap.get(source.get(schemaFieldName).toString() + "." + source.get("table").toString());
OutputTag<String> outputTag = tagMap.get(table);
ctx.output(outputTag, result);
} catch (Exception e) {
out.collect(objectMapper.writeValueAsString(map));
} }
} }
});
tagMap.forEach((k, v) -> {
String topic = getSinkTableName(k);
process.getSideOutput(v).rebalance().addSink(new FlinkKafkaProducer<>(config.getSink().get("brokers"),
topic, new SimpleStringSchema())).name(topic);
});
} }
} }
return dataStreamSource; return dataStreamSource;
......
...@@ -20,15 +20,28 @@ ...@@ -20,15 +20,28 @@
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
...@@ -37,27 +50,14 @@ import org.apache.flink.table.types.utils.TypeConversions; ...@@ -37,27 +50,14 @@ import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import javax.xml.bind.DatatypeConverter; import javax.xml.bind.DatatypeConverter;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
/** /**
* SQLSinkBuilder * SQLSinkBuilder
...@@ -83,7 +83,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -83,7 +83,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
private DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, DataStream<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList, List<LogicalType> columnTypeList,
String schemaTableName) { String schemaTableName) {
...@@ -189,20 +189,47 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -189,20 +189,47 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
logger.info("Build deserialize successful..."); logger.info("Build deserialize successful...");
Map<Table, OutputTag<Map>> tagMap = new HashMap<>();
Map<String, Table> tableMap = new HashMap<>();
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
String sinkTableName = getSinkTableName(table);
OutputTag<Map> outputTag = new OutputTag<Map>(sinkTableName) {
};
tagMap.put(table, outputTag);
tableMap.put(table.getSchemaTableName(), table);
}
}
final String schemaFieldName = config.getSchemaFieldName();
ObjectMapper objectMapper = new ObjectMapper();
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(x -> objectMapper.readValue(x,Map.class)).returns(Map.class);
SingleOutputStreamOperator<Map> processOperator = mapOperator.process(new ProcessFunction<Map, Map>() {
@Override
public void processElement(Map map, ProcessFunction<Map, Map>.Context ctx, Collector<Map> out) throws Exception {
LinkedHashMap source = (LinkedHashMap) map.get("source");
try {
Table table = tableMap.get(source.get(schemaFieldName).toString() + "." + source.get("table").toString());
OutputTag<Map> outputTag = tagMap.get(table);
ctx.output(outputTag, map);
} catch (Exception e) {
out.collect(map);
}
}
});
tagMap.forEach((table,tag) -> {
final String schemaTableName = table.getSchemaTableName(); final String schemaTableName = table.getSchemaTableName();
try { try {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); DataStream<Map> filterOperator = shunt(processOperator, table, tag);
logger.info("Build " + schemaTableName + " shunt successful..."); logger.info("Build " + schemaTableName + " shunt successful...");
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>(); List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName); DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName).rebalance();
logger.info("Build " + schemaTableName + " flatMap successful..."); logger.info("Build " + schemaTableName + " flatMap successful...");
logger.info("Start build " + schemaTableName + " sink..."); logger.info("Start build " + schemaTableName + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
...@@ -210,8 +237,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -210,8 +237,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
logger.error("Build " + schemaTableName + " cdc sync failed..."); logger.error("Build " + schemaTableName + " cdc sync failed...");
logger.error(LogUtil.getError(e)); logger.error(LogUtil.getError(e));
} }
} });
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations); List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) { for (Transformation<?> item : trans) {
env.addOperator(item); env.addOperator(item);
......
...@@ -37,6 +37,7 @@ import org.apache.flink.table.operations.ModifyOperation; ...@@ -37,6 +37,7 @@ import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.logical.*;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -110,7 +111,13 @@ public abstract class AbstractSinkBuilder { ...@@ -110,7 +111,13 @@ public abstract class AbstractSinkBuilder {
} }
}); });
} }
protected DataStream<Map> shunt(
SingleOutputStreamOperator<Map> processOperator,
Table table,
OutputTag<Map> tag) {
return processOperator.getSideOutput(tag);
}
protected DataStream<RowData> buildRowData( protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
......
...@@ -20,31 +20,32 @@ ...@@ -20,31 +20,32 @@
package com.dlink.cdc.kafka; package com.dlink.cdc.kafka;
import org.apache.flink.api.common.functions.FilterFunction; import com.dlink.assertion.Asserts;
import org.apache.flink.api.common.functions.MapFunction; import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/** /**
* MysqlCDCBuilder * MysqlCDCBuilder
* *
...@@ -89,44 +90,48 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -89,44 +90,48 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
if (Asserts.isNotNullString(config.getSink().get("topic"))) { if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"), dataStreamSource.addSink(new FlinkKafkaProducer<>(config.getSink().get("brokers"),
config.getSink().get("topic"), config.getSink().get("topic"),
new SimpleStringSchema())); new SimpleStringSchema()));
} else { } else {
Map<Table, OutputTag<String>> tagMap = new HashMap<>();
Map<String, Table> tableMap = new HashMap<>();
ObjectMapper objectMapper = new ObjectMapper();
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(x -> objectMapper.readValue(x,Map.class)).returns(Map.class);
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(value, Map.class);
}
});
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
final String tableName = table.getName(); String sinkTableName = getSinkTableName(table);
final String schemaName = table.getSchema(); OutputTag<String> outputTag = new OutputTag<String>(sinkTableName) {
SingleOutputStreamOperator<Map> filterOperator = mapOperator.filter(new FilterFunction<Map>() { };
@Override tagMap.put(table, outputTag);
public boolean filter(Map value) throws Exception { tableMap.put(table.getSchemaTableName(), table);
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get(schemaFieldName).toString());
} }
});
SingleOutputStreamOperator<String> stringOperator = filterOperator.map(new MapFunction<Map, String>() {
@Override
public String map(Map value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(value);
} }
});
stringOperator.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"), SingleOutputStreamOperator<String> process = mapOperator.process(new ProcessFunction<Map, String>() {
getSinkTableName(table), @Override
new SimpleStringSchema())); public void processElement(Map map, ProcessFunction<Map, String>.Context ctx, Collector<String> out) throws Exception {
LinkedHashMap source = (LinkedHashMap) map.get("source");
try {
String result = objectMapper.writeValueAsString(map);
Table table = tableMap.get(source.get(schemaFieldName).toString() + "." + source.get("table").toString());
OutputTag<String> outputTag = tagMap.get(table);
ctx.output(outputTag, result);
} catch (Exception e) {
out.collect(objectMapper.writeValueAsString(map));
} }
} }
});
tagMap.forEach((k, v) -> {
String topic = getSinkTableName(k);
process.getSideOutput(v).rebalance().addSink(new FlinkKafkaProducer<>(config.getSink().get("brokers"),
topic, new SimpleStringSchema())).name(topic);
});
} }
} }
return dataStreamSource; return dataStreamSource;
......
...@@ -20,15 +20,28 @@ ...@@ -20,15 +20,28 @@
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
...@@ -37,27 +50,14 @@ import org.apache.flink.table.types.utils.TypeConversions; ...@@ -37,27 +50,14 @@ import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import javax.xml.bind.DatatypeConverter; import javax.xml.bind.DatatypeConverter;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
/** /**
* SQLSinkBuilder * SQLSinkBuilder
...@@ -83,7 +83,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -83,7 +83,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
private DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, DataStream<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList, List<LogicalType> columnTypeList,
String schemaTableName) { String schemaTableName) {
...@@ -189,20 +189,47 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -189,20 +189,47 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
logger.info("Build deserialize successful..."); logger.info("Build deserialize successful...");
Map<Table, OutputTag<Map>> tagMap = new HashMap<>();
Map<String, Table> tableMap = new HashMap<>();
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
String sinkTableName = getSinkTableName(table);
OutputTag<Map> outputTag = new OutputTag<Map>(sinkTableName) {
};
tagMap.put(table, outputTag);
tableMap.put(table.getSchemaTableName(), table);
}
}
final String schemaFieldName = config.getSchemaFieldName();
ObjectMapper objectMapper = new ObjectMapper();
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(x -> objectMapper.readValue(x,Map.class)).returns(Map.class);
SingleOutputStreamOperator<Map> processOperator = mapOperator.process(new ProcessFunction<Map, Map>() {
@Override
public void processElement(Map map, ProcessFunction<Map, Map>.Context ctx, Collector<Map> out) throws Exception {
LinkedHashMap source = (LinkedHashMap) map.get("source");
try {
Table table = tableMap.get(source.get(schemaFieldName).toString() + "." + source.get("table").toString());
OutputTag<Map> outputTag = tagMap.get(table);
ctx.output(outputTag, map);
} catch (Exception e) {
out.collect(map);
}
}
});
tagMap.forEach((table,tag) -> {
final String schemaTableName = table.getSchemaTableName(); final String schemaTableName = table.getSchemaTableName();
try { try {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); DataStream<Map> filterOperator = shunt(processOperator, table, tag);
logger.info("Build " + schemaTableName + " shunt successful..."); logger.info("Build " + schemaTableName + " shunt successful...");
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>(); List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName); DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName).rebalance();
logger.info("Build " + schemaTableName + " flatMap successful..."); logger.info("Build " + schemaTableName + " flatMap successful...");
logger.info("Start build " + schemaTableName + " sink..."); logger.info("Start build " + schemaTableName + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
...@@ -210,8 +237,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -210,8 +237,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
logger.error("Build " + schemaTableName + " cdc sync failed..."); logger.error("Build " + schemaTableName + " cdc sync failed...");
logger.error(LogUtil.getError(e)); logger.error(LogUtil.getError(e));
} }
} });
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations); List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) { for (Transformation<?> item : trans) {
env.addOperator(item); env.addOperator(item);
......
...@@ -37,6 +37,7 @@ import org.apache.flink.table.operations.ModifyOperation; ...@@ -37,6 +37,7 @@ import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.logical.*;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -110,7 +111,13 @@ public abstract class AbstractSinkBuilder { ...@@ -110,7 +111,13 @@ public abstract class AbstractSinkBuilder {
} }
}); });
} }
protected DataStream<Map> shunt(
SingleOutputStreamOperator<Map> processOperator,
Table table,
OutputTag<Map> tag) {
return processOperator.getSideOutput(tag);
}
protected DataStream<RowData> buildRowData( protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
......
...@@ -20,31 +20,32 @@ ...@@ -20,31 +20,32 @@
package com.dlink.cdc.kafka; package com.dlink.cdc.kafka;
import org.apache.flink.api.common.functions.FilterFunction; import com.dlink.assertion.Asserts;
import org.apache.flink.api.common.functions.MapFunction; import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/** /**
* MysqlCDCBuilder * MysqlCDCBuilder
* *
...@@ -79,44 +80,48 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -79,44 +80,48 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
if (Asserts.isNotNullString(config.getSink().get("topic"))) { if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"), dataStreamSource.addSink(new FlinkKafkaProducer<>(config.getSink().get("brokers"),
config.getSink().get("topic"), config.getSink().get("topic"),
new SimpleStringSchema())); new SimpleStringSchema()));
} else { } else {
Map<Table, OutputTag<String>> tagMap = new HashMap<>();
Map<String, Table> tableMap = new HashMap<>();
ObjectMapper objectMapper = new ObjectMapper();
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(x -> objectMapper.readValue(x,Map.class)).returns(Map.class);
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(value, Map.class);
}
});
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
final String tableName = table.getName(); String sinkTableName = getSinkTableName(table);
final String schemaName = table.getSchema(); OutputTag<String> outputTag = new OutputTag<String>(sinkTableName) {
SingleOutputStreamOperator<Map> filterOperator = mapOperator.filter(new FilterFunction<Map>() { };
@Override tagMap.put(table, outputTag);
public boolean filter(Map value) throws Exception { tableMap.put(table.getSchemaTableName(), table);
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get(schemaFieldName).toString());
} }
});
SingleOutputStreamOperator<String> stringOperator = filterOperator.map(new MapFunction<Map, String>() {
@Override
public String map(Map value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(value);
} }
});
stringOperator.addSink(new FlinkKafkaProducer<String>(config.getSink().get("brokers"), SingleOutputStreamOperator<String> process = mapOperator.process(new ProcessFunction<Map, String>() {
getSinkTableName(table), @Override
new SimpleStringSchema())); public void processElement(Map map, ProcessFunction<Map, String>.Context ctx, Collector<String> out) throws Exception {
LinkedHashMap source = (LinkedHashMap) map.get("source");
try {
String result = objectMapper.writeValueAsString(map);
Table table = tableMap.get(source.get(schemaFieldName).toString() + "." + source.get("table").toString());
OutputTag<String> outputTag = tagMap.get(table);
ctx.output(outputTag, result);
} catch (Exception e) {
out.collect(objectMapper.writeValueAsString(map));
} }
} }
});
tagMap.forEach((k, v) -> {
String topic = getSinkTableName(k);
process.getSideOutput(v).rebalance().addSink(new FlinkKafkaProducer<>(config.getSink().get("brokers"),
topic, new SimpleStringSchema())).name(topic);
});
} }
} }
return dataStreamSource; return dataStreamSource;
......
...@@ -20,17 +20,28 @@ ...@@ -20,17 +20,28 @@
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil; import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
...@@ -39,26 +50,14 @@ import org.apache.flink.table.types.utils.TypeConversions; ...@@ -39,26 +50,14 @@ import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import javax.xml.bind.DatatypeConverter;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import javax.xml.bind.DatatypeConverter;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.JSONUtil;
/** /**
* SQLSinkBuilder * SQLSinkBuilder
...@@ -139,7 +138,62 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -139,7 +138,62 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
}, rowTypeInfo); }, rowTypeInfo);
} }
private DataStream<Row> buildRow(
DataStream<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList,
String schemaTableName) {
final String[] columnNames = columnNameList.toArray(new String[columnNameList.size()]);
final LogicalType[] columnTypes = columnTypeList.toArray(new LogicalType[columnTypeList.size()]);
TypeInformation<?>[] typeInformations = TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.fromLogicalToDataType(columnTypes));
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, columnNames);
return filterOperator
.flatMap(new FlatMapFunction<Map, Row>() {
@Override
public void flatMap(Map value, Collector<Row> out) throws Exception {
try {
switch (value.get("op").toString()) {
case "r":
case "c":
Row irow = Row.ofKind(RowKind.INSERT);
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(irow);
break;
case "d":
Row drow = Row.ofKind(RowKind.DELETE);
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(drow);
break;
case "u":
Row ubrow = Row.ofKind(RowKind.UPDATE_BEFORE);
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubrow);
Row uarow = Row.ofKind(RowKind.UPDATE_AFTER);
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uarow);
break;
}
} catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
throw e;
}
}
}, rowTypeInfo);
}
private void addTableSink( private void addTableSink(
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStream<Row> rowDataDataStream, DataStream<Row> rowDataDataStream,
...@@ -190,20 +244,48 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -190,20 +244,48 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
logger.info("Build deserialize successful..."); logger.info("Build deserialize successful...");
Map<Table, OutputTag<Map>> tagMap = new HashMap<>();
Map<String, Table> tableMap = new HashMap<>();
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
String sinkTableName = getSinkTableName(table);
OutputTag<Map> outputTag = new OutputTag<Map>(sinkTableName) {
};
tagMap.put(table, outputTag);
tableMap.put(table.getSchemaTableName(), table);
}
}
final String schemaFieldName = config.getSchemaFieldName();
ObjectMapper objectMapper = new ObjectMapper();
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(x -> objectMapper.readValue(x,Map.class)).returns(Map.class);
SingleOutputStreamOperator<Map> processOperator = mapOperator.process(new ProcessFunction<Map, Map>() {
@Override
public void processElement(Map map, ProcessFunction<Map, Map>.Context ctx, Collector<Map> out) throws Exception {
LinkedHashMap source = (LinkedHashMap) map.get("source");
try {
Table table = tableMap.get(source.get(schemaFieldName).toString() + "." + source.get("table").toString());
OutputTag<Map> outputTag = tagMap.get(table);
ctx.output(outputTag, map);
} catch (Exception e) {
out.collect(map);
}
}
});
tagMap.forEach((table,tag) -> {
final String schemaTableName = table.getSchemaTableName(); final String schemaTableName = table.getSchemaTableName();
try { try {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); DataStream<Map> filterOperator = shunt(processOperator, table, tag);
logger.info("Build " + schemaTableName + " shunt successful..."); logger.info("Build " + schemaTableName + " shunt successful...");
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>(); List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName); DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName).rebalance();
logger.info("Build " + schemaTableName + " flatMap successful..."); logger.info("Build " + schemaTableName + " flatMap successful...");
logger.info("Start build " + schemaTableName + " sink..."); logger.info("Start build " + schemaTableName + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
...@@ -211,8 +293,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -211,8 +293,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
logger.error("Build " + schemaTableName + " cdc sync failed..."); logger.error("Build " + schemaTableName + " cdc sync failed...");
logger.error(LogUtil.getError(e)); logger.error(LogUtil.getError(e));
} }
} });
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations); List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) { for (Transformation<?> item : trans) {
env.addOperator(item); env.addOperator(item);
......
...@@ -37,6 +37,7 @@ import org.apache.flink.table.operations.ModifyOperation; ...@@ -37,6 +37,7 @@ import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.logical.*;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -110,7 +111,13 @@ public abstract class AbstractSinkBuilder { ...@@ -110,7 +111,13 @@ public abstract class AbstractSinkBuilder {
} }
}); });
} }
protected DataStream<Map> shunt(
SingleOutputStreamOperator<Map> processOperator,
Table table,
OutputTag<Map> tag) {
return processOperator.getSideOutput(tag);
}
protected DataStream<RowData> buildRowData( protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
......
...@@ -20,9 +20,16 @@ ...@@ -20,9 +20,16 @@
package com.dlink.cdc.kafka; package com.dlink.cdc.kafka;
import org.apache.flink.api.common.functions.FilterFunction; import com.dlink.assertion.Asserts;
import org.apache.flink.api.common.functions.MapFunction; import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
...@@ -30,22 +37,17 @@ import org.apache.flink.streaming.api.datastream.DataStream; ...@@ -30,22 +37,17 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/** /**
* MysqlCDCBuilder * MysqlCDCBuilder
* *
...@@ -90,54 +92,60 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -90,54 +92,60 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
if (Asserts.isNotNullString(config.getSink().get("topic"))) { if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.sinkTo(KafkaSink.<String>builder() KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers(config.getSink().get("brokers"))
.setBootstrapServers(config.getSink().get("brokers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder() .setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(config.getSink().get("topic")) .setTopic(config.getSink().get("topic"))
.setValueSerializationSchema(new SimpleStringSchema()) .setValueSerializationSchema(new SimpleStringSchema())
.build() .build()
) )
.build()); .setDeliverGuarantee(DeliveryGuarantee.valueOf(env.getCheckpointingMode().name()))
.build();
dataStreamSource.sinkTo(kafkaSink);
} else { } else {
Map<Table, OutputTag<String>> tagMap = new HashMap<>();
Map<String, Table> tableMap = new HashMap<>();
ObjectMapper objectMapper = new ObjectMapper();
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(x -> objectMapper.readValue(x,Map.class)).returns(Map.class);
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(value, Map.class);
}
});
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
final String tableName = table.getName(); String sinkTableName = getSinkTableName(table);
final String schemaName = table.getSchema(); OutputTag<String> outputTag = new OutputTag<String>(sinkTableName) {
SingleOutputStreamOperator<Map> filterOperator = mapOperator.filter(new FilterFunction<Map>() { };
@Override tagMap.put(table, outputTag);
public boolean filter(Map value) throws Exception { tableMap.put(table.getSchemaTableName(), table);
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get(schemaFieldName).toString());
} }
}); }
SingleOutputStreamOperator<String> stringOperator = filterOperator.map(new MapFunction<Map, String>() {
SingleOutputStreamOperator<String> process = mapOperator.process(new ProcessFunction<Map, String>() {
@Override @Override
public String map(Map value) throws Exception { public void processElement(Map map, ProcessFunction<Map, String>.Context ctx, Collector<String> out) throws Exception {
ObjectMapper objectMapper = new ObjectMapper(); LinkedHashMap source = (LinkedHashMap) map.get("source");
return objectMapper.writeValueAsString(value); try {
String result = objectMapper.writeValueAsString(map);
Table table = tableMap.get(source.get(schemaFieldName).toString() + "." + source.get("table").toString());
OutputTag<String> outputTag = tagMap.get(table);
ctx.output(outputTag, result);
} catch (Exception e) {
out.collect(objectMapper.writeValueAsString(map));
}
} }
}); });
stringOperator.sinkTo(KafkaSink.<String>builder() tagMap.forEach((k, v) -> {
.setBootstrapServers(config.getSink().get("brokers")) String topic = getSinkTableName(k);
KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers(config.getSink().get("brokers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder() .setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(getSinkTableName(table)) .setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema()) .setValueSerializationSchema(new SimpleStringSchema())
.build() .build()
) )
.build()); .setDeliverGuarantee(DeliveryGuarantee.valueOf(env.getCheckpointingMode().name()))
} .build();
} process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic);
});
} }
} }
return dataStreamSource; return dataStreamSource;
......
...@@ -25,10 +25,12 @@ import org.apache.flink.api.common.functions.FlatMapFunction; ...@@ -25,10 +25,12 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
...@@ -43,9 +45,7 @@ import java.io.Serializable; ...@@ -43,9 +45,7 @@ import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder; import com.dlink.cdc.AbstractSinkBuilder;
...@@ -58,6 +58,7 @@ import com.dlink.model.Table; ...@@ -58,6 +58,7 @@ import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil; import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil; import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil; import com.dlink.utils.LogUtil;
import org.apache.flink.util.OutputTag;
/** /**
* SQLSinkBuilder * SQLSinkBuilder
...@@ -83,7 +84,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -83,7 +84,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
private DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, DataStream<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList, List<LogicalType> columnTypeList,
String schemaTableName) { String schemaTableName) {
...@@ -101,7 +102,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -101,7 +102,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r": case "r":
case "c": case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); Row irow = Row.ofKind(RowKind.INSERT);
Map idata = (Map) value.get("after"); Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
...@@ -109,7 +110,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -109,7 +110,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(irow); out.collect(irow);
break; break;
case "d": case "d":
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); Row drow = Row.ofKind(RowKind.DELETE);
Map ddata = (Map) value.get("before"); Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
...@@ -117,13 +118,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -117,13 +118,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(drow); out.collect(drow);
break; break;
case "u": case "u":
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); Row ubrow = Row.ofKind(RowKind.UPDATE_BEFORE);
Map ubdata = (Map) value.get("before"); Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(ubrow); out.collect(ubrow);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); Row uarow = Row.ofKind(RowKind.UPDATE_AFTER);
Map uadata = (Map) value.get("after"); Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
...@@ -189,20 +190,47 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -189,20 +190,47 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
logger.info("Build deserialize successful..."); logger.info("Build deserialize successful...");
Map<Table, OutputTag<Map>> tagMap = new HashMap<>();
Map<String, Table> tableMap = new HashMap<>();
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
String sinkTableName = getSinkTableName(table);
OutputTag<Map> outputTag = new OutputTag<Map>(sinkTableName) {
};
tagMap.put(table, outputTag);
tableMap.put(table.getSchemaTableName(), table);
}
}
final String schemaFieldName = config.getSchemaFieldName();
ObjectMapper objectMapper = new ObjectMapper();
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(x -> objectMapper.readValue(x,Map.class)).returns(Map.class);
SingleOutputStreamOperator<Map> processOperator = mapOperator.process(new ProcessFunction<Map, Map>() {
@Override
public void processElement(Map map, ProcessFunction<Map, Map>.Context ctx, Collector<Map> out) throws Exception {
LinkedHashMap source = (LinkedHashMap) map.get("source");
try {
Table table = tableMap.get(source.get(schemaFieldName).toString() + "." + source.get("table").toString());
OutputTag<Map> outputTag = tagMap.get(table);
ctx.output(outputTag, map);
} catch (Exception e) {
out.collect(map);
}
}
});
tagMap.forEach((table,tag) -> {
final String schemaTableName = table.getSchemaTableName(); final String schemaTableName = table.getSchemaTableName();
try { try {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); DataStream<Map> filterOperator = shunt(processOperator, table, tag);
logger.info("Build " + schemaTableName + " shunt successful..."); logger.info("Build " + schemaTableName + " shunt successful...");
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>(); List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName); DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName).rebalance();
logger.info("Build " + schemaTableName + " flatMap successful..."); logger.info("Build " + schemaTableName + " flatMap successful...");
logger.info("Start build " + schemaTableName + " sink..."); logger.info("Start build " + schemaTableName + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
...@@ -210,8 +238,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -210,8 +238,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
logger.error("Build " + schemaTableName + " cdc sync failed..."); logger.error("Build " + schemaTableName + " cdc sync failed...");
logger.error(LogUtil.getError(e)); logger.error(LogUtil.getError(e));
} }
} });
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations); List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) { for (Transformation<?> item : trans) {
env.addOperator(item); env.addOperator(item);
......
...@@ -37,6 +37,7 @@ import org.apache.flink.table.operations.ModifyOperation; ...@@ -37,6 +37,7 @@ import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.logical.*;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -110,7 +111,13 @@ public abstract class AbstractSinkBuilder { ...@@ -110,7 +111,13 @@ public abstract class AbstractSinkBuilder {
} }
}); });
} }
protected DataStream<Map> shunt(
SingleOutputStreamOperator<Map> processOperator,
Table table,
OutputTag<Map> tag) {
return processOperator.getSideOutput(tag);
}
protected DataStream<RowData> buildRowData( protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator, SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
......
...@@ -20,9 +20,16 @@ ...@@ -20,9 +20,16 @@
package com.dlink.cdc.kafka; package com.dlink.cdc.kafka;
import org.apache.flink.api.common.functions.FilterFunction; import com.dlink.assertion.Asserts;
import org.apache.flink.api.common.functions.MapFunction; import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
...@@ -30,22 +37,17 @@ import org.apache.flink.streaming.api.datastream.DataStream; ...@@ -30,22 +37,17 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
/** /**
* MysqlCDCBuilder * MysqlCDCBuilder
* *
...@@ -90,54 +92,60 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder ...@@ -90,54 +92,60 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements SinkBuilder
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
if (Asserts.isNotNullString(config.getSink().get("topic"))) { if (Asserts.isNotNullString(config.getSink().get("topic"))) {
dataStreamSource.sinkTo(KafkaSink.<String>builder() KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers(config.getSink().get("brokers"))
.setBootstrapServers(config.getSink().get("brokers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder() .setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(config.getSink().get("topic")) .setTopic(config.getSink().get("topic"))
.setValueSerializationSchema(new SimpleStringSchema()) .setValueSerializationSchema(new SimpleStringSchema())
.build() .build()
) )
.build()); .setDeliverGuarantee(DeliveryGuarantee.valueOf(env.getCheckpointingMode().name()))
.build();
dataStreamSource.sinkTo(kafkaSink);
} else { } else {
Map<Table, OutputTag<String>> tagMap = new HashMap<>();
Map<String, Table> tableMap = new HashMap<>();
ObjectMapper objectMapper = new ObjectMapper();
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(x -> objectMapper.readValue(x,Map.class)).returns(Map.class);
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName(); final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(new MapFunction<String, Map>() {
@Override
public Map map(String value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(value, Map.class);
}
});
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
final String tableName = table.getName(); String sinkTableName = getSinkTableName(table);
final String schemaName = table.getSchema(); OutputTag<String> outputTag = new OutputTag<String>(sinkTableName) {
SingleOutputStreamOperator<Map> filterOperator = mapOperator.filter(new FilterFunction<Map>() { };
@Override tagMap.put(table, outputTag);
public boolean filter(Map value) throws Exception { tableMap.put(table.getSchemaTableName(), table);
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get(schemaFieldName).toString());
} }
}); }
SingleOutputStreamOperator<String> stringOperator = filterOperator.map(new MapFunction<Map, String>() {
SingleOutputStreamOperator<String> process = mapOperator.process(new ProcessFunction<Map, String>() {
@Override @Override
public String map(Map value) throws Exception { public void processElement(Map map, ProcessFunction<Map, String>.Context ctx, Collector<String> out) throws Exception {
ObjectMapper objectMapper = new ObjectMapper(); LinkedHashMap source = (LinkedHashMap) map.get("source");
return objectMapper.writeValueAsString(value); try {
String result = objectMapper.writeValueAsString(map);
Table table = tableMap.get(source.get(schemaFieldName).toString() + "." + source.get("table").toString());
OutputTag<String> outputTag = tagMap.get(table);
ctx.output(outputTag, result);
} catch (Exception e) {
out.collect(objectMapper.writeValueAsString(map));
}
} }
}); });
stringOperator.sinkTo(KafkaSink.<String>builder() tagMap.forEach((k, v) -> {
.setBootstrapServers(config.getSink().get("brokers")) String topic = getSinkTableName(k);
KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers(config.getSink().get("brokers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder() .setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(getSinkTableName(table)) .setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema()) .setValueSerializationSchema(new SimpleStringSchema())
.build() .build()
) )
.build()); .setDeliverGuarantee(DeliveryGuarantee.valueOf(env.getCheckpointingMode().name()))
} .build();
} process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic);
});
} }
} }
return dataStreamSource; return dataStreamSource;
......
...@@ -20,15 +20,28 @@ ...@@ -20,15 +20,28 @@
package com.dlink.cdc.sql; package com.dlink.cdc.sql;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
...@@ -37,27 +50,14 @@ import org.apache.flink.table.types.utils.TypeConversions; ...@@ -37,27 +50,14 @@ import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import javax.xml.bind.DatatypeConverter; import javax.xml.bind.DatatypeConverter;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractSinkBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.cdc.SinkBuilder;
import com.dlink.executor.CustomTableEnvironment;
import com.dlink.model.FlinkCDCConfig;
import com.dlink.model.Schema;
import com.dlink.model.Table;
import com.dlink.utils.FlinkBaseUtil;
import com.dlink.utils.JSONUtil;
import com.dlink.utils.LogUtil;
/** /**
* SQLSinkBuilder * SQLSinkBuilder
...@@ -83,7 +83,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -83,7 +83,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
private DataStream<Row> buildRow( private DataStream<Row> buildRow(
SingleOutputStreamOperator<Map> filterOperator, DataStream<Map> filterOperator,
List<String> columnNameList, List<String> columnNameList,
List<LogicalType> columnTypeList, List<LogicalType> columnTypeList,
String schemaTableName) { String schemaTableName) {
...@@ -101,7 +101,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -101,7 +101,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
switch (value.get("op").toString()) { switch (value.get("op").toString()) {
case "r": case "r":
case "c": case "c":
Row irow = Row.withPositions(RowKind.INSERT, columnNameList.size()); Row irow = Row.ofKind(RowKind.INSERT);
Map idata = (Map) value.get("after"); Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); irow.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
...@@ -109,7 +109,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -109,7 +109,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(irow); out.collect(irow);
break; break;
case "d": case "d":
Row drow = Row.withPositions(RowKind.DELETE, columnNameList.size()); Row drow = Row.ofKind(RowKind.DELETE);
Map ddata = (Map) value.get("before"); Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); drow.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
...@@ -117,13 +117,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -117,13 +117,13 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
out.collect(drow); out.collect(drow);
break; break;
case "u": case "u":
Row ubrow = Row.withPositions(RowKind.UPDATE_BEFORE, columnNameList.size()); Row ubrow = Row.ofKind(RowKind.UPDATE_BEFORE);
Map ubdata = (Map) value.get("before"); Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); ubrow.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
} }
out.collect(ubrow); out.collect(ubrow);
Row uarow = Row.withPositions(RowKind.UPDATE_AFTER, columnNameList.size()); Row uarow = Row.ofKind(RowKind.UPDATE_AFTER);
Map uadata = (Map) value.get("after"); Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) { for (int i = 0; i < columnNameList.size(); i++) {
uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); uarow.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
...@@ -189,20 +189,47 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -189,20 +189,47 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
CustomTableEnvironment customTableEnvironment, CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) { DataStreamSource<String> dataStreamSource) {
final List<Schema> schemaList = config.getSchemaList(); final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
if (Asserts.isNotNullCollection(schemaList)) { if (Asserts.isNotNullCollection(schemaList)) {
SingleOutputStreamOperator<Map> mapOperator = deserialize(dataStreamSource);
logger.info("Build deserialize successful..."); logger.info("Build deserialize successful...");
Map<Table, OutputTag<Map>> tagMap = new HashMap<>();
Map<String, Table> tableMap = new HashMap<>();
for (Schema schema : schemaList) { for (Schema schema : schemaList) {
for (Table table : schema.getTables()) { for (Table table : schema.getTables()) {
String sinkTableName = getSinkTableName(table);
OutputTag<Map> outputTag = new OutputTag<Map>(sinkTableName) {
};
tagMap.put(table, outputTag);
tableMap.put(table.getSchemaTableName(), table);
}
}
final String schemaFieldName = config.getSchemaFieldName();
ObjectMapper objectMapper = new ObjectMapper();
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource.map(x -> objectMapper.readValue(x,Map.class)).returns(Map.class);
SingleOutputStreamOperator<Map> processOperator = mapOperator.process(new ProcessFunction<Map, Map>() {
@Override
public void processElement(Map map, ProcessFunction<Map, Map>.Context ctx, Collector<Map> out) throws Exception {
LinkedHashMap source = (LinkedHashMap) map.get("source");
try {
Table table = tableMap.get(source.get(schemaFieldName).toString() + "." + source.get("table").toString());
OutputTag<Map> outputTag = tagMap.get(table);
ctx.output(outputTag, map);
} catch (Exception e) {
out.collect(map);
}
}
});
tagMap.forEach((table,tag) -> {
final String schemaTableName = table.getSchemaTableName(); final String schemaTableName = table.getSchemaTableName();
try { try {
SingleOutputStreamOperator<Map> filterOperator = shunt(mapOperator, table, schemaFieldName); DataStream<Map> filterOperator = shunt(processOperator, table, tag);
logger.info("Build " + schemaTableName + " shunt successful..."); logger.info("Build " + schemaTableName + " shunt successful...");
List<String> columnNameList = new ArrayList<>(); List<String> columnNameList = new ArrayList<>();
List<LogicalType> columnTypeList = new ArrayList<>(); List<LogicalType> columnTypeList = new ArrayList<>();
buildColumn(columnNameList, columnTypeList, table.getColumns()); buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName); DataStream<Row> rowDataDataStream = buildRow(filterOperator, columnNameList, columnTypeList, schemaTableName).rebalance();
logger.info("Build " + schemaTableName + " flatMap successful..."); logger.info("Build " + schemaTableName + " flatMap successful...");
logger.info("Start build " + schemaTableName + " sink..."); logger.info("Start build " + schemaTableName + " sink...");
addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList); addTableSink(customTableEnvironment, rowDataDataStream, table, columnNameList);
...@@ -210,8 +237,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -210,8 +237,8 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
logger.error("Build " + schemaTableName + " cdc sync failed..."); logger.error("Build " + schemaTableName + " cdc sync failed...");
logger.error(LogUtil.getError(e)); logger.error(LogUtil.getError(e));
} }
} });
}
List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations); List<Transformation<?>> trans = customTableEnvironment.getPlanner().translate(modifyOperations);
for (Transformation<?> item : trans) { for (Transformation<?> item : trans) {
env.addOperator(item); env.addOperator(item);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment