Commit 266bbd10 authored by shezaixing's avatar shezaixing

同步写入达梦客户端

parent 6085ef5a
......@@ -351,6 +351,11 @@
<version>2.1.3</version>
</dependency>
<dependency>
<groupId>com.dameng</groupId>
<artifactId>DmJdbcDriver18</artifactId>
<version>8.1.1.193</version>
</dependency>
</dependencies>
<repositories>
......
package com.dsk.flink.dsc.common.function;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.HashSet;
import java.util.Set;
public class CanalMapToTsGroupFunction implements MapFunction<JSONObject, Tuple3<JSONObject, String, Long>> {
@Override
public Tuple3<JSONObject, String, Long> map(JSONObject value) throws Exception {
JSONArray dataList = value.getJSONArray("data");
JSONObject mysqlType = value.getJSONObject("mysqlType");
String table = value.getString("table");
JSONArray pkNames = value.getJSONArray("pkNames");
Set<String> pkNameSet = new HashSet<>();
long ts = value.getLong("ts");
if (CollUtil.isNotEmpty(pkNames)) {
pkNames.forEach(name -> pkNameSet.add(String.valueOf(name)));
}
JSONObject dataObj = dataList.getJSONObject(0);
String groupKey = table;
for (String pk : pkNameSet) {
String pkValue = AsyncDamengDataTransferFunction.getValueString(dataObj, pk,
mysqlType.getString(pk));
groupKey = table.concat("-").concat(pkValue);
}
return Tuple3.of(value, groupKey, ts);
}
}
package com.dsk.flink.dsc.common.function;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class GroupTsProcessWindowFunction extends ProcessWindowFunction<Tuple3<JSONObject, String, Long>, JSONObject, Long, TimeWindow> {
@Override
public void process(Long aLong, ProcessWindowFunction<Tuple3<JSONObject, String, Long>, JSONObject, Long,
TimeWindow>.Context context, Iterable<Tuple3<JSONObject, String, Long>> elements, Collector<JSONObject> out) throws Exception {
List<Tuple3<JSONObject, String, Long>> list = CollUtil.list(false, elements);
Map<Long, List<Tuple3<JSONObject, String, Long>>> tsGroupMap = list.stream().collect(Collectors.groupingBy(x -> x.f2));
Long max = CollUtil.max(tsGroupMap.keySet());
List<Tuple3<JSONObject, String, Long>> resList = tsGroupMap.get(max);
if(resList.size() == 2){
JSONObject insertJson = new JSONObject();
JSONObject deleteJson = new JSONObject();
for (Tuple3<JSONObject, String, Long> rs : resList) {
if("INSERT".equals(rs.f0.getString("type"))){
insertJson = rs.f0;
}
if("DELETE".equals(rs.f0.getString("type"))){
deleteJson = rs.f0;
}
}
if(StrUtil.isNotBlank(insertJson.getString("type")) && StrUtil.isNotBlank(deleteJson.getString("type"))){
insertJson.put("type","UPDATE");
insertJson.put("old", deleteJson.getJSONArray("data"));
out.collect(insertJson);
return;
}
}
out.collect(resList.get(0).f0);
}
}
package com.dsk.flink.dsc.common.sink;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.db.DbUtil;
import com.alibaba.druid.pool.DruidDataSource;
import com.dsk.flink.dsc.common.dto.SqlErrorLog;
import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DamengDataTransferSink extends RichSinkFunction<String> {
static Logger logger = LoggerFactory.getLogger(DamengDataTransferSink.class);
EnvProperties envProps;
ExecutorService executorService;
DruidDataSource dataSource;
public DamengDataTransferSink(EnvProperties envProps) {
this.envProps = envProps;
}
@Override
public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(4, 4, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
//初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
//System.out.println(configTidbUrl);
dataSource = new DruidDataSource();
dataSource.setDriverClassName("dm.jdbc.driver.DmDriver");
dataSource.setUsername(envProps.getDb_username());
dataSource.setPassword(envProps.getDb_password());
dataSource.setUrl(configTidbUrl);
dataSource.setMaxActive(30);
dataSource.setInitialSize(10);
dataSource.setTestWhileIdle(true);
dataSource.setMaxWait(20000);
dataSource.setValidationQuery("select 1");
}
@Override
public void close() throws Exception {
executorService.shutdown();
dataSource.close();
}
@Override
public void invoke(String value, Context context) throws Exception {
executorService.execute(() -> {
try {
executeSql(value);
}catch (Exception e){
e.printStackTrace();
}
});
}
private void executeSql(String sql) throws Exception{
Connection connection = null;
PreparedStatement pt = null;
try {
connection = dataSource.getConnection();
pt = connection.prepareStatement(sql);
pt.execute();
} catch (Exception e) {
System.out.println("sql报错----->" + sql);
e.printStackTrace();
logger.error("------错误时间:{}-----,sql:{}--------异常:{}", DateUtil.now(),sql,e.getMessage());
// SqlErrorLog errorLog = new SqlErrorLog(new Date(), sql, e.getMessage());
// try {
// writeErrLog(errorLog);
// }catch (Exception re){
// logger.error("错误日志保存异常 -> {}", re.getMessage());
// }
} finally {
DbUtil.close(connection,pt);
}
}
private void writeErrLog(SqlErrorLog errorLog) {
writeErrLogDb(errorLog);
}
private void writeErrLogDb(SqlErrorLog errorLog) {
Snowflake snowflake = IdUtil.getSnowflake(RandomUtil.randomInt(31), RandomUtil.randomInt(31));
String sql = "insert dsc_err_log (id,error_time, error_sql, error_msg) values (?, ?, ?, ?)";
Connection conn = null;
PreparedStatement pt = null;
try {
conn = dataSource.getConnection();
pt = conn.prepareStatement(sql);
pt.setLong(1, snowflake.nextId() + RandomUtil.randomInt(10,99));
pt.setObject(2, errorLog.getErrorTime());
pt.setString(3, errorLog.getSql());
pt.setString(4, errorLog.getError());
pt.execute();
}catch (Exception e){
logger.error("错误日志保存异常 -> {}", e.getMessage());
}finally {
DbUtil.close(conn, pt);
}
}
}
......@@ -3,8 +3,10 @@ package com.dsk.flink.dsc.sync;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.common.function.AsyncMysqlDataTransferFunctionNew;
import com.dsk.flink.dsc.common.sink.MysqlDataTransferSink;
import com.dsk.flink.dsc.common.function.AsyncDamengDataTransferFunction;
import com.dsk.flink.dsc.common.function.CanalMapToTsGroupFunction;
import com.dsk.flink.dsc.common.function.GroupTsProcessWindowFunction;
import com.dsk.flink.dsc.common.sink.DamengDataTransferSink;
import com.dsk.flink.dsc.utils.EnvProperties;
import com.dsk.flink.dsc.utils.EnvPropertiesUtil;
import com.dsk.flink.dsc.utils.EtlUtils;
......@@ -38,7 +40,6 @@ import java.util.stream.Collectors;
* @author shezaixing
* @date 2023/12/5 14:44
* @description 同步至客户目标数据源任务
*
*/
@Slf4j
public class SyncCustomerDataSource {
......@@ -60,14 +61,19 @@ public class SyncCustomerDataSource {
String offsetTimestamp = parameterTool.get("offsetTimestamp");
String propertiesPath = parameterTool.get("propertiesPath");
EnvProperties envProps = EnvPropertiesUtil.getPropertiesFromArgsPath(propertiesPath);
envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName());
envProps.put("providerImpl",
JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName());
System.out.println("读取到的配置文件:-> " + envProps.toString());
System.out.println("读取到的数据连接配置:->" + String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database()));
System.out.println("读取到的数据连接配置:->" + String.format(envProps.getDb_url(), envProps.getDb_host(),
envProps.getDb_port(), envProps.getDb_database()));
System.out.println("获取到的kafka消费组:->" + EtlUtils.getKafkaGroup(envProps));
//TODO 到时需要改这里,改成正式的消费组
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(),envProps.getKafka_password()));
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(),
new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(),
EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(), envProps.getKafka_password()));
//System.out.println(envProps.getKafka_topic());
long defaultOffset = LocalDateTime.now().minusMinutes(10).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
long defaultOffset =
LocalDateTime.now().minusMinutes(10).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
kafkaConsumer.setStartFromTimestamp(defaultOffset);
//kafkaConsumer.setStartFromLatest();
//偏移量
......@@ -95,7 +101,14 @@ public class SyncCustomerDataSource {
//canalJsonStream.print("canal stream");
// SingleOutputStreamOperator<String> sqlResultStream = AsyncDataStream.orderedWait(canalJsonStream, new AsyncMysqlDataTransferFunction(envProps), 1200L, TimeUnit.SECONDS, 20)
SingleOutputStreamOperator<Tuple3<JSONObject, String, Long>> tsGroupStream = canalJsonStream.map(new CanalMapToTsGroupFunction());
SingleOutputStreamOperator<JSONObject> process = tsGroupStream.keyBy(x -> x.f2)
.window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
.process(new GroupTsProcessWindowFunction());
// SingleOutputStreamOperator<String> sqlResultStream = AsyncDataStream.orderedWait(canalJsonStream, new
// AsyncMysqlDataTransferFunction(envProps), 1200L, TimeUnit.SECONDS, 20)
// .filter(new FilterFunction<String>() {
// @Override
// public boolean filter(String value) throws Exception {
......@@ -109,8 +122,8 @@ public class SyncCustomerDataSource {
//
// sqlResultStream.addSink(new MysqlDataTransferSink(envProps)).name("sqlSinkStream").uid("sqlSinkStream");
SingleOutputStreamOperator<Tuple3<String, String, Long>> sqlResultStream1 = AsyncDataStream.orderedWait(canalJsonStream,
new AsyncMysqlDataTransferFunctionNew(envProps), 1200L, TimeUnit.SECONDS, 20)
SingleOutputStreamOperator<Tuple3<String, String, Long>> sqlResultStream1 =
AsyncDataStream.orderedWait(process, new AsyncDamengDataTransferFunction(envProps), 1200L, TimeUnit.SECONDS, 20)
.filter(new FilterFunction<Tuple3<String, String, Long>>() {
@Override
public boolean filter(Tuple3<String, String, Long> value) throws Exception {
......@@ -129,8 +142,10 @@ public class SyncCustomerDataSource {
Collector<String> out) throws Exception {
List<Tuple3<String, String, Long>> list = CollUtil.list(false, elements);
if ("dsc_cdc_log".equals(list.get(0).f1)) {
list = list.stream().sorted(Comparator.comparing(x -> x.f2,Comparator.reverseOrder() )).collect(Collectors.toList());
list.forEach(x -> {out.collect(x.f0);});
list = list.stream().sorted(Comparator.comparing(x -> x.f2, Comparator.reverseOrder())).collect(Collectors.toList());
list.forEach(x -> {
out.collect(x.f0);
});
return;
}
Tuple3<String, String, Long> maxTsElement =
......@@ -140,9 +155,11 @@ public class SyncCustomerDataSource {
})
.name("groupWindowSqlResultStream")
.uid("groupWindowSqlResultStream");
groupWindowSqlResultStream.print("sql result");
groupWindowSqlResultStream.addSink(new MysqlDataTransferSink(envProps)).name("sqlSinkStream").uid("sqlSinkStream");
//groupWindowSqlResultStream.print("sql result");
groupWindowSqlResultStream.addSink(new DamengDataTransferSink(envProps)).name("sqlSinkStream").uid("sqlSinkStream");
env.execute();
}
}
......@@ -22,9 +22,9 @@ public class EtlUtils {
/*properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"dsk\" password=\"LU1SRhTzoxssRoCp\";");
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "PLAIN");*/
properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password));
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
// properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password));
// properties.setProperty("security.protocol", "SASL_PLAINTEXT");
// properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
return properties;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment