Commit 73259a30 authored by liaowenwu's avatar liaowenwu

修改kafka连接

parent 0a0fd085
......@@ -30,7 +30,7 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
EnvProperties envProps;
private static transient ExecutorService executorService;
private static transient DruidDataSource dataSource;
private static final int BATCH_SIZE = 2000;
private static final int BATCH_SIZE = 500;
private static final int FLUSH_INTERVAL = 500;
private static final int INIT_CAPACITY = (int)(BATCH_SIZE / 0.75 + 1);
private ArrayBlockingQueue<String> sqlBatch = new ArrayBlockingQueue<>(BATCH_SIZE * 2);
......@@ -43,7 +43,7 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
@Override
public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(15, 20, 20, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000));
executorService = new ThreadPoolExecutor(4, 4, 20, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000));
// 初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
dataSource = new DruidDataSource();
......
......@@ -15,7 +15,7 @@ public class EtlUtils {
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password));
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-256");
properties.setProperty("fetch.max.bytes", "1048576"); //1M
properties.setProperty("flink.consumer.max.fetch.size", "524288");//512k
properties.setProperty("session.timeout.ms", "30000");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment