Commit c5470ee1 authored by shezaixing's avatar shezaixing

可配置逻辑删除

可配置cdc日志记录写入
parent 0153cd84
package com.dsk.flink.dsc.common.function; package com.dsk.flink.dsc.common.function;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
...@@ -80,6 +81,12 @@ public class AsyncMysqlDataTransferFunction extends RichAsyncFunction<JSONObject ...@@ -80,6 +81,12 @@ public class AsyncMysqlDataTransferFunction extends RichAsyncFunction<JSONObject
} }
JSONObject dataObj = dataList.getJSONObject(0); JSONObject dataObj = dataList.getJSONObject(0);
Boolean logicalDelete = MapUtil.getBool(dbInfoMap, "logical_delete", false);
if(logicalDelete){
mysqlType.put("is_del", "int");
dataObj.put("is_del", "DELETE".equals(type) ? 1 : 0);
}
if("INSERT".equals(type)){ if("INSERT".equals(type)){
excueteSql = tranferInsertSql(table,dataObj,mysqlType); excueteSql = tranferInsertSql(table,dataObj,mysqlType);
} }
...@@ -90,8 +97,17 @@ public class AsyncMysqlDataTransferFunction extends RichAsyncFunction<JSONObject ...@@ -90,8 +97,17 @@ public class AsyncMysqlDataTransferFunction extends RichAsyncFunction<JSONObject
} }
if("DELETE".equals(type)){ if("DELETE".equals(type)){
excueteSql = transferDeleteSql(table,dataObj,mysqlType,pkNameSet); excueteSql = logicalDelete ? tranferInsertSql(table,dataObj,mysqlType) : transferDeleteSql(table,dataObj,mysqlType,pkNameSet);
}
//处理先后顺序
//获取该条数据的表名和主键作为唯一的groupKey
String groupKey = table;
for (String pk : pkNameSet) {
String pkValue = getValueString(dataObj, pk, mysqlType.getString(pk));
groupKey = table.concat("-").concat(pkValue);
} }
Long ts = value.getLong("ts");
resultFuture.complete(Collections.singleton(excueteSql)); resultFuture.complete(Collections.singleton(excueteSql));
}catch (Exception e){ }catch (Exception e){
......
package com.dsk.flink.dsc.common.function;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObject, Tuple3<String,String,Long>> {
static Logger logger = LoggerFactory.getLogger(AsyncMysqlDataTransferFunctionNew.class);
//数据库连接信息
EnvProperties dbInfoMap;
//线程池
private transient ExecutorService executorService;
public AsyncMysqlDataTransferFunctionNew(EnvProperties dbInfoMap) {
this.dbInfoMap = dbInfoMap;
}
@Override
public void open(Configuration parameters) throws Exception {
//初始化线程池
executorService = new ThreadPoolExecutor(4 , 4, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
}
@Override
public void close() throws Exception {
executorService.shutdown();
}
@Override
public void timeout(JSONObject input, ResultFuture<Tuple3<String,String,Long>> resultFuture) throws Exception {
resultFuture.complete(Collections.singleton(Tuple3.of("err","",0L)));
}
@Override
public void asyncInvoke(JSONObject value, ResultFuture<Tuple3<String,String,Long>> resultFuture) throws Exception {
executorService.submit(() -> {
try {
//返回数据集合
List<Tuple3<String,String,Long>> resultList = new ArrayList<>();
String type = value.getString("type");
JSONArray dataList = value.getJSONArray("data");
JSONObject mysqlType = value.getJSONObject("mysqlType");
JSONArray oldDataList = value.getJSONArray("old");
String table = value.getString("table");
Boolean isDdl = value.getBoolean("isDdl");
JSONArray pkNames = value.getJSONArray("pkNames");
Set<String> pkNameSet = new HashSet<>();
long ts = value.getLong("ts");
if(CollUtil.isNotEmpty(pkNames)){
pkNames.forEach(name -> pkNameSet.add(String.valueOf(name)));
}
String excueteSql = "";
if(isDdl){
excueteSql = value.getString("sql");
if(StrUtil.isNotBlank(excueteSql)){
excueteSql = StrUtil.subBefore(excueteSql,"AFTER",true);
resultList.add(Tuple3.of(excueteSql,table,ts));
}
resultFuture.complete(resultList);
return;
}
JSONObject dataObj = dataList.getJSONObject(0);
Boolean logicalDelete = MapUtil.getBool(dbInfoMap, "logical_delete", false);
if(logicalDelete){
mysqlType.put("is_del", "int");
dataObj.put("is_del", "DELETE".equals(type) ? 1 : 0);
}
//处理先后顺序
//获取该条数据的表名和主键作为唯一的groupKey
String groupKey = table;
for (String pk : pkNameSet) {
String pkValue = getValueString(dataObj, pk, mysqlType.getString(pk));
groupKey = table.concat("-").concat(pkValue);
}
if("INSERT".equals(type)){
excueteSql = tranferInsertSql(table,dataObj,mysqlType);
}
if("UPDATE".equals(type)){
JSONObject oldDataObj = oldDataList.getJSONObject(0);
// excueteSql = tranferUpdateSql(table,dataObj,oldDataObj,mysqlType,pkNameSet);
excueteSql = tranferInsertSql(table,dataObj,mysqlType);
}
if("DELETE".equals(type)){
excueteSql = logicalDelete ? tranferInsertSql(table,dataObj,mysqlType) : transferDeleteSql(table,dataObj,mysqlType,pkNameSet);
}
resultList.add(Tuple3.of(excueteSql,groupKey,ts));
Boolean logEnable = MapUtil.getBool(dbInfoMap, "log_enable", false);
if (logEnable){
String logSql = buildLogData(type, table, pkNameSet, dataObj, ts);
resultList.add(Tuple3.of(logSql,"dsc_cdc_log",ts));
}
resultFuture.complete(resultList);
}catch (Exception e){
e.printStackTrace();
resultFuture.complete(Collections.singleton(Tuple3.of("err","",0L)));
}finally {
}
});
}
private static String logSqlFormat = "INSERT INTO dsc_cdc_log (`table`,op_type,pk_columns,pk_values,cdc_ts) values ('%s','%s','%s','%s', %d)";
private String buildLogData(String type, String table, Set<String> pkNameSet, JSONObject dataObj, long ts) {
List<String> pkValueList = new ArrayList<>();
for (String pk : pkNameSet) {
pkValueList.add(dataObj.getString(pk));
}
String pkColumns = String.join(",",pkNameSet);
String pkValues = String.join("-",pkValueList);
return String.format(logSqlFormat, table, type, pkColumns, pkValues, ts);
}
private static final String[] STR_SQL_TYPE = new String[]{"VARCHAR","CHAR","TINYBLOB","BLOB","MEDIUMBLOB","LONGBLOB","TINYTEXT","TEXT","MEDIUMTEXT","LONGTEXT","TIME","TIMESTAMP","JSON","json"};
private static final String[] KEYWORD = new String[]{"limit"};
private static String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType) {
Set<String> columnSet = mysqlType.keySet();
StringBuilder sb = new StringBuilder();
for (String s : columnSet) {
sb.append("`"+s+"`").append(",");
}
List<String> valueList = new ArrayList<>();
List<String> updateList = new ArrayList<>();
for (String col : columnSet) {
String formatCol = "`"+col+"`";
if(Arrays.asList(KEYWORD).contains(col)){
valueList.add(getValueString(dataObj,col,mysqlType.getString(col)));
updateList.add(formatCol.concat(" = VALUES(").concat(formatCol).concat(")"));
}else {
valueList.add(getValueString(dataObj,col,mysqlType.getString(col)));
updateList.add(col.concat(" = VALUES(").concat(col).concat(")"));
}
}
//String columnString = String.join(",",columnSet);
sb.setLength(sb.length()-1);
String columnString = sb.toString();
String valueString = String.join(",",valueList);
String updateString = String.join(",",updateList);
return String.format("INSERT INTO %s (%s) values (%s) ON DUPLICATE KEY UPDATE %s;",table,columnString,valueString,updateString);
}
private String tranferUpdateSql(String table, JSONObject dataObj, JSONObject oldDataObj, JSONObject mysqlType,Set<String> pkNameSet) {
Set<String> columnSet = mysqlType.keySet();
List<String> setList = new ArrayList<>();
List<String> whereList = new ArrayList<>();
for (String col : columnSet) {
String setString = col.concat(" = ").concat(getValueString(dataObj,col,mysqlType.getString(col)));
setList.add(setString);
}
for (String pk : pkNameSet) {
String whereString = pk.concat(" = ").concat(getValueString(oldDataObj,pk,mysqlType.getString(pk)));
whereList.add(whereString);
}
String setString = String.join(",",setList);
String whereString = String.join(" and ",whereList);
return String.format("UPDATE %s SET %s WHERE %s",table,setString,whereString);
}
private String transferDeleteSql(String table, JSONObject dataObj, JSONObject mysqlType, Set<String> pkNameSet) {
List<String> whereList = new ArrayList<>();
for (String pk : pkNameSet) {
String whereString = pk.concat(" = ").concat(getValueString(dataObj,pk,mysqlType.getString(pk)));
whereList.add(whereString);
}
String whereString = String.join(" and ",whereList);
return String.format("DELETE FROM %s WHERE %s",table,whereString);
}
/**
* @author shezaixing
* @date 2023/12/7 14:23
* @description 判断拼接字符串时类型(是否需要加上引号)
*
*/
private static String getValueString(JSONObject dataObj,String columnKey,String mysqlType){
if(null == dataObj.get(columnKey)){
return "null";
}
//需要处理成字符串加引号的类型
if(Arrays.asList(STR_SQL_TYPE).contains(mysqlType.toUpperCase())){
return String.format("'%s'", dataObj.getString(columnKey).replace("\\","\\\\").replace("'", "\\'") );
}
//时间字段处理
if("DATE".equalsIgnoreCase(mysqlType) || "DATETIME".equalsIgnoreCase(mysqlType)){
SimpleDateFormat df = "DATETIME".equalsIgnoreCase(mysqlType) ? new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") : new SimpleDateFormat("yyyy-MM-dd");
return String.format("\"%s\"",df.format(dataObj.getDate(columnKey)));
}
return dataObj.getString(columnKey);
}
public static void main(String[] args) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("id",1);
jsonObject.put("name","Nana");
jsonObject.put("age",26);
jsonObject.put("salary",20000);
jsonObject.put("date1","2023-10-01");
jsonObject.put("date2","2023-10-02 11:11:00");
JSONObject mysqlType = new JSONObject();
mysqlType.put("id","int");
mysqlType.put("name","varchar");
mysqlType.put("age","bigint");
mysqlType.put("salary","double");
mysqlType.put("date1","date");
mysqlType.put("date2","datetime");
mysqlType.put("relation",null);
String table = "test";
String s= "ff8940af-c080-40cc-9d83-8c7dc8b86ed4";
System.out.println(s.length());
String s1 = "hello string sss";
String s2 = "'kaskljsl'";
System.out.println(StrUtil.subBefore(s1,"string",true));
System.out.println(tranferInsertSql(table,jsonObject,mysqlType));
System.out.println(s2.replaceAll("'","\\\\'"));
String[] ss = new String[]{"1","2","3" };
StringBuilder sb = new StringBuilder();
for (String s3 : ss)
{
sb.append("`"+s3+"?").append(",");
}
for (String s3 : ss) {
System.out.println(s3);
}
sb.setLength(sb.length()-1);
System.out.println(sb.toString());
String s5 = "交货地址:安徽霍邱供应站及指定地点\\\",\\\"bjsm\\\":null,\\\"jhType\\\":null,\\";
System.out.println(s5);
System.out.println(s5.replace("\\","\\\\").replace("'", "\\'"));
}
}
package com.dsk.flink.dsc.sync; package com.dsk.flink.dsc.sync;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.common.function.AsyncMysqlDataTransferFunction; import com.dsk.flink.dsc.common.function.AsyncMysqlDataTransferFunctionNew;
import com.dsk.flink.dsc.common.sink.MysqlDataTransferSink; import com.dsk.flink.dsc.common.sink.MysqlDataTransferSink;
import com.dsk.flink.dsc.utils.EnvProperties; import com.dsk.flink.dsc.utils.EnvProperties;
import com.dsk.flink.dsc.utils.EnvPropertiesUtil; import com.dsk.flink.dsc.utils.EnvPropertiesUtil;
...@@ -12,15 +13,26 @@ import lombok.extern.slf4j.Slf4j; ...@@ -12,15 +13,26 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/** /**
* @author shezaixing * @author shezaixing
...@@ -55,7 +67,9 @@ public class SyncCustomerDataSource { ...@@ -55,7 +67,9 @@ public class SyncCustomerDataSource {
//TODO 到时需要改这里,改成正式的消费组 //TODO 到时需要改这里,改成正式的消费组
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(),envProps.getKafka_password())); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(),envProps.getKafka_password()));
//System.out.println(envProps.getKafka_topic()); //System.out.println(envProps.getKafka_topic());
kafkaConsumer.setStartFromEarliest(); long defaultOffset = LocalDateTime.now().minusMinutes(5).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
kafkaConsumer.setStartFromTimestamp(defaultOffset);
//kafkaConsumer.setStartFromLatest();
//偏移量 //偏移量
if (StrUtil.isNotBlank(offsetTimestamp)) { if (StrUtil.isNotBlank(offsetTimestamp)) {
kafkaConsumer.setStartFromTimestamp(Long.parseLong(offsetTimestamp)); kafkaConsumer.setStartFromTimestamp(Long.parseLong(offsetTimestamp));
...@@ -72,7 +86,8 @@ public class SyncCustomerDataSource { ...@@ -72,7 +86,8 @@ public class SyncCustomerDataSource {
.filter(new FilterFunction<JSONObject>() { .filter(new FilterFunction<JSONObject>() {
@Override @Override
public boolean filter(JSONObject value) throws Exception { public boolean filter(JSONObject value) throws Exception {
return !value.getBoolean("isDdl");
return !value.getBoolean("isDdl") && !"TIDB_WATERMARK".equals(value.getString("type"));
} }
}) })
.name("canalJsonStream") .name("canalJsonStream")
...@@ -80,19 +95,54 @@ public class SyncCustomerDataSource { ...@@ -80,19 +95,54 @@ public class SyncCustomerDataSource {
//canalJsonStream.print("canal stream"); //canalJsonStream.print("canal stream");
SingleOutputStreamOperator<String> sqlResultStream = AsyncDataStream.orderedWait(canalJsonStream, new AsyncMysqlDataTransferFunction(envProps), 1200L, TimeUnit.SECONDS, 20) // SingleOutputStreamOperator<String> sqlResultStream = AsyncDataStream.orderedWait(canalJsonStream, new AsyncMysqlDataTransferFunction(envProps), 1200L, TimeUnit.SECONDS, 20)
.filter(new FilterFunction<String>() { // .filter(new FilterFunction<String>() {
// @Override
// public boolean filter(String value) throws Exception {
// return StrUtil.isNotBlank(value) && !"err".equals(value);
// }
// })
// .name("sqlResultStream")
// .uid("sqlResultStream");
//
// //sqlResultStream.print("sql result");
//
// sqlResultStream.addSink(new MysqlDataTransferSink(envProps)).name("sqlSinkStream").uid("sqlSinkStream");
SingleOutputStreamOperator<Tuple3<String, String, Long>> sqlResultStream1 = AsyncDataStream.orderedWait(canalJsonStream,
new AsyncMysqlDataTransferFunctionNew(envProps), 1200L, TimeUnit.SECONDS, 20)
.filter(new FilterFunction<Tuple3<String, String, Long>>() {
@Override @Override
public boolean filter(String value) throws Exception { public boolean filter(Tuple3<String, String, Long> value) throws Exception {
return StrUtil.isNotBlank(value) && !"err".equals(value); return StrUtil.isNotBlank(value.f0) && !"err".equals(value.f0);
} }
}) })
.name("sqlResultStream") .name("sqlResultStream")
.uid("sqlResultStream"); .uid("sqlResultStream");
//sqlResultStream.print("sql result"); SingleOutputStreamOperator<String> groupWindowSqlResultStream = sqlResultStream1.keyBy(value -> value.f1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
.process(new ProcessWindowFunction<Tuple3<String, String, Long>, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<Tuple3<String, String, Long>, String, String,
TimeWindow>.Context context, Iterable<Tuple3<String, String, Long>> elements,
Collector<String> out) throws Exception {
List<Tuple3<String, String, Long>> list = CollUtil.list(false, elements);
if ("dsc_cdc_log".equals(list.get(0).f1)) {
list = list.stream().sorted(Comparator.comparing(x -> x.f2,Comparator.reverseOrder() )).collect(Collectors.toList());
list.forEach(x -> {out.collect(x.f0);});
return;
}
Tuple3<String, String, Long> maxTsElement =
list.stream().max(Comparator.comparing(x -> x.f2)).get();
out.collect(maxTsElement.f0);
}
})
.name("groupWindowSqlResultStream")
.uid("groupWindowSqlResultStream");
groupWindowSqlResultStream.print("sql result");
sqlResultStream.addSink(new MysqlDataTransferSink(envProps)).name("sqlSinkStream").uid("sqlSinkStream"); groupWindowSqlResultStream.addSink(new MysqlDataTransferSink(envProps)).name("sqlSinkStream").uid("sqlSinkStream");
env.execute(); env.execute();
} }
} }
...@@ -113,6 +113,26 @@ public class EnvProperties extends Properties { ...@@ -113,6 +113,26 @@ public class EnvProperties extends Properties {
String solr_urls; String solr_urls;
String solr_zk_hosts; String solr_zk_hosts;
String logical_delete;
String log_enable;
public String getLog_enable() {
return logical_delete == null ? this.getProperty("logical_delete") : logical_delete;
}
public void setLog_enable(String log_enable) {
this.log_enable = log_enable;
}
public String getLogical_delete() {
return logical_delete == null ? this.getProperty("logical_delete") : logical_delete;
}
public void setLogical_delete(String logical_delete) {
this.logical_delete = logical_delete;
}
public String getEnv() { public String getEnv() {
return env == null ? this.getProperty("env") : env; return env == null ? this.getProperty("env") : env;
} }
......
...@@ -145,7 +145,7 @@ public class EnvPropertiesUtil { ...@@ -145,7 +145,7 @@ public class EnvPropertiesUtil {
EnvProperties envProperties = new EnvProperties(); EnvProperties envProperties = new EnvProperties();
if(StrUtil.isBlank(filePath)){ if(StrUtil.isBlank(filePath)){
filePath = System.getProperties().getProperty("os.name").contains("Windows") ? "D:\\Env\\application_pro.properties" : "/home/module/flink-job/application.properties"; filePath = System.getProperties().getProperty("os.name").contains("Windows") ? "D:\\Env\\application.properties" : "/home/module/flink-job/application.properties";
} }
File file = new File(filePath); File file = new File(filePath);
if (!file.exists()) { if (!file.exists()) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment