Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dsk-dsc-flink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
shezaixing
dsk-dsc-flink
Commits
4279b65d
Commit
4279b65d
authored
Apr 01, 2024
by
shezaixing
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
配置文件处理
parent
b240749c
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
48 additions
and
20 deletions
+48
-20
SyncCustomerDataSource.java
...n/java/com/dsk/flink/dsc/sync/SyncCustomerDataSource.java
+16
-1
EnvProperties.java
src/main/java/com/dsk/flink/dsc/utils/EnvProperties.java
+32
-19
No files found.
src/main/java/com/dsk/flink/dsc/sync/SyncCustomerDataSource.java
View file @
4279b65d
...
@@ -19,6 +19,8 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
...
@@ -19,6 +19,8 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.CheckpointConfig
;
import
org.apache.flink.streaming.api.environment.CheckpointConfig
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeUnit
;
...
@@ -30,6 +32,7 @@ import java.util.concurrent.TimeUnit;
...
@@ -30,6 +32,7 @@ import java.util.concurrent.TimeUnit;
*/
*/
@Slf4j
@Slf4j
public
class
SyncCustomerDataSource
{
public
class
SyncCustomerDataSource
{
static
Logger
logger
=
LoggerFactory
.
getLogger
(
SyncCustomerDataSource
.
class
);
public
static
void
main
(
String
[]
args
)
throws
Exception
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
...
@@ -51,7 +54,19 @@ public class SyncCustomerDataSource {
...
@@ -51,7 +54,19 @@ public class SyncCustomerDataSource {
envProps
.
put
(
"providerImpl"
,
JdbcConnectionProviderFactory
.
HikariDataSourceJdbcConnectionProvider
.
class
.
getName
());
envProps
.
put
(
"providerImpl"
,
JdbcConnectionProviderFactory
.
HikariDataSourceJdbcConnectionProvider
.
class
.
getName
());
System
.
out
.
println
(
"读取到的配置文件:-> "
+
envProps
.
toString
());
System
.
out
.
println
(
"读取到的配置文件:-> "
+
envProps
.
toString
());
System
.
out
.
println
(
"读取到的数据连接配置:->"
+
String
.
format
(
envProps
.
getDb_url
(),
envProps
.
getDb_host
(),
envProps
.
getDb_port
(),
envProps
.
getDb_database
()));
System
.
out
.
println
(
"读取到的数据连接配置:->"
+
String
.
format
(
envProps
.
getDb_url
(),
envProps
.
getDb_host
(),
envProps
.
getDb_port
(),
envProps
.
getDb_database
()));
System
.
out
.
println
(
"获取到的kafka消费组:->"
+
EtlUtils
.
getKafkaGroup
(
envProps
));
System
.
out
.
println
(
"读取到的数据连接配置:->"
+
String
.
format
(
envProps
.
getDb_url
(),
envProps
.
getDb_host
(),
envProps
.
getDb_port
(),
envProps
.
getDb_database
()));
System
.
out
.
println
(
"获取到的kafka消费组:-> {}"
+
EtlUtils
.
getKafkaGroup
(
envProps
));
System
.
out
.
println
(
"获取到的kafka消费组:-> {}"
+
envProps
.
getKafka_topic
());
System
.
out
.
println
(
"获取到的kafka消费组:-> {}"
+
envProps
.
getKafka_password
());
logger
.
info
(
"获取到的kafka消费组:-> {}"
,
EtlUtils
.
getKafkaGroup
(
envProps
));
logger
.
info
(
"读取到的配置文件:-> {}"
,
envProps
.
toString
());
logger
.
info
(
"读取到的数据连接配置:-> {}"
,
String
.
format
(
envProps
.
getDb_url
(),
envProps
.
getDb_host
(),
envProps
.
getDb_port
(),
envProps
.
getDb_database
()));
logger
.
info
(
"获取到的kafka消费组:-> {}"
,
EtlUtils
.
getKafkaGroup
(
envProps
));
logger
.
info
(
"获取到的kafka消费组:-> {}"
,
EtlUtils
.
getKafkaGroup
(
envProps
));
logger
.
info
(
"获取到的kafka主题:-> {}"
,
envProps
.
getKafka_topic
());
logger
.
info
(
"获取到的kafka用户名:-> {}"
,
envProps
.
getKafka_username
());
logger
.
info
(
"获取到的kafka密码:-> {}"
,
envProps
.
getKafka_password
());
//TODO 到时需要改这里,改成正式的消费组
//TODO 到时需要改这里,改成正式的消费组
FlinkKafkaConsumer
<
String
>
kafkaConsumer
=
new
FlinkKafkaConsumer
<
String
>(
envProps
.
getKafka_topic
(),
new
SimpleStringSchema
(),
EtlUtils
.
getKafkaConfig
(
envProps
.
getKafka_brokers
(),
EtlUtils
.
getKafkaGroup
(
envProps
),
envProps
.
getKafka_username
(),
envProps
.
getKafka_password
()));
FlinkKafkaConsumer
<
String
>
kafkaConsumer
=
new
FlinkKafkaConsumer
<
String
>(
envProps
.
getKafka_topic
(),
new
SimpleStringSchema
(),
EtlUtils
.
getKafkaConfig
(
envProps
.
getKafka_brokers
(),
EtlUtils
.
getKafkaGroup
(
envProps
),
envProps
.
getKafka_username
(),
envProps
.
getKafka_password
()));
//System.out.println(envProps.getKafka_topic());
//System.out.println(envProps.getKafka_topic());
...
...
src/main/java/com/dsk/flink/dsc/utils/EnvProperties.java
View file @
4279b65d
package
com
.
dsk
.
flink
.
dsc
.
utils
;
package
com
.
dsk
.
flink
.
dsc
.
utils
;
import
cn.hutool.core.util.StrUtil
;
import
org.apache.commons.lang3.StringUtils
;
import
java.util.Properties
;
import
java.util.Properties
;
/**
/**
...
@@ -11,21 +14,21 @@ public class EnvProperties extends Properties {
...
@@ -11,21 +14,21 @@ public class EnvProperties extends Properties {
String
env
;
String
env
;
// #连接
// #连接
String
db_url
;
String
db_url
=
"jdbc:mysql://%s:%s/%s?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true&&allowMultiQueries=true"
;
// #建设库TIDB库
// #建设库TIDB库
String
db_host
;
String
db_host
=
"10.23.52.133"
;
String
db_port
;
String
db_port
=
"3306"
;
String
db_username
;
String
db_username
=
"ccroad_scs"
;
String
db_password
;
String
db_password
=
"ccroad_scs@123"
;
String
tidb_pd_addresses
;
String
tidb_pd_addresses
;
String
db_database
;
String
db_database
=
"ccroad_scs"
;
String
kafka_brokers
;
String
kafka_brokers
=
"123.249.87.86:9095,123.249.87.21:9095,120.46.67.20:9095"
;
String
kafka_topic
;
String
kafka_topic
=
"dsc_ccroad"
;
String
kafka_username
;
String
kafka_username
=
"ccroad"
;
String
kafka_password
;
String
kafka_password
=
"BkYeyyb9KkFGkVua9sjzwWZVL"
;
String
st_kafka_brokers
;
String
st_kafka_brokers
;
...
@@ -128,25 +131,31 @@ public class EnvProperties extends Properties {
...
@@ -128,25 +131,31 @@ public class EnvProperties extends Properties {
this
.
db_url
=
db_url
;
this
.
db_url
=
db_url
;
}
}
public
String
getDb_host
()
{
public
String
getDb_host
()
{
return
db_host
==
null
?
this
.
getProperty
(
"db_host"
)
:
db_host
;
return
StrUtil
.
isBlank
(
this
.
getProperty
(
"db_host"
))
?
db_host
:
this
.
getProperty
(
"db_host"
);
//return db_host == null ? StringUtils.isBlank(this.getProperty("db_host")) ? "10.23.52.133":this.getProperty("db_host"): db_host;
}
}
public
void
setDb_host
(
String
db_host
)
{
public
void
setDb_host
(
String
db_host
)
{
this
.
db_host
=
db_host
;
this
.
db_host
=
db_host
;
}
}
public
String
getDb_port
()
{
public
String
getDb_port
()
{
return
db_port
==
null
?
this
.
getProperty
(
"db_port"
)
:
db_port
;
return
StrUtil
.
isBlank
(
this
.
getProperty
(
"db_port"
))
?
db_port
:
this
.
getProperty
(
"db_port"
);
//return db_port == null ? this.getProperty("db_port") : db_port;
}
}
public
void
setDb_port
(
String
db_port
)
{
public
void
setDb_port
(
String
db_port
)
{
this
.
db_port
=
db_port
;
this
.
db_port
=
db_port
;
}
}
public
String
getDb_username
()
{
public
String
getDb_username
()
{
return
db_username
==
null
?
this
.
getProperty
(
"db_username"
)
:
db_username
;
return
StrUtil
.
isBlank
(
this
.
getProperty
(
"db_username"
))
?
db_username
:
this
.
getProperty
(
"db_username"
);
//return db_username == null ? this.getProperty("db_username") : db_username;
}
}
public
void
setDb_username
(
String
db_username
)
{
public
void
setDb_username
(
String
db_username
)
{
this
.
db_username
=
db_username
;
this
.
db_username
=
db_username
;
}
}
public
String
getDb_password
()
{
public
String
getDb_password
()
{
return
db_password
==
null
?
this
.
getProperty
(
"db_password"
)
:
db_password
;
return
StrUtil
.
isBlank
(
this
.
getProperty
(
"db_password"
))
?
db_password
:
this
.
getProperty
(
"db_password"
);
//return db_password == null ? this.getProperty("db_password") : db_password;
}
}
public
void
setDb_password
(
String
db_password
)
{
public
void
setDb_password
(
String
db_password
)
{
this
.
db_password
=
db_password
;
this
.
db_password
=
db_password
;
...
@@ -159,7 +168,8 @@ public class EnvProperties extends Properties {
...
@@ -159,7 +168,8 @@ public class EnvProperties extends Properties {
}
}
public
String
getDb_database
()
{
public
String
getDb_database
()
{
return
db_database
==
null
?
this
.
getProperty
(
"db_database"
)
:
db_database
;
return
StrUtil
.
isBlank
(
this
.
getProperty
(
"db_database"
))
?
db_database
:
this
.
getProperty
(
"db_database"
);
//return db_database == null ? this.getProperty("db_database") : db_database;
}
}
public
void
setDb_database
(
String
db_database
)
{
public
void
setDb_database
(
String
db_database
)
{
...
@@ -167,14 +177,16 @@ public class EnvProperties extends Properties {
...
@@ -167,14 +177,16 @@ public class EnvProperties extends Properties {
}
}
public
String
getKafka_brokers
()
{
public
String
getKafka_brokers
()
{
return
kafka_brokers
==
null
?
this
.
getProperty
(
"kafka_brokers"
)
:
kafka_brokers
;
return
StrUtil
.
isBlank
(
this
.
getProperty
(
"kafka_brokers"
))
?
kafka_brokers
:
this
.
getProperty
(
"kafka_brokers"
);
//return kafka_brokers == null ? this.getProperty("kafka_brokers") : kafka_brokers;
}
}
public
void
setKafka_brokers
(
String
kafka_brokers
)
{
public
void
setKafka_brokers
(
String
kafka_brokers
)
{
this
.
kafka_brokers
=
kafka_brokers
;
this
.
kafka_brokers
=
kafka_brokers
;
}
}
public
String
getKafka_username
()
{
public
String
getKafka_username
()
{
return
kafka_username
==
null
?
this
.
getProperty
(
"kafka_username"
)
:
kafka_username
;
return
StrUtil
.
isBlank
(
this
.
getProperty
(
"kafka_username"
))
?
kafka_username
:
this
.
getProperty
(
"kafka_username"
);
//return kafka_username == null ? this.getProperty("kafka_username") : kafka_username;
}
}
public
void
setKafka_username
(
String
kafka_username
)
{
public
void
setKafka_username
(
String
kafka_username
)
{
...
@@ -182,7 +194,8 @@ public class EnvProperties extends Properties {
...
@@ -182,7 +194,8 @@ public class EnvProperties extends Properties {
}
}
public
String
getKafka_password
()
{
public
String
getKafka_password
()
{
return
kafka_password
==
null
?
this
.
getProperty
(
"kafka_password"
)
:
kafka_password
;
return
StrUtil
.
isBlank
(
this
.
getProperty
(
"kafka_password"
))
?
kafka_password
:
this
.
getProperty
(
"kafka_password"
);
//return kafka_password == null ? this.getProperty("kafka_password") : kafka_password;
}
}
public
void
setKafka_password
(
String
kafka_password
)
{
public
void
setKafka_password
(
String
kafka_password
)
{
...
@@ -529,7 +542,7 @@ public class EnvProperties extends Properties {
...
@@ -529,7 +542,7 @@ public class EnvProperties extends Properties {
}
}
public
String
getKafka_topic
()
{
public
String
getKafka_topic
()
{
return
kafka_topic
==
null
?
this
.
getProperty
(
"kafka_topic"
)
:
kafka_topic
;
return
kafka_topic
==
null
?
StringUtils
.
isBlank
(
this
.
getProperty
(
"kafka_topic"
))
?
"dsc_ccroad"
:
this
.
getProperty
(
"kafka_topic"
)
:
kafka_topic
;
}
}
public
void
setKafka_topic
(
String
kafka_topic
)
{
public
void
setKafka_topic
(
String
kafka_topic
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment