Commit 2592603c authored by shezaixing's avatar shezaixing

update

parent da242a2e
......@@ -51,7 +51,7 @@ public class SyncCustomerDataSource {
envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName());
//System.out.println("读取到的配置文件:-> " + envProps.toString());
//TODO 到时需要改这里,改成正式的消费组
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), envProps.getKafka_topic().concat("-test-group")));
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), envProps.getKafka_topic().concat("-test-group"),envProps.getKafka_username(),envProps.getKafka_password()));
//偏移量
if (StrUtil.isNotBlank(offsetTimestamp)) {
kafkaConsumer.setStartFromTimestamp(Long.parseLong(offsetTimestamp));
......
......@@ -20,6 +20,11 @@ public class EnvProperties extends Properties {
String db_database;
String kafka_brokers;
String kafka_topic;
String kafka_username;
String kafka_password;
String st_kafka_brokers;
//公共组件tidb
......@@ -157,6 +162,23 @@ public class EnvProperties extends Properties {
public void setKafka_brokers(String kafka_brokers) {
this.kafka_brokers = kafka_brokers;
}
public String getKafka_username() {
return kafka_username == null ? this.getProperty("kafka_username") : kafka_username;
}
public void setKafka_username(String kafka_username) {
this.kafka_username = kafka_username;
}
public String getKafka_password() {
return kafka_password == null ? this.getProperty("kafka_password") : kafka_password;
}
public void setKafka_password(String kafka_password) {
this.kafka_password = kafka_password;
}
public String getSt_kafka_brokers() {
return st_kafka_brokers == null ? this.getProperty("st_kafka_brokers") : st_kafka_brokers;
}
......
......@@ -4,7 +4,7 @@ import java.util.Properties;
public class EtlUtils {
public static Properties getKafkaConfig(String url, String groupId) {
public static Properties getKafkaConfig(String url, String groupId, String username, String password) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", url);
properties.setProperty("group.id", groupId);
......@@ -20,9 +20,18 @@ public class EtlUtils {
/*properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"dsk\" password=\"LU1SRhTzoxssRoCp\";");
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "PLAIN");*/
properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"ccroad\" password=\"BkYeyyb9KkFGkVua9sjzwWZVL\";");
properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password));
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
return properties;
}
public static String getSaslJaasConfig(String username, String password){
String saslJaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
return String.format(saslJaasConfig, username, password);
}
public static void main(String[] args) {
System.out.println(getSaslJaasConfig("szx","123456"));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment