Commit a0b42df9 authored by shezaixing's avatar shezaixing

upate

parent 2592603c
...@@ -167,7 +167,7 @@ public class AsyncMysqlDataTransferFunction extends RichAsyncFunction<JSONObject ...@@ -167,7 +167,7 @@ public class AsyncMysqlDataTransferFunction extends RichAsyncFunction<JSONObject
//需要处理成字符串加引号的类型 //需要处理成字符串加引号的类型
if(Arrays.asList(STR_SQL_TYPE).contains(mysqlType.toUpperCase())){ if(Arrays.asList(STR_SQL_TYPE).contains(mysqlType.toUpperCase())){
return String.format("'%s'",dataObj.getString(columnKey)); return String.format("'%s'", dataObj.getString(columnKey).replaceAll("'","\\\\'") );
} }
//时间字段处理 //时间字段处理
...@@ -206,9 +206,13 @@ public class AsyncMysqlDataTransferFunction extends RichAsyncFunction<JSONObject ...@@ -206,9 +206,13 @@ public class AsyncMysqlDataTransferFunction extends RichAsyncFunction<JSONObject
String s1 = "hello string sss"; String s1 = "hello string sss";
String s2 = "'kaskljsl'";
System.out.println(StrUtil.subBefore(s1,"string",true)); System.out.println(StrUtil.subBefore(s1,"string",true));
System.out.println(tranferInsertSql(table,jsonObject,mysqlType)); System.out.println(tranferInsertSql(table,jsonObject,mysqlType));
System.out.println(s2.replaceAll("'","\\\\'"));
} }
} }
...@@ -29,6 +29,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> { ...@@ -29,6 +29,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
executorService = new ThreadPoolExecutor(4, 4, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); executorService = new ThreadPoolExecutor(4, 4, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
//初始化获取配置 //初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database()); String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
//System.out.println(configTidbUrl);
dataSource = new DruidDataSource(); dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver"); dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUsername(envProps.getDb_username()); dataSource.setUsername(envProps.getDb_username());
...@@ -66,6 +67,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> { ...@@ -66,6 +67,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
pt = connection.prepareStatement(sql); pt = connection.prepareStatement(sql);
pt.execute(); pt.execute();
} catch (Exception e) { } catch (Exception e) {
System.out.println("sql报错----->" + sql);
e.printStackTrace(); e.printStackTrace();
logger.error("------错误时间:{}-----,sql:{}--------异常:{}", DateUtil.now(),sql,e.getMessage()); logger.error("------错误时间:{}-----,sql:{}--------异常:{}", DateUtil.now(),sql,e.getMessage());
} finally { } finally {
......
...@@ -51,11 +51,13 @@ public class SyncCustomerDataSource { ...@@ -51,11 +51,13 @@ public class SyncCustomerDataSource {
envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName()); envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName());
//System.out.println("读取到的配置文件:-> " + envProps.toString()); //System.out.println("读取到的配置文件:-> " + envProps.toString());
//TODO 到时需要改这里,改成正式的消费组 //TODO 到时需要改这里,改成正式的消费组
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), envProps.getKafka_topic().concat("-test-group"),envProps.getKafka_username(),envProps.getKafka_password())); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), envProps.getKafka_topic().concat("-group"),envProps.getKafka_username(),envProps.getKafka_password()));
//System.out.println(envProps.getKafka_topic());
//偏移量 //偏移量
if (StrUtil.isNotBlank(offsetTimestamp)) { if (StrUtil.isNotBlank(offsetTimestamp)) {
kafkaConsumer.setStartFromTimestamp(Long.parseLong(offsetTimestamp)); kafkaConsumer.setStartFromTimestamp(Long.parseLong(offsetTimestamp));
} }
kafkaConsumer.setStartFromEarliest();
SingleOutputStreamOperator<String> kafkaSource = env.addSource(kafkaConsumer) SingleOutputStreamOperator<String> kafkaSource = env.addSource(kafkaConsumer)
.setParallelism(1) .setParallelism(1)
......
...@@ -118,7 +118,7 @@ public class EnvProperties extends Properties { ...@@ -118,7 +118,7 @@ public class EnvProperties extends Properties {
this.db_url = db_url; this.db_url = db_url;
} }
public String getDb_host() { public String getDb_host() {
return db_host == null ? this.getProperty("tidb_host") : db_host; return db_host == null ? this.getProperty("db_host") : db_host;
} }
public void setDb_host(String db_host) { public void setDb_host(String db_host) {
this.db_host = db_host; this.db_host = db_host;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment