Commit 25931076 authored by shezaixing's avatar shezaixing

偏移量

parent 4fc840af
...@@ -53,11 +53,11 @@ public class SyncCustomerDataSource { ...@@ -53,11 +53,11 @@ public class SyncCustomerDataSource {
//TODO 到时需要改这里,改成正式的消费组 //TODO 到时需要改这里,改成正式的消费组
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(),envProps.getKafka_password())); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), EtlUtils.getKafkaGroup(envProps), envProps.getKafka_username(),envProps.getKafka_password()));
//System.out.println(envProps.getKafka_topic()); //System.out.println(envProps.getKafka_topic());
kafkaConsumer.setStartFromEarliest();
//偏移量 //偏移量
if (StrUtil.isNotBlank(offsetTimestamp)) { if (StrUtil.isNotBlank(offsetTimestamp)) {
kafkaConsumer.setStartFromTimestamp(Long.parseLong(offsetTimestamp)); kafkaConsumer.setStartFromTimestamp(Long.parseLong(offsetTimestamp));
} }
kafkaConsumer.setStartFromEarliest();
SingleOutputStreamOperator<String> kafkaSource = env.addSource(kafkaConsumer) SingleOutputStreamOperator<String> kafkaSource = env.addSource(kafkaConsumer)
.setParallelism(1) .setParallelism(1)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment