Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dsk-dsc-flink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
shezaixing
dsk-dsc-flink
Commits
e552acf5
Commit
e552acf5
authored
Aug 22, 2024
by
沈毫厘
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
分库分表代码
parent
0153cd84
Changes
6
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
179 additions
and
83 deletions
+179
-83
AsyncMysqlDataTransferFunction.java
...k/dsc/common/function/AsyncMysqlDataTransferFunction.java
+78
-57
MysqlDataTransferSink.java
.../com/dsk/flink/dsc/common/sink/MysqlDataTransferSink.java
+30
-0
SyncCustomerDataSource.java
...n/java/com/dsk/flink/dsc/sync/SyncCustomerDataSource.java
+12
-6
EnvProperties.java
src/main/java/com/dsk/flink/dsc/utils/EnvProperties.java
+39
-0
EnvPropertiesUtil.java
src/main/java/com/dsk/flink/dsc/utils/EnvPropertiesUtil.java
+17
-17
EtlUtils.java
src/main/java/com/dsk/flink/dsc/utils/EtlUtils.java
+3
-3
No files found.
src/main/java/com/dsk/flink/dsc/common/function/AsyncMysqlDataTransferFunction.java
View file @
e552acf5
...
...
@@ -17,6 +17,7 @@ import java.util.concurrent.ExecutorService;
import
java.util.concurrent.LinkedBlockingDeque
;
import
java.util.concurrent.ThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
import
java.util.stream.Collectors
;
/**
* @author shezaixing
...
...
@@ -24,7 +25,7 @@ import java.util.concurrent.TimeUnit;
* @description mysql/tidb sink 数据组装 sql
*
*/
public
class
AsyncMysqlDataTransferFunction
extends
RichAsyncFunction
<
JSONObject
,
String
>
{
public
class
AsyncMysqlDataTransferFunction
extends
RichAsyncFunction
<
JSONObject
,
String
>
{
static
Logger
logger
=
LoggerFactory
.
getLogger
(
AsyncMysqlDataTransferFunction
.
class
);
//数据库连接信息
...
...
@@ -48,11 +49,6 @@ public class AsyncMysqlDataTransferFunction extends RichAsyncFunction<JSONObject
executorService
.
shutdown
();
}
@Override
public
void
timeout
(
JSONObject
input
,
ResultFuture
<
String
>
resultFuture
)
throws
Exception
{
resultFuture
.
complete
(
Collections
.
singleton
(
"err"
));
}
@Override
public
void
asyncInvoke
(
JSONObject
value
,
ResultFuture
<
String
>
resultFuture
)
throws
Exception
{
executorService
.
submit
(()
->
{
...
...
@@ -64,11 +60,31 @@ public class AsyncMysqlDataTransferFunction extends RichAsyncFunction<JSONObject
String
table
=
value
.
getString
(
"table"
);
Boolean
isDdl
=
value
.
getBoolean
(
"isDdl"
);
JSONArray
pkNames
=
value
.
getJSONArray
(
"pkNames"
);
JSONObject
dataObj
=
dataList
.
getJSONObject
(
0
);
Set
<
String
>
pkNameSet
=
new
HashSet
<>();
if
(
CollUtil
.
isNotEmpty
(
pkNames
)){
pkNames
.
forEach
(
name
->
pkNameSet
.
add
(
String
.
valueOf
(
name
)));
}
String
shardedTable
=
dbInfoMap
.
getSharded_table
();
List
<
Map
<
String
,
Object
>>
list
=
JSONArray
.
parseObject
(
shardedTable
,
List
.
class
);
List
<
Map
<
String
,
Object
>>
table1
=
list
.
stream
().
filter
(
l
->
{
return
l
.
get
(
"table"
).
toString
().
equals
(
table
);
}).
collect
(
Collectors
.
toList
());
String
concat
=
""
;
//分表数
String
tbNum
=
table1
.
get
(
0
).
get
(
"tb_num"
).
toString
();
//分表字段
Integer
majorKey
=
dataObj
.
getInteger
(
table1
.
get
(
0
).
get
(
"major_key"
).
toString
());
if
(!
tbNum
.
equals
(
"1"
)){
int
i1
=
majorKey
.
intValue
()
%
Integer
.
parseInt
(
tbNum
);
concat
=
table
.
concat
(
"_"
).
concat
(
String
.
valueOf
(
i1
));
}
else
{
concat
=
table
;
}
String
excueteSql
=
""
;
if
(
isDdl
){
excueteSql
=
value
.
getString
(
"sql"
);
...
...
@@ -78,19 +94,18 @@ public class AsyncMysqlDataTransferFunction extends RichAsyncFunction<JSONObject
resultFuture
.
complete
(
Collections
.
singleton
(
excueteSql
));
return
;
}
JSONObject
dataObj
=
dataList
.
getJSONObject
(
0
);
if
(
"INSERT"
.
equals
(
type
)){
excueteSql
=
tranferInsertSql
(
table
,
dataObj
,
mysqlType
);
excueteSql
=
tranferInsertSql
(
concat
,
dataObj
,
mysqlType
);
}
if
(
"UPDATE"
.
equals
(
type
)){
JSONObject
oldDataObj
=
oldDataList
.
getJSONObject
(
0
);
// excueteSql = tranferUpdateSql(table,dataObj,oldDataObj,mysqlType,pkNameSet);
excueteSql
=
tranferInsertSql
(
table
,
dataObj
,
mysqlType
);
excueteSql
=
tranferInsertSql
(
concat
,
dataObj
,
mysqlType
);
}
if
(
"DELETE"
.
equals
(
type
)){
excueteSql
=
transferDeleteSql
(
table
,
dataObj
,
mysqlType
,
pkNameSet
);
excueteSql
=
transferDeleteSql
(
concat
,
dataObj
,
mysqlType
,
pkNameSet
);
}
resultFuture
.
complete
(
Collections
.
singleton
(
excueteSql
));
...
...
@@ -194,54 +209,60 @@ public class AsyncMysqlDataTransferFunction extends RichAsyncFunction<JSONObject
public
static
void
main
(
String
[]
args
)
{
JSONObject
jsonObject
=
new
JSONObject
();
jsonObject
.
put
(
"id"
,
1
);
jsonObject
.
put
(
"name"
,
"Nana"
);
jsonObject
.
put
(
"age"
,
26
);
jsonObject
.
put
(
"salary"
,
20000
);
jsonObject
.
put
(
"date1"
,
"2023-10-01"
);
jsonObject
.
put
(
"date2"
,
"2023-10-02 11:11:00"
);
JSONObject
mysqlType
=
new
JSONObject
();
mysqlType
.
put
(
"id"
,
"int"
);
mysqlType
.
put
(
"name"
,
"varchar"
);
mysqlType
.
put
(
"age"
,
"bigint"
);
mysqlType
.
put
(
"salary"
,
"double"
);
mysqlType
.
put
(
"date1"
,
"date"
);
mysqlType
.
put
(
"date2"
,
"datetime"
);
mysqlType
.
put
(
"relation"
,
null
);
String
table
=
"test"
;
String
s
=
"ff8940af-c080-40cc-9d83-8c7dc8b86ed4"
;
System
.
out
.
println
(
s
.
length
());
String
s1
=
"hello string sss"
;
String
s2
=
"'kaskljsl'"
;
System
.
out
.
println
(
StrUtil
.
subBefore
(
s1
,
"string"
,
true
));
System
.
out
.
println
(
tranferInsertSql
(
table
,
jsonObject
,
mysqlType
));
System
.
out
.
println
(
s2
.
replaceAll
(
"'"
,
"\\\\'"
));
String
[]
ss
=
new
String
[]{
"1"
,
"2"
,
"3"
};
StringBuilder
sb
=
new
StringBuilder
();
for
(
String
s3
:
ss
)
{
sb
.
append
(
"`"
+
s3
+
"?"
).
append
(
","
);
}
for
(
String
s3
:
ss
)
{
System
.
out
.
println
(
s3
);
}
sb
.
setLength
(
sb
.
length
()-
1
);
System
.
out
.
println
(
sb
.
toString
());
String
s5
=
"交货地址:安徽霍邱供应站及指定地点\\\",\\\"bjsm\\\":null,\\\"jhType\\\":null,\\"
;
System
.
out
.
println
(
s5
);
System
.
out
.
println
(
s5
.
replace
(
"\\"
,
"\\\\"
).
replace
(
"'"
,
"\\'"
));
int
i
=
1422
;
int
i1
=
i
%
6
%
3
;
int
i2
=
i
%
3
;
System
.
out
.
println
(
i1
);
System
.
out
.
println
(
i2
);
// JSONObject jsonObject = new JSONObject();
//
// jsonObject.put("id",1);
// jsonObject.put("name","Nana");
// jsonObject.put("age",26);
// jsonObject.put("salary",20000);
// jsonObject.put("date1","2023-10-01");
// jsonObject.put("date2","2023-10-02 11:11:00");
//
// JSONObject mysqlType = new JSONObject();
// mysqlType.put("id","int");
// mysqlType.put("name","varchar");
// mysqlType.put("age","bigint");
// mysqlType.put("salary","double");
// mysqlType.put("date1","date");
// mysqlType.put("date2","datetime");
// mysqlType.put("relation",null);
//
// String table = "test";
//
// String s= "ff8940af-c080-40cc-9d83-8c7dc8b86ed4";
// System.out.println(s.length());
//
// String s1 = "hello string sss";
//
// String s2 = "'kaskljsl'";
//
// System.out.println(StrUtil.subBefore(s1,"string",true));
//
// System.out.println(tranferInsertSql(table,jsonObject,mysqlType));
//
// System.out.println(s2.replaceAll("'","\\\\'"));
//
// String[] ss = new String[]{"1","2","3" };
// StringBuilder sb = new StringBuilder();
// for (String s3 : ss)
// {
// sb.append("`"+s3+"?").append(",");
// }
// for (String s3 : ss) {
// System.out.println(s3);
// }
// sb.setLength(sb.length()-1);
// System.out.println(sb.toString());
//
// String s5 = "交货地址:安徽霍邱供应站及指定地点\\\",\\\"bjsm\\\":null,\\\"jhType\\\":null,\\";
// System.out.println(s5);
// System.out.println(s5.replace("\\","\\\\").replace("'", "\\'"));
}
...
...
src/main/java/com/dsk/flink/dsc/common/sink/MysqlDataTransferSink.java
View file @
e552acf5
...
...
@@ -8,14 +8,19 @@ import cn.hutool.db.DbUtil;
import
com.alibaba.druid.pool.DruidDataSource
;
import
com.dsk.flink.dsc.common.dto.SqlErrorLog
;
import
com.dsk.flink.dsc.utils.EnvProperties
;
import
org.apache.flink.api.java.tuple.Tuple2
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.sql.Array
;
import
java.sql.Connection
;
import
java.sql.PreparedStatement
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.concurrent.*
;
public
class
MysqlDataTransferSink
extends
RichSinkFunction
<
String
>
{
...
...
@@ -25,6 +30,8 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
ExecutorService
executorService
;
DruidDataSource
dataSource
;
List
<
DruidDataSource
>
dataSourceList
;
public
MysqlDataTransferSink
(
EnvProperties
envProps
)
{
this
.
envProps
=
envProps
;
}
...
...
@@ -33,6 +40,27 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
public
void
open
(
Configuration
parameters
)
throws
Exception
{
executorService
=
new
ThreadPoolExecutor
(
4
,
4
,
20
,
TimeUnit
.
MINUTES
,
new
LinkedBlockingDeque
<>());
// String shardedDbUrls = envProps.getSharded_db_urls();
//
// List<String> list = Arrays.asList(shardedDbUrls.split(","));
// logger.info("链接数组:{}",list);
// dataSourceList = new ArrayList<>();
// list.forEach(l ->{
// dataSource = new DruidDataSource();
// dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
// dataSource.setUsername(envProps.getDb_username());
// dataSource.setPassword(envProps.getDb_password());
//// dataSource.setUrl(configTidbUrl);
// dataSource.setMaxActive(30);
// dataSource.setInitialSize(10);
// dataSource.setTestWhileIdle(true);
// dataSource.setMaxWait(20000);
// dataSource.setValidationQuery("select 1");
// dataSource.setUrl(l);
// dataSourceList.add(dataSource);
// });
//初始化获取配置
String
configTidbUrl
=
String
.
format
(
envProps
.
getDb_url
(),
envProps
.
getDb_host
(),
envProps
.
getDb_port
(),
envProps
.
getDb_database
());
//System.out.println(configTidbUrl);
...
...
@@ -46,6 +74,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
dataSource
.
setTestWhileIdle
(
true
);
dataSource
.
setMaxWait
(
20000
);
dataSource
.
setValidationQuery
(
"select 1"
);
}
@Override
...
...
@@ -69,6 +98,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
Connection
connection
=
null
;
PreparedStatement
pt
=
null
;
try
{
// connection = dataSourceList.get(dbNum).getConnection();
connection
=
dataSource
.
getConnection
();
pt
=
connection
.
prepareStatement
(
sql
);
pt
.
execute
();
...
...
src/main/java/com/dsk/flink/dsc/sync/SyncCustomerDataSource.java
View file @
e552acf5
...
...
@@ -50,7 +50,7 @@ public class SyncCustomerDataSource {
EnvProperties
envProps
=
EnvPropertiesUtil
.
getPropertiesFromArgsPath
(
propertiesPath
);
envProps
.
put
(
"providerImpl"
,
JdbcConnectionProviderFactory
.
HikariDataSourceJdbcConnectionProvider
.
class
.
getName
());
System
.
out
.
println
(
"读取到的配置文件:-> "
+
envProps
.
toString
());
System
.
out
.
println
(
"读取到的数据连接配置:->"
+
String
.
format
(
envProps
.
getDb_url
(),
envProps
.
getDb_host
(),
envProps
.
getDb_port
(),
envProps
.
getDb_database
()));
//
System.out.println("读取到的数据连接配置:->" + String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database()));
System
.
out
.
println
(
"获取到的kafka消费组:->"
+
EtlUtils
.
getKafkaGroup
(
envProps
));
//TODO 到时需要改这里,改成正式的消费组
FlinkKafkaConsumer
<
String
>
kafkaConsumer
=
new
FlinkKafkaConsumer
<
String
>(
envProps
.
getKafka_topic
(),
new
SimpleStringSchema
(),
EtlUtils
.
getKafkaConfig
(
envProps
.
getKafka_brokers
(),
EtlUtils
.
getKafkaGroup
(
envProps
),
envProps
.
getKafka_username
(),
envProps
.
getKafka_password
()));
...
...
@@ -66,7 +66,7 @@ public class SyncCustomerDataSource {
.
name
(
"kafka-source"
)
.
uid
(
"kafka-source"
);
//
kafkaSource.print("kafaka stream");
kafkaSource
.
print
(
"kafaka stream"
);
SingleOutputStreamOperator
<
JSONObject
>
canalJsonStream
=
kafkaSource
.
map
(
JSONObject:
:
parseObject
)
.
filter
(
new
FilterFunction
<
JSONObject
>()
{
...
...
@@ -78,10 +78,16 @@ public class SyncCustomerDataSource {
.
name
(
"canalJsonStream"
)
.
uid
(
"canalJsonStream"
);
//
canalJsonStream.print("canal stream");
canalJsonStream
.
print
(
"canal stream"
);
SingleOutputStreamOperator
<
String
>
sqlResultStream
=
AsyncDataStream
.
orderedWait
(
canalJsonStream
,
new
AsyncMysqlDataTransferFunction
(
envProps
),
1200L
,
TimeUnit
.
SECONDS
,
20
)
.
filter
(
new
FilterFunction
<
String
>()
{
// SingleOutputStreamOperator<Tuple2<Integer, String>> sqlResultStream = AsyncDataStream.orderedWait(canalJsonStream, new AsyncMysqlDataTransferFunction(envProps), 1200L, TimeUnit.SECONDS, 20)
// .filter(new FilterFunction<Tuple2<Integer, String>>() {
// @Override
// public boolean filter(Tuple2<Integer, String> value) throws Exception {
// return StrUtil.isNotBlank(value.f1) && !"err".equals(value.f1);
// }
// })
SingleOutputStreamOperator
<
String
>
sqlResultStream
=
AsyncDataStream
.
orderedWait
(
canalJsonStream
,
new
AsyncMysqlDataTransferFunction
(
envProps
),
1200L
,
TimeUnit
.
SECONDS
,
20
).
filter
(
new
FilterFunction
<
String
>()
{
@Override
public
boolean
filter
(
String
value
)
throws
Exception
{
return
StrUtil
.
isNotBlank
(
value
)
&&
!
"err"
.
equals
(
value
);
...
...
@@ -90,7 +96,7 @@ public class SyncCustomerDataSource {
.
name
(
"sqlResultStream"
)
.
uid
(
"sqlResultStream"
);
//
sqlResultStream.print("sql result");
sqlResultStream
.
print
(
"sql result"
);
sqlResultStream
.
addSink
(
new
MysqlDataTransferSink
(
envProps
)).
name
(
"sqlSinkStream"
).
uid
(
"sqlSinkStream"
);
env
.
execute
();
...
...
src/main/java/com/dsk/flink/dsc/utils/EnvProperties.java
View file @
e552acf5
...
...
@@ -20,6 +20,13 @@ public class EnvProperties extends Properties {
String
db_password
;
String
tidb_pd_addresses
;
String
db_database
;
String
sharded_db_urls
;
String
sharded_db_num
;
String
sharded_tb_num
;
String
sharded_table
;
String
kafka_brokers
;
String
kafka_topic
;
...
...
@@ -121,6 +128,38 @@ public class EnvProperties extends Properties {
this
.
env
=
env
;
}
public
String
getSharded_db_urls
()
{
return
sharded_db_urls
==
null
?
this
.
getProperty
(
"sharded_db_urls"
)
:
sharded_db_urls
;
}
public
void
setSharded_db_urls
(
String
sharded_db_urls
)
{
this
.
sharded_db_urls
=
sharded_db_urls
;
}
public
String
getSharded_db_num
()
{
return
sharded_db_num
==
null
?
this
.
getProperty
(
"sharded_db_num"
)
:
sharded_db_num
;
}
public
void
setSharded_db_num
(
String
sharded_db_num
)
{
this
.
sharded_db_num
=
sharded_db_num
;
}
public
String
getSharded_table
()
{
return
sharded_table
==
null
?
this
.
getProperty
(
"sharded_table"
)
:
sharded_table
;
}
public
void
setSharded_table
(
String
sharded_table
)
{
this
.
sharded_table
=
sharded_table
;
}
public
String
getSharded_tb_num
()
{
return
sharded_tb_num
==
null
?
this
.
getProperty
(
"sharded_tb_num"
)
:
sharded_tb_num
;
}
public
void
setSharded_tb_num
(
String
sharded_tb_num
)
{
this
.
sharded_tb_num
=
sharded_tb_num
;
}
public
String
getDb_url
()
{
return
db_url
==
null
?
this
.
getProperty
(
"db_url"
)
:
db_url
;
}
...
...
src/main/java/com/dsk/flink/dsc/utils/EnvPropertiesUtil.java
View file @
e552acf5
...
...
@@ -145,7 +145,7 @@ public class EnvPropertiesUtil {
EnvProperties
envProperties
=
new
EnvProperties
();
if
(
StrUtil
.
isBlank
(
filePath
)){
filePath
=
System
.
getProperties
().
getProperty
(
"os.name"
).
contains
(
"Windows"
)
?
"D:\\Env\\application_pro.properties"
:
"/
home/module/flink-job
/application.properties"
;
filePath
=
System
.
getProperties
().
getProperty
(
"os.name"
).
contains
(
"Windows"
)
?
"D:\\Env\\application_pro.properties"
:
"/
Users/moumoumou/shl/dm/dm-yaml
/application.properties"
;
}
File
file
=
new
File
(
filePath
);
if
(!
file
.
exists
())
{
...
...
src/main/java/com/dsk/flink/dsc/utils/EtlUtils.java
View file @
e552acf5
...
...
@@ -22,9 +22,9 @@ public class EtlUtils {
/*properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"dsk\" password=\"LU1SRhTzoxssRoCp\";");
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "PLAIN");*/
properties
.
setProperty
(
"sasl.jaas.config"
,
getSaslJaasConfig
(
username
,
password
));
properties
.
setProperty
(
"security.protocol"
,
"SASL_PLAINTEXT"
);
properties
.
setProperty
(
"sasl.mechanism"
,
"SCRAM-SHA-512"
);
//
properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password));
//
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
//
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
return
properties
;
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment