Unverified Commit c80d284d authored by mengyejiang's avatar mengyejiang Committed by GitHub

flinkSQL supports the posgresql CDC function and the full database synchronization function. (#925)

* 修改点:
1、dlink-flink1.14增加postgres cdc connector,支持flinksql开发。
2、整库同步增加支持postgres数据库。

* 修改点:
1、METADATA_TYPE与dlink-meta中保持一致,避免Driver找不到lib的情况;
2、url中/后改为databasename,原来代码中填schema名是不正确的。

* postgresCDCBuilder改为首字母大写。
parent dc1bf052
...@@ -22,6 +22,7 @@ package com.dlink.cdc; ...@@ -22,6 +22,7 @@ package com.dlink.cdc;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.cdc.mysql.MysqlCDCBuilder; import com.dlink.cdc.mysql.MysqlCDCBuilder;
import com.dlink.cdc.oracle.OracleCDCBuilder; import com.dlink.cdc.oracle.OracleCDCBuilder;
import com.dlink.cdc.postgres.PostgresCDCBuilder;
import com.dlink.exception.FlinkClientException; import com.dlink.exception.FlinkClientException;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
...@@ -35,7 +36,8 @@ public class CDCBuilderFactory { ...@@ -35,7 +36,8 @@ public class CDCBuilderFactory {
private static CDCBuilder[] cdcBuilders = { private static CDCBuilder[] cdcBuilders = {
new MysqlCDCBuilder(), new MysqlCDCBuilder(),
new OracleCDCBuilder() new OracleCDCBuilder(),
new PostgresCDCBuilder()
}; };
public static CDCBuilder buildCDCBuilder(FlinkCDCConfig config) { public static CDCBuilder buildCDCBuilder(FlinkCDCConfig config) {
......
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.cdc.postgres;
import com.dlink.assertion.Asserts;
import com.dlink.cdc.AbstractCDCBuilder;
import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* postgresCDCBuilder
*
* @author mengyejiang
* @since 2022/8/21 10:00
**/
public class PostgresCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private final static String KEY_WORD = "postgres-cdc";
private final static String METADATA_TYPE = "PostgreSql";
public PostgresCDCBuilder() {
}
public PostgresCDCBuilder(FlinkCDCConfig config) {
super(config);
}
@Override
public String getHandle() {
return KEY_WORD;
}
@Override
public CDCBuilder create(FlinkCDCConfig config) {
return new PostgresCDCBuilder(config);
}
@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
String decodingPluginName = config.getSource().get("decoding.plugin.name");
String slotName = config.getSource().get("slot.name");
Properties debeziumProperties = new Properties();
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
debeziumProperties.setProperty(entry.getKey(), entry.getValue());
}
}
PostgreSQLSource.Builder<String> sourceBuilder = PostgreSQLSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.database(config.getDatabase())
.username(config.getUsername())
.password(config.getPassword());
String schema = config.getSchema();
if (Asserts.isNotNullString(schema)) {
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
sourceBuilder.schemaList(schemas);
} else {
sourceBuilder.schemaList(new String[0]);
}
List<String> schemaTableNameList = config.getSchemaTableNameList();
if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(schemaTableNameList.toArray(new String[schemaTableNameList.size()]));
} else {
sourceBuilder.tableList(new String[0]);
}
sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(debeziumProperties);
if (Asserts.isNotNullString(decodingPluginName)) {
sourceBuilder.decodingPluginName(decodingPluginName);
}
if (Asserts.isNotNullString(slotName)) {
sourceBuilder.slotName(slotName);
}
return env.addSource(sourceBuilder.build(),"Postgres CDC Source");
}
public Map<String, Map<String, String>> parseMetaDataConfigs() {
Map<String, Map<String, String>> allConfigMap = new HashMap<>();
List<String> schemaList = getSchemaList();
for (String schema : schemaList) {
Map<String, String> configMap = new HashMap<>();
configMap.put(ClientConstant.METADATA_TYPE, METADATA_TYPE);
StringBuilder sb = new StringBuilder("jdbc:postgresql://");
sb.append(config.getHostname());
sb.append(":");
sb.append(config.getPort());
sb.append("/");
sb.append(config.getDatabase());
configMap.put(ClientConstant.METADATA_NAME, sb.toString());
configMap.put(ClientConstant.METADATA_URL, sb.toString());
configMap.put(ClientConstant.METADATA_USERNAME, config.getUsername());
configMap.put(ClientConstant.METADATA_PASSWORD, config.getPassword());
allConfigMap.put(schema, configMap);
}
return allConfigMap;
}
}
...@@ -148,6 +148,11 @@ ...@@ -148,6 +148,11 @@
<artifactId>flink-sql-connector-oracle-cdc</artifactId> <artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>${flinkcdc.version}</version> <version>${flinkcdc.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment