Commit ac8354d0 authored by liaowenwu's avatar liaowenwu

增加并行度

parent 1b7bd055
...@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit; ...@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObject, Tuple3<String,String,Long>> { public class AsyncMysqlDataTransferFunctionNew extends RichAsyncFunction<JSONObject, Tuple3<String,String,Long>> {
static Logger logger = LoggerFactory.getLogger(AsyncMysqlDataTransferFunctionNew.class); //static Logger logger = LoggerFactory.getLogger(AsyncMysqlDataTransferFunctionNew.class);
//数据库连接信息 //数据库连接信息
EnvProperties dbInfoMap; EnvProperties dbInfoMap;
......
...@@ -13,34 +13,6 @@ import java.util.Map; ...@@ -13,34 +13,6 @@ import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class GroupTsProcessWindowFunction extends ProcessWindowFunction<Tuple3<JSONObject, String, Long>, JSONObject, String, TimeWindow> { public class GroupTsProcessWindowFunction extends ProcessWindowFunction<Tuple3<JSONObject, String, Long>, JSONObject, String, TimeWindow> {
/*@Override
public void process(String aLong, ProcessWindowFunction<Tuple3<JSONObject, String, Long>, JSONObject, Long,
TimeWindow>.Context context, Iterable<Tuple3<JSONObject, String, Long>> elements, Collector<JSONObject> out) throws Exception {
List<Tuple3<JSONObject, String, Long>> list = CollUtil.list(false, elements);
Map<Long, List<Tuple3<JSONObject, String, Long>>> tsGroupMap = list.stream().collect(Collectors.groupingBy(x -> x.f2));
Long max = CollUtil.max(tsGroupMap.keySet());
List<Tuple3<JSONObject, String, Long>> resList = tsGroupMap.get(max);
if(resList.size() == 2){
JSONObject insertJson = new JSONObject();
JSONObject deleteJson = new JSONObject();
for (Tuple3<JSONObject, String, Long> rs : resList) {
if("INSERT".equals(rs.f0.getString("type"))){
insertJson = rs.f0;
}
if("DELETE".equals(rs.f0.getString("type"))){
deleteJson = rs.f0;
}
}
if(StrUtil.isNotBlank(insertJson.getString("type")) && StrUtil.isNotBlank(deleteJson.getString("type"))){
insertJson.put("type","UPDATE");
insertJson.put("old", deleteJson.getJSONArray("data"));
out.collect(insertJson);
return;
}
}
out.collect(resList.get(0).f0);
}*/
@Override @Override
public void process(String key, Context context, Iterable<Tuple3<JSONObject, String, Long>> elements, Collector<JSONObject> out) throws Exception { public void process(String key, Context context, Iterable<Tuple3<JSONObject, String, Long>> elements, Collector<JSONObject> out) throws Exception {
......
...@@ -31,7 +31,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> { ...@@ -31,7 +31,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(6, 6, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); executorService = new ThreadPoolExecutor(10, 10, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
//初始化获取配置 //初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database()); String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
//System.out.println(configTidbUrl); //System.out.println(configTidbUrl);
...@@ -56,15 +56,11 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> { ...@@ -56,15 +56,11 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
@Override @Override
public void invoke(String value, Context context) throws Exception { public void invoke(String value, Context context) throws Exception {
executorService.execute(() -> { executorService.execute(() -> {
try {
executeSql(value); executeSql(value);
}catch (Exception e){
e.printStackTrace();
}
}); });
} }
private void executeSql(String sql) throws Exception{ private void executeSql(String sql){
Connection connection = null; Connection connection = null;
PreparedStatement pt = null; PreparedStatement pt = null;
try { try {
...@@ -73,8 +69,8 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> { ...@@ -73,8 +69,8 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
pt.execute(); pt.execute();
} catch (Exception e) { } catch (Exception e) {
System.out.println("sql报错----->" + sql); System.out.println("sql报错----->" + sql);
e.printStackTrace();
logger.error("------错误时间:{}-----,sql:{}--------异常:{}", DateUtil.now(),sql,e.getMessage()); logger.error("------错误时间:{}-----,sql:{}--------异常:{}", DateUtil.now(),sql,e.getMessage());
logger.error("异常信息:",e);
SqlErrorLog errorLog = new SqlErrorLog(new Date(), sql, e.getMessage()); SqlErrorLog errorLog = new SqlErrorLog(new Date(), sql, e.getMessage());
try { try {
writeErrLog(errorLog); writeErrLog(errorLog);
...@@ -82,12 +78,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> { ...@@ -82,12 +78,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
logger.error("错误日志保存异常 -> {}", re.getMessage()); logger.error("错误日志保存异常 -> {}", re.getMessage());
} }
} finally { } finally {
if (pt != null) { DbUtil.close(pt, connection);
pt.close();
}
if (connection != null) {
connection.close();
}
} }
} }
...@@ -111,7 +102,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> { ...@@ -111,7 +102,7 @@ public class MysqlDataTransferSink extends RichSinkFunction<String> {
}catch (Exception e){ }catch (Exception e){
logger.error("错误日志保存异常 -> {}", e.getMessage()); logger.error("错误日志保存异常 -> {}", e.getMessage());
}finally { }finally {
DbUtil.close(conn, pt); DbUtil.close(pt, conn);
} }
} }
} }
...@@ -88,7 +88,6 @@ public class SyncCustomerDataSource { ...@@ -88,7 +88,6 @@ public class SyncCustomerDataSource {
.filter(new FilterFunction<JSONObject>() { .filter(new FilterFunction<JSONObject>() {
@Override @Override
public boolean filter(JSONObject value) throws Exception { public boolean filter(JSONObject value) throws Exception {
return !value.getBoolean("isDdl") && !"TIDB_WATERMARK".equals(value.getString("type")); return !value.getBoolean("isDdl") && !"TIDB_WATERMARK".equals(value.getString("type"));
} }
}) })
...@@ -100,11 +99,11 @@ public class SyncCustomerDataSource { ...@@ -100,11 +99,11 @@ public class SyncCustomerDataSource {
SingleOutputStreamOperator<Tuple3<JSONObject, String, Long>> tsGroupStream = canalJsonStream.map(new CanalMapToTsGroupFunction()); SingleOutputStreamOperator<Tuple3<JSONObject, String, Long>> tsGroupStream = canalJsonStream.map(new CanalMapToTsGroupFunction());
SingleOutputStreamOperator<JSONObject> process = tsGroupStream.keyBy(x -> x.f1) SingleOutputStreamOperator<JSONObject> process = tsGroupStream.keyBy(x -> x.f1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1))) .window(TumblingProcessingTimeWindows.of(Time.milliseconds(300)))
.process(new GroupTsProcessWindowFunction()); .process(new GroupTsProcessWindowFunction());
SingleOutputStreamOperator<Tuple3<String, String, Long>> sqlResultStream1 = AsyncDataStream.orderedWait(process, SingleOutputStreamOperator<Tuple3<String, String, Long>> sqlResultStream1 = AsyncDataStream.orderedWait(process,
new AsyncMysqlDataTransferFunctionNew(envProps), 600L, TimeUnit.SECONDS) new AsyncMysqlDataTransferFunctionNew(envProps), 300L, TimeUnit.SECONDS)
.filter(new FilterFunction<Tuple3<String, String, Long>>() { .filter(new FilterFunction<Tuple3<String, String, Long>>() {
@Override @Override
public boolean filter(Tuple3<String, String, Long> value) throws Exception { public boolean filter(Tuple3<String, String, Long> value) throws Exception {
...@@ -117,7 +116,7 @@ public class SyncCustomerDataSource { ...@@ -117,7 +116,7 @@ public class SyncCustomerDataSource {
//sqlResultStream1.print("async sql==>"); //sqlResultStream1.print("async sql==>");
SingleOutputStreamOperator<String> groupWindowSqlResultStream = sqlResultStream1.keyBy(value -> value.f1) SingleOutputStreamOperator<String> groupWindowSqlResultStream = sqlResultStream1.keyBy(value -> value.f1)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))) .window(TumblingProcessingTimeWindows.of(Time.milliseconds(300)))
.process(new ProcessWindowFunction<Tuple3<String, String, Long>, String, String, TimeWindow>() { .process(new ProcessWindowFunction<Tuple3<String, String, Long>, String, String, TimeWindow>() {
@Override @Override
public void process(String s, ProcessWindowFunction<Tuple3<String, String, Long>, String, String, public void process(String s, ProcessWindowFunction<Tuple3<String, String, Long>, String, String,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment