Commit 2b8ea804 authored by liaowenwu's avatar liaowenwu

优化代码

parent 01f8cec1
package com.dsk.flink.dsc.common.function;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple3<String,String,Long>> {
//数据库连接信息
private final EnvProperties dbInfoMap;
private OutputTag<Tuple3<String, String, Long>> toSlideTag;
public MysqlDataTransferFunction(EnvProperties dbInfoMap, OutputTag<Tuple3<String, String, Long>> toSlideTag) {
this.dbInfoMap = dbInfoMap;
this.toSlideTag = toSlideTag;
}
private static String logSqlFormat = "INSERT INTO dsc_cdc_log (`table`,op_type,pk_columns,pk_values,data_json,cdc_ts) values ('%s','%s','%s','%s','%s', %d)";
private String buildLogData(String type, String table, Set<String> pkNameSet, JSONObject dataObj, long ts, String dataJsonStr) {
List<String> pkValueList = new ArrayList<>();
for (String pk : pkNameSet) {
pkValueList.add(dataObj.getString(pk));
}
String pkColumns = String.join(",",pkNameSet);
String pkValues = String.join("-",pkValueList);
dataJsonStr = dataJsonStr.replace("\\","\\\\");
return String.format(logSqlFormat, table, type, pkColumns, pkValues, dataJsonStr, ts);
}
private static final String[] STR_SQL_TYPE = new String[]{"VARCHAR","CHAR","TINYBLOB","BLOB","MEDIUMBLOB","LONGBLOB","TINYTEXT","TEXT","MEDIUMTEXT","LONGTEXT","TIME","TIMESTAMP","JSON","json"};
private static final String[] KEYWORD = new String[]{"limit"};
private static String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType) {
Set<String> columnSet = mysqlType.keySet();
StringBuilder sb = new StringBuilder();
for (String s : columnSet) {
sb.append("`"+s+"`").append(",");
}
List<String> valueList = new ArrayList<>();
for (String col : columnSet) {
if(Arrays.asList(KEYWORD).contains(col)){
valueList.add(getValueString(dataObj,col,mysqlType.getString(col)));
}else {
valueList.add(getValueString(dataObj,col,mysqlType.getString(col)));
}
}
sb.setLength(sb.length()-1);
String columnString = sb.toString();
String valueString = String.join(",",valueList);
//return String.format("INSERT INTO %s (%s) values (%s) ON DUPLICATE KEY UPDATE %s;",table,columnString,valueString,updateString);
return String.format("REPLACE INTO %s (%s) values (%s);",table,columnString,valueString);
}
private String transferDeleteSql(String table, JSONObject dataObj, JSONObject mysqlType, Set<String> pkNameSet) {
List<String> whereList = new ArrayList<>();
for (String pk : pkNameSet) {
String whereString = pk.concat(" = ").concat(getValueString(dataObj,pk,mysqlType.getString(pk)));
whereList.add(whereString);
}
String whereString = String.join(" and ",whereList);
return String.format("DELETE FROM %s WHERE %s",table,whereString);
}
/**
* @author shezaixing
* @date 2023/12/7 14:23
* @description 判断拼接字符串时类型(是否需要加上引号)
*
*/
private static String getValueString(JSONObject dataObj,String columnKey,String mysqlType){
if(null == dataObj.get(columnKey)){
return "null";
}
//需要处理成字符串加引号的类型
if(Arrays.asList(STR_SQL_TYPE).contains(mysqlType.toUpperCase())){
return String.format("'%s'", dataObj.getString(columnKey).replace("\\","\\\\").replace("'", "\\'") );
}
//时间字段处理
if("DATE".equalsIgnoreCase(mysqlType) || "DATETIME".equalsIgnoreCase(mysqlType)){
SimpleDateFormat df = "DATETIME".equalsIgnoreCase(mysqlType) ? new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") : new SimpleDateFormat("yyyy-MM-dd");
return String.format("\"%s\"",df.format(dataObj.getDate(columnKey)));
}
return dataObj.getString(columnKey);
}
@Override
public void processElement(JSONObject value, Context ctx, Collector<Tuple3<String, String, Long>> out) throws Exception {
//返回数据集合
String type = value.getString("type");
JSONArray dataList = value.getJSONArray("data");
JSONObject mysqlType = value.getJSONObject("mysqlType");
String table = value.getString("table");
JSONArray pkNames = value.getJSONArray("pkNames");
Set<String> pkNameSet = new HashSet<>();
long ts = value.getLong("ts") == null ? System.currentTimeMillis() : value.getLong("ts");
if(CollUtil.isNotEmpty(pkNames)){
pkNames.forEach(name -> pkNameSet.add(String.valueOf(name)));
}
String excueteSql = "";
if(value.getBoolean("isDdl")){
return;
}
JSONObject dataObj = dataList.getJSONObject(0);
Boolean logicalDelete = MapUtil.getBool(dbInfoMap, "logical_delete", false);
if(logicalDelete){
mysqlType.put("is_del", "int");
dataObj.put("is_del", "DELETE".equals(type) ? 1 : 0);
}
//处理先后顺序
//获取该条数据的表名和主键作为唯一的groupKey
String groupKey = table;
for (String pk : pkNameSet) {
String pkValue = getValueString(dataObj, pk, mysqlType.getString(pk));
groupKey = table.concat("-").concat(pkValue);
}
if("INSERT".equals(type) || "UPDATE".equals(type)){
excueteSql = tranferInsertSql(table,dataObj,mysqlType);
} else {
excueteSql = logicalDelete ? tranferInsertSql(table,dataObj,mysqlType) : transferDeleteSql(table,dataObj,mysqlType,pkNameSet);
}
out.collect(Tuple3.of(excueteSql,groupKey,ts));
if (MapUtil.getBool(dbInfoMap, "log_enable", false)){
String logSql = buildLogData(type, table, pkNameSet, dataObj, ts, value.toJSONString());
ctx.output(toSlideTag,Tuple3.of(logSql,"dsc_cdc_log",ts));
}
}
}
package com.dsk.flink.dsc.common.sink;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.db.DbUtil;
import cn.hutool.db.sql.SqlExecutor;
import com.alibaba.druid.pool.DruidDataSource;
import com.dsk.flink.dsc.common.dto.SqlErrorLog;
import com.dsk.flink.dsc.utils.EnvProperties;
......@@ -16,7 +16,10 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MysqlDataTransferSink extends RichSinkFunction<String> {
......@@ -62,11 +65,9 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
private void executeSql(String sql){
Connection connection = null;
PreparedStatement pt = null;
try {
connection = dataSource.getConnection();
pt = connection.prepareStatement(sql);
pt.execute();
SqlExecutor.execute(connection,sql);
} catch (Exception e) {
//logger.error("------错误时间:{}-----,sql:{}--------异常:{}", DateUtil.now(),sql,e.getMessage());
logger.error("异常信息:",e);
......@@ -77,7 +78,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
logger.error("错误日志保存异常 -> {}", re.getMessage());
}
} finally {
DbUtil.close(pt, connection);
DbUtil.close(connection);
}
}
......
......@@ -3,14 +3,13 @@ package com.dsk.flink.dsc.sync;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.common.function.AsyncMysqlDataTransferFunctionNew;
import com.dsk.flink.dsc.common.function.CanalMapToTsGroupFunction;
import com.dsk.flink.dsc.common.function.GroupTsProcessWindowFunction;
import com.dsk.flink.dsc.common.function.MysqlDataTransferFunction;
import com.dsk.flink.dsc.common.sink.MysqlDataTransferSink;
import com.dsk.flink.dsc.utils.EnvProperties;
import com.dsk.flink.dsc.utils.EnvPropertiesUtil;
import com.dsk.flink.dsc.utils.EtlUtils;
import io.tidb.bigdata.tidb.JdbcConnectionProviderFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
......@@ -18,7 +17,7 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -28,19 +27,18 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author shezaixing
* @date 2023/12/5 14:44
* @description 同步至客户目标数据源任务
*
* update by lww
*/
@Slf4j
public class SyncCustomerDataSource {
......@@ -48,15 +46,15 @@ public class SyncCustomerDataSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(3);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 30000));
env.enableCheckpointing(300000);
//env.setParallelism(1);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 60000));
env.enableCheckpointing(180000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(300000);
env.getCheckpointConfig().setCheckpointTimeout(7200000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(180000);
env.getCheckpointConfig().setCheckpointTimeout(300000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(50);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(100);
//获取用户自己的配置信息
ParameterTool parameterTool = ParameterTool.fromArgs(args);
......@@ -77,68 +75,59 @@ public class SyncCustomerDataSource {
kafkaConsumer.setStartFromTimestamp(Long.parseLong(offsetTimestamp));
}
SingleOutputStreamOperator<String> kafkaSource = env.addSource(kafkaConsumer)
.setParallelism(1)
.name("kafka-source")
.uid("kafka-source");
//kafkaSource.print("kafaka stream");
SingleOutputStreamOperator<JSONObject> canalJsonStream = kafkaSource.map(JSONObject::parseObject)
SingleOutputStreamOperator<Tuple3<JSONObject, String, Long>> tsGroupStream = env.addSource(kafkaConsumer)
.map(JSONObject::parseObject)
.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
return !value.getBoolean("isDdl") && !"TIDB_WATERMARK".equals(value.getString("type"));
}
})
.name("canalJsonStream")
.uid("canalJsonStream");
//canalJsonStream.print("canal stream");
SingleOutputStreamOperator<Tuple3<JSONObject, String, Long>> tsGroupStream = canalJsonStream.map(new CanalMapToTsGroupFunction());
.name("dsc-source")
.uid("dsc-source")
.map(new CanalMapToTsGroupFunction())
.name("dsc-groupKey")
.uid("dsc-groupKey");
SingleOutputStreamOperator<JSONObject> process = tsGroupStream.keyBy(x -> x.f1)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
.process(new GroupTsProcessWindowFunction());
SingleOutputStreamOperator<Tuple3<String, String, Long>> sqlResultStream1 = AsyncDataStream.orderedWait(process,
new AsyncMysqlDataTransferFunctionNew(envProps), 120L, TimeUnit.SECONDS)
.filter(new FilterFunction<Tuple3<String, String, Long>>() {
@Override
public boolean filter(Tuple3<String, String, Long> value) throws Exception {
return StrUtil.isNotBlank(value.f0) && !"err".equals(value.f0);
}
})
.name("sqlResultStream")
.uid("sqlResultStream");
//sqlResultStream1.print("async sql==>");
SingleOutputStreamOperator<String> groupWindowSqlResultStream = sqlResultStream1.keyBy(value -> value.f1)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(50)))
.process(new GroupTsProcessWindowFunction())
.uid("dsc-w1")
.name("dsc-w1");
OutputTag<Tuple3<String, String, Long>> cdcLogTag = new OutputTag<Tuple3<String, String, Long>>("dsc_cdc_log") {};
SingleOutputStreamOperator<Tuple3<String, String, Long>> slide = process
.process(new MysqlDataTransferFunction(envProps, cdcLogTag))
.name("dsc-sql")
.uid("dsc-sql");
SingleOutputStreamOperator<String> groupWindowSqlResultStream = slide
.keyBy(value -> value.f1)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(50)))
.process(new ProcessWindowFunction<Tuple3<String, String, Long>, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<Tuple3<String, String, Long>, String, String,
TimeWindow>.Context context, Iterable<Tuple3<String, String, Long>> elements,
Collector<String> out) throws Exception {
List<Tuple3<String, String, Long>> list = CollUtil.list(false, elements);
if ("dsc_cdc_log".equals(list.get(0).f1)) {
list = list.stream().sorted(Comparator.comparing(x -> x.f2,Comparator.reverseOrder() )).collect(Collectors.toList());
list.forEach(x -> {out.collect(x.f0);});
return;
}
if (CollUtil.isNotEmpty(list)) {
Tuple3<String, String, Long> maxTsElement =
list.stream().max(Comparator.comparing(x -> x.f2)).get();
out.collect(maxTsElement.f0);
//list.forEach(x -> {out.collect(x.f0);});
}
}
})
.name("groupWindowSqlResultStream")
.uid("groupWindowSqlResultStream");
//groupWindowSqlResultStream.print("sql result==>");
.name("dsc-max")
.uid("dsc-max");
groupWindowSqlResultStream.addSink(new MysqlDataTransferSink(envProps))
.name("dsc-sink")
.uid("dsc-sink");
slide.getSideOutput(cdcLogTag).map(x -> x.f0).addSink(new MysqlDataTransferSink(envProps))
.name("dsc-cdc-log")
.uid("dsc-cdc-log");
groupWindowSqlResultStream.addSink(new MysqlDataTransferSink(envProps)).name("sqlSinkStream").uid("sqlSinkStream");
env.execute();
env.execute("dsc-client");
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment