Commit 7a5c1c09 authored by liaowenwu's avatar liaowenwu

修改kafka连接

parent b77326eb
......@@ -25,8 +25,10 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
static Logger logger = LoggerFactory.getLogger(MysqlDataTransferSink.class);
EnvProperties envProps;
private static transient ExecutorService executorService;
private static transient DruidDataSource dataSource;
private transient ExecutorService executorService;
private transient DruidDataSource dataSource;
private static final int MAX_RETRIES = 3; // 最大重试次数
private static final int RETRY_DELAY_MS = 100; // 重试间隔时间
public MysqlDataTransferSink(EnvProperties envProps) {
this.envProps = envProps;
......@@ -34,7 +36,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
@Override
public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(4, 4, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
executorService = new ThreadPoolExecutor(5, 5, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
//初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
//System.out.println(configTidbUrl);
......@@ -59,11 +61,11 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
@Override
public void invoke(String value, Context context) throws Exception {
executorService.execute(() -> {
executeSql(value);
executeSqlWithRetry(value);
});
}
private void executeSql(String sql){
/*private void executeSql(String sql){
Connection connection = null;
try {
connection = dataSource.getConnection();
......@@ -76,6 +78,37 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
} finally {
DbUtil.close(connection);
}
}*/
private void executeSqlWithRetry(String sql) {
int retries = 0;
Exception lastException = null;
while (retries < MAX_RETRIES) {
Connection connection = null;
try {
connection = dataSource.getConnection();
SqlExecutor.execute(connection, sql);
return;
} catch (Exception e) {
lastException = e;
retries++;
logger.error("SQL执行失败,重试次数: {}, SQL: {}", retries, sql, e);
if (retries < MAX_RETRIES) {
try {
Thread.sleep((long) RETRY_DELAY_MS * retries);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
logger.error("重试等待被中断", ie);
}
}
} finally {
DbUtil.close(connection);
}
}
// 所有重试都失败后记录错误日志
logger.error("SQL执行失败,已达到最大重试次数: {}", MAX_RETRIES);
SqlErrorLog errorLog = new SqlErrorLog(new Date(), sql, lastException.getMessage());
writeErrLogDb(errorLog);
}
/*private void writeErrLog(SqlErrorLog errorLog) {
......
package com.dsk.flink.dsc.common.sink;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
......@@ -28,14 +29,16 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
static Logger logger = LoggerFactory.getLogger(MysqlDataTransferSink.class);
EnvProperties envProps;
private static transient ExecutorService executorService;
private static transient DruidDataSource dataSource;
private static final int BATCH_SIZE = 100;
private static final int FLUSH_INTERVAL = 500;
private transient ExecutorService executorService;
private transient DruidDataSource dataSource;
private static final int BATCH_SIZE = 20;
private static final int FLUSH_INTERVAL = 200;
private static final int INIT_CAPACITY = (int)(BATCH_SIZE / 0.75 + 1);
private static final int MAX_RETRIES = 3;
private ArrayBlockingQueue<String> sqlBatch = new ArrayBlockingQueue<>(BATCH_SIZE * 2);
private static transient ScheduledExecutorService scheduledExecutorService;
private transient ScheduledExecutorService scheduledExecutorService;
private AtomicBoolean flushing = new AtomicBoolean(false);
private int subtaskIndex;
public MysqlDataTransferSinkBatch(EnvProperties envProps) {
this.envProps = envProps;
......@@ -43,7 +46,8 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
@Override
public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(4, 4, 20, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000));
subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
executorService = new ThreadPoolExecutor(2, 2, 20, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000));
// 初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
dataSource = new DruidDataSource();
......@@ -51,13 +55,15 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
dataSource.setUsername(envProps.getDb_username());
dataSource.setPassword(envProps.getDb_password());
dataSource.setUrl(configTidbUrl);
dataSource.setMaxActive(50);
dataSource.setInitialSize(30);
dataSource.setMaxActive(10);
dataSource.setInitialSize(5);
dataSource.setTestWhileIdle(true);
dataSource.setMaxWait(20000);
dataSource.setValidationQuery("select 1");
dataSource.setConnectionInitSqls(CollUtil.newArrayList("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED"));
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
scheduledExecutorService.scheduleAtFixedRate(this::flush, FLUSH_INTERVAL, FLUSH_INTERVAL, TimeUnit.MILLISECONDS);
logger.info("Subtask {} initialized with batch size {}", subtaskIndex, BATCH_SIZE);
}
@Override
......@@ -98,11 +104,6 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
pt.executeBatch();
connection.commit();
} catch (Exception e) {
try {
connection.rollback();
} catch (Exception ex) {
logger.error("事务回滚异常", ex);
}
logger.error("------错误时间:{}-------------异常:", new Date(), e);
//SqlErrorLog errorLog = new SqlErrorLog(new Date(), String.join(";", batch), e.getMessage());
SqlErrorLog errorLog = new SqlErrorLog(new Date(), batch.get(0), e.getMessage());
......@@ -124,7 +125,7 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
private void writeErrLogDb(SqlErrorLog errorLog) {
Snowflake snowflake = IdUtil.getSnowflake(RandomUtil.randomInt(31), RandomUtil.randomInt(31));
String sql = "insert dsc_err_log (id,error_time, error_sql, error_msg) values (?,?,?,?)";
String sql = "insert into dsc_err_log (id,error_time, error_sql, error_msg) values (?,?,?,?)";
try (Connection conn = dataSource.getConnection();
PreparedStatement pt = conn.prepareStatement(sql)) {
pt.setLong(1, snowflake.nextId());
......
......@@ -3,6 +3,7 @@ package com.dsk.flink.dsc.sync;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.common.function.MysqlDataTransferFunction;
import com.dsk.flink.dsc.common.sink.MysqlDataTransferSink;
import com.dsk.flink.dsc.common.sink.MysqlDataTransferSinkBatch;
import com.dsk.flink.dsc.utils.EnvProperties;
import com.dsk.flink.dsc.utils.EnvPropertiesUtil;
......@@ -100,7 +101,7 @@ public class SyncCustomerDataSource {
SingleOutputStreamOperator<String> groupWindowSqlResultStream = slide
.keyBy(value -> value.f1)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(500)))
.process(new ProcessWindowFunction<Tuple3<String, String, Long>, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<Tuple3<String, String, Long>, String, String,
......@@ -120,7 +121,7 @@ public class SyncCustomerDataSource {
.name("dsc-max")
.uid("dsc-max");
groupWindowSqlResultStream.addSink(new MysqlDataTransferSinkBatch(envProps))
groupWindowSqlResultStream.addSink(new MysqlDataTransferSink(envProps))
.name("dsc-sink")
.uid("dsc-sink");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment