Commit 2d9ab50f authored by 沈毫厘's avatar 沈毫厘

分库分表修改

parent e552acf5
package com.dsk.flink.dsc.common.function; package com.dsk.flink.dsc.common.function;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties; import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
...@@ -66,23 +69,45 @@ public class AsyncMysqlDataTransferFunction extends RichAsyncFunction<JSONObject ...@@ -66,23 +69,45 @@ public class AsyncMysqlDataTransferFunction extends RichAsyncFunction<JSONObject
pkNames.forEach(name -> pkNameSet.add(String.valueOf(name))); pkNames.forEach(name -> pkNameSet.add(String.valueOf(name)));
} }
String shardedTable = dbInfoMap.getSharded_table(); String shardingRule = dataObj.getString("shardingRule");
List<Map<String,Object>> list = JSONArray.parseObject(shardedTable, List.class);
List<Map<String, Object>> table1 = list.stream().filter(l -> {
return l.get("table").toString().equals(table);
}).collect(Collectors.toList());
String concat = ""; String concat = "";
//分表数 if (StrUtil.isNotBlank(shardingRule)){
String tbNum = table1.get(0).get("tb_num").toString(); Map<String,Object> map = JSON.parseObject(shardingRule, Map.class);
//分表字段 String strategy = map.get("strategy").toString();
Integer majorKey = dataObj.getInteger(table1.get(0).get("major_key").toString());
if (!tbNum.equals("1")){ Map<String,Object> strategyMap = JSON.parseObject(strategy, Map.class);
int i1 = majorKey.intValue() % Integer.parseInt(tbNum); String str = MapUtil.getStr(strategyMap, "sharding-column");
concat = table.concat("_").concat(String.valueOf(i1)); int i = MapUtil.getInt(strategyMap, "sharding-count").intValue();
if (i > 1){
int tableNum = dataObj.getInteger(str).intValue() % i;
concat = table.concat("_").concat(String.valueOf(tableNum));
}else {
concat = table;
}
}else { }else {
concat = table; String shardedTable = dbInfoMap.getSharded_table();
List<Map<String,Object>> list = JSONArray.parseObject(shardedTable, List.class);
List<Map<String, Object>> table1 = list.stream().filter(l -> {
return l.get("table").toString().equals(table);
}).collect(Collectors.toList());
if (ObjectUtils.isEmpty(table1)){
concat = table;
}else {
//分表数
String tbNum = table1.get(0).get("tb_num").toString();
//分表字段
Integer majorKey = dataObj.getInteger(table1.get(0).get("major_key").toString());
if (!tbNum.equals("1")){
int i1 = majorKey.intValue() % Integer.parseInt(tbNum);
concat = table.concat("_").concat(String.valueOf(i1));
}else {
concat = table;
}
}
} }
String excueteSql = ""; String excueteSql = "";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment