Commit 3c04f7b1 authored by liaowenwu's avatar liaowenwu

优化代码

parent 557e449f
package com.dsk.flink.dsc.common.sink; package com.dsk.flink.dsc.common.sink;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Snowflake; import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil; import cn.hutool.core.util.RandomUtil;
...@@ -18,12 +17,7 @@ import java.sql.Statement; ...@@ -18,12 +17,7 @@ import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
...@@ -36,7 +30,7 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> { ...@@ -36,7 +30,7 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
EnvProperties envProps; EnvProperties envProps;
private static transient ExecutorService executorService; private static transient ExecutorService executorService;
private static transient DruidDataSource dataSource; private static transient DruidDataSource dataSource;
private static final int BATCH_SIZE = 500; private static final int BATCH_SIZE = 1000;
private static final int FLUSH_INTERVAL = 500; private static final int FLUSH_INTERVAL = 500;
private ArrayBlockingQueue<String> sqlBatch = new ArrayBlockingQueue<>(BATCH_SIZE * 2); private ArrayBlockingQueue<String> sqlBatch = new ArrayBlockingQueue<>(BATCH_SIZE * 2);
private static transient ScheduledExecutorService scheduledExecutorService; private static transient ScheduledExecutorService scheduledExecutorService;
...@@ -48,7 +42,7 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> { ...@@ -48,7 +42,7 @@ public class MysqlDataTransferSinkBatch extends RichSinkFunction<String> {
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(10, 10, 20, TimeUnit.MINUTES, new ArrayBlockingQueue<>(500)); executorService = new ThreadPoolExecutor(10, 10, 20, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000));
// 初始化获取配置 // 初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database()); String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
dataSource = new DruidDataSource(); dataSource = new DruidDataSource();
......
...@@ -16,7 +16,7 @@ public class EtlUtils { ...@@ -16,7 +16,7 @@ public class EtlUtils {
properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password)); properties.setProperty("sasl.jaas.config", getSaslJaasConfig(username,password));
properties.setProperty("security.protocol", "SASL_PLAINTEXT"); properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512"); properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
/*properties.setProperty("fetch.max.bytes", "1048576"); //1M properties.setProperty("fetch.max.bytes", "1048576"); //1M
properties.setProperty("flink.consumer.max.fetch.size", "524288");//512k properties.setProperty("flink.consumer.max.fetch.size", "524288");//512k
properties.setProperty("session.timeout.ms", "30000"); properties.setProperty("session.timeout.ms", "30000");
properties.setProperty("heartbeat.interval.ms", "10000"); properties.setProperty("heartbeat.interval.ms", "10000");
...@@ -24,7 +24,7 @@ public class EtlUtils { ...@@ -24,7 +24,7 @@ public class EtlUtils {
properties.setProperty("retries", "3"); properties.setProperty("retries", "3");
properties.setProperty("retry.backoff.ms", "5000"); properties.setProperty("retry.backoff.ms", "5000");
properties.setProperty("receive.buffer.bytes", "65536"); //64k properties.setProperty("receive.buffer.bytes", "65536"); //64k
properties.setProperty("max.poll.records", "10");*/ properties.setProperty("max.poll.records", "10");
return properties; return properties;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment