Commit ba62a7c4 authored by liaowenwu's avatar liaowenwu

优化代码

parent e2cad0bc
......@@ -3,6 +3,8 @@ package com.dsk.flink.dsc.common.function;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties;
......@@ -12,6 +14,7 @@ import org.apache.flink.util.Collector;
import java.util.*;
/**
* 重构代码
* @author lww
* @date 2025-01-14
*/
......@@ -41,8 +44,6 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
this.dbInfoMap = envProps;
}
private static String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType) {
Set<String> columnSet = mysqlType.keySet();
StringBuilder sb = new StringBuilder("REPLACE INTO ").append(table).append(" (");
......@@ -58,7 +59,6 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
return sb.toString();
}
private static String transferDeleteSql(String table, JSONObject dataObj, JSONObject mysqlType, Set<String> pkNameSet) {
StringBuilder whereClauseBuilder = new StringBuilder();
for (String pk : pkNameSet) {
......@@ -70,14 +70,7 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
return String.format("DELETE FROM %s WHERE %s",table, whereClauseBuilder);
}
/**
* @author shezaixing
* @date 2023/12/7 14:23
* @description 判断拼接字符串时类型(是否需要加上引号)
*
*/
private static String getValueString(JSONObject dataObj,String columnKey,String mysqlType){
if(null == dataObj.get(columnKey)){
return "null";
}
......@@ -100,7 +93,6 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
public void processElement(JSONObject value, Context ctx, Collector<Tuple3<String, String, Long>> out) throws Exception {
//返回数据集合
String type = value.getString("type");
//JSONArray dataList = value.getJSONArray("data");
JSONObject mysqlType = value.getJSONObject("mysqlType");
String table = value.getString("table");
JSONArray pkNames = value.getJSONArray("pkNames");
......@@ -117,9 +109,6 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
mysqlType.put("is_del", "int");
dataObj.put("is_del", "DELETE".equals(type) ? 1 : 0);
}
//处理先后顺序
//获取该条数据的表名和主键作为唯一的groupKey
StringBuilder groupKeyBuilder = new StringBuilder(table);
for (String pk : pkNameSet) {
String pkValue = getValueString(dataObj, pk, mysqlType.getString(pk));
......@@ -128,9 +117,19 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
String groupKey = groupKeyBuilder.toString();
//添加分表参数
//String shardingRule = dataObj.getString("shardingRule");
//String shardedTable = dbInfoMap.getSharded_table();
String shardingRule = dataObj.getString("shardingRule");
if (StrUtil.isNotBlank(shardingRule)) {
Map<String,Object> map = JSON.parseObject(shardingRule, Map.class);
String strategy = MapUtil.getStr(map, "strategy");
Map<String,Object> strategyMap = JSON.parseObject(strategy, Map.class);
int i = MapUtil.getInt(strategyMap, "sharding-count");
if (i > 1){
String str = MapUtil.getStr(strategyMap, "sharding-column");
Integer val = dataObj.getInteger(str);
val = val == null ? 1 : val;
table = table.concat("_").concat(String.valueOf(val % i));
}
}
if("INSERT".equals(type) || "UPDATE".equals(type)){
excueteSql = tranferInsertSql(table,dataObj,mysqlType);
} else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment