Commit 006d98f6 authored by liaowenwu's avatar liaowenwu

添加日志

parent 28890f5a
......@@ -109,7 +109,7 @@ public class MysqlDataTransferFunction extends ProcessFunction<JSONObject, Tuple
pkColumns.setLength(pkColumns.length()-1);
pkValues.setLength(pkValues.length()-1);
}
return Tuple6.of(table,type, pkColumns.toString(), pkValues.toString(),dataJsonStr.replace("\\","\\\\").replace("'", "\\'"),ts);
return Tuple6.of(table,type, pkColumns.toString(), pkValues.toString().replace("'",""),dataJsonStr.replace("\\","\\\\").replace("'", "\\'"),ts);
}
private static String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType) {
......
......@@ -46,7 +46,7 @@ public class SyncCustomerDataSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
env.setParallelism(1);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 60000));
env.enableCheckpointing(120000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
......@@ -91,7 +91,7 @@ public class SyncCustomerDataSource {
.name("dsc-source")
.uid("dsc-source");
//tsGroupStream.print("source==>");
tsGroupStream.print("source==>");
OutputTag<Tuple6<String,String,String,String,String,Long>> logSlideTag = new OutputTag<Tuple6<String,String,String,String,String,Long>>("log_slide") {};
SingleOutputStreamOperator<Tuple3<String, String, Long>> slide = tsGroupStream
......@@ -99,7 +99,7 @@ public class SyncCustomerDataSource {
.name("dsc-sql")
.uid("dsc-sql");
//slide.print("dsc-sql ==>");
slide.print("dsc-sql ==>");
SingleOutputStreamOperator<String> groupWindowSqlResultStream = slide
.keyBy(value -> value.f1)
......@@ -123,13 +123,13 @@ public class SyncCustomerDataSource {
.name("dsc-max")
.uid("dsc-max");
//groupWindowSqlResultStream.print("dsc-max==>");
groupWindowSqlResultStream.print("dsc-max==>");
groupWindowSqlResultStream.addSink(new MysqlDataTransferSinkBatch(envProps))
.name("dsc-sink")
.uid("dsc-sink");
DataStream<Tuple6<String,String,String,String,String,Long>> sideOutput = slide.getSideOutput(logSlideTag);
sideOutput.print("log==>");
sideOutput.addSink(JdbcSink.sink(
"INSERT INTO dsc_cdc_log (`table`,op_type,pk_columns,pk_values,data_json,cdc_ts) values (?,?,?,?,?,?)",
(ps,t) -> {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment