Commit b573e61b authored by shezaixing's avatar shezaixing

过滤ddl,新增sql错误日志写入数据库

parent 4279b65d
package com.dsk.flink.dsc.common.dto;
import lombok.Data;
import java.util.Date;
@Data
public class SqlErrorLog {
private Long id;
private Date errorTime;
private String sql;
private String error;
public SqlErrorLog(Date errorTime, String sql, String error) {
this.errorTime = errorTime;
this.sql = sql;
this.error = error;
}
public SqlErrorLog() {
}
}
package com.dsk.flink.dsc.common.sink;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.db.DbUtil;
import com.alibaba.druid.pool.DruidDataSource;
import com.dsk.flink.dsc.common.dto.SqlErrorLog;
import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
......@@ -10,6 +15,7 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.Date;
import java.util.concurrent.*;
public class MysqlDataTransferSink extends RichSinkFunction<String> {
......@@ -70,6 +76,12 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
System.out.println("sql报错----->" + sql);
e.printStackTrace();
logger.error("------错误时间:{}-----,sql:{}--------异常:{}", DateUtil.now(),sql,e.getMessage());
SqlErrorLog errorLog = new SqlErrorLog(new Date(), sql, e.getMessage());
try {
writeErrLog(errorLog);
}catch (Exception re){
logger.error("错误日志保存异常 -> {}", re.getMessage());
}
} finally {
if (pt != null) {
pt.close();
......@@ -79,4 +91,28 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
}
}
}
private void writeErrLog(SqlErrorLog errorLog) {
writeErrLogDb(errorLog);
}
private void writeErrLogDb(SqlErrorLog errorLog) {
Snowflake snowflake = IdUtil.getSnowflake(RandomUtil.randomInt(31), RandomUtil.randomInt(31));
String sql = "insert dsc_err_log (id,error_time, error_sql, error_msg) values (?, ?, ?, ?)";
Connection conn = null;
PreparedStatement pt = null;
try {
conn = dataSource.getConnection();
pt = conn.prepareStatement(sql);
pt.setLong(1, snowflake.nextId() + RandomUtil.randomInt(10,99));
pt.setObject(2, errorLog.getErrorTime());
pt.setString(3, errorLog.getSql());
pt.setString(4, errorLog.getError());
pt.execute();
}catch (Exception e){
logger.error("错误日志保存异常 -> {}", e.getMessage());
}finally {
DbUtil.close(conn, pt);
}
}
}
......@@ -54,19 +54,22 @@ public class SyncCustomerDataSource {
envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName());
System.out.println("读取到的配置文件:-> " + envProps.toString());
System.out.println("读取到的数据连接配置:->" + String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database()));
System.out.println("获取到的kafka消费组:->" + EtlUtils.getKafkaGroup(envProps));
System.out.println("读取到的数据库用户名:->" + envProps.getDb_username());
System.out.println("读取到的数据库密码:->" + envProps.getDb_password());
System.out.println("读取到的数据连接配置:->" + String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database()));
System.out.println("获取到的kafka消费组:-> {}"+ EtlUtils.getKafkaGroup(envProps));
System.out.println("获取到的kafka消费组:-> {}"+ envProps.getKafka_topic());
System.out.println("获取到的kafka消费组:-> {}"+ envProps.getKafka_password());
logger.info("获取到的kafka消费组:-> {}", EtlUtils.getKafkaGroup(envProps));
logger.info("读取到的配置文件:-> {}", envProps.toString());
logger.info("读取到的数据连接配置:-> {}", String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database()));
logger.info("获取到的kafka消费组:-> {}", EtlUtils.getKafkaGroup(envProps));
logger.info("获取到的kafka消费组:-> {}", EtlUtils.getKafkaGroup(envProps));
logger.info("获取到的kafka主题:-> {}", envProps.getKafka_topic());
logger.info("获取到的kafka用户名:-> {}", envProps.getKafka_username());
logger.info("获取到的kafka密码:-> {}", envProps.getKafka_password());
// logger.info("获取到的kafka消费组:-> {}", EtlUtils.getKafkaGroup(envProps));
// logger.info("读取到的配置文件:-> {}", envProps.toString());
// logger.info("读取到的数据连接配置:-> {}", String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database()));
// logger.info("获取到的kafka消费组:-> {}", EtlUtils.getKafkaGroup(envProps));
//
// logger.info("获取到的kafka消费组:-> {}", EtlUtils.getKafkaGroup(envProps));
// logger.info("获取到的kafka主题:-> {}", envProps.getKafka_topic());
// logger.info("获取到的kafka用户名:-> {}", envProps.getKafka_username());
// logger.info("获取到的kafka密码:-> {}", envProps.getKafka_password());
//TODO 到时需要改这里,改成正式的消费组
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(),envProps.getKafka_password()));
//System.out.println(envProps.getKafka_topic());
......@@ -84,6 +87,12 @@ public class SyncCustomerDataSource {
//kafkaSource.print("kafaka stream");
SingleOutputStreamOperator<JSONObject> canalJsonStream = kafkaSource.map(JSONObject::parseObject)
.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
return !value.getBoolean("isDdl");
}
})
.name("canalJsonStream")
.uid("canalJsonStream");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment