Commit 557e449f authored by liaowenwu's avatar liaowenwu

优化代码

parent f3d7e6ed
......@@ -137,11 +137,11 @@
<artifactId>hutool-all</artifactId>
<version>${hutool-version}</version>
</dependency>
<dependency>
<!--<dependency>
<groupId>com.dsk.io.tidb</groupId>
<artifactId>flink-tidb-connector-1.13</artifactId>
<version>0.0.5.1</version>
</dependency>
</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
......
......@@ -2,6 +2,7 @@ package com.dsk.flink.dsc.common.function;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple3;
......@@ -10,12 +11,32 @@ import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* @author lww
* @date 2025-01-14
*/
public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple3<String,String,Long>> {
private static final String[] STR_SQL_TYPE = new String[]{"VARCHAR","CHAR","TINYBLOB","BLOB","MEDIUMBLOB","LONGBLOB","TINYTEXT","TEXT","MEDIUMTEXT","LONGTEXT","TIME","TIMESTAMP","JSON","json"};
private static final Map<String,Integer> STR_SQL_TYPE;
static {
STR_SQL_TYPE = MapUtil.newHashMap();
STR_SQL_TYPE.put("VARCHAR",1);
STR_SQL_TYPE.put("CHAR",1);
STR_SQL_TYPE.put("TINYBLOB",1);
STR_SQL_TYPE.put("BLOB",1);
STR_SQL_TYPE.put("MEDIUMBLOB",1);
STR_SQL_TYPE.put("LONGBLOB",1);
STR_SQL_TYPE.put("TINYTEXT",1);
STR_SQL_TYPE.put("TEXT",1);
STR_SQL_TYPE.put("MEDIUMTEXT",1);
STR_SQL_TYPE.put("LONGTEXT",1);
STR_SQL_TYPE.put("TIME",1);
STR_SQL_TYPE.put("TIMESTAMP",1);
STR_SQL_TYPE.put("JSON",1);
}
private static String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType) {
/*private static String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType) {
Set<String> columnSet = mysqlType.keySet();
StringBuilder sb = new StringBuilder();
......@@ -32,10 +53,24 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
//return String.format("INSERT INTO %s (%s) values (%s) ON DUPLICATE KEY UPDATE %s;",table,columnString,valueString,updateString);
return String.format("REPLACE INTO %s (%s) values (%s);",table,columnString,valueString);
}*/
private static String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType) {
Set<String> columnSet = mysqlType.keySet();
StringBuilder sb = new StringBuilder("REPLACE INTO ").append(table).append(" (");
List<String> valueList = new ArrayList<>();
for (String s : columnSet) {
sb.append("`").append(s).append("`,");
valueList.add(getValueString(dataObj, s, mysqlType.getString(s)));
}
sb.setLength(sb.length() - 1);
sb.append(") values (");
sb.append(String.join(",", valueList));
sb.append(")");
return sb.toString();
}
private String transferDeleteSql(String table, JSONObject dataObj, JSONObject mysqlType, Set<String> pkNameSet) {
/*private static String transferDeleteSql(String table, JSONObject dataObj, JSONObject mysqlType, Set<String> pkNameSet) {
List<String> whereList = new ArrayList<>();
for (String pk : pkNameSet) {
......@@ -44,6 +79,17 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
}
String whereString = String.join(" and ",whereList);
return String.format("DELETE FROM %s WHERE %s",table,whereString);
}*/
private static String transferDeleteSql(String table, JSONObject dataObj, JSONObject mysqlType, Set<String> pkNameSet) {
StringBuilder whereClauseBuilder = new StringBuilder();
for (String pk : pkNameSet) {
if (whereClauseBuilder.length() > 0) {
whereClauseBuilder.append(" and ");
}
whereClauseBuilder.append(pk).append(" = ").append(getValueString(dataObj, pk, mysqlType.getString(pk)));
}
return String.format("DELETE FROM %s WHERE %s",table, whereClauseBuilder);
}
/**
......@@ -57,15 +103,15 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
if(null == dataObj.get(columnKey)){
return "null";
}
String upperCase = mysqlType.toUpperCase();
//需要处理成字符串加引号的类型
if(Arrays.asList(STR_SQL_TYPE).contains(mysqlType.toUpperCase())){
if(STR_SQL_TYPE.containsKey(upperCase)){
return String.format("'%s'", dataObj.getString(columnKey).replace("\\","\\\\").replace("'", "\\'") );
}
//时间字段处理
if("DATE".equalsIgnoreCase(mysqlType) || "DATETIME".equalsIgnoreCase(mysqlType)){
String date = "DATETIME".equalsIgnoreCase(mysqlType) ? DateUtil.format(dataObj.getDate(columnKey),"yyyy-MM-dd HH:mm:ss") : DateUtil.format(dataObj.getDate(columnKey),"yyyy-MM-dd");
if("DATE".equals(upperCase) || "DATETIME".equals(upperCase)){
String date = "DATETIME".equals(upperCase) ? DateUtil.format(dataObj.getDate(columnKey),"yyyy-MM-dd HH:mm:ss") : DateUtil.format(dataObj.getDate(columnKey),"yyyy-MM-dd");
return String.format("\"%s\"",date);
}
......@@ -76,7 +122,7 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
public void processElement(JSONObject value, Context ctx, Collector<Tuple3<String, String, Long>> out) throws Exception {
//返回数据集合
String type = value.getString("type");
JSONArray dataList = value.getJSONArray("data");
//JSONArray dataList = value.getJSONArray("data");
JSONObject mysqlType = value.getJSONObject("mysqlType");
String table = value.getString("table");
JSONArray pkNames = value.getJSONArray("pkNames");
......@@ -86,7 +132,7 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
pkNames.forEach(name -> pkNameSet.add(String.valueOf(name)));
}
String excueteSql;
JSONObject dataObj = dataList.getJSONObject(0);
JSONObject dataObj = value.getJSONArray("data").getJSONObject(0);
/*Boolean logicalDelete = MapUtil.getBool(dbInfoMap, "logical_delete", false);
if(logicalDelete){
......@@ -96,11 +142,12 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
//处理先后顺序
//获取该条数据的表名和主键作为唯一的groupKey
String groupKey = table;
StringBuilder groupKeyBuilder = new StringBuilder(table);
for (String pk : pkNameSet) {
String pkValue = getValueString(dataObj, pk, mysqlType.getString(pk));
groupKey = table.concat("-").concat(pkValue);
groupKeyBuilder.append("-").append(pkValue);
}
String groupKey = groupKeyBuilder.toString();
if("INSERT".equals(type) || "UPDATE".equals(type)){
excueteSql = tranferInsertSql(table,dataObj,mysqlType);
......
......@@ -34,7 +34,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
@Override
public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(10, 10, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
executorService = new ThreadPoolExecutor(4, 4, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
//初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
//System.out.println(configTidbUrl);
......@@ -72,19 +72,15 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
//logger.error("------错误时间:{}-----,sql:{}--------异常:{}", DateUtil.now(),sql,e.getMessage());
logger.error("异常信息:",e);
SqlErrorLog errorLog = new SqlErrorLog(new Date(), sql, e.getMessage());
try {
writeErrLog(errorLog);
}catch (Exception re){
logger.error("错误日志保存异常 -> {}", re.getMessage());
}
writeErrLogDb(errorLog);
} finally {
DbUtil.close(connection);
}
}
private void writeErrLog(SqlErrorLog errorLog) {
/*private void writeErrLog(SqlErrorLog errorLog) {
writeErrLogDb(errorLog);
}
}*/
private void writeErrLogDb(SqlErrorLog errorLog) {
Snowflake snowflake = IdUtil.getSnowflake(RandomUtil.randomInt(31), RandomUtil.randomInt(31));
......
package com.dsk.flink.dsc.common.sink;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import com.alibaba.druid.pool.DruidDataSource;
import com.dsk.flink.dsc.common.dto.SqlErrorLog;
import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author lww
* @date 2025-01-14
*/
public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
static Logger logger = LoggerFactory.getLogger(MysqlDataTransferSink.class);
EnvProperties envProps;
private static transient ExecutorService executorService;
private static transient DruidDataSource dataSource;
private static final int BATCH_SIZE = 500;
private static final int FLUSH_INTERVAL = 500;
private ArrayBlockingQueue<String> sqlBatch = new ArrayBlockingQueue<>(BATCH_SIZE * 2);
private static transient ScheduledExecutorService scheduledExecutorService;
private AtomicBoolean flushing = new AtomicBoolean(false);
public MysqlDataTransferSinkBatch(EnvProperties envProps) {
this.envProps = envProps;
}
@Override
public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(10, 10, 20, TimeUnit.MINUTES, new ArrayBlockingQueue<>(500));
// 初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUsername(envProps.getDb_username());
dataSource.setPassword(envProps.getDb_password());
dataSource.setUrl(configTidbUrl);
dataSource.setMaxActive(30);
dataSource.setInitialSize(20);
dataSource.setTestWhileIdle(true);
dataSource.setMaxWait(20000);
dataSource.setValidationQuery("select 1");
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
scheduledExecutorService.scheduleAtFixedRate(this::flush, FLUSH_INTERVAL, FLUSH_INTERVAL, TimeUnit.MILLISECONDS);
}
@Override
public void close() throws Exception {
executorService.shutdown();
scheduledExecutorService.shutdown();
dataSource.close();
}
@Override
public void invoke(String value, Context context) throws Exception {
sqlBatch.put(value);
if (sqlBatch.size() >= BATCH_SIZE) {
flush();
}
}
private void flush() {
if (flushing.compareAndSet(false, true)) {
try {
if (sqlBatch.isEmpty()) {
flushing.set(false);
return;
}
List<String> batch = new ArrayList<>(1334);
sqlBatch.drainTo(batch, BATCH_SIZE);
if (batch.isEmpty()) {
flushing.set(false);
return;
}
executorService.execute(() -> {
try (Connection connection = dataSource.getConnection()) {
connection.setAutoCommit(false);
try (Statement pt = connection.createStatement()) {
for (String sql : batch) {
pt.addBatch(sql);
}
pt.executeBatch();
connection.commit();
} catch (Exception e) {
try {
connection.rollback();
} catch (Exception ex) {
logger.error("事务回滚异常", ex);
}
logger.error("------错误时间:{}-----,sql:{}--------异常:{}", new Date(), String.join(";", batch), e);
SqlErrorLog errorLog = new SqlErrorLog(new Date(), String.join(";", batch), e.getMessage());
writeErrLogDb(errorLog);
}
} catch (Exception e) {
logger.error("获取数据库连接异常", e);
} finally {
flushing.set(false);
}
});
} catch (Exception e) {
flushing.set(false);
logger.error("Flush 操作异常", e);
}
}
}
private void writeErrLogDb(SqlErrorLog errorLog) {
Snowflake snowflake = IdUtil.getSnowflake(RandomUtil.randomInt(31), RandomUtil.randomInt(31));
String sql = "insert dsc_err_log (id,error_time, error_sql, error_msg) values (?,?,?,?)";
try (Connection conn = dataSource.getConnection();
PreparedStatement pt = conn.prepareStatement(sql)) {
pt.setLong(1, snowflake.nextId());
pt.setObject(2, errorLog.getErrorTime());
pt.setString(3, errorLog.getSql());
pt.setString(4, errorLog.getError());
pt.execute();
} catch (Exception e) {
logger.error("错误日志保存异常", e);
}
}
}
\ No newline at end of file
package com.dsk.flink.dsc.sync;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.common.function.CanalMapToTsGroupFunction;
import com.dsk.flink.dsc.common.function.GroupTsProcessWindowFunction;
import com.dsk.flink.dsc.common.function.MysqlDataTransferFunction;
import com.dsk.flink.dsc.common.sink.MysqlDataTransferSink;
import com.dsk.flink.dsc.common.sink.MysqlDataTransferSinkBatch;
import com.dsk.flink.dsc.utils.EnvProperties;
import com.dsk.flink.dsc.utils.EnvPropertiesUtil;
import com.dsk.flink.dsc.utils.EtlUtils;
......@@ -17,7 +14,7 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -27,12 +24,9 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Comparator;
import java.util.List;
/**
* @author shezaixing
......@@ -48,9 +42,9 @@ public class SyncCustomerDataSource {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 60000));
env.enableCheckpointing(180000);
env.enableCheckpointing(120000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(180000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(120000);
env.getCheckpointConfig().setCheckpointTimeout(300000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
......@@ -73,15 +67,22 @@ public class SyncCustomerDataSource {
kafkaConsumer.setStartFromTimestamp(Long.parseLong(offsetTimestamp));
}
SingleOutputStreamOperator<JSONObject> tsGroupStream = env.addSource(kafkaConsumer)
.setParallelism(1)
DataStreamSource<String> dscKafka = env
.addSource(kafkaConsumer)
.setParallelism(1);
SingleOutputStreamOperator<JSONObject> tsGroupStream = dscKafka
.map(JSONObject::parseObject)
.setParallelism(1)
.name("dsc-json")
.uid("dsc-json")
.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
return !value.getBoolean("isDdl") && !"TIDB_WATERMARK".equals(value.getString("type"));
}
})
.setParallelism(1)
.name("dsc-source")
.uid("dsc-source");
......@@ -96,16 +97,19 @@ public class SyncCustomerDataSource {
SingleOutputStreamOperator<String> groupWindowSqlResultStream = slide
.keyBy(value -> value.f1)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(50)))
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
.process(new ProcessWindowFunction<Tuple3<String, String, Long>, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<Tuple3<String, String, Long>, String, String,
TimeWindow>.Context context, Iterable<Tuple3<String, String, Long>> elements,
Collector<String> out) throws Exception {
List<Tuple3<String, String, Long>> list = CollUtil.newArrayList(elements);
if (CollUtil.isNotEmpty(list)) {
Tuple3<String, String, Long> maxTsElement =
list.stream().max(Comparator.comparing(x -> x.f2)).get();
Tuple3<String, String, Long> maxTsElement = null;
for (Tuple3<String, String, Long> element : elements) {
if (maxTsElement == null || element.f2 > maxTsElement.f2) {
maxTsElement = element;
}
}
if (maxTsElement!= null) {
out.collect(maxTsElement.f0);
}
}
......@@ -115,7 +119,7 @@ public class SyncCustomerDataSource {
//groupWindowSqlResultStream.print("dsc-max==>");
groupWindowSqlResultStream.addSink(new MysqlDataTransferSink(envProps))
groupWindowSqlResultStream.addSink(new MysqlDataTransferSinkBatch(envProps))
.name("dsc-sink")
.uid("dsc-sink");
......
......@@ -16,17 +16,15 @@ public class EtlUtils {
properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password));
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
properties.setProperty("fetch.max.bytes", "2097152"); //2M
//properties.setProperty("max.partition.fetch.bytes", "104857600");
properties.setProperty("flink.consumer.max.fetch.size", "1048576");//1m
//properties.setProperty("frame.size", "2097152000");
/*properties.setProperty("fetch.max.bytes", "1048576"); //1M
properties.setProperty("flink.consumer.max.fetch.size", "524288");//512k
properties.setProperty("session.timeout.ms", "30000");
properties.setProperty("heartbeat.interval.ms", "10000");
properties.setProperty("request.timeout.ms", "60000");
properties.setProperty("retries", "3");
properties.setProperty("retry.backoff.ms", "5000");
properties.setProperty("receive.buffer.bytes", "262144"); //256k
properties.setProperty("max.poll.records", "50");
properties.setProperty("receive.buffer.bytes", "65536"); //64k
properties.setProperty("max.poll.records", "10");*/
return properties;
}
......@@ -41,7 +39,7 @@ public class EtlUtils {
return envProps.getKafka_topic().concat(StrUtil.isNotBlank(envProps.getEnv()) ? envProps.getEnv() : "").concat("-group");
}
public static void main(String[] args) {
/*public static void main(String[] args) {
System.out.println(getSaslJaasConfig("szx","123456"));
}
}*/
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment