Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
03f7e409
Commit
03f7e409
authored
Jun 20, 2022
by
wanshicheng
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix #608 Can't create a flink table when table name is a sql reserved word
parent
ed54c10d
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
156 additions
and
498 deletions
+156
-498
SQLSinkBuilder.java
...-1.11/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
+14
-102
SQLSinkBuilder.java
...-1.12/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
+18
-106
SQLSinkBuilder.java
...-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
+5
-85
SQLSinkBuilder.java
...-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
+14
-102
SQLSinkBuilder.java
...-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
+15
-103
FlinkBaseUtil.java
...ent-base/src/main/java/com/dlink/utils/FlinkBaseUtil.java
+90
-0
No files found.
dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
View file @
03f7e409
package
com
.
dlink
.
cdc
.
sql
;
package
com
.
dlink
.
cdc
.
sql
;
import
com.dlink.model.Column
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.model.ColumnType
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
import
com.dlink.utils.FlinkBaseUtil
;
import
com.dlink.utils.LogUtil
;
import
com.dlink.utils.LogUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
...
@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...
@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.*
;
import
org.apache.flink.table.types.logical.DateType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.TimestampType
;
import
org.apache.flink.table.types.utils.TypeConversions
;
import
org.apache.flink.table.types.utils.TypeConversions
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.types.RowKind
;
...
@@ -33,16 +36,6 @@ import java.util.ArrayList;
...
@@ -33,16 +36,6 @@ import java.util.ArrayList;
import
java.util.List
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
import
com.dlink.utils.SqlUtil
;
/**
/**
* SQLSinkBuilder
* SQLSinkBuilder
*
*
...
@@ -123,15 +116,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -123,15 +116,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
Table
table
,
Table
table
,
List
<
String
>
columnNameList
)
{
List
<
String
>
columnNameList
)
{
String
sinkSchemaName
=
getSinkSchemaName
(
table
);
String
sinkTableName
=
getSinkTableName
(
table
);
String
sinkTableName
=
getSinkTableName
(
table
);
String
pkList
=
StringUtils
.
join
(
getPKList
(
table
),
"."
);
String
viewName
=
"VIEW_"
+
table
.
getSchemaTableNameWithUnderline
();
String
viewName
=
"VIEW_"
+
table
.
getSchemaTableNameWithUnderline
();
customTableEnvironment
.
createTemporaryView
(
viewName
,
rowDataDataStream
,
StringUtils
.
join
(
columnNameList
,
","
));
customTableEnvironment
.
createTemporaryView
(
viewName
,
rowDataDataStream
,
StringUtils
.
join
(
columnNameList
,
","
));
logger
.
info
(
"Create "
+
viewName
+
" temporaryView successful..."
);
logger
.
info
(
"Create "
+
viewName
+
" temporaryView successful..."
);
String
flinkDDL
=
getFlinkDDL
(
table
,
sinkTableName
);
String
flinkDDL
=
FlinkBaseUtil
.
getFlinkDDL
(
table
,
sinkTableName
,
config
,
sinkSchemaName
,
sinkTableName
,
pkList
);
logger
.
info
(
flinkDDL
);
logger
.
info
(
flinkDDL
);
customTableEnvironment
.
executeSql
(
flinkDDL
);
customTableEnvironment
.
executeSql
(
flinkDDL
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL DDL successful..."
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL DDL successful..."
);
String
cdcSqlInsert
=
getCDCSqlInsert
(
table
,
sinkTableName
,
viewName
);
String
cdcSqlInsert
=
FlinkBaseUtil
.
getCDCSqlInsert
(
table
,
sinkTableName
,
viewName
,
config
);
logger
.
info
(
cdcSqlInsert
);
logger
.
info
(
cdcSqlInsert
);
List
<
Operation
>
operations
=
customTableEnvironment
.
getParser
().
parse
(
cdcSqlInsert
);
List
<
Operation
>
operations
=
customTableEnvironment
.
getParser
().
parse
(
cdcSqlInsert
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL insert into successful..."
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL insert into successful..."
);
...
@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return
dataStreamSource
;
return
dataStreamSource
;
}
}
private
String
getFlinkDDL
(
Table
table
,
String
tableName
)
{
StringBuilder
sb
=
new
StringBuilder
();
sb
.
append
(
"CREATE TABLE IF NOT EXISTS "
);
sb
.
append
(
tableName
);
sb
.
append
(
" (\n"
);
List
<
String
>
pks
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
table
.
getColumns
().
size
();
i
++)
{
String
type
=
table
.
getColumns
().
get
(
i
).
getJavaType
().
getFlinkType
();
sb
.
append
(
" "
);
if
(
i
>
0
)
{
sb
.
append
(
","
);
}
sb
.
append
(
"`"
);
sb
.
append
(
table
.
getColumns
().
get
(
i
).
getName
());
sb
.
append
(
"` "
);
sb
.
append
(
convertSinkColumnType
(
type
));
sb
.
append
(
"\n"
);
if
(
table
.
getColumns
().
get
(
i
).
isKeyFlag
())
{
pks
.
add
(
table
.
getColumns
().
get
(
i
).
getName
());
}
}
StringBuilder
pksb
=
new
StringBuilder
(
"PRIMARY KEY ( "
);
for
(
int
i
=
0
;
i
<
pks
.
size
();
i
++)
{
if
(
i
>
0
)
{
pksb
.
append
(
","
);
}
pksb
.
append
(
"`"
);
pksb
.
append
(
pks
.
get
(
i
));
pksb
.
append
(
"`"
);
}
pksb
.
append
(
" ) NOT ENFORCED\n"
);
if
(
pks
.
size
()
>
0
)
{
sb
.
append
(
" ,"
);
sb
.
append
(
pksb
);
}
sb
.
append
(
") WITH (\n"
);
sb
.
append
(
getSinkConfigurationString
(
table
));
sb
.
append
(
")\n"
);
return
sb
.
toString
();
}
private
String
getCDCSqlInsert
(
Table
table
,
String
targetName
,
String
sourceName
)
{
StringBuilder
sb
=
new
StringBuilder
(
"INSERT INTO "
);
sb
.
append
(
targetName
);
sb
.
append
(
" SELECT\n"
);
for
(
int
i
=
0
;
i
<
table
.
getColumns
().
size
();
i
++)
{
sb
.
append
(
" "
);
if
(
i
>
0
)
{
sb
.
append
(
","
);
}
sb
.
append
(
getColumnProcessing
(
table
.
getColumns
().
get
(
i
))
+
" \n"
);
}
sb
.
append
(
" FROM "
);
sb
.
append
(
sourceName
);
return
sb
.
toString
();
}
private
String
getColumnProcessing
(
Column
column
)
{
if
(
"true"
.
equals
(
config
.
getSink
().
get
(
"column.replace.line-break"
))
&&
ColumnType
.
STRING
.
equals
(
column
.
getJavaType
()))
{
return
"REGEXP_REPLACE(`"
+
column
.
getName
()
+
"`, '\\n', '') AS `"
+
column
.
getName
()
+
"`"
;
}
else
{
return
"`"
+
column
.
getName
()
+
"`"
;
}
}
private
String
convertSinkColumnType
(
String
type
)
{
if
(
config
.
getSink
().
get
(
"connector"
).
equals
(
"hudi"
))
{
if
(
type
.
equals
(
"TIMESTAMP"
))
{
return
"TIMESTAMP(3)"
;
}
}
return
type
;
}
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
if
(
value
==
null
)
{
if
(
value
==
null
)
{
return
null
;
return
null
;
...
@@ -293,13 +214,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -293,13 +214,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return
value
;
return
value
;
}
}
}
}
private
String
getSinkConfigurationString
(
Table
table
)
{
String
configurationString
=
SqlUtil
.
replaceAllParam
(
config
.
getSinkConfigurationString
(),
"schemaName"
,
getSinkSchemaName
(
table
));
configurationString
=
SqlUtil
.
replaceAllParam
(
configurationString
,
"tableName"
,
getSinkTableName
(
table
));
if
(
configurationString
.
contains
(
"${pkList}"
))
{
configurationString
=
SqlUtil
.
replaceAllParam
(
configurationString
,
"pkList"
,
StringUtils
.
join
(
getPKList
(
table
),
"."
));
}
return
configurationString
;
}
}
}
dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
View file @
03f7e409
package
com
.
dlink
.
cdc
.
sql
;
package
com
.
dlink
.
cdc
.
sql
;
import
com.dlink.model.Column
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.model.ColumnType
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
import
com.dlink.utils.FlinkBaseUtil
;
import
com.dlink.utils.LogUtil
;
import
com.dlink.utils.LogUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
...
@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...
@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.*
;
import
org.apache.flink.table.types.logical.DateType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.TimestampType
;
import
org.apache.flink.table.types.utils.TypeConversions
;
import
org.apache.flink.table.types.utils.TypeConversions
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.types.RowKind
;
...
@@ -33,16 +36,6 @@ import java.util.ArrayList;
...
@@ -33,16 +36,6 @@ import java.util.ArrayList;
import
java.util.List
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
import
com.dlink.utils.SqlUtil
;
/**
/**
* SQLSinkBuilder
* SQLSinkBuilder
*
*
...
@@ -118,20 +111,22 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -118,20 +111,22 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
}
}
private
void
addTableSink
(
private
void
addTableSink
(
CustomTableEnvironment
customTableEnvironment
,
CustomTableEnvironment
customTableEnvironment
,
DataStream
<
Row
>
rowDataDataStream
,
DataStream
<
Row
>
rowDataDataStream
,
Table
table
,
Table
table
,
List
<
String
>
columnNameList
)
{
List
<
String
>
columnNameList
)
{
String
sinkSchemaName
=
getSinkSchemaName
(
table
);
String
sinkTableName
=
getSinkTableName
(
table
);
String
sinkTableName
=
getSinkTableName
(
table
);
String
pkList
=
StringUtils
.
join
(
getPKList
(
table
),
"."
);
String
viewName
=
"VIEW_"
+
table
.
getSchemaTableNameWithUnderline
();
String
viewName
=
"VIEW_"
+
table
.
getSchemaTableNameWithUnderline
();
customTableEnvironment
.
createTemporaryView
(
viewName
,
rowDataDataStream
,
StringUtils
.
join
(
columnNameList
,
","
));
customTableEnvironment
.
createTemporaryView
(
viewName
,
rowDataDataStream
,
StringUtils
.
join
(
columnNameList
,
","
));
logger
.
info
(
"Create "
+
viewName
+
" temporaryView successful..."
);
logger
.
info
(
"Create "
+
viewName
+
" temporaryView successful..."
);
String
flinkDDL
=
getFlinkDDL
(
table
,
sinkTableName
);
String
flinkDDL
=
FlinkBaseUtil
.
getFlinkDDL
(
table
,
sinkTableName
,
config
,
sinkSchemaName
,
sinkTableName
,
pkList
);
logger
.
info
(
flinkDDL
);
logger
.
info
(
flinkDDL
);
customTableEnvironment
.
executeSql
(
flinkDDL
);
customTableEnvironment
.
executeSql
(
flinkDDL
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL DDL successful..."
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL DDL successful..."
);
String
cdcSqlInsert
=
getCDCSqlInsert
(
table
,
sinkTableName
,
viewName
);
String
cdcSqlInsert
=
FlinkBaseUtil
.
getCDCSqlInsert
(
table
,
sinkTableName
,
viewName
,
config
);
logger
.
info
(
cdcSqlInsert
);
logger
.
info
(
cdcSqlInsert
);
List
<
Operation
>
operations
=
customTableEnvironment
.
getParser
().
parse
(
cdcSqlInsert
);
List
<
Operation
>
operations
=
customTableEnvironment
.
getParser
().
parse
(
cdcSqlInsert
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL insert into successful..."
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL insert into successful..."
);
...
@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return
dataStreamSource
;
return
dataStreamSource
;
}
}
private
String
getFlinkDDL
(
Table
table
,
String
tableName
)
{
StringBuilder
sb
=
new
StringBuilder
();
sb
.
append
(
"CREATE TABLE IF NOT EXISTS "
);
sb
.
append
(
tableName
);
sb
.
append
(
" (\n"
);
List
<
String
>
pks
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
table
.
getColumns
().
size
();
i
++)
{
String
type
=
table
.
getColumns
().
get
(
i
).
getJavaType
().
getFlinkType
();
sb
.
append
(
" "
);
if
(
i
>
0
)
{
sb
.
append
(
","
);
}
sb
.
append
(
"`"
);
sb
.
append
(
table
.
getColumns
().
get
(
i
).
getName
());
sb
.
append
(
"` "
);
sb
.
append
(
convertSinkColumnType
(
type
));
sb
.
append
(
"\n"
);
if
(
table
.
getColumns
().
get
(
i
).
isKeyFlag
())
{
pks
.
add
(
table
.
getColumns
().
get
(
i
).
getName
());
}
}
StringBuilder
pksb
=
new
StringBuilder
(
"PRIMARY KEY ( "
);
for
(
int
i
=
0
;
i
<
pks
.
size
();
i
++)
{
if
(
i
>
0
)
{
pksb
.
append
(
","
);
}
pksb
.
append
(
"`"
);
pksb
.
append
(
pks
.
get
(
i
));
pksb
.
append
(
"`"
);
}
pksb
.
append
(
" ) NOT ENFORCED\n"
);
if
(
pks
.
size
()
>
0
)
{
sb
.
append
(
" ,"
);
sb
.
append
(
pksb
);
}
sb
.
append
(
") WITH (\n"
);
sb
.
append
(
getSinkConfigurationString
(
table
));
sb
.
append
(
")\n"
);
return
sb
.
toString
();
}
private
String
getCDCSqlInsert
(
Table
table
,
String
targetName
,
String
sourceName
)
{
StringBuilder
sb
=
new
StringBuilder
(
"INSERT INTO "
);
sb
.
append
(
targetName
);
sb
.
append
(
" SELECT\n"
);
for
(
int
i
=
0
;
i
<
table
.
getColumns
().
size
();
i
++)
{
sb
.
append
(
" "
);
if
(
i
>
0
)
{
sb
.
append
(
","
);
}
sb
.
append
(
getColumnProcessing
(
table
.
getColumns
().
get
(
i
))
+
" \n"
);
}
sb
.
append
(
" FROM "
);
sb
.
append
(
sourceName
);
return
sb
.
toString
();
}
private
String
getColumnProcessing
(
Column
column
)
{
if
(
"true"
.
equals
(
config
.
getSink
().
get
(
"column.replace.line-break"
))
&&
ColumnType
.
STRING
.
equals
(
column
.
getJavaType
()))
{
return
"REGEXP_REPLACE(`"
+
column
.
getName
()
+
"`, '\\n', '') AS `"
+
column
.
getName
()
+
"`"
;
}
else
{
return
"`"
+
column
.
getName
()
+
"`"
;
}
}
private
String
convertSinkColumnType
(
String
type
)
{
if
(
config
.
getSink
().
get
(
"connector"
).
equals
(
"hudi"
))
{
if
(
type
.
equals
(
"TIMESTAMP"
))
{
return
"TIMESTAMP(3)"
;
}
}
return
type
;
}
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
if
(
value
==
null
)
{
if
(
value
==
null
)
{
return
null
;
return
null
;
...
@@ -293,13 +214,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -293,13 +214,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return
value
;
return
value
;
}
}
}
}
private
String
getSinkConfigurationString
(
Table
table
)
{
String
configurationString
=
SqlUtil
.
replaceAllParam
(
config
.
getSinkConfigurationString
(),
"schemaName"
,
getSinkSchemaName
(
table
));
configurationString
=
SqlUtil
.
replaceAllParam
(
configurationString
,
"tableName"
,
getSinkTableName
(
table
));
if
(
configurationString
.
contains
(
"${pkList}"
))
{
configurationString
=
SqlUtil
.
replaceAllParam
(
configurationString
,
"pkList"
,
StringUtils
.
join
(
getPKList
(
table
),
"."
));
}
return
configurationString
;
}
}
}
dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
View file @
03f7e409
...
@@ -2,6 +2,7 @@ package com.dlink.cdc.sql;
...
@@ -2,6 +2,7 @@ package com.dlink.cdc.sql;
import
com.dlink.model.*
;
import
com.dlink.model.*
;
import
com.dlink.utils.FlinkBaseUtil
;
import
com.dlink.utils.LogUtil
;
import
com.dlink.utils.LogUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
...
@@ -116,15 +117,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -116,15 +117,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
Table
table
,
Table
table
,
List
<
String
>
columnNameList
)
{
List
<
String
>
columnNameList
)
{
String
sinkSchemaName
=
getSinkSchemaName
(
table
);
String
sinkTableName
=
getSinkTableName
(
table
);
String
sinkTableName
=
getSinkTableName
(
table
);
String
pkList
=
StringUtils
.
join
(
getPKList
(
table
),
"."
);
String
viewName
=
"VIEW_"
+
table
.
getSchemaTableNameWithUnderline
();
String
viewName
=
"VIEW_"
+
table
.
getSchemaTableNameWithUnderline
();
customTableEnvironment
.
createTemporaryView
(
viewName
,
rowDataDataStream
,
StringUtils
.
join
(
columnNameList
,
","
));
customTableEnvironment
.
createTemporaryView
(
viewName
,
rowDataDataStream
,
StringUtils
.
join
(
columnNameList
,
","
));
logger
.
info
(
"Create "
+
viewName
+
" temporaryView successful..."
);
logger
.
info
(
"Create "
+
viewName
+
" temporaryView successful..."
);
String
flinkDDL
=
getFlinkDDL
(
table
,
sinkTableName
);
String
flinkDDL
=
FlinkBaseUtil
.
getFlinkDDL
(
table
,
sinkTableName
,
config
,
sinkSchemaName
,
sinkTableName
,
pkList
);
logger
.
info
(
flinkDDL
);
logger
.
info
(
flinkDDL
);
customTableEnvironment
.
executeSql
(
flinkDDL
);
customTableEnvironment
.
executeSql
(
flinkDDL
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL DDL successful..."
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL DDL successful..."
);
String
cdcSqlInsert
=
getCDCSqlInsert
(
table
,
sinkTableName
,
viewName
);
String
cdcSqlInsert
=
FlinkBaseUtil
.
getCDCSqlInsert
(
table
,
sinkTableName
,
viewName
,
config
);
logger
.
info
(
cdcSqlInsert
);
logger
.
info
(
cdcSqlInsert
);
List
<
Operation
>
operations
=
customTableEnvironment
.
getParser
().
parse
(
cdcSqlInsert
);
List
<
Operation
>
operations
=
customTableEnvironment
.
getParser
().
parse
(
cdcSqlInsert
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL insert into successful..."
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL insert into successful..."
);
...
@@ -184,80 +187,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -184,80 +187,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return
dataStreamSource
;
return
dataStreamSource
;
}
}
private
String
getFlinkDDL
(
Table
table
,
String
tableName
)
{
StringBuilder
sb
=
new
StringBuilder
();
sb
.
append
(
"CREATE TABLE IF NOT EXISTS "
);
sb
.
append
(
tableName
);
sb
.
append
(
" (\n"
);
List
<
String
>
pks
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
table
.
getColumns
().
size
();
i
++)
{
String
type
=
table
.
getColumns
().
get
(
i
).
getJavaType
().
getFlinkType
();
sb
.
append
(
" "
);
if
(
i
>
0
)
{
sb
.
append
(
","
);
}
sb
.
append
(
"`"
);
sb
.
append
(
table
.
getColumns
().
get
(
i
).
getName
());
sb
.
append
(
"` "
);
sb
.
append
(
convertSinkColumnType
(
type
));
sb
.
append
(
"\n"
);
if
(
table
.
getColumns
().
get
(
i
).
isKeyFlag
())
{
pks
.
add
(
table
.
getColumns
().
get
(
i
).
getName
());
}
}
StringBuilder
pksb
=
new
StringBuilder
(
"PRIMARY KEY ( "
);
for
(
int
i
=
0
;
i
<
pks
.
size
();
i
++)
{
if
(
i
>
0
)
{
pksb
.
append
(
","
);
}
pksb
.
append
(
"`"
);
pksb
.
append
(
pks
.
get
(
i
));
pksb
.
append
(
"`"
);
}
pksb
.
append
(
" ) NOT ENFORCED\n"
);
if
(
pks
.
size
()
>
0
)
{
sb
.
append
(
" ,"
);
sb
.
append
(
pksb
);
}
sb
.
append
(
") WITH (\n"
);
sb
.
append
(
getSinkConfigurationString
(
table
));
sb
.
append
(
")\n"
);
return
sb
.
toString
();
}
private
String
getCDCSqlInsert
(
Table
table
,
String
targetName
,
String
sourceName
)
{
StringBuilder
sb
=
new
StringBuilder
(
"INSERT INTO "
);
sb
.
append
(
targetName
);
sb
.
append
(
" SELECT\n"
);
for
(
int
i
=
0
;
i
<
table
.
getColumns
().
size
();
i
++)
{
sb
.
append
(
" "
);
if
(
i
>
0
)
{
sb
.
append
(
","
);
}
sb
.
append
(
getColumnProcessing
(
table
.
getColumns
().
get
(
i
))
+
" \n"
);
}
sb
.
append
(
" FROM "
);
sb
.
append
(
sourceName
);
return
sb
.
toString
();
}
private
String
getColumnProcessing
(
Column
column
)
{
if
(
"true"
.
equals
(
config
.
getSink
().
get
(
"column.replace.line-break"
))
&&
ColumnType
.
STRING
.
equals
(
column
.
getJavaType
()))
{
return
"REGEXP_REPLACE(`"
+
column
.
getName
()
+
"`, '\\n', '') AS `"
+
column
.
getName
()
+
"`"
;
}
else
{
return
"`"
+
column
.
getName
()
+
"`"
;
}
}
private
String
convertSinkColumnType
(
String
type
)
{
if
(
config
.
getSink
().
get
(
"connector"
).
equals
(
"hudi"
))
{
if
(
type
.
equals
(
"TIMESTAMP"
))
{
return
"TIMESTAMP(3)"
;
}
}
return
type
;
}
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
if
(
value
==
null
)
{
if
(
value
==
null
)
{
return
null
;
return
null
;
...
@@ -288,13 +217,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -288,13 +217,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return
value
;
return
value
;
}
}
}
}
private
String
getSinkConfigurationString
(
Table
table
)
{
String
configurationString
=
SqlUtil
.
replaceAllParam
(
config
.
getSinkConfigurationString
(),
"schemaName"
,
getSinkSchemaName
(
table
));
configurationString
=
SqlUtil
.
replaceAllParam
(
configurationString
,
"tableName"
,
getSinkTableName
(
table
));
if
(
configurationString
.
contains
(
"${pkList}"
))
{
configurationString
=
SqlUtil
.
replaceAllParam
(
configurationString
,
"pkList"
,
StringUtils
.
join
(
getPKList
(
table
),
"."
));
}
return
configurationString
;
}
}
}
dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
View file @
03f7e409
package
com
.
dlink
.
cdc
.
sql
;
package
com
.
dlink
.
cdc
.
sql
;
import
com.dlink.model.Column
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.model.ColumnType
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
import
com.dlink.utils.FlinkBaseUtil
;
import
com.dlink.utils.LogUtil
;
import
com.dlink.utils.LogUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
...
@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...
@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.*
;
import
org.apache.flink.table.types.logical.DateType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.TimestampType
;
import
org.apache.flink.table.types.utils.TypeConversions
;
import
org.apache.flink.table.types.utils.TypeConversions
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.types.RowKind
;
...
@@ -33,16 +36,6 @@ import java.util.ArrayList;
...
@@ -33,16 +36,6 @@ import java.util.ArrayList;
import
java.util.List
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
import
com.dlink.utils.SqlUtil
;
/**
/**
* SQLSinkBuilder
* SQLSinkBuilder
*
*
...
@@ -123,15 +116,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -123,15 +116,17 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
Table
table
,
Table
table
,
List
<
String
>
columnNameList
)
{
List
<
String
>
columnNameList
)
{
String
sinkSchemaName
=
getSinkSchemaName
(
table
);
String
sinkTableName
=
getSinkTableName
(
table
);
String
sinkTableName
=
getSinkTableName
(
table
);
String
pkList
=
StringUtils
.
join
(
getPKList
(
table
),
"."
);
String
viewName
=
"VIEW_"
+
table
.
getSchemaTableNameWithUnderline
();
String
viewName
=
"VIEW_"
+
table
.
getSchemaTableNameWithUnderline
();
customTableEnvironment
.
createTemporaryView
(
viewName
,
rowDataDataStream
,
StringUtils
.
join
(
columnNameList
,
","
));
customTableEnvironment
.
createTemporaryView
(
viewName
,
rowDataDataStream
,
StringUtils
.
join
(
columnNameList
,
","
));
logger
.
info
(
"Create "
+
viewName
+
" temporaryView successful..."
);
logger
.
info
(
"Create "
+
viewName
+
" temporaryView successful..."
);
String
flinkDDL
=
getFlinkDDL
(
table
,
sinkTableName
);
String
flinkDDL
=
FlinkBaseUtil
.
getFlinkDDL
(
table
,
sinkTableName
,
config
,
sinkSchemaName
,
sinkTableName
,
pkList
);
logger
.
info
(
flinkDDL
);
logger
.
info
(
flinkDDL
);
customTableEnvironment
.
executeSql
(
flinkDDL
);
customTableEnvironment
.
executeSql
(
flinkDDL
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL DDL successful..."
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL DDL successful..."
);
String
cdcSqlInsert
=
getCDCSqlInsert
(
table
,
sinkTableName
,
viewName
);
String
cdcSqlInsert
=
FlinkBaseUtil
.
getCDCSqlInsert
(
table
,
sinkTableName
,
viewName
,
config
);
logger
.
info
(
cdcSqlInsert
);
logger
.
info
(
cdcSqlInsert
);
List
<
Operation
>
operations
=
customTableEnvironment
.
getParser
().
parse
(
cdcSqlInsert
);
List
<
Operation
>
operations
=
customTableEnvironment
.
getParser
().
parse
(
cdcSqlInsert
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL insert into successful..."
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL insert into successful..."
);
...
@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return
dataStreamSource
;
return
dataStreamSource
;
}
}
private
String
getFlinkDDL
(
Table
table
,
String
tableName
)
{
StringBuilder
sb
=
new
StringBuilder
();
sb
.
append
(
"CREATE TABLE IF NOT EXISTS "
);
sb
.
append
(
tableName
);
sb
.
append
(
" (\n"
);
List
<
String
>
pks
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
table
.
getColumns
().
size
();
i
++)
{
String
type
=
table
.
getColumns
().
get
(
i
).
getJavaType
().
getFlinkType
();
sb
.
append
(
" "
);
if
(
i
>
0
)
{
sb
.
append
(
","
);
}
sb
.
append
(
"`"
);
sb
.
append
(
table
.
getColumns
().
get
(
i
).
getName
());
sb
.
append
(
"` "
);
sb
.
append
(
convertSinkColumnType
(
type
));
sb
.
append
(
"\n"
);
if
(
table
.
getColumns
().
get
(
i
).
isKeyFlag
())
{
pks
.
add
(
table
.
getColumns
().
get
(
i
).
getName
());
}
}
StringBuilder
pksb
=
new
StringBuilder
(
"PRIMARY KEY ( "
);
for
(
int
i
=
0
;
i
<
pks
.
size
();
i
++)
{
if
(
i
>
0
)
{
pksb
.
append
(
","
);
}
pksb
.
append
(
"`"
);
pksb
.
append
(
pks
.
get
(
i
));
pksb
.
append
(
"`"
);
}
pksb
.
append
(
" ) NOT ENFORCED\n"
);
if
(
pks
.
size
()
>
0
)
{
sb
.
append
(
" ,"
);
sb
.
append
(
pksb
);
}
sb
.
append
(
") WITH (\n"
);
sb
.
append
(
getSinkConfigurationString
(
table
));
sb
.
append
(
")\n"
);
return
sb
.
toString
();
}
private
String
getCDCSqlInsert
(
Table
table
,
String
targetName
,
String
sourceName
)
{
StringBuilder
sb
=
new
StringBuilder
(
"INSERT INTO "
);
sb
.
append
(
targetName
);
sb
.
append
(
" SELECT\n"
);
for
(
int
i
=
0
;
i
<
table
.
getColumns
().
size
();
i
++)
{
sb
.
append
(
" "
);
if
(
i
>
0
)
{
sb
.
append
(
","
);
}
sb
.
append
(
getColumnProcessing
(
table
.
getColumns
().
get
(
i
))
+
" \n"
);
}
sb
.
append
(
" FROM "
);
sb
.
append
(
sourceName
);
return
sb
.
toString
();
}
private
String
getColumnProcessing
(
Column
column
)
{
if
(
"true"
.
equals
(
config
.
getSink
().
get
(
"column.replace.line-break"
))
&&
ColumnType
.
STRING
.
equals
(
column
.
getJavaType
()))
{
return
"REGEXP_REPLACE(`"
+
column
.
getName
()
+
"`, '\\n', '') AS `"
+
column
.
getName
()
+
"`"
;
}
else
{
return
"`"
+
column
.
getName
()
+
"`"
;
}
}
private
String
convertSinkColumnType
(
String
type
)
{
if
(
config
.
getSink
().
get
(
"connector"
).
equals
(
"hudi"
))
{
if
(
type
.
equals
(
"TIMESTAMP"
))
{
return
"TIMESTAMP(3)"
;
}
}
return
type
;
}
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
if
(
value
==
null
)
{
if
(
value
==
null
)
{
return
null
;
return
null
;
...
@@ -295,13 +216,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -295,13 +216,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return
value
;
return
value
;
}
}
}
}
private
String
getSinkConfigurationString
(
Table
table
)
{
String
configurationString
=
SqlUtil
.
replaceAllParam
(
config
.
getSinkConfigurationString
(),
"schemaName"
,
getSinkSchemaName
(
table
));
configurationString
=
SqlUtil
.
replaceAllParam
(
configurationString
,
"tableName"
,
getSinkTableName
(
table
));
if
(
configurationString
.
contains
(
"${pkList}"
))
{
configurationString
=
SqlUtil
.
replaceAllParam
(
configurationString
,
"pkList"
,
StringUtils
.
join
(
getPKList
(
table
),
"."
));
}
return
configurationString
;
}
}
}
dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java
View file @
03f7e409
package
com
.
dlink
.
cdc
.
sql
;
package
com
.
dlink
.
cdc
.
sql
;
import
com.dlink.model.Column
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.model.ColumnType
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
import
com.dlink.utils.FlinkBaseUtil
;
import
com.dlink.utils.LogUtil
;
import
com.dlink.utils.LogUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
...
@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...
@@ -15,11 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.data.RowData
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.types.logical.BigIntType
;
import
org.apache.flink.table.types.logical.*
;
import
org.apache.flink.table.types.logical.DateType
;
import
org.apache.flink.table.types.logical.DecimalType
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.TimestampType
;
import
org.apache.flink.table.types.utils.TypeConversions
;
import
org.apache.flink.table.types.utils.TypeConversions
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.types.RowKind
;
import
org.apache.flink.types.RowKind
;
...
@@ -33,16 +36,6 @@ import java.util.ArrayList;
...
@@ -33,16 +36,6 @@ import java.util.ArrayList;
import
java.util.List
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.cdc.AbstractSinkBuilder
;
import
com.dlink.cdc.CDCBuilder
;
import
com.dlink.cdc.SinkBuilder
;
import
com.dlink.executor.CustomTableEnvironment
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Schema
;
import
com.dlink.model.Table
;
import
com.dlink.utils.SqlUtil
;
/**
/**
* SQLSinkBuilder
* SQLSinkBuilder
*
*
...
@@ -117,21 +110,23 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -117,21 +110,23 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
},
rowTypeInfo
);
},
rowTypeInfo
);
}
}
p
ublic
void
addTableSink
(
p
rivate
void
addTableSink
(
CustomTableEnvironment
customTableEnvironment
,
CustomTableEnvironment
customTableEnvironment
,
DataStream
<
Row
>
rowDataDataStream
,
DataStream
<
Row
>
rowDataDataStream
,
Table
table
,
Table
table
,
List
<
String
>
columnNameList
)
{
List
<
String
>
columnNameList
)
{
String
sinkSchemaName
=
getSinkSchemaName
(
table
);
String
sinkTableName
=
getSinkTableName
(
table
);
String
sinkTableName
=
getSinkTableName
(
table
);
String
pkList
=
StringUtils
.
join
(
getPKList
(
table
),
"."
);
String
viewName
=
"VIEW_"
+
table
.
getSchemaTableNameWithUnderline
();
String
viewName
=
"VIEW_"
+
table
.
getSchemaTableNameWithUnderline
();
customTableEnvironment
.
createTemporaryView
(
viewName
,
rowDataDataStream
,
StringUtils
.
join
(
columnNameList
,
","
));
customTableEnvironment
.
createTemporaryView
(
viewName
,
rowDataDataStream
,
StringUtils
.
join
(
columnNameList
,
","
));
logger
.
info
(
"Create "
+
viewName
+
" temporaryView successful..."
);
logger
.
info
(
"Create "
+
viewName
+
" temporaryView successful..."
);
String
flinkDDL
=
getFlinkDDL
(
table
,
sinkTableName
);
String
flinkDDL
=
FlinkBaseUtil
.
getFlinkDDL
(
table
,
sinkTableName
,
config
,
sinkSchemaName
,
sinkTableName
,
pkList
);
logger
.
info
(
flinkDDL
);
logger
.
info
(
flinkDDL
);
customTableEnvironment
.
executeSql
(
flinkDDL
);
customTableEnvironment
.
executeSql
(
flinkDDL
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL DDL successful..."
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL DDL successful..."
);
String
cdcSqlInsert
=
getCDCSqlInsert
(
table
,
sinkTableName
,
viewName
);
String
cdcSqlInsert
=
FlinkBaseUtil
.
getCDCSqlInsert
(
table
,
sinkTableName
,
viewName
,
config
);
logger
.
info
(
cdcSqlInsert
);
logger
.
info
(
cdcSqlInsert
);
List
<
Operation
>
operations
=
customTableEnvironment
.
getParser
().
parse
(
cdcSqlInsert
);
List
<
Operation
>
operations
=
customTableEnvironment
.
getParser
().
parse
(
cdcSqlInsert
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL insert into successful..."
);
logger
.
info
(
"Create "
+
sinkTableName
+
" FlinkSQL insert into successful..."
);
...
@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -191,80 +186,6 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return
dataStreamSource
;
return
dataStreamSource
;
}
}
private
String
getFlinkDDL
(
Table
table
,
String
tableName
)
{
StringBuilder
sb
=
new
StringBuilder
();
sb
.
append
(
"CREATE TABLE IF NOT EXISTS "
);
sb
.
append
(
tableName
);
sb
.
append
(
" (\n"
);
List
<
String
>
pks
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
table
.
getColumns
().
size
();
i
++)
{
String
type
=
table
.
getColumns
().
get
(
i
).
getJavaType
().
getFlinkType
();
sb
.
append
(
" "
);
if
(
i
>
0
)
{
sb
.
append
(
","
);
}
sb
.
append
(
"`"
);
sb
.
append
(
table
.
getColumns
().
get
(
i
).
getName
());
sb
.
append
(
"` "
);
sb
.
append
(
convertSinkColumnType
(
type
));
sb
.
append
(
"\n"
);
if
(
table
.
getColumns
().
get
(
i
).
isKeyFlag
())
{
pks
.
add
(
table
.
getColumns
().
get
(
i
).
getName
());
}
}
StringBuilder
pksb
=
new
StringBuilder
(
"PRIMARY KEY ( "
);
for
(
int
i
=
0
;
i
<
pks
.
size
();
i
++)
{
if
(
i
>
0
)
{
pksb
.
append
(
","
);
}
pksb
.
append
(
"`"
);
pksb
.
append
(
pks
.
get
(
i
));
pksb
.
append
(
"`"
);
}
pksb
.
append
(
" ) NOT ENFORCED\n"
);
if
(
pks
.
size
()
>
0
)
{
sb
.
append
(
" ,"
);
sb
.
append
(
pksb
);
}
sb
.
append
(
") WITH (\n"
);
sb
.
append
(
getSinkConfigurationString
(
table
));
sb
.
append
(
")\n"
);
return
sb
.
toString
();
}
private
String
getCDCSqlInsert
(
Table
table
,
String
targetName
,
String
sourceName
)
{
StringBuilder
sb
=
new
StringBuilder
(
"INSERT INTO "
);
sb
.
append
(
targetName
);
sb
.
append
(
" SELECT\n"
);
for
(
int
i
=
0
;
i
<
table
.
getColumns
().
size
();
i
++)
{
sb
.
append
(
" "
);
if
(
i
>
0
)
{
sb
.
append
(
","
);
}
sb
.
append
(
getColumnProcessing
(
table
.
getColumns
().
get
(
i
))
+
" \n"
);
}
sb
.
append
(
" FROM "
);
sb
.
append
(
sourceName
);
return
sb
.
toString
();
}
private
String
getColumnProcessing
(
Column
column
)
{
if
(
"true"
.
equals
(
config
.
getSink
().
get
(
"column.replace.line-break"
))
&&
ColumnType
.
STRING
.
equals
(
column
.
getJavaType
()))
{
return
"REGEXP_REPLACE(`"
+
column
.
getName
()
+
"`, '\\n', '') AS `"
+
column
.
getName
()
+
"`"
;
}
else
{
return
"`"
+
column
.
getName
()
+
"`"
;
}
}
private
String
convertSinkColumnType
(
String
type
)
{
if
(
config
.
getSink
().
get
(
"connector"
).
equals
(
"hudi"
))
{
if
(
type
.
equals
(
"TIMESTAMP"
))
{
return
"TIMESTAMP(3)"
;
}
}
return
type
;
}
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
protected
Object
convertValue
(
Object
value
,
LogicalType
logicalType
)
{
if
(
value
==
null
)
{
if
(
value
==
null
)
{
return
null
;
return
null
;
...
@@ -295,13 +216,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
...
@@ -295,13 +216,4 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
return
value
;
return
value
;
}
}
}
}
private
String
getSinkConfigurationString
(
Table
table
)
{
String
configurationString
=
SqlUtil
.
replaceAllParam
(
config
.
getSinkConfigurationString
(),
"schemaName"
,
getSinkSchemaName
(
table
));
configurationString
=
SqlUtil
.
replaceAllParam
(
configurationString
,
"tableName"
,
getSinkTableName
(
table
));
if
(
configurationString
.
contains
(
"${pkList}"
))
{
configurationString
=
SqlUtil
.
replaceAllParam
(
configurationString
,
"pkList"
,
StringUtils
.
join
(
getPKList
(
table
),
"."
));
}
return
configurationString
;
}
}
}
dlink-client/dlink-client-base/src/main/java/com/dlink/utils/FlinkBaseUtil.java
View file @
03f7e409
package
com
.
dlink
.
utils
;
package
com
.
dlink
.
utils
;
import
com.dlink.constant.FlinkParamConstant
;
import
com.dlink.constant.FlinkParamConstant
;
import
com.dlink.model.Column
;
import
com.dlink.model.ColumnType
;
import
com.dlink.model.FlinkCDCConfig
;
import
com.dlink.model.Table
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map
;
/**
/**
...
@@ -24,4 +30,88 @@ public class FlinkBaseUtil {
...
@@ -24,4 +30,88 @@ public class FlinkBaseUtil {
params
.
put
(
FlinkParamConstant
.
PASSWORD
,
parameters
.
get
(
FlinkParamConstant
.
PASSWORD
,
null
));
params
.
put
(
FlinkParamConstant
.
PASSWORD
,
parameters
.
get
(
FlinkParamConstant
.
PASSWORD
,
null
));
return
params
;
return
params
;
}
}
public
static
String
getCDCSqlInsert
(
Table
table
,
String
targetName
,
String
sourceName
,
FlinkCDCConfig
config
)
{
StringBuilder
sb
=
new
StringBuilder
(
"INSERT INTO `"
);
sb
.
append
(
targetName
);
sb
.
append
(
"` SELECT\n"
);
for
(
int
i
=
0
;
i
<
table
.
getColumns
().
size
();
i
++)
{
sb
.
append
(
" "
);
if
(
i
>
0
)
{
sb
.
append
(
","
);
}
sb
.
append
(
getColumnProcessing
(
table
.
getColumns
().
get
(
i
),
config
)).
append
(
" \n"
);
}
sb
.
append
(
" FROM `"
);
sb
.
append
(
sourceName
);
sb
.
append
(
"`"
);
return
sb
.
toString
();
}
public
static
String
getFlinkDDL
(
Table
table
,
String
tableName
,
FlinkCDCConfig
config
,
String
sinkSchemaName
,
String
sinkTableName
,
String
pkList
)
{
StringBuilder
sb
=
new
StringBuilder
();
sb
.
append
(
"CREATE TABLE IF NOT EXISTS `"
);
sb
.
append
(
tableName
);
sb
.
append
(
"` (\n"
);
List
<
String
>
pks
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
table
.
getColumns
().
size
();
i
++)
{
String
type
=
table
.
getColumns
().
get
(
i
).
getJavaType
().
getFlinkType
();
sb
.
append
(
" "
);
if
(
i
>
0
)
{
sb
.
append
(
","
);
}
sb
.
append
(
"`"
);
sb
.
append
(
table
.
getColumns
().
get
(
i
).
getName
());
sb
.
append
(
"` "
);
sb
.
append
(
convertSinkColumnType
(
type
,
config
));
sb
.
append
(
"\n"
);
if
(
table
.
getColumns
().
get
(
i
).
isKeyFlag
())
{
pks
.
add
(
table
.
getColumns
().
get
(
i
).
getName
());
}
}
StringBuilder
pksb
=
new
StringBuilder
(
"PRIMARY KEY ( "
);
for
(
int
i
=
0
;
i
<
pks
.
size
();
i
++)
{
if
(
i
>
0
)
{
pksb
.
append
(
","
);
}
pksb
.
append
(
"`"
);
pksb
.
append
(
pks
.
get
(
i
));
pksb
.
append
(
"`"
);
}
pksb
.
append
(
" ) NOT ENFORCED\n"
);
if
(
pks
.
size
()
>
0
)
{
sb
.
append
(
" ,"
);
sb
.
append
(
pksb
);
}
sb
.
append
(
") WITH (\n"
);
sb
.
append
(
getSinkConfigurationString
(
table
,
config
,
sinkSchemaName
,
sinkTableName
,
pkList
));
sb
.
append
(
")\n"
);
return
sb
.
toString
();
}
public
static
String
getSinkConfigurationString
(
Table
table
,
FlinkCDCConfig
config
,
String
sinkSchemaName
,
String
sinkTableName
,
String
pkList
)
{
String
configurationString
=
SqlUtil
.
replaceAllParam
(
config
.
getSinkConfigurationString
(),
"schemaName"
,
sinkSchemaName
);
configurationString
=
SqlUtil
.
replaceAllParam
(
configurationString
,
"tableName"
,
sinkTableName
);
if
(
configurationString
.
contains
(
"${pkList}"
))
{
configurationString
=
SqlUtil
.
replaceAllParam
(
configurationString
,
"pkList"
,
pkList
);
}
return
configurationString
;
}
public
static
String
convertSinkColumnType
(
String
type
,
FlinkCDCConfig
config
)
{
if
(
config
.
getSink
().
get
(
"connector"
).
equals
(
"hudi"
))
{
if
(
type
.
equals
(
"TIMESTAMP"
))
{
return
"TIMESTAMP(3)"
;
}
}
return
type
;
}
public
static
String
getColumnProcessing
(
Column
column
,
FlinkCDCConfig
config
)
{
if
(
"true"
.
equals
(
config
.
getSink
().
get
(
"column.replace.line-break"
))
&&
ColumnType
.
STRING
.
equals
(
column
.
getJavaType
()))
{
return
"REGEXP_REPLACE(`"
+
column
.
getName
()
+
"`, '\\n', '') AS `"
+
column
.
getName
()
+
"`"
;
}
else
{
return
"`"
+
column
.
getName
()
+
"`"
;
}
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment