Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dsk-dsc-flink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
shezaixing
dsk-dsc-flink
Commits
49e012e1
Commit
49e012e1
authored
Dec 04, 2024
by
liaowenwu
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
修改bug
parent
61728f63
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
4 additions
and
4 deletions
+4
-4
AsyncMysqlDataTransferFunctionNew.java
...sc/common/function/AsyncMysqlDataTransferFunctionNew.java
+4
-4
No files found.
src/main/java/com/dsk/flink/dsc/common/function/AsyncMysqlDataTransferFunctionNew.java
View file @
49e012e1
...
@@ -109,7 +109,7 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj
...
@@ -109,7 +109,7 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj
resultList
.
add
(
Tuple3
.
of
(
excueteSql
,
groupKey
,
ts
));
resultList
.
add
(
Tuple3
.
of
(
excueteSql
,
groupKey
,
ts
));
Boolean
logEnable
=
MapUtil
.
getBool
(
dbInfoMap
,
"log_enable"
,
false
);
Boolean
logEnable
=
MapUtil
.
getBool
(
dbInfoMap
,
"log_enable"
,
false
);
if
(
logEnable
){
if
(
logEnable
){
String
logSql
=
buildLogData
(
type
,
table
,
pkNameSet
,
dataObj
,
ts
);
String
logSql
=
buildLogData
(
type
,
table
,
pkNameSet
,
dataObj
,
ts
,
value
.
toJSONString
()
);
resultList
.
add
(
Tuple3
.
of
(
logSql
,
"dsc_cdc_log"
,
ts
));
resultList
.
add
(
Tuple3
.
of
(
logSql
,
"dsc_cdc_log"
,
ts
));
}
}
resultFuture
.
complete
(
resultList
);
resultFuture
.
complete
(
resultList
);
...
@@ -121,16 +121,16 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj
...
@@ -121,16 +121,16 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj
});
});
}
}
private
static
String
logSqlFormat
=
"INSERT INTO dsc_cdc_log (`table`,op_type,pk_columns,pk_values,
cdc_ts) values (
'%s','%s','%s','%s', %d)"
;
private
static
String
logSqlFormat
=
"INSERT INTO dsc_cdc_log (`table`,op_type,pk_columns,pk_values,
data_json,cdc_ts) values ('%s',
'%s','%s','%s','%s', %d)"
;
private
String
buildLogData
(
String
type
,
String
table
,
Set
<
String
>
pkNameSet
,
JSONObject
dataObj
,
long
ts
)
{
private
String
buildLogData
(
String
type
,
String
table
,
Set
<
String
>
pkNameSet
,
JSONObject
dataObj
,
long
ts
,
String
dataJsonStr
)
{
List
<
String
>
pkValueList
=
new
ArrayList
<>();
List
<
String
>
pkValueList
=
new
ArrayList
<>();
for
(
String
pk
:
pkNameSet
)
{
for
(
String
pk
:
pkNameSet
)
{
pkValueList
.
add
(
dataObj
.
getString
(
pk
));
pkValueList
.
add
(
dataObj
.
getString
(
pk
));
}
}
String
pkColumns
=
String
.
join
(
","
,
pkNameSet
);
String
pkColumns
=
String
.
join
(
","
,
pkNameSet
);
String
pkValues
=
String
.
join
(
"-"
,
pkValueList
);
String
pkValues
=
String
.
join
(
"-"
,
pkValueList
);
return
String
.
format
(
logSqlFormat
,
table
,
type
,
pkColumns
,
pkValues
,
ts
);
return
String
.
format
(
logSqlFormat
,
table
,
type
,
pkColumns
,
pkValues
,
dataJsonStr
,
ts
);
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment