Commit b240749c authored by shezaixing's avatar shezaixing

update

parent 159aac6a
...@@ -49,7 +49,9 @@ public class SyncCustomerDataSource { ...@@ -49,7 +49,9 @@ public class SyncCustomerDataSource {
String propertiesPath = parameterTool.get("propertiesPath"); String propertiesPath = parameterTool.get("propertiesPath");
EnvProperties envProps = EnvPropertiesUtil.getPropertiesFromArgsPath(propertiesPath); EnvProperties envProps = EnvPropertiesUtil.getPropertiesFromArgsPath(propertiesPath);
envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName()); envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName());
//System.out.println("读取到的配置文件:-> " + envProps.toString()); System.out.println("读取到的配置文件:-> " + envProps.toString());
System.out.println("读取到的数据连接配置:->" + String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database()));
System.out.println("获取到的kafka消费组:->" + EtlUtils.getKafkaGroup(envProps));
//TODO 到时需要改这里,改成正式的消费组 //TODO 到时需要改这里,改成正式的消费组
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(),envProps.getKafka_password())); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(),envProps.getKafka_password()));
//System.out.println(envProps.getKafka_topic()); //System.out.println(envProps.getKafka_topic());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment