Commit f3d7e6ed authored by liaowenwu's avatar liaowenwu

优化代码

parent 2b8ea804
package com.dsk.flink.dsc.common.function; package com.dsk.flink.dsc.common.function;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple3<String,String,Long>> { public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple3<String,String,Long>> {
//数据库连接信息
private final EnvProperties dbInfoMap;
private OutputTag<Tuple3<String, String, Long>> toSlideTag;
public MysqlDataTransferFunction(EnvProperties dbInfoMap, OutputTag<Tuple3<String, String, Long>> toSlideTag) {
this.dbInfoMap = dbInfoMap;
this.toSlideTag = toSlideTag;
}
private static String logSqlFormat = "INSERT INTO dsc_cdc_log (`table`,op_type,pk_columns,pk_values,data_json,cdc_ts) values ('%s','%s','%s','%s','%s', %d)";
private String buildLogData(String type, String table, Set<String> pkNameSet, JSONObject dataObj, long ts, String dataJsonStr) {
List<String> pkValueList = new ArrayList<>();
for (String pk : pkNameSet) {
pkValueList.add(dataObj.getString(pk));
}
String pkColumns = String.join(",",pkNameSet);
String pkValues = String.join("-",pkValueList);
dataJsonStr = dataJsonStr.replace("\\","\\\\");
return String.format(logSqlFormat, table, type, pkColumns, pkValues, dataJsonStr, ts);
}
private static final String[] STR_SQL_TYPE = new String[]{"VARCHAR","CHAR","TINYBLOB","BLOB","MEDIUMBLOB","LONGBLOB","TINYTEXT","TEXT","MEDIUMTEXT","LONGTEXT","TIME","TIMESTAMP","JSON","json"}; private static final String[] STR_SQL_TYPE = new String[]{"VARCHAR","CHAR","TINYBLOB","BLOB","MEDIUMBLOB","LONGBLOB","TINYTEXT","TEXT","MEDIUMTEXT","LONGTEXT","TIME","TIMESTAMP","JSON","json"};
private static final String[] KEYWORD = new String[]{"limit"};
private static String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType) { private static String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType) {
Set<String> columnSet = mysqlType.keySet(); Set<String> columnSet = mysqlType.keySet();
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (String s : columnSet) { for (String s : columnSet) {
sb.append("`"+s+"`").append(","); sb.append("`").append(s).append("`,");
} }
List<String> valueList = new ArrayList<>(); List<String> valueList = new ArrayList<>();
for (String col : columnSet) { for (String col : columnSet) {
if(Arrays.asList(KEYWORD).contains(col)){ valueList.add(getValueString(dataObj,col,mysqlType.getString(col)));
valueList.add(getValueString(dataObj,col,mysqlType.getString(col)));
}else {
valueList.add(getValueString(dataObj,col,mysqlType.getString(col)));
}
} }
sb.setLength(sb.length()-1); sb.setLength(sb.length()-1);
String columnString = sb.toString(); String columnString = sb.toString();
...@@ -104,8 +65,8 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple ...@@ -104,8 +65,8 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
//时间字段处理 //时间字段处理
if("DATE".equalsIgnoreCase(mysqlType) || "DATETIME".equalsIgnoreCase(mysqlType)){ if("DATE".equalsIgnoreCase(mysqlType) || "DATETIME".equalsIgnoreCase(mysqlType)){
SimpleDateFormat df = "DATETIME".equalsIgnoreCase(mysqlType) ? new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") : new SimpleDateFormat("yyyy-MM-dd"); String date = "DATETIME".equalsIgnoreCase(mysqlType) ? DateUtil.format(dataObj.getDate(columnKey),"yyyy-MM-dd HH:mm:ss") : DateUtil.format(dataObj.getDate(columnKey),"yyyy-MM-dd");
return String.format("\"%s\"",df.format(dataObj.getDate(columnKey))); return String.format("\"%s\"",date);
} }
return dataObj.getString(columnKey); return dataObj.getString(columnKey);
...@@ -124,17 +85,14 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple ...@@ -124,17 +85,14 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
if(CollUtil.isNotEmpty(pkNames)){ if(CollUtil.isNotEmpty(pkNames)){
pkNames.forEach(name -> pkNameSet.add(String.valueOf(name))); pkNames.forEach(name -> pkNameSet.add(String.valueOf(name)));
} }
String excueteSql = ""; String excueteSql;
if(value.getBoolean("isDdl")){
return;
}
JSONObject dataObj = dataList.getJSONObject(0); JSONObject dataObj = dataList.getJSONObject(0);
Boolean logicalDelete = MapUtil.getBool(dbInfoMap, "logical_delete", false); /*Boolean logicalDelete = MapUtil.getBool(dbInfoMap, "logical_delete", false);
if(logicalDelete){ if(logicalDelete){
mysqlType.put("is_del", "int"); mysqlType.put("is_del", "int");
dataObj.put("is_del", "DELETE".equals(type) ? 1 : 0); dataObj.put("is_del", "DELETE".equals(type) ? 1 : 0);
} }*/
//处理先后顺序 //处理先后顺序
//获取该条数据的表名和主键作为唯一的groupKey //获取该条数据的表名和主键作为唯一的groupKey
...@@ -147,12 +105,12 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple ...@@ -147,12 +105,12 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
if("INSERT".equals(type) || "UPDATE".equals(type)){ if("INSERT".equals(type) || "UPDATE".equals(type)){
excueteSql = tranferInsertSql(table,dataObj,mysqlType); excueteSql = tranferInsertSql(table,dataObj,mysqlType);
} else { } else {
excueteSql = logicalDelete ? tranferInsertSql(table,dataObj,mysqlType) : transferDeleteSql(table,dataObj,mysqlType,pkNameSet); excueteSql = transferDeleteSql(table,dataObj,mysqlType,pkNameSet);
} }
out.collect(Tuple3.of(excueteSql,groupKey,ts)); out.collect(Tuple3.of(excueteSql,groupKey,ts));
if (MapUtil.getBool(dbInfoMap, "log_enable", false)){ /*if (MapUtil.getBool(dbInfoMap, "log_enable", false)){
String logSql = buildLogData(type, table, pkNameSet, dataObj, ts, value.toJSONString()); String logSql = buildLogData(type, table, pkNameSet, dataObj, ts, value.toJSONString());
ctx.output(toSlideTag,Tuple3.of(logSql,"dsc_cdc_log",ts)); ctx.output(toSlideTag,Tuple3.of(logSql,"dsc_cdc_log",ts));
} }*/
} }
} }
...@@ -66,16 +66,15 @@ public class SyncCustomerDataSource { ...@@ -66,16 +66,15 @@ public class SyncCustomerDataSource {
System.out.println("读取到的数据连接配置:->" + String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database())); System.out.println("读取到的数据连接配置:->" + String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database()));
System.out.println("获取到的kafka消费组:->" + EtlUtils.getKafkaGroup(envProps)); System.out.println("获取到的kafka消费组:->" + EtlUtils.getKafkaGroup(envProps));
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(),envProps.getKafka_password())); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(),envProps.getKafka_password()));
//System.out.println(envProps.getKafka_topic());
long defaultOffset = LocalDateTime.now().minusMinutes(10).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); long defaultOffset = LocalDateTime.now().minusMinutes(10).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
kafkaConsumer.setStartFromTimestamp(defaultOffset); kafkaConsumer.setStartFromTimestamp(defaultOffset);
//kafkaConsumer.setStartFromLatest();
//偏移量 //偏移量
if (StrUtil.isNotBlank(offsetTimestamp)) { if (StrUtil.isNotBlank(offsetTimestamp)) {
kafkaConsumer.setStartFromTimestamp(Long.parseLong(offsetTimestamp)); kafkaConsumer.setStartFromTimestamp(Long.parseLong(offsetTimestamp));
} }
SingleOutputStreamOperator<Tuple3<JSONObject, String, Long>> tsGroupStream = env.addSource(kafkaConsumer) SingleOutputStreamOperator<JSONObject> tsGroupStream = env.addSource(kafkaConsumer)
.setParallelism(1)
.map(JSONObject::parseObject) .map(JSONObject::parseObject)
.filter(new FilterFunction<JSONObject>() { .filter(new FilterFunction<JSONObject>() {
@Override @Override
...@@ -84,23 +83,17 @@ public class SyncCustomerDataSource { ...@@ -84,23 +83,17 @@ public class SyncCustomerDataSource {
} }
}) })
.name("dsc-source") .name("dsc-source")
.uid("dsc-source") .uid("dsc-source");
.map(new CanalMapToTsGroupFunction())
.name("dsc-groupKey")
.uid("dsc-groupKey");
SingleOutputStreamOperator<JSONObject> process = tsGroupStream.keyBy(x -> x.f1) //tsGroupStream.print("source==>");
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(50)))
.process(new GroupTsProcessWindowFunction())
.uid("dsc-w1")
.name("dsc-w1");
OutputTag<Tuple3<String, String, Long>> cdcLogTag = new OutputTag<Tuple3<String, String, Long>>("dsc_cdc_log") {}; SingleOutputStreamOperator<Tuple3<String, String, Long>> slide = tsGroupStream
SingleOutputStreamOperator<Tuple3<String, String, Long>> slide = process .process(new MysqlDataTransferFunction())
.process(new MysqlDataTransferFunction(envProps, cdcLogTag))
.name("dsc-sql") .name("dsc-sql")
.uid("dsc-sql"); .uid("dsc-sql");
//slide.print("dsc-sql ==>");
SingleOutputStreamOperator<String> groupWindowSqlResultStream = slide SingleOutputStreamOperator<String> groupWindowSqlResultStream = slide
.keyBy(value -> value.f1) .keyBy(value -> value.f1)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(50))) .window(TumblingProcessingTimeWindows.of(Time.milliseconds(50)))
...@@ -109,7 +102,7 @@ public class SyncCustomerDataSource { ...@@ -109,7 +102,7 @@ public class SyncCustomerDataSource {
public void process(String s, ProcessWindowFunction<Tuple3<String, String, Long>, String, String, public void process(String s, ProcessWindowFunction<Tuple3<String, String, Long>, String, String,
TimeWindow>.Context context, Iterable<Tuple3<String, String, Long>> elements, TimeWindow>.Context context, Iterable<Tuple3<String, String, Long>> elements,
Collector<String> out) throws Exception { Collector<String> out) throws Exception {
List<Tuple3<String, String, Long>> list = CollUtil.list(false, elements); List<Tuple3<String, String, Long>> list = CollUtil.newArrayList(elements);
if (CollUtil.isNotEmpty(list)) { if (CollUtil.isNotEmpty(list)) {
Tuple3<String, String, Long> maxTsElement = Tuple3<String, String, Long> maxTsElement =
list.stream().max(Comparator.comparing(x -> x.f2)).get(); list.stream().max(Comparator.comparing(x -> x.f2)).get();
...@@ -120,14 +113,12 @@ public class SyncCustomerDataSource { ...@@ -120,14 +113,12 @@ public class SyncCustomerDataSource {
.name("dsc-max") .name("dsc-max")
.uid("dsc-max"); .uid("dsc-max");
//groupWindowSqlResultStream.print("dsc-max==>");
groupWindowSqlResultStream.addSink(new MysqlDataTransferSink(envProps)) groupWindowSqlResultStream.addSink(new MysqlDataTransferSink(envProps))
.name("dsc-sink") .name("dsc-sink")
.uid("dsc-sink"); .uid("dsc-sink");
slide.getSideOutput(cdcLogTag).map(x -> x.f0).addSink(new MysqlDataTransferSink(envProps))
.name("dsc-cdc-log")
.uid("dsc-cdc-log");
env.execute("dsc-client"); env.execute("dsc-client");
} }
} }
...@@ -16,16 +16,16 @@ public class EtlUtils { ...@@ -16,16 +16,16 @@ public class EtlUtils {
properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password)); properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password));
properties.setProperty("security.protocol", "SASL_PLAINTEXT"); properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
properties.setProperty("fetch.max.bytes", "10485760"); //10M properties.setProperty("fetch.max.bytes", "2097152"); //2M
//properties.setProperty("max.partition.fetch.bytes", "104857600"); //properties.setProperty("max.partition.fetch.bytes", "104857600");
properties.setProperty("flink.consumer.max.fetch.size", "5242880");//5m properties.setProperty("flink.consumer.max.fetch.size", "1048576");//1m
//properties.setProperty("frame.size", "2097152000"); //properties.setProperty("frame.size", "2097152000");
properties.setProperty("session.timeout.ms", "30000"); properties.setProperty("session.timeout.ms", "30000");
properties.setProperty("heartbeat.interval.ms", "10000"); properties.setProperty("heartbeat.interval.ms", "10000");
properties.setProperty("request.timeout.ms", "60000"); properties.setProperty("request.timeout.ms", "60000");
properties.setProperty("retries", "3"); properties.setProperty("retries", "3");
properties.setProperty("retry.backoff.ms", "5000"); properties.setProperty("retry.backoff.ms", "5000");
properties.setProperty("receive.buffer.bytes", "2621440"); //2.5m properties.setProperty("receive.buffer.bytes", "262144"); //256k
properties.setProperty("max.poll.records", "50"); properties.setProperty("max.poll.records", "50");
return properties; return properties;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment