Commit 4fc840af authored by shezaixing's avatar shezaixing

upate

parent a0b42df9
...@@ -51,7 +51,7 @@ public class SyncCustomerDataSource { ...@@ -51,7 +51,7 @@ public class SyncCustomerDataSource {
envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName()); envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName());
//System.out.println("读取到的配置文件:-> " + envProps.toString()); //System.out.println("读取到的配置文件:-> " + envProps.toString());
//TODO 到时需要改这里,改成正式的消费组 //TODO 到时需要改这里,改成正式的消费组
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), envProps.getKafka_topic().concat("-group"),envProps.getKafka_username(),envProps.getKafka_password())); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(),envProps.getKafka_password()));
//System.out.println(envProps.getKafka_topic()); //System.out.println(envProps.getKafka_topic());
//偏移量 //偏移量
if (StrUtil.isNotBlank(offsetTimestamp)) { if (StrUtil.isNotBlank(offsetTimestamp)) {
......
...@@ -8,6 +8,8 @@ import java.util.Properties; ...@@ -8,6 +8,8 @@ import java.util.Properties;
*/ */
public class EnvProperties extends Properties { public class EnvProperties extends Properties {
String env;
// #连接 // #连接
String db_url; String db_url;
...@@ -111,6 +113,14 @@ public class EnvProperties extends Properties { ...@@ -111,6 +113,14 @@ public class EnvProperties extends Properties {
String solr_urls; String solr_urls;
String solr_zk_hosts; String solr_zk_hosts;
public String getEnv() {
return env == null ? this.getProperty("env") : env;
}
public void setEnv(String env) {
this.env = env;
}
public String getDb_url() { public String getDb_url() {
return db_url == null ? this.getProperty("db_url") : db_url; return db_url == null ? this.getProperty("db_url") : db_url;
} }
......
package com.dsk.flink.dsc.utils; package com.dsk.flink.dsc.utils;
import cn.hutool.core.util.StrUtil;
import java.util.Properties; import java.util.Properties;
public class EtlUtils { public class EtlUtils {
...@@ -31,6 +33,10 @@ public class EtlUtils { ...@@ -31,6 +33,10 @@ public class EtlUtils {
return String.format(saslJaasConfig, username, password); return String.format(saslJaasConfig, username, password);
} }
public static String getKafkaGroup(EnvProperties envProps){
return envProps.getKafka_topic().concat(StrUtil.isNotBlank(envProps.getEnv()) ? envProps.getEnv() : "").concat("-group");
}
public static void main(String[] args) { public static void main(String[] args) {
System.out.println(getSaslJaasConfig("szx","123456")); System.out.println(getSaslJaasConfig("szx","123456"));
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment