Commit 181322a7 authored by liaowenwu's avatar liaowenwu

修改kafka连接

parent c1b6e736
package com.dsk.flink.dsc.common.function;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
......@@ -14,6 +13,9 @@ import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
/**
* 重构代码
......@@ -109,7 +111,9 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
pkColumns.setLength(pkColumns.length()-1);
pkValues.setLength(pkValues.length()-1);
}
return Tuple6.of(table,type, pkColumns.toString(), pkValues.toString().replace("'",""),dataJsonStr.replace("\\","\\\\").replace("'", "\\'"),ts);
String step1 = StrUtil.replace(dataJsonStr, "\\", "\\\\");
String step2 = StrUtil.replace(step1, "'", "\\'");
return Tuple6.of(table,type, pkColumns.toString(), pkValues.toString().replace("'",""),step2,ts);
}
private static String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType) {
......@@ -138,6 +142,8 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
return String.format("DELETE FROM %s WHERE %s",table, whereClauseBuilder);
}
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final DateTimeFormatter DATETIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static String getValueString(JSONObject dataObj,String columnKey,String mysqlType){
if(null == dataObj.get(columnKey)){
return "null";
......@@ -145,12 +151,19 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
String upperCase = mysqlType.toUpperCase();
//需要处理成字符串加引号的类型
if(STR_SQL_TYPE.containsKey(upperCase)){
return String.format("'%s'", dataObj.getString(columnKey).replace("\\","\\\\").replace("'", "\\'") );
String step1 = StrUtil.replace(dataObj.getString(columnKey), "\\", "\\\\");
return StrUtil.replace(step1, "'", "\\'");
//return String.format("'%s'", dataObj.getString(columnKey).replace("\\","\\\\").replace("'", "\\'") );
}
//时间字段处理
if("DATE".equals(upperCase) || "DATETIME".equals(upperCase)){
String date = "DATETIME".equals(upperCase) ? DateUtil.format(dataObj.getDate(columnKey),"yyyy-MM-dd HH:mm:ss") : DateUtil.format(dataObj.getDate(columnKey),"yyyy-MM-dd");
Date d = dataObj.getDate(columnKey);
if (d == null) {
return "";
}
LocalDateTime dateTime = LocalDateTime.ofInstant(d.toInstant(), ZoneId.systemDefault());
String date = "DATETIME".equals(upperCase) ? DATETIME_FORMAT.format(dateTime) : DATE_FORMAT.format(dateTime);
return String.format("\"%s\"",date);
}
......
package com.dsk.flink.dsc.common.sink;
import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.db.DbUtil;
import cn.hutool.db.sql.SqlExecutor;
import com.alibaba.druid.pool.DruidDataSource;
import com.dsk.flink.dsc.common.dto.SqlErrorLog;
import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MysqlDataSlideSink extends RichSinkFunction<Tuple6<String,String,String,String,String,Long>> {
static Logger logger = LoggerFactory.getLogger(MysqlDataSlideSink.class);
EnvProperties envProps;
private transient ExecutorService executorService;
private transient DruidDataSource dataSource;
private static final int MAX_RETRIES = 3; // 最大重试次数
private static final int RETRY_DELAY_MS = 100; // 重试间隔时间
private static final String SQL = "INSERT INTO dsc_cdc_log (`table`,op_type,pk_columns,pk_values,data_json,cdc_ts) values (?,?,?,?,?,?)";
public MysqlDataSlideSink(EnvProperties envProps) {
this.envProps = envProps;
}
@Override
public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(5, 5, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
//初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
//System.out.println(configTidbUrl);
dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUsername(envProps.getDb_username());
dataSource.setPassword(envProps.getDb_password());
dataSource.setUrl(configTidbUrl);
dataSource.setMaxActive(30);
dataSource.setInitialSize(20);
dataSource.setTestWhileIdle(true);
dataSource.setMaxWait(20000);
dataSource.setValidationQuery("select 1");
}
@Override
public void close() throws Exception {
executorService.shutdown();
dataSource.close();
}
@Override
public void invoke(Tuple6<String,String,String,String,String,Long> value, Context context) throws Exception {
executorService.execute(() -> {
executeSqlWithRetry(value);
});
}
private void executeSqlWithRetry(Tuple6<String,String,String,String,String,Long> t) {
int retries = 0;
Exception lastException = null;
while (retries < MAX_RETRIES) {
Connection connection = null;
try {
connection = dataSource.getConnection();
SqlExecutor.execute(connection, SQL, t.f0,t.f1,t.f2,t.f3,t.f4,t.f5);
return;
} catch (Exception e) {
lastException = e;
retries++;
logger.error("SQL执行失败,重试次数: {}, SQL: {}", retries, t.toString(), e);
if (retries < MAX_RETRIES) {
try {
Thread.sleep((long) RETRY_DELAY_MS * retries);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
logger.error("重试等待被中断", ie);
}
}
} finally {
DbUtil.close(connection);
}
}
// 所有重试都失败后记录错误日志
logger.error("SQL执行失败,已达到最大重试次数: {}", MAX_RETRIES);
SqlErrorLog errorLog = new SqlErrorLog(new Date(), t.toString(), lastException.getMessage());
writeErrLogDb(errorLog);
}
private void writeErrLogDb(SqlErrorLog errorLog) {
Snowflake snowflake = IdUtil.getSnowflake(RandomUtil.randomInt(31), RandomUtil.randomInt(31));
String sql = "insert dsc_err_log (id,error_time, error_sql, error_msg) values (?, ?, ?, ?)";
Connection conn = null;
PreparedStatement pt = null;
try {
conn = dataSource.getConnection();
pt = conn.prepareStatement(sql);
pt.setLong(1, snowflake.nextId() + RandomUtil.randomInt(10,99));
pt.setObject(2, errorLog.getErrorTime());
pt.setString(3, errorLog.getSql());
pt.setString(4, errorLog.getError());
pt.execute();
}catch (Exception e){
logger.error("错误日志保存异常 -> {}", e.getMessage());
}finally {
DbUtil.close(pt, conn);
}
}
}
......@@ -36,7 +36,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
@Override
public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(5, 5, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
executorService = new ThreadPoolExecutor(8, 20, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
//初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
//System.out.println(configTidbUrl);
......
package com.dsk.flink.dsc.sync;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.common.function.MysqlDataTransferFunction;
import com.dsk.flink.dsc.common.sink.MysqlDataSlideSink;
import com.dsk.flink.dsc.common.sink.MysqlDataTransferSink;
import com.dsk.flink.dsc.common.sink.MysqlDataTransferSinkBatch;
import com.dsk.flink.dsc.utils.EnvProperties;
import com.dsk.flink.dsc.utils.EnvPropertiesUtil;
import com.dsk.flink.dsc.utils.EtlUtils;
......@@ -15,9 +16,6 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
......@@ -29,11 +27,14 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTime
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Map;
/**
* @author shezaixing
......@@ -60,6 +61,8 @@ public class SyncCustomerDataSource {
//获取用户自己的配置信息
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String offsetTimestamp = parameterTool.get("offsetTimestamp");
String position = parameterTool.get("position");
String propertiesPath = parameterTool.get("propertiesPath");
EnvProperties envProps = EnvPropertiesUtil.getPropertiesFromArgsPath(propertiesPath);
System.out.println("读取到的配置文件:-> " + envProps.toString());
......@@ -72,7 +75,17 @@ public class SyncCustomerDataSource {
if (StrUtil.isNotBlank(offsetTimestamp)) {
kafkaConsumer.setStartFromTimestamp(Long.parseLong(offsetTimestamp));
}
if (StrUtil.isNotBlank(position)) {
// 0-1234,1-222
Map<KafkaTopicPartition, Long> specificStartupOffsets = MapUtil.newHashMap();
Arrays.stream(position.split(",")).forEach(x -> {
String par = StrUtil.subBefore(x, "_", false);
String pos = StrUtil.subAfter(x, "_", false);
KafkaTopicPartition tp = new KafkaTopicPartition(envProps.getKafka_topic(),Integer.parseInt(par));
specificStartupOffsets.put(tp,Long.parseLong(pos));
});
kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
}
DataStreamSource<String> dscKafka = env
.addSource(kafkaConsumer)
.setParallelism(1);
......@@ -127,7 +140,7 @@ public class SyncCustomerDataSource {
DataStream<Tuple6<String,String,String,String,String,Long>> sideOutput = slide.getSideOutput(logSlideTag);
sideOutput.addSink(JdbcSink.sink(
/*sideOutput.addSink(JdbcSink.sink(
"INSERT INTO dsc_cdc_log (`table`,op_type,pk_columns,pk_values,data_json,cdc_ts) values (?,?,?,?,?,?)",
(ps,t) -> {
ps.setString(1,t.f0);
......@@ -149,12 +162,15 @@ public class SyncCustomerDataSource {
.withPassword(envProps.getDb_password())
.build()
)).uid("dsc-log")
.name("dsc-log");
.name("dsc-log");*/
sideOutput.addSink(new MysqlDataSlideSink(envProps)).uid("dsc-log")
.name("dsc-log");
env.execute("dsc-client");
}
private static String getSinkUrl(EnvProperties envProps) {
/*private static String getSinkUrl(EnvProperties envProps) {
return String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
}
}*/
}
......@@ -23,7 +23,7 @@ public class EtlUtils {
properties.setProperty("request.timeout.ms", "120000");
properties.setProperty("retries", "3");
properties.setProperty("retry.backoff.ms", "5000");
properties.setProperty("receive.buffer.bytes", "2097152"); //2m
properties.setProperty("receive.buffer.bytes", "65536"); //64k
properties.setProperty("max.poll.records", "10");
return properties;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment