Commit 84e5cfc9 authored by liaowenwu's avatar liaowenwu

修改代码

parent 3730ba06
...@@ -45,13 +45,17 @@ public class SyncCustomerDataSource { ...@@ -45,13 +45,17 @@ public class SyncCustomerDataSource {
//获取用户自己的配置信息 //获取用户自己的配置信息
ParameterTool parameterTool = ParameterTool.fromArgs(args); ParameterTool parameterTool = ParameterTool.fromArgs(args);
String offsetTimestamp = parameterTool.get("offsetTimestamp");
String propertiesPath = parameterTool.get("propertiesPath"); String propertiesPath = parameterTool.get("propertiesPath");
EnvProperties envProps = EnvPropertiesUtil.getPropertiesFromArgsPath(propertiesPath); EnvProperties envProps = EnvPropertiesUtil.getPropertiesFromArgsPath(propertiesPath);
envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName()); envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName());
//System.out.println("读取到的配置文件:-> " + envProps.toString()); //System.out.println("读取到的配置文件:-> " + envProps.toString());
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), envProps.getKafka_topic().concat("-group"))); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), envProps.getKafka_topic().concat("-group")));
//kafkaConsumer.setStartFromEarliest(); //lww添加
if (StrUtil.isNotBlank(offsetTimestamp)) {
kafkaConsumer.setStartFromTimestamp(Long.parseLong(offsetTimestamp));
}
SingleOutputStreamOperator<String> kafkaSource = env.addSource(kafkaConsumer) SingleOutputStreamOperator<String> kafkaSource = env.addSource(kafkaConsumer)
.setParallelism(1) .setParallelism(1)
......
...@@ -21,7 +21,7 @@ import java.util.zip.ZipEntry; ...@@ -21,7 +21,7 @@ import java.util.zip.ZipEntry;
*/ */
public class EnvPropertiesUtil { public class EnvPropertiesUtil {
private static String SYSTEM_ENV = "dev"; private static String SYSTEM_ENV = "pro";
private static EnvProperties properties = new EnvProperties(); private static EnvProperties properties = new EnvProperties();
static { static {
SYSTEM_ENV = System.getProperty("profiles.active") == null ? SYSTEM_ENV : System.getProperty("profiles.active"); SYSTEM_ENV = System.getProperty("profiles.active") == null ? SYSTEM_ENV : System.getProperty("profiles.active");
......
...@@ -10,8 +10,8 @@ public class EtlUtils { ...@@ -10,8 +10,8 @@ public class EtlUtils {
properties.setProperty("group.id", groupId); properties.setProperty("group.id", groupId);
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//properties.setProperty("auto.offset.reset", "earliest"); properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("auto.offset.reset", "latest"); //properties.setProperty("auto.offset.reset", "latest");
//properties.setProperty("max.poll.interval.ms", "604800000"); //properties.setProperty("max.poll.interval.ms", "604800000");
//properties.setProperty("session.timeout.ms", "20160000"); //properties.setProperty("session.timeout.ms", "20160000");
//properties.setProperty("heartbeat.interval.ms", "6720000"); //properties.setProperty("heartbeat.interval.ms", "6720000");
...@@ -20,7 +20,7 @@ public class EtlUtils { ...@@ -20,7 +20,7 @@ public class EtlUtils {
/*properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"dsk\" password=\"LU1SRhTzoxssRoCp\";"); /*properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"dsk\" password=\"LU1SRhTzoxssRoCp\";");
properties.setProperty("security.protocol", "SASL_PLAINTEXT"); properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "PLAIN");*/ properties.setProperty("sasl.mechanism", "PLAIN");*/
properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"dsk_st\" password=\"vIfVv6esjpwU2jT\";"); properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"ccroad\" password=\"BkYeyyb9KkFGkVua9sjzwWZVL\";");
properties.setProperty("security.protocol", "SASL_PLAINTEXT"); properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
return properties; return properties;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment