Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dsk-dsc-flink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
shezaixing
dsk-dsc-flink
Commits
07075e3e
Commit
07075e3e
authored
Feb 11, 2025
by
liaowenwu
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
添加日志
parent
006d98f6
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
4 additions
and
8 deletions
+4
-8
SyncCustomerDataSource.java
...n/java/com/dsk/flink/dsc/sync/SyncCustomerDataSource.java
+4
-8
No files found.
src/main/java/com/dsk/flink/dsc/sync/SyncCustomerDataSource.java
View file @
07075e3e
...
...
@@ -46,7 +46,7 @@ public class SyncCustomerDataSource {
public
static
void
main
(
String
[]
args
)
throws
Exception
{
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setParallelism
(
1
);
//
env.setParallelism(1);
env
.
setRestartStrategy
(
RestartStrategies
.
fixedDelayRestart
(
100
,
60000
));
env
.
enableCheckpointing
(
120000
);
env
.
getCheckpointConfig
().
setCheckpointingMode
(
CheckpointingMode
.
EXACTLY_ONCE
);
...
...
@@ -91,7 +91,6 @@ public class SyncCustomerDataSource {
.
name
(
"dsc-source"
)
.
uid
(
"dsc-source"
);
tsGroupStream
.
print
(
"source==>"
);
OutputTag
<
Tuple6
<
String
,
String
,
String
,
String
,
String
,
Long
>>
logSlideTag
=
new
OutputTag
<
Tuple6
<
String
,
String
,
String
,
String
,
String
,
Long
>>(
"log_slide"
)
{};
SingleOutputStreamOperator
<
Tuple3
<
String
,
String
,
Long
>>
slide
=
tsGroupStream
...
...
@@ -99,8 +98,6 @@ public class SyncCustomerDataSource {
.
name
(
"dsc-sql"
)
.
uid
(
"dsc-sql"
);
slide
.
print
(
"dsc-sql ==>"
);
SingleOutputStreamOperator
<
String
>
groupWindowSqlResultStream
=
slide
.
keyBy
(
value
->
value
.
f1
)
.
window
(
TumblingProcessingTimeWindows
.
of
(
Time
.
milliseconds
(
100
)))
...
...
@@ -123,13 +120,12 @@ public class SyncCustomerDataSource {
.
name
(
"dsc-max"
)
.
uid
(
"dsc-max"
);
groupWindowSqlResultStream
.
print
(
"dsc-max==>"
);
groupWindowSqlResultStream
.
addSink
(
new
MysqlDataTransferSinkBatch
(
envProps
))
.
name
(
"dsc-sink"
)
.
uid
(
"dsc-sink"
);
DataStream
<
Tuple6
<
String
,
String
,
String
,
String
,
String
,
Long
>>
sideOutput
=
slide
.
getSideOutput
(
logSlideTag
);
sideOutput
.
print
(
"log==>"
);
sideOutput
.
addSink
(
JdbcSink
.
sink
(
"INSERT INTO dsc_cdc_log (`table`,op_type,pk_columns,pk_values,data_json,cdc_ts) values (?,?,?,?,?,?)"
,
(
ps
,
t
)
->
{
...
...
@@ -151,8 +147,8 @@ public class SyncCustomerDataSource {
.
withUsername
(
envProps
.
getDb_username
())
.
withPassword
(
envProps
.
getDb_password
())
.
build
()
)).
uid
(
"d
eleteProject
"
)
.
name
(
"deleteProject
"
);
)).
uid
(
"d
sc-log
"
)
.
name
(
"dsc-log
"
);
env
.
execute
(
"dsc-client"
);
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment