Commit b77326eb authored by liaowenwu's avatar liaowenwu

修改kafka连接

parent 73259a30
...@@ -30,7 +30,7 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> { ...@@ -30,7 +30,7 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
EnvProperties envProps; EnvProperties envProps;
private static transient ExecutorService executorService; private static transient ExecutorService executorService;
private static transient DruidDataSource dataSource; private static transient DruidDataSource dataSource;
private static final int BATCH_SIZE = 500; private static final int BATCH_SIZE = 100;
private static final int FLUSH_INTERVAL = 500; private static final int FLUSH_INTERVAL = 500;
private static final int INIT_CAPACITY = (int)(BATCH_SIZE / 0.75 + 1); private static final int INIT_CAPACITY = (int)(BATCH_SIZE / 0.75 + 1);
private ArrayBlockingQueue<String> sqlBatch = new ArrayBlockingQueue<>(BATCH_SIZE * 2); private ArrayBlockingQueue<String> sqlBatch = new ArrayBlockingQueue<>(BATCH_SIZE * 2);
......
...@@ -108,7 +108,7 @@ public class SyncCustomerDataSource { ...@@ -108,7 +108,7 @@ public class SyncCustomerDataSource {
Collector<String> out) throws Exception { Collector<String> out) throws Exception {
Tuple3<String, String, Long> maxTsElement = null; Tuple3<String, String, Long> maxTsElement = null;
for (Tuple3<String, String, Long> element : elements) { for (Tuple3<String, String, Long> element : elements) {
if (maxTsElement == null || element.f2 > maxTsElement.f2) { if (maxTsElement == null || element.f2 >= maxTsElement.f2) {
maxTsElement = element; maxTsElement = element;
} }
} }
......
...@@ -15,7 +15,7 @@ public class EtlUtils { ...@@ -15,7 +15,7 @@ public class EtlUtils {
properties.setProperty("auto.offset.reset", "earliest"); properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password)); properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password));
properties.setProperty("security.protocol", "SASL_PLAINTEXT"); properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-256"); properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
properties.setProperty("fetch.max.bytes", "1048576"); //1M properties.setProperty("fetch.max.bytes", "1048576"); //1M
properties.setProperty("flink.consumer.max.fetch.size", "524288");//512k properties.setProperty("flink.consumer.max.fetch.size", "524288");//512k
properties.setProperty("session.timeout.ms", "30000"); properties.setProperty("session.timeout.ms", "30000");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment