Commit ce59d998 authored by shezaixing's avatar shezaixing

处理唯一键update拆分cdc消息

parent 6085ef5a
package com.dsk.flink.dsc.common.function;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
public class CanalMapToTsGroupFunction implements MapFunction<JSONObject, Tuple3<JSONObject, String, Long>> {
@Override
public Tuple3<JSONObject, String, Long> map(JSONObject value) throws Exception {
JSONArray dataList = value.getJSONArray("data");
JSONObject mysqlType = value.getJSONObject("mysqlType");
String table = value.getString("table");
JSONArray pkNames = value.getJSONArray("pkNames");
Set<String> pkNameSet = new HashSet<>();
long ts = value.getLong("ts");
if (CollUtil.isNotEmpty(pkNames)) {
pkNames.forEach(name -> pkNameSet.add(String.valueOf(name)));
}
JSONObject dataObj = dataList.getJSONObject(0);
String groupKey = table;
for (String pk : pkNameSet) {
String pkValue = getValueString(dataObj, pk,
mysqlType.getString(pk));
groupKey = table.concat("-").concat(pkValue);
}
return Tuple3.of(value, groupKey, ts);
}
private static final String[] STR_SQL_TYPE = new String[]{"VARCHAR","CHAR","TINYBLOB","BLOB","MEDIUMBLOB","LONGBLOB","TINYTEXT","TEXT","MEDIUMTEXT","LONGTEXT","TIME","TIMESTAMP","JSON","json"};
/**
* @author shezaixing
* @date 2023/12/7 14:23
* @description 判断拼接字符串时类型(是否需要加上引号)
*
*/
private static String getValueString(JSONObject dataObj,String columnKey,String mysqlType){
if(null == dataObj.get(columnKey)){
return "null";
}
//需要处理成字符串加引号的类型
if(Arrays.asList(STR_SQL_TYPE).contains(mysqlType.toUpperCase())){
return String.format("'%s'", dataObj.getString(columnKey).replace("\\","\\\\").replace("'", "\\'") );
}
//时间字段处理
if("DATE".equalsIgnoreCase(mysqlType) || "DATETIME".equalsIgnoreCase(mysqlType)){
SimpleDateFormat df = "DATETIME".equalsIgnoreCase(mysqlType) ? new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") : new SimpleDateFormat("yyyy-MM-dd");
return String.format("\"%s\"",df.format(dataObj.getDate(columnKey)));
}
return dataObj.getString(columnKey);
}
}
package com.dsk.flink.dsc.common.function;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class GroupTsProcessWindowFunction extends ProcessWindowFunction<Tuple3<JSONObject, String, Long>, JSONObject, Long, TimeWindow> {
@Override
public void process(Long aLong, ProcessWindowFunction<Tuple3<JSONObject, String, Long>, JSONObject, Long,
TimeWindow>.Context context, Iterable<Tuple3<JSONObject, String, Long>> elements, Collector<JSONObject> out) throws Exception {
List<Tuple3<JSONObject, String, Long>> list = CollUtil.list(false, elements);
Map<Long, List<Tuple3<JSONObject, String, Long>>> tsGroupMap = list.stream().collect(Collectors.groupingBy(x -> x.f2));
Long max = CollUtil.max(tsGroupMap.keySet());
List<Tuple3<JSONObject, String, Long>> resList = tsGroupMap.get(max);
if(resList.size() == 2){
JSONObject insertJson = new JSONObject();
JSONObject deleteJson = new JSONObject();
for (Tuple3<JSONObject, String, Long> rs : resList) {
if("INSERT".equals(rs.f0.getString("type"))){
insertJson = rs.f0;
}
if("DELETE".equals(rs.f0.getString("type"))){
deleteJson = rs.f0;
}
}
if(StrUtil.isNotBlank(insertJson.getString("type")) && StrUtil.isNotBlank(deleteJson.getString("type"))){
insertJson.put("type","UPDATE");
insertJson.put("old", deleteJson.getJSONArray("data"));
out.collect(insertJson);
return;
}
}
out.collect(resList.get(0).f0);
}
}
......@@ -4,6 +4,8 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.common.function.AsyncMysqlDataTransferFunctionNew;
import com.dsk.flink.dsc.common.function.CanalMapToTsGroupFunction;
import com.dsk.flink.dsc.common.function.GroupTsProcessWindowFunction;
import com.dsk.flink.dsc.common.sink.MysqlDataTransferSink;
import com.dsk.flink.dsc.utils.EnvProperties;
import com.dsk.flink.dsc.utils.EnvPropertiesUtil;
......@@ -95,6 +97,12 @@ public class SyncCustomerDataSource {
//canalJsonStream.print("canal stream");
SingleOutputStreamOperator<Tuple3<JSONObject, String, Long>> tsGroupStream = canalJsonStream.map(new CanalMapToTsGroupFunction());
SingleOutputStreamOperator<JSONObject> process = tsGroupStream.keyBy(x -> x.f2)
.window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
.process(new GroupTsProcessWindowFunction());
// SingleOutputStreamOperator<String> sqlResultStream = AsyncDataStream.orderedWait(canalJsonStream, new AsyncMysqlDataTransferFunction(envProps), 1200L, TimeUnit.SECONDS, 20)
// .filter(new FilterFunction<String>() {
// @Override
......@@ -109,7 +117,7 @@ public class SyncCustomerDataSource {
//
// sqlResultStream.addSink(new MysqlDataTransferSink(envProps)).name("sqlSinkStream").uid("sqlSinkStream");
SingleOutputStreamOperator<Tuple3<String, String, Long>> sqlResultStream1 = AsyncDataStream.orderedWait(canalJsonStream,
SingleOutputStreamOperator<Tuple3<String, String, Long>> sqlResultStream1 = AsyncDataStream.orderedWait(process,
new AsyncMysqlDataTransferFunctionNew(envProps), 1200L, TimeUnit.SECONDS, 20)
.filter(new FilterFunction<Tuple3<String, String, Long>>() {
@Override
......@@ -140,7 +148,7 @@ public class SyncCustomerDataSource {
})
.name("groupWindowSqlResultStream")
.uid("groupWindowSqlResultStream");
groupWindowSqlResultStream.print("sql result");
//groupWindowSqlResultStream.print("sql result");
groupWindowSqlResultStream.addSink(new MysqlDataTransferSink(envProps)).name("sqlSinkStream").uid("sqlSinkStream");
env.execute();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment