Commit 01f8cec1 authored by liaowenwu's avatar liaowenwu

优化

parent ac8354d0
...@@ -36,7 +36,7 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj ...@@ -36,7 +36,7 @@ public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObj
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
//初始化线程池 //初始化线程池
executorService = new ThreadPoolExecutor(10 , 10, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); executorService = new ThreadPoolExecutor(12 , 12, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
} }
@Override @Override
......
...@@ -22,8 +22,8 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> { ...@@ -22,8 +22,8 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
static Logger logger = LoggerFactory.getLogger(MysqlDataTransferSink.class); static Logger logger = LoggerFactory.getLogger(MysqlDataTransferSink.class);
EnvProperties envProps; EnvProperties envProps;
ExecutorService executorService; private static transient ExecutorService executorService;
DruidDataSource dataSource; private static transient DruidDataSource dataSource;
public MysqlDataTransferSink(EnvProperties envProps) { public MysqlDataTransferSink(EnvProperties envProps) {
this.envProps = envProps; this.envProps = envProps;
...@@ -68,8 +68,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> { ...@@ -68,8 +68,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
pt = connection.prepareStatement(sql); pt = connection.prepareStatement(sql);
pt.execute(); pt.execute();
} catch (Exception e) { } catch (Exception e) {
System.out.println("sql报错----->" + sql); //logger.error("------错误时间:{}-----,sql:{}--------异常:{}", DateUtil.now(),sql,e.getMessage());
logger.error("------错误时间:{}-----,sql:{}--------异常:{}", DateUtil.now(),sql,e.getMessage());
logger.error("异常信息:",e); logger.error("异常信息:",e);
SqlErrorLog errorLog = new SqlErrorLog(new Date(), sql, e.getMessage()); SqlErrorLog errorLog = new SqlErrorLog(new Date(), sql, e.getMessage());
try { try {
......
...@@ -78,7 +78,7 @@ public class SyncCustomerDataSource { ...@@ -78,7 +78,7 @@ public class SyncCustomerDataSource {
} }
SingleOutputStreamOperator<String> kafkaSource = env.addSource(kafkaConsumer) SingleOutputStreamOperator<String> kafkaSource = env.addSource(kafkaConsumer)
//.setParallelism(1) .setParallelism(1)
.name("kafka-source") .name("kafka-source")
.uid("kafka-source"); .uid("kafka-source");
...@@ -99,11 +99,11 @@ public class SyncCustomerDataSource { ...@@ -99,11 +99,11 @@ public class SyncCustomerDataSource {
SingleOutputStreamOperator<Tuple3<JSONObject, String, Long>> tsGroupStream = canalJsonStream.map(new CanalMapToTsGroupFunction()); SingleOutputStreamOperator<Tuple3<JSONObject, String, Long>> tsGroupStream = canalJsonStream.map(new CanalMapToTsGroupFunction());
SingleOutputStreamOperator<JSONObject> process = tsGroupStream.keyBy(x -> x.f1) SingleOutputStreamOperator<JSONObject> process = tsGroupStream.keyBy(x -> x.f1)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(300))) .window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
.process(new GroupTsProcessWindowFunction()); .process(new GroupTsProcessWindowFunction());
SingleOutputStreamOperator<Tuple3<String, String, Long>> sqlResultStream1 = AsyncDataStream.orderedWait(process, SingleOutputStreamOperator<Tuple3<String, String, Long>> sqlResultStream1 = AsyncDataStream.orderedWait(process,
new AsyncMysqlDataTransferFunctionNew(envProps), 300L, TimeUnit.SECONDS) new AsyncMysqlDataTransferFunctionNew(envProps), 120L, TimeUnit.SECONDS)
.filter(new FilterFunction<Tuple3<String, String, Long>>() { .filter(new FilterFunction<Tuple3<String, String, Long>>() {
@Override @Override
public boolean filter(Tuple3<String, String, Long> value) throws Exception { public boolean filter(Tuple3<String, String, Long> value) throws Exception {
...@@ -116,7 +116,7 @@ public class SyncCustomerDataSource { ...@@ -116,7 +116,7 @@ public class SyncCustomerDataSource {
//sqlResultStream1.print("async sql==>"); //sqlResultStream1.print("async sql==>");
SingleOutputStreamOperator<String> groupWindowSqlResultStream = sqlResultStream1.keyBy(value -> value.f1) SingleOutputStreamOperator<String> groupWindowSqlResultStream = sqlResultStream1.keyBy(value -> value.f1)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(300))) .window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
.process(new ProcessWindowFunction<Tuple3<String, String, Long>, String, String, TimeWindow>() { .process(new ProcessWindowFunction<Tuple3<String, String, Long>, String, String, TimeWindow>() {
@Override @Override
public void process(String s, ProcessWindowFunction<Tuple3<String, String, Long>, String, String, public void process(String s, ProcessWindowFunction<Tuple3<String, String, Long>, String, String,
......
...@@ -16,15 +16,17 @@ public class EtlUtils { ...@@ -16,15 +16,17 @@ public class EtlUtils {
properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password)); properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password));
properties.setProperty("security.protocol", "SASL_PLAINTEXT"); properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
properties.setProperty("fetch.max.bytes", "2097152000"); properties.setProperty("fetch.max.bytes", "10485760"); //10M
properties.setProperty("max.partition.fetch.bytes", "2097152000"); //properties.setProperty("max.partition.fetch.bytes", "104857600");
properties.setProperty("flink.consumer.max.fetch.size", "2097152000"); properties.setProperty("flink.consumer.max.fetch.size", "5242880");//5m
properties.setProperty("frame.size", "2097152000"); //properties.setProperty("frame.size", "2097152000");
properties.setProperty("session.timeout.ms", "30000"); properties.setProperty("session.timeout.ms", "30000");
properties.setProperty("heartbeat.interval.ms", "10000"); properties.setProperty("heartbeat.interval.ms", "10000");
properties.setProperty("request.timeout.ms", "60000"); properties.setProperty("request.timeout.ms", "60000");
properties.setProperty("retries", "3"); properties.setProperty("retries", "3");
properties.setProperty("retry.backoff.ms", "5000"); properties.setProperty("retry.backoff.ms", "5000");
properties.setProperty("receive.buffer.bytes", "2621440"); //2.5m
properties.setProperty("max.poll.records", "50");
return properties; return properties;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment