Commit 4a492dfd authored by liaowenwu's avatar liaowenwu

增加并行度

parent b9829263
......@@ -36,7 +36,7 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj
@Override
public void open(Configuration parameters) throws Exception {
//初始化线程池
executorService = new ThreadPoolExecutor(4 , 4, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
executorService = new ThreadPoolExecutor(10 , 10, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
}
@Override
......
......@@ -32,7 +32,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
@Override
public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(4, 4, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
executorService = new ThreadPoolExecutor(10, 10, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
//初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
//System.out.println(configTidbUrl);
......
......@@ -20,6 +20,12 @@ public class EtlUtils {
properties.setProperty("max.partition.fetch.bytes", "2097152000");
properties.setProperty("flink.consumer.max.fetch.size", "2097152000");
properties.setProperty("frame.size", "2097152000");
properties.setProperty("session.timeout.ms", "30000");
properties.setProperty("heartbeat.interval.ms", "10000");
properties.setProperty("request.timeout.ms", "60000");
properties.setProperty("retries", "3");
properties.setProperty("retry.backoff.ms", "5000");
return properties;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment