Commit 3730ba06 authored by shezaixing's avatar shezaixing

update

parent 9ec79f4f
Pipeline #367 failed with stages
...@@ -48,10 +48,10 @@ public class SyncCustomerDataSource { ...@@ -48,10 +48,10 @@ public class SyncCustomerDataSource {
String propertiesPath = parameterTool.get("propertiesPath"); String propertiesPath = parameterTool.get("propertiesPath");
EnvProperties envProps = EnvPropertiesUtil.getPropertiesFromArgsPath(propertiesPath); EnvProperties envProps = EnvPropertiesUtil.getPropertiesFromArgsPath(propertiesPath);
envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName()); envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName());
System.out.println("读取到的配置文件:-> " + envProps.toString()); //System.out.println("读取到的配置文件:-> " + envProps.toString());
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), envProps.getKafka_topic().concat("-group"))); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), envProps.getKafka_topic().concat("-group")));
kafkaConsumer.setStartFromEarliest(); //kafkaConsumer.setStartFromEarliest();
SingleOutputStreamOperator<String> kafkaSource = env.addSource(kafkaConsumer) SingleOutputStreamOperator<String> kafkaSource = env.addSource(kafkaConsumer)
.setParallelism(1) .setParallelism(1)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment