Commit 965215e7 authored by liaowenwu's avatar liaowenwu

修改bug

parent 5abd8d28
...@@ -98,9 +98,9 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj ...@@ -98,9 +98,9 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj
excueteSql = tranferInsertSql(table,dataObj,mysqlType); excueteSql = tranferInsertSql(table,dataObj,mysqlType);
} }
if("UPDATE".equals(type)){ if("UPDATE".equals(type)){
JSONObject oldDataObj = oldDataList.getJSONObject(0); //JSONObject oldDataObj = oldDataList.getJSONObject(0);
excueteSql = tranferUpdateSql(table,dataObj,oldDataObj,mysqlType,pkNameSet); //excueteSql = tranferUpdateSql(table,dataObj,oldDataObj,mysqlType,pkNameSet);
// excueteSql = tranferInsertSql(table,dataObj,mysqlType); excueteSql = tranferInsertSql(table,dataObj,mysqlType);
} }
if("DELETE".equals(type)){ if("DELETE".equals(type)){
...@@ -158,7 +158,7 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj ...@@ -158,7 +158,7 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj
String valueString = String.join(",",valueList); String valueString = String.join(",",valueList);
//return String.format("INSERT INTO %s (%s) values (%s) ON DUPLICATE KEY UPDATE %s;",table,columnString,valueString,updateString); //return String.format("INSERT INTO %s (%s) values (%s) ON DUPLICATE KEY UPDATE %s;",table,columnString,valueString,updateString);
return String.format("INSERT INTO %s (%s) values (%s);",table,columnString,valueString); return String.format("REPLACE INTO %s (%s) values (%s);",table,columnString,valueString);
} }
private String tranferUpdateSql(String table, JSONObject dataObj, JSONObject oldDataObj, JSONObject mysqlType,Set<String> pkNameSet) { private String tranferUpdateSql(String table, JSONObject dataObj, JSONObject oldDataObj, JSONObject mysqlType,Set<String> pkNameSet) {
......
...@@ -69,7 +69,7 @@ public class SyncCustomerDataSource { ...@@ -69,7 +69,7 @@ public class SyncCustomerDataSource {
System.out.println("获取到的kafka消费组:->" + EtlUtils.getKafkaGroup(envProps)); System.out.println("获取到的kafka消费组:->" + EtlUtils.getKafkaGroup(envProps));
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(),envProps.getKafka_password())); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(),envProps.getKafka_password()));
//System.out.println(envProps.getKafka_topic()); //System.out.println(envProps.getKafka_topic());
long defaultOffset = LocalDateTime.now().minusMinutes(30).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); long defaultOffset = LocalDateTime.now().minusMinutes(10).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
kafkaConsumer.setStartFromTimestamp(defaultOffset); kafkaConsumer.setStartFromTimestamp(defaultOffset);
//kafkaConsumer.setStartFromLatest(); //kafkaConsumer.setStartFromLatest();
//偏移量 //偏移量
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment