Commit 6085ef5a authored by shezaixing's avatar shezaixing

update

parent c5470ee1
...@@ -67,7 +67,7 @@ public class SyncCustomerDataSource { ...@@ -67,7 +67,7 @@ public class SyncCustomerDataSource {
//TODO 到时需要改这里,改成正式的消费组 //TODO 到时需要改这里,改成正式的消费组
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(),envProps.getKafka_password())); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(),envProps.getKafka_password()));
//System.out.println(envProps.getKafka_topic()); //System.out.println(envProps.getKafka_topic());
long defaultOffset = LocalDateTime.now().minusMinutes(5).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); long defaultOffset = LocalDateTime.now().minusMinutes(10).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
kafkaConsumer.setStartFromTimestamp(defaultOffset); kafkaConsumer.setStartFromTimestamp(defaultOffset);
//kafkaConsumer.setStartFromLatest(); //kafkaConsumer.setStartFromLatest();
//偏移量 //偏移量
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment