Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dsk-dsc-flink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
shezaixing
dsk-dsc-flink
Commits
480d7ec5
Commit
480d7ec5
authored
Sep 29, 2024
by
shezaixing
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
同步写入达梦客户端
parent
266bbd10
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
16 additions
and
15 deletions
+16
-15
AsyncDamengDataTransferFunction.java
.../dsc/common/function/AsyncDamengDataTransferFunction.java
+16
-15
No files found.
src/main/java/com/dsk/flink/dsc/common/function/AsyncDamengDataTransferFunction.java
View file @
480d7ec5
...
@@ -95,16 +95,16 @@ public class AsyncDamengDataTransferFunction extends RichAsyncFunction<JSONObjec
...
@@ -95,16 +95,16 @@ public class AsyncDamengDataTransferFunction extends RichAsyncFunction<JSONObjec
}
}
if
(
"INSERT"
.
equals
(
type
)){
if
(
"INSERT"
.
equals
(
type
)){
excueteSql
=
tranferInsertSql
(
table
,
dataObj
,
mysqlType
);
excueteSql
=
tranferInsertSql
(
table
,
dataObj
,
mysqlType
,
dbInfoMap
.
getDb_database
()
);
}
}
if
(
"UPDATE"
.
equals
(
type
)){
if
(
"UPDATE"
.
equals
(
type
)){
JSONObject
oldDataObj
=
oldDataList
.
getJSONObject
(
0
);
JSONObject
oldDataObj
=
oldDataList
.
getJSONObject
(
0
);
// excueteSql = tranferUpdateSql(table,dataObj,oldDataObj,mysqlType,pkNameSet);
// excueteSql = tranferUpdateSql(table,dataObj,oldDataObj,mysqlType,pkNameSet);
excueteSql
=
tranferDMUpdateSql
(
table
,
dataObj
,
oldDataObj
,
mysqlType
,
pkNameSet
);
excueteSql
=
tranferDMUpdateSql
(
table
,
dataObj
,
oldDataObj
,
mysqlType
,
pkNameSet
,
dbInfoMap
.
getDb_database
()
);
}
}
if
(
"DELETE"
.
equals
(
type
)){
if
(
"DELETE"
.
equals
(
type
)){
excueteSql
=
logicalDelete
?
tranferInsertSql
(
table
,
dataObj
,
mysqlType
)
:
transferDeleteSql
(
table
,
dataObj
,
mysqlType
,
pkNameSet
);
excueteSql
=
logicalDelete
?
tranferInsertSql
(
table
,
dataObj
,
mysqlType
,
dbInfoMap
.
getDb_database
())
:
transferDeleteSql
(
table
,
dataObj
,
mysqlType
,
pkNameSet
,
dbInfoMap
.
getDb_database
()
);
}
}
resultList
.
add
(
Tuple3
.
of
(
excueteSql
,
groupKey
,
ts
));
resultList
.
add
(
Tuple3
.
of
(
excueteSql
,
groupKey
,
ts
));
Boolean
logEnable
=
MapUtil
.
getBool
(
dbInfoMap
,
"log_enable"
,
false
);
Boolean
logEnable
=
MapUtil
.
getBool
(
dbInfoMap
,
"log_enable"
,
false
);
...
@@ -124,11 +124,11 @@ public class AsyncDamengDataTransferFunction extends RichAsyncFunction<JSONObjec
...
@@ -124,11 +124,11 @@ public class AsyncDamengDataTransferFunction extends RichAsyncFunction<JSONObjec
}
}
//采用事务方式先删后增
//采用事务方式先删后增
private
String
tranferDMUpdateSql
(
String
table
,
JSONObject
dataObj
,
JSONObject
oldDataObj
,
JSONObject
mysqlType
,
Set
<
String
>
pkNameSet
)
{
private
String
tranferDMUpdateSql
(
String
table
,
JSONObject
dataObj
,
JSONObject
oldDataObj
,
JSONObject
mysqlType
,
Set
<
String
>
pkNameSet
,
String
database
)
{
String
oldDeleteSql
=
transferDeleteSql
(
table
,
oldDataObj
,
mysqlType
,
pkNameSet
);
String
oldDeleteSql
=
transferDeleteSql
(
table
,
oldDataObj
,
mysqlType
,
pkNameSet
,
database
);
String
deleteSql
=
transferDeleteSql
(
table
,
dataObj
,
mysqlType
,
pkNameSet
);
String
deleteSql
=
transferDeleteSql
(
table
,
dataObj
,
mysqlType
,
pkNameSet
,
database
);
String
insertSql
=
tranferInsertSql
(
table
,
dataObj
,
mysqlType
);
String
insertSql
=
tranferInsertSql
(
table
,
dataObj
,
mysqlType
,
database
);
String
sql
=
oldDeleteSql
+
deleteSql
+
insertSql
+
"commit;"
;
String
sql
=
oldDeleteSql
+
deleteSql
+
insertSql
+
"commit;"
;
return
sql
;
return
sql
;
}
}
...
@@ -151,12 +151,12 @@ public class AsyncDamengDataTransferFunction extends RichAsyncFunction<JSONObjec
...
@@ -151,12 +151,12 @@ public class AsyncDamengDataTransferFunction extends RichAsyncFunction<JSONObjec
private
static
final
String
[]
STR_SQL_TYPE
=
new
String
[]{
"VARCHAR"
,
"CHAR"
,
"TINYBLOB"
,
"BLOB"
,
"MEDIUMBLOB"
,
"LONGBLOB"
,
"TINYTEXT"
,
"TEXT"
,
"MEDIUMTEXT"
,
"LONGTEXT"
,
"TIME"
,
"TIMESTAMP"
,
"JSON"
,
"json"
};
private
static
final
String
[]
STR_SQL_TYPE
=
new
String
[]{
"VARCHAR"
,
"CHAR"
,
"TINYBLOB"
,
"BLOB"
,
"MEDIUMBLOB"
,
"LONGBLOB"
,
"TINYTEXT"
,
"TEXT"
,
"MEDIUMTEXT"
,
"LONGTEXT"
,
"TIME"
,
"TIMESTAMP"
,
"JSON"
,
"json"
};
private
static
final
String
[]
KEYWORD
=
new
String
[]{
"limit"
};
private
static
final
String
[]
KEYWORD
=
new
String
[]{
"limit"
};
private
static
String
tranferInsertSql
(
String
table
,
JSONObject
dataObj
,
JSONObject
mysqlType
)
{
private
static
String
tranferInsertSql
(
String
table
,
JSONObject
dataObj
,
JSONObject
mysqlType
,
String
database
)
{
Set
<
String
>
columnSet
=
mysqlType
.
keySet
();
Set
<
String
>
columnSet
=
mysqlType
.
keySet
();
StringBuilder
sb
=
new
StringBuilder
();
StringBuilder
sb
=
new
StringBuilder
();
for
(
String
s
:
columnSet
)
{
for
(
String
s
:
columnSet
)
{
sb
.
append
(
s
).
append
(
","
);
sb
.
append
(
"\""
+
s
+
"\""
).
append
(
","
);
//sb.append(s).append(",");
}
}
List
<
String
>
valueList
=
new
ArrayList
<>();
List
<
String
>
valueList
=
new
ArrayList
<>();
//List<String> updateList = new ArrayList<>();
//List<String> updateList = new ArrayList<>();
...
@@ -176,7 +176,7 @@ public class AsyncDamengDataTransferFunction extends RichAsyncFunction<JSONObjec
...
@@ -176,7 +176,7 @@ public class AsyncDamengDataTransferFunction extends RichAsyncFunction<JSONObjec
String
valueString
=
String
.
join
(
","
,
valueList
);
String
valueString
=
String
.
join
(
","
,
valueList
);
//String updateString = String.join(",",updateList);
//String updateString = String.join(",",updateList);
return
String
.
format
(
"INSERT INTO
%s (%s) values (%s) ;"
,
table
,
columnString
,
valueString
);
return
String
.
format
(
"INSERT INTO
\"%s\".\"%s\" (%s) values (%s) ;"
,
database
,
table
,
columnString
,
valueString
);
}
}
private
String
tranferUpdateSql
(
String
table
,
JSONObject
dataObj
,
JSONObject
oldDataObj
,
JSONObject
mysqlType
,
Set
<
String
>
pkNameSet
)
{
private
String
tranferUpdateSql
(
String
table
,
JSONObject
dataObj
,
JSONObject
oldDataObj
,
JSONObject
mysqlType
,
Set
<
String
>
pkNameSet
)
{
...
@@ -199,15 +199,16 @@ public class AsyncDamengDataTransferFunction extends RichAsyncFunction<JSONObjec
...
@@ -199,15 +199,16 @@ public class AsyncDamengDataTransferFunction extends RichAsyncFunction<JSONObjec
return
String
.
format
(
"UPDATE %s SET %s WHERE %s"
,
table
,
setString
,
whereString
);
return
String
.
format
(
"UPDATE %s SET %s WHERE %s"
,
table
,
setString
,
whereString
);
}
}
private
String
transferDeleteSql
(
String
table
,
JSONObject
dataObj
,
JSONObject
mysqlType
,
Set
<
String
>
pkNameSet
)
{
private
String
transferDeleteSql
(
String
table
,
JSONObject
dataObj
,
JSONObject
mysqlType
,
Set
<
String
>
pkNameSet
,
String
database
)
{
List
<
String
>
whereList
=
new
ArrayList
<>();
List
<
String
>
whereList
=
new
ArrayList
<>();
for
(
String
pk
:
pkNameSet
)
{
for
(
String
pk
:
pkNameSet
)
{
String
whereString
=
pk
.
concat
(
" = "
).
concat
(
getValueString
(
dataObj
,
pk
,
mysqlType
.
getString
(
pk
)));
String
pkFormatStr
=
"\""
+
pk
+
"\""
;
String
whereString
=
pkFormatStr
.
concat
(
" = "
).
concat
(
getValueString
(
dataObj
,
pk
,
mysqlType
.
getString
(
pk
)));
whereList
.
add
(
whereString
);
whereList
.
add
(
whereString
);
}
}
String
whereString
=
String
.
join
(
" and "
,
whereList
);
String
whereString
=
String
.
join
(
" and "
,
whereList
);
return
String
.
format
(
"DELETE FROM
%s WHERE %s;"
,
table
,
whereString
);
return
String
.
format
(
"DELETE FROM
\"%s\"\"%s\" WHERE %s;"
,
database
,
table
,
whereString
);
}
}
/**
/**
...
@@ -267,7 +268,7 @@ public class AsyncDamengDataTransferFunction extends RichAsyncFunction<JSONObjec
...
@@ -267,7 +268,7 @@ public class AsyncDamengDataTransferFunction extends RichAsyncFunction<JSONObjec
System
.
out
.
println
(
StrUtil
.
subBefore
(
s1
,
"string"
,
true
));
System
.
out
.
println
(
StrUtil
.
subBefore
(
s1
,
"string"
,
true
));
System
.
out
.
println
(
tranferInsertSql
(
table
,
jsonObject
,
mysqlType
));
System
.
out
.
println
(
tranferInsertSql
(
table
,
jsonObject
,
mysqlType
,
""
));
System
.
out
.
println
(
s2
.
replaceAll
(
"'"
,
"\\\\'"
));
System
.
out
.
println
(
s2
.
replaceAll
(
"'"
,
"\\\\'"
));
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment