Commit 1b7bd055 authored by liaowenwu's avatar liaowenwu

增加并行度

parent 7dbb7b90
...@@ -31,8 +31,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> { ...@@ -31,8 +31,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(6, 6, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
executorService = new ThreadPoolExecutor(10, 10, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
//初始化获取配置 //初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database()); String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
//System.out.println(configTidbUrl); //System.out.println(configTidbUrl);
...@@ -42,7 +41,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> { ...@@ -42,7 +41,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
dataSource.setPassword(envProps.getDb_password()); dataSource.setPassword(envProps.getDb_password());
dataSource.setUrl(configTidbUrl); dataSource.setUrl(configTidbUrl);
dataSource.setMaxActive(30); dataSource.setMaxActive(30);
dataSource.setInitialSize(10); dataSource.setInitialSize(20);
dataSource.setTestWhileIdle(true); dataSource.setTestWhileIdle(true);
dataSource.setMaxWait(20000); dataSource.setMaxWait(20000);
dataSource.setValidationQuery("select 1"); dataSource.setValidationQuery("select 1");
......
...@@ -48,7 +48,7 @@ public class SyncCustomerDataSource { ...@@ -48,7 +48,7 @@ public class SyncCustomerDataSource {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1); //env.setParallelism(3);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 30000)); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 30000));
env.enableCheckpointing(300000); env.enableCheckpointing(300000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
...@@ -78,7 +78,7 @@ public class SyncCustomerDataSource { ...@@ -78,7 +78,7 @@ public class SyncCustomerDataSource {
} }
SingleOutputStreamOperator<String> kafkaSource = env.addSource(kafkaConsumer) SingleOutputStreamOperator<String> kafkaSource = env.addSource(kafkaConsumer)
.setParallelism(1) //.setParallelism(1)
.name("kafka-source") .name("kafka-source")
.uid("kafka-source"); .uid("kafka-source");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment