Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dsk-dsc-flink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
shezaixing
dsk-dsc-flink
Commits
c6a18364
Commit
c6a18364
authored
Nov 26, 2024
by
liaowenwu
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
gaussdb
parent
ce59d998
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
54 additions
and
50 deletions
+54
-50
AsyncMysqlDataTransferFunctionNew.java
...sc/common/function/AsyncMysqlDataTransferFunctionNew.java
+4
-11
CanalMapToTsGroupFunction.java
.../flink/dsc/common/function/CanalMapToTsGroupFunction.java
+1
-1
GroupTsProcessWindowFunction.java
...ink/dsc/common/function/GroupTsProcessWindowFunction.java
+30
-3
SyncCustomerDataSource.java
...n/java/com/dsk/flink/dsc/sync/SyncCustomerDataSource.java
+18
-26
EtlUtils.java
src/main/java/com/dsk/flink/dsc/utils/EtlUtils.java
+1
-9
No files found.
src/main/java/com/dsk/flink/dsc/common/function/AsyncMysqlDataTransferFunctionNew.java
View file @
c6a18364
...
...
@@ -99,8 +99,8 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj
}
if
(
"UPDATE"
.
equals
(
type
)){
JSONObject
oldDataObj
=
oldDataList
.
getJSONObject
(
0
);
//
excueteSql = tranferUpdateSql(table,dataObj,oldDataObj,mysqlType,pkNameSet);
excueteSql
=
tranferInsertSql
(
table
,
dataObj
,
mysqlType
);
excueteSql
=
tranferUpdateSql
(
table
,
dataObj
,
oldDataObj
,
mysqlType
,
pkNameSet
);
//
excueteSql = tranferInsertSql(table,dataObj,mysqlType);
}
if
(
"DELETE"
.
equals
(
type
)){
...
...
@@ -117,8 +117,6 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj
}
catch
(
Exception
e
){
e
.
printStackTrace
();
resultFuture
.
complete
(
Collections
.
singleton
(
Tuple3
.
of
(
"err"
,
""
,
0L
)));
}
finally
{
}
});
}
...
...
@@ -148,24 +146,19 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj
sb
.
append
(
"`"
+
s
+
"`"
).
append
(
","
);
}
List
<
String
>
valueList
=
new
ArrayList
<>();
List
<
String
>
updateList
=
new
ArrayList
<>();
for
(
String
col
:
columnSet
)
{
String
formatCol
=
"`"
+
col
+
"`"
;
if
(
Arrays
.
asList
(
KEYWORD
).
contains
(
col
)){
valueList
.
add
(
getValueString
(
dataObj
,
col
,
mysqlType
.
getString
(
col
)));
updateList
.
add
(
formatCol
.
concat
(
" = VALUES("
).
concat
(
formatCol
).
concat
(
")"
));
}
else
{
valueList
.
add
(
getValueString
(
dataObj
,
col
,
mysqlType
.
getString
(
col
)));
updateList
.
add
(
col
.
concat
(
" = VALUES("
).
concat
(
col
).
concat
(
")"
));
}
}
//String columnString = String.join(",",columnSet);
sb
.
setLength
(
sb
.
length
()-
1
);
String
columnString
=
sb
.
toString
();
String
valueString
=
String
.
join
(
","
,
valueList
);
String
updateString
=
String
.
join
(
","
,
updateList
);
return
String
.
format
(
"INSERT INTO %s (%s) values (%s) ON DUPLICATE KEY UPDATE %s;"
,
table
,
columnString
,
valueString
,
updateString
);
//return String.format("INSERT INTO %s (%s) values (%s) ON DUPLICATE KEY UPDATE %s;",table,columnString,valueString,updateString);
return
String
.
format
(
"INSERT INTO %s (%s) values (%s);"
,
table
,
columnString
,
valueString
);
}
private
String
tranferUpdateSql
(
String
table
,
JSONObject
dataObj
,
JSONObject
oldDataObj
,
JSONObject
mysqlType
,
Set
<
String
>
pkNameSet
)
{
...
...
src/main/java/com/dsk/flink/dsc/common/function/CanalMapToTsGroupFunction.java
View file @
c6a18364
...
...
@@ -20,7 +20,7 @@ public class CanalMapToTsGroupFunction implements MapFunction<JSONObject, Tuple3
String
table
=
value
.
getString
(
"table"
);
JSONArray
pkNames
=
value
.
getJSONArray
(
"pkNames"
);
Set
<
String
>
pkNameSet
=
new
HashSet
<>();
long
ts
=
value
.
getLong
(
"ts"
);
long
ts
=
value
.
getLong
(
"ts"
)
==
null
?
System
.
currentTimeMillis
()
:
value
.
getLong
(
"ts"
)
;
if
(
CollUtil
.
isNotEmpty
(
pkNames
))
{
pkNames
.
forEach
(
name
->
pkNameSet
.
add
(
String
.
valueOf
(
name
)));
}
...
...
src/main/java/com/dsk/flink/dsc/common/function/GroupTsProcessWindowFunction.java
View file @
c6a18364
...
...
@@ -12,11 +12,38 @@ import java.util.List;
import
java.util.Map
;
import
java.util.stream.Collectors
;
public
class
GroupTsProcessWindowFunction
extends
ProcessWindowFunction
<
Tuple3
<
JSONObject
,
String
,
Long
>,
JSONObject
,
Lo
ng
,
TimeWindow
>
{
@Override
public
void
process
(
Lo
ng
aLong
,
ProcessWindowFunction
<
Tuple3
<
JSONObject
,
String
,
Long
>,
JSONObject
,
Long
,
public
class
GroupTsProcessWindowFunction
extends
ProcessWindowFunction
<
Tuple3
<
JSONObject
,
String
,
Long
>,
JSONObject
,
Stri
ng
,
TimeWindow
>
{
/*
@Override
public void process(
Stri
ng aLong, ProcessWindowFunction<Tuple3<JSONObject, String, Long>, JSONObject, Long,
TimeWindow>.Context context, Iterable<Tuple3<JSONObject, String, Long>> elements, Collector<JSONObject> out) throws Exception {
List<Tuple3<JSONObject, String, Long>> list = CollUtil.list(false, elements);
Map<Long, List<Tuple3<JSONObject, String, Long>>> tsGroupMap = list.stream().collect(Collectors.groupingBy(x -> x.f2));
Long max = CollUtil.max(tsGroupMap.keySet());
List<Tuple3<JSONObject, String, Long>> resList = tsGroupMap.get(max);
if(resList.size() == 2){
JSONObject insertJson = new JSONObject();
JSONObject deleteJson = new JSONObject();
for (Tuple3<JSONObject, String, Long> rs : resList) {
if("INSERT".equals(rs.f0.getString("type"))){
insertJson = rs.f0;
}
if("DELETE".equals(rs.f0.getString("type"))){
deleteJson = rs.f0;
}
}
if(StrUtil.isNotBlank(insertJson.getString("type")) && StrUtil.isNotBlank(deleteJson.getString("type"))){
insertJson.put("type","UPDATE");
insertJson.put("old", deleteJson.getJSONArray("data"));
out.collect(insertJson);
return;
}
}
out.collect(resList.get(0).f0);
}*/
@Override
public
void
process
(
String
key
,
Context
context
,
Iterable
<
Tuple3
<
JSONObject
,
String
,
Long
>>
elements
,
Collector
<
JSONObject
>
out
)
throws
Exception
{
List
<
Tuple3
<
JSONObject
,
String
,
Long
>>
list
=
CollUtil
.
list
(
false
,
elements
);
Map
<
Long
,
List
<
Tuple3
<
JSONObject
,
String
,
Long
>>>
tsGroupMap
=
list
.
stream
().
collect
(
Collectors
.
groupingBy
(
x
->
x
.
f2
));
Long
max
=
CollUtil
.
max
(
tsGroupMap
.
keySet
());
...
...
src/main/java/com/dsk/flink/dsc/sync/SyncCustomerDataSource.java
View file @
c6a18364
...
...
@@ -48,28 +48,28 @@ public class SyncCustomerDataSource {
public
static
void
main
(
String
[]
args
)
throws
Exception
{
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setRestartStrategy
(
RestartStrategies
.
fixedDelayRestart
(
3
,
30000
));
env
.
enableCheckpointing
(
60000
);
//env.setParallelism(1);
env
.
setRestartStrategy
(
RestartStrategies
.
fixedDelayRestart
(
50
,
30000
));
env
.
enableCheckpointing
(
120000
);
env
.
getCheckpointConfig
().
setCheckpointingMode
(
CheckpointingMode
.
EXACTLY_ONCE
);
env
.
getCheckpointConfig
().
setMinPauseBetweenCheckpoints
(
6
0000
);
env
.
getCheckpointConfig
().
setMinPauseBetweenCheckpoints
(
12
0000
);
env
.
getCheckpointConfig
().
setCheckpointTimeout
(
7200000
);
env
.
getCheckpointConfig
().
setMaxConcurrentCheckpoints
(
1
);
env
.
getCheckpointConfig
().
setExternalizedCheckpointCleanup
(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.
RETAIN_ON_CANCELLATION
);
env
.
getCheckpointConfig
().
setTolerableCheckpointFailureNumber
(
3
);
env
.
getCheckpointConfig
().
setTolerableCheckpointFailureNumber
(
50
);
//获取用户自己的配置信息
ParameterTool
parameterTool
=
ParameterTool
.
fromArgs
(
args
);
String
offsetTimestamp
=
parameterTool
.
get
(
"offsetTimestamp"
);
String
propertiesPath
=
parameterTool
.
get
(
"propertiesPath"
);
EnvProperties
envProps
=
EnvPropertiesUtil
.
getPropertiesFromArgsPath
(
propertiesPath
);
envProps
.
put
(
"providerImpl"
,
JdbcConnectionProviderFactory
.
HikariDataSourceJdbcConnectionProvider
.
class
.
getName
());
//
envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName());
System
.
out
.
println
(
"读取到的配置文件:-> "
+
envProps
.
toString
());
System
.
out
.
println
(
"读取到的数据连接配置:->"
+
String
.
format
(
envProps
.
getDb_url
(),
envProps
.
getDb_host
(),
envProps
.
getDb_port
(),
envProps
.
getDb_database
()));
System
.
out
.
println
(
"获取到的kafka消费组:->"
+
EtlUtils
.
getKafkaGroup
(
envProps
));
//TODO 到时需要改这里,改成正式的消费组
FlinkKafkaConsumer
<
String
>
kafkaConsumer
=
new
FlinkKafkaConsumer
<
String
>(
envProps
.
getKafka_topic
(),
new
SimpleStringSchema
(),
EtlUtils
.
getKafkaConfig
(
envProps
.
getKafka_brokers
(),
EtlUtils
.
getKafkaGroup
(
envProps
),
envProps
.
getKafka_username
(),
envProps
.
getKafka_password
()));
//System.out.println(envProps.getKafka_topic());
long
defaultOffset
=
LocalDateTime
.
now
().
minusMinutes
(
1
0
).
atZone
(
ZoneId
.
systemDefault
()).
toInstant
().
toEpochMilli
();
long
defaultOffset
=
LocalDateTime
.
now
().
minusMinutes
(
3
0
).
atZone
(
ZoneId
.
systemDefault
()).
toInstant
().
toEpochMilli
();
kafkaConsumer
.
setStartFromTimestamp
(
defaultOffset
);
//kafkaConsumer.setStartFromLatest();
//偏移量
...
...
@@ -99,26 +99,12 @@ public class SyncCustomerDataSource {
SingleOutputStreamOperator
<
Tuple3
<
JSONObject
,
String
,
Long
>>
tsGroupStream
=
canalJsonStream
.
map
(
new
CanalMapToTsGroupFunction
());
SingleOutputStreamOperator
<
JSONObject
>
process
=
tsGroupStream
.
keyBy
(
x
->
x
.
f
2
)
.
window
(
TumblingProcessingTimeWindows
.
of
(
Time
.
seconds
(
3
)))
SingleOutputStreamOperator
<
JSONObject
>
process
=
tsGroupStream
.
keyBy
(
x
->
x
.
f
1
)
.
window
(
TumblingProcessingTimeWindows
.
of
(
Time
.
seconds
(
1
)))
.
process
(
new
GroupTsProcessWindowFunction
());
// SingleOutputStreamOperator<String> sqlResultStream = AsyncDataStream.orderedWait(canalJsonStream, new AsyncMysqlDataTransferFunction(envProps), 1200L, TimeUnit.SECONDS, 20)
// .filter(new FilterFunction<String>() {
// @Override
// public boolean filter(String value) throws Exception {
// return StrUtil.isNotBlank(value) && !"err".equals(value);
// }
// })
// .name("sqlResultStream")
// .uid("sqlResultStream");
//
// //sqlResultStream.print("sql result");
//
// sqlResultStream.addSink(new MysqlDataTransferSink(envProps)).name("sqlSinkStream").uid("sqlSinkStream");
SingleOutputStreamOperator
<
Tuple3
<
String
,
String
,
Long
>>
sqlResultStream1
=
AsyncDataStream
.
orderedWait
(
process
,
new
AsyncMysqlDataTransferFunctionNew
(
envProps
),
1200L
,
TimeUnit
.
SECONDS
,
20
)
new
AsyncMysqlDataTransferFunctionNew
(
envProps
),
600L
,
TimeUnit
.
SECONDS
)
.
filter
(
new
FilterFunction
<
Tuple3
<
String
,
String
,
Long
>>()
{
@Override
public
boolean
filter
(
Tuple3
<
String
,
String
,
Long
>
value
)
throws
Exception
{
...
...
@@ -128,8 +114,10 @@ public class SyncCustomerDataSource {
.
name
(
"sqlResultStream"
)
.
uid
(
"sqlResultStream"
);
sqlResultStream1
.
print
(
"async sql==>"
);
SingleOutputStreamOperator
<
String
>
groupWindowSqlResultStream
=
sqlResultStream1
.
keyBy
(
value
->
value
.
f1
)
.
window
(
TumblingProcessingTimeWindows
.
of
(
Time
.
seconds
(
3
)))
.
window
(
TumblingProcessingTimeWindows
.
of
(
Time
.
milliseconds
(
500
)))
.
process
(
new
ProcessWindowFunction
<
Tuple3
<
String
,
String
,
Long
>,
String
,
String
,
TimeWindow
>()
{
@Override
public
void
process
(
String
s
,
ProcessWindowFunction
<
Tuple3
<
String
,
String
,
Long
>,
String
,
String
,
...
...
@@ -143,12 +131,16 @@ public class SyncCustomerDataSource {
}
Tuple3
<
String
,
String
,
Long
>
maxTsElement
=
list
.
stream
().
max
(
Comparator
.
comparing
(
x
->
x
.
f2
)).
get
();
if
(
maxTsElement
.
f0
.
contains
(
"甘肃华羚实业集团有限公司"
))
{
System
.
out
.
println
(
"======================="
);
}
out
.
collect
(
maxTsElement
.
f0
);
//list.forEach(x -> {out.collect(x.f0);});
}
})
.
name
(
"groupWindowSqlResultStream"
)
.
uid
(
"groupWindowSqlResultStream"
);
//groupWindowSqlResultStream.print("sql result");
//groupWindowSqlResultStream.print("sql result
==>
");
groupWindowSqlResultStream
.
addSink
(
new
MysqlDataTransferSink
(
envProps
)).
name
(
"sqlSinkStream"
).
uid
(
"sqlSinkStream"
);
env
.
execute
();
...
...
src/main/java/com/dsk/flink/dsc/utils/EtlUtils.java
View file @
c6a18364
...
...
@@ -13,15 +13,6 @@ public class EtlUtils {
properties
.
setProperty
(
"key.deserializer"
,
"org.apache.kafka.common.serialization.StringDeserializer"
);
properties
.
setProperty
(
"value.deserializer"
,
"org.apache.kafka.common.serialization.StringDeserializer"
);
properties
.
setProperty
(
"auto.offset.reset"
,
"earliest"
);
//properties.setProperty("auto.offset.reset", "latest");
//properties.setProperty("max.poll.interval.ms", "604800000");
//properties.setProperty("session.timeout.ms", "20160000");
//properties.setProperty("heartbeat.interval.ms", "6720000");
//properties.setProperty("max.partition.fetch.bytes", "349525");
//properties.setProperty("max.poll.records", "50");
/*properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"dsk\" password=\"LU1SRhTzoxssRoCp\";");
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "PLAIN");*/
properties
.
setProperty
(
"sasl.jaas.config"
,
getSaslJaasConfig
(
username
,
password
));
properties
.
setProperty
(
"security.protocol"
,
"SASL_PLAINTEXT"
);
properties
.
setProperty
(
"sasl.mechanism"
,
"SCRAM-SHA-512"
);
...
...
@@ -34,6 +25,7 @@ public class EtlUtils {
}
public
static
String
getKafkaGroup
(
EnvProperties
envProps
){
//return "test";
return
envProps
.
getKafka_topic
().
concat
(
StrUtil
.
isNotBlank
(
envProps
.
getEnv
())
?
envProps
.
getEnv
()
:
""
).
concat
(
"-group"
);
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment