Commit da242a2e authored by liaowenwu's avatar liaowenwu

修改代码

parent 5418b84d
...@@ -52,7 +52,7 @@ public class SyncCustomerDataSource { ...@@ -52,7 +52,7 @@ public class SyncCustomerDataSource {
//System.out.println("读取到的配置文件:-> " + envProps.toString()); //System.out.println("读取到的配置文件:-> " + envProps.toString());
//TODO 到时需要改这里,改成正式的消费组 //TODO 到时需要改这里,改成正式的消费组
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), envProps.getKafka_topic().concat("-test-group"))); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), envProps.getKafka_topic().concat("-test-group")));
//lww添加 //偏移量
if (StrUtil.isNotBlank(offsetTimestamp)) { if (StrUtil.isNotBlank(offsetTimestamp)) {
kafkaConsumer.setStartFromTimestamp(Long.parseLong(offsetTimestamp)); kafkaConsumer.setStartFromTimestamp(Long.parseLong(offsetTimestamp));
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment