Unverified Commit b3ad0a5f authored by Kerwin's avatar Kerwin Committed by GitHub

[Optimization][Style] Configure global checkstyle validation. (#953)

* Configure global checkstyle validation.

* Configure global checkstyle validation.
parent c6e87d9d
...@@ -53,7 +53,7 @@ jobs: ...@@ -53,7 +53,7 @@ jobs:
${{ runner.os }}-maven- ${{ runner.os }}-maven-
- name: Check Style - name: Check Style
run: | run: |
./mvnw -T 2C -B checkstyle:check --no-snapshot-updates ./mvnw -T 2C -B --no-snapshot-updates clean checkstyle:check
- name: Build and Package - name: Build and Package
run: | run: |
./mvnw -B clean install \ ./mvnw -B clean install \
......
...@@ -19,19 +19,26 @@ ...@@ -19,19 +19,26 @@
package com.dlink.controller; package com.dlink.controller;
import com.dlink.common.result.ProTableResult; import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result; import com.dlink.common.result.Result;
import com.dlink.model.FragmentVariable; import com.dlink.model.FragmentVariable;
import com.dlink.service.FragmentVariableService; import com.dlink.service.FragmentVariableService;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
/** /**
* FragmentVariableController * FragmentVariableController
* *
......
...@@ -19,11 +19,12 @@ ...@@ -19,11 +19,12 @@
package com.dlink.dto; package com.dlink.dto;
import com.dlink.assertion.Asserts;
import com.dlink.job.JobConfig;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.dlink.job.JobConfig;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
......
...@@ -21,6 +21,7 @@ package com.dlink.mapper; ...@@ -21,6 +21,7 @@ package com.dlink.mapper;
import com.dlink.db.mapper.SuperMapper; import com.dlink.db.mapper.SuperMapper;
import com.dlink.model.FragmentVariable; import com.dlink.model.FragmentVariable;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
/** /**
......
...@@ -19,10 +19,12 @@ ...@@ -19,10 +19,12 @@
package com.dlink.model; package com.dlink.model;
import com.dlink.db.model.SuperEntity;
import com.baomidou.mybatisplus.annotation.FieldFill; import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import com.dlink.db.model.SuperEntity;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
......
...@@ -19,13 +19,12 @@ ...@@ -19,13 +19,12 @@
package com.dlink.service; package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.model.FragmentVariable;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.db.service.ISuperService;
import com.dlink.model.FragmentVariable;
/** /**
* FragmentVariableService * FragmentVariableService
* *
......
...@@ -19,6 +19,11 @@ ...@@ -19,6 +19,11 @@
package com.dlink.service.impl; package com.dlink.service.impl;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.FragmentVariableMapper;
import com.dlink.model.FragmentVariable;
import com.dlink.service.FragmentVariableService;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -26,10 +31,6 @@ import java.util.Map; ...@@ -26,10 +31,6 @@ import java.util.Map;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.FragmentVariableMapper;
import com.dlink.model.FragmentVariable;
import com.dlink.service.FragmentVariableService;
/** /**
......
...@@ -119,6 +119,8 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper, ...@@ -119,6 +119,8 @@ public class JobInstanceServiceImpl extends SuperServiceImpl<JobInstanceMapper,
break; break;
case UNKNOWN: case UNKNOWN:
jobInstanceStatus.setUnknown(counts); jobInstanceStatus.setUnknown(counts);
break;
default:
} }
} }
jobInstanceStatus.setAll(total); jobInstanceStatus.setAll(total);
......
...@@ -19,21 +19,6 @@ ...@@ -19,21 +19,6 @@
package com.dlink.service.impl; package com.dlink.service.impl;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.dlink.api.FlinkAPI; import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.config.Dialect; import com.dlink.config.Dialect;
...@@ -80,6 +65,22 @@ import com.dlink.session.SessionPool; ...@@ -80,6 +65,22 @@ import com.dlink.session.SessionPool;
import com.dlink.sql.FlinkQuery; import com.dlink.sql.FlinkQuery;
import com.dlink.utils.RunTimeUtil; import com.dlink.utils.RunTimeUtil;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/** /**
* StudioServiceImpl * StudioServiceImpl
* *
...@@ -222,15 +223,19 @@ public class StudioServiceImpl implements StudioService { ...@@ -222,15 +223,19 @@ public class StudioServiceImpl implements StudioService {
private List<SqlExplainResult> explainCommonSql(StudioExecuteDTO studioExecuteDTO) { private List<SqlExplainResult> explainCommonSql(StudioExecuteDTO studioExecuteDTO) {
if (Asserts.isNull(studioExecuteDTO.getDatabaseId())) { if (Asserts.isNull(studioExecuteDTO.getDatabaseId())) {
return new ArrayList<SqlExplainResult>() {{ return new ArrayList<SqlExplainResult>() {
{
add(SqlExplainResult.fail(studioExecuteDTO.getStatement(), "请指定数据源")); add(SqlExplainResult.fail(studioExecuteDTO.getStatement(), "请指定数据源"));
}}; }
};
} else { } else {
DataBase dataBase = dataBaseService.getById(studioExecuteDTO.getDatabaseId()); DataBase dataBase = dataBaseService.getById(studioExecuteDTO.getDatabaseId());
if (Asserts.isNull(dataBase)) { if (Asserts.isNull(dataBase)) {
return new ArrayList<SqlExplainResult>() {{ return new ArrayList<SqlExplainResult>() {
{
add(SqlExplainResult.fail(studioExecuteDTO.getStatement(), "数据源不存在")); add(SqlExplainResult.fail(studioExecuteDTO.getStatement(), "数据源不存在"));
}}; }
};
} }
Driver driver = Driver.build(dataBase.getDriverConfig()); Driver driver = Driver.build(dataBase.getDriverConfig());
List<SqlExplainResult> sqlExplainResults = driver.explain(studioExecuteDTO.getStatement()); List<SqlExplainResult> sqlExplainResults = driver.explain(studioExecuteDTO.getStatement());
...@@ -314,9 +319,9 @@ public class StudioServiceImpl implements StudioService { ...@@ -314,9 +319,9 @@ public class StudioServiceImpl implements StudioService {
return null; return null;
} }
if (studioCADTO.getDialect().equalsIgnoreCase("doris")) { if (studioCADTO.getDialect().equalsIgnoreCase("doris")) {
return com.dlink.explainer.sqlLineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), "mysql", dataBase.getDriverConfig()); return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), "mysql", dataBase.getDriverConfig());
} else { } else {
return com.dlink.explainer.sqlLineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), studioCADTO.getDialect().toLowerCase(), dataBase.getDriverConfig()); return com.dlink.explainer.sqllineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), studioCADTO.getDialect().toLowerCase(), dataBase.getDriverConfig());
} }
} else { } else {
addFlinkSQLEnv(studioCADTO); addFlinkSQLEnv(studioCADTO);
......
...@@ -19,15 +19,15 @@ ...@@ -19,15 +19,15 @@
package com.dlink.app; package com.dlink.app;
import java.io.IOException;
import java.util.Map;
import com.dlink.app.db.DBConfig; import com.dlink.app.db.DBConfig;
import com.dlink.app.flinksql.Submiter; import com.dlink.app.flinksql.Submiter;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkParamConstant; import com.dlink.constant.FlinkParamConstant;
import com.dlink.utils.FlinkBaseUtil; import com.dlink.utils.FlinkBaseUtil;
import java.io.IOException;
import java.util.Map;
/** /**
* MainApp * MainApp
* *
......
...@@ -28,6 +28,7 @@ import com.dlink.executor.ExecutorSetting; ...@@ -28,6 +28,7 @@ import com.dlink.executor.ExecutorSetting;
import com.dlink.interceptor.FlinkInterceptor; import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType; import com.dlink.parser.SqlType;
import com.dlink.trans.Operations; import com.dlink.trans.Operations;
import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.CheckpointingOptions;
import java.io.IOException; import java.io.IOException;
...@@ -73,9 +74,7 @@ public class Submiter { ...@@ -73,9 +74,7 @@ public class Submiter {
try { try {
statement = DBUtil.getOneByID(getQuerySQL(id), config); statement = DBUtil.getOneByID(getQuerySQL(id), config);
} catch (IOException | SQLException e) { } catch (IOException | SQLException e) {
logger.error("{} --> 获取 FlinkSQL 配置异常,ID 为 {}, \n" + logger.error("{} --> 获取 FlinkSQL 配置异常,ID 为 {}, 连接信息为:{} ,异常信息为:{} ", LocalDateTime.now(), id, config.toString(), e.getMessage(), e);
"连接信息为:{} \n" +
"异常信息为:{} ", LocalDateTime.now(), id, config.toString(), e.getMessage(), e);
} }
return statement; return statement;
} }
...@@ -85,9 +84,7 @@ public class Submiter { ...@@ -85,9 +84,7 @@ public class Submiter {
try { try {
task = DBUtil.getMapByID(getTaskInfo(id), config); task = DBUtil.getMapByID(getTaskInfo(id), config);
} catch (IOException | SQLException e) { } catch (IOException | SQLException e) {
logger.error("{} --> 获取 FlinkSQL 配置异常,ID 为 {}, \n" + logger.error("{} --> 获取 FlinkSQL 配置异常,ID 为 {}, 连接信息为:{} ,异常信息为:{} ", LocalDateTime.now(), id, config.toString(), e.getMessage(), e);
"连接信息为:{} \n" +
"异常信息为:{} ", LocalDateTime.now(), id, config.toString(), e.getMessage(), e);
} }
return task; return task;
} }
......
...@@ -190,6 +190,7 @@ public abstract class AbstractSinkBuilder { ...@@ -190,6 +190,7 @@ public abstract class AbstractSinkBuilder {
} }
out.collect(uagenericRowData); out.collect(uagenericRowData);
break; break;
default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
......
...@@ -141,6 +141,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -141,6 +141,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
out.collect(uarow); out.collect(uarow);
break; break;
default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
......
...@@ -190,6 +190,7 @@ public abstract class AbstractSinkBuilder { ...@@ -190,6 +190,7 @@ public abstract class AbstractSinkBuilder {
} }
out.collect(uagenericRowData); out.collect(uagenericRowData);
break; break;
default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
......
...@@ -104,6 +104,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -104,6 +104,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
case "latest-offset": case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default:
} }
} else { } else {
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
......
...@@ -141,6 +141,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -141,6 +141,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
out.collect(uarow); out.collect(uarow);
break; break;
default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
......
...@@ -190,6 +190,7 @@ public abstract class AbstractSinkBuilder { ...@@ -190,6 +190,7 @@ public abstract class AbstractSinkBuilder {
} }
out.collect(uagenericRowData); out.collect(uagenericRowData);
break; break;
default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
......
...@@ -123,6 +123,7 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui ...@@ -123,6 +123,7 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
before = (Map) value.get("before"); before = (Map) value.get("before");
convertAttr(columnNameList, columnTypeList, before, value.get("op").toString(), 1, schemaName, tableName, tsMs); convertAttr(columnNameList, columnTypeList, before, value.get("op").toString(), 1, schemaName, tableName, tsMs);
break; break;
default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Exception:", e); logger.error("SchameTable: {} - Exception:", e);
......
...@@ -132,6 +132,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -132,6 +132,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
case "latest-offset": case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default:
} }
} else { } else {
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
......
...@@ -103,6 +103,7 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -103,6 +103,7 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
case "latest-offset": case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default:
} }
} else { } else {
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
......
...@@ -141,6 +141,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -141,6 +141,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
out.collect(uarow); out.collect(uarow);
break; break;
default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
......
...@@ -190,6 +190,7 @@ public abstract class AbstractSinkBuilder { ...@@ -190,6 +190,7 @@ public abstract class AbstractSinkBuilder {
} }
out.collect(uagenericRowData); out.collect(uagenericRowData);
break; break;
default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
......
...@@ -121,6 +121,7 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui ...@@ -121,6 +121,7 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
before = (Map) value.get("before"); before = (Map) value.get("before");
convertAttr(columnNameList, columnTypeList, before,value.get("op").toString(), 1,schemaName, tableName, tsMs); convertAttr(columnNameList, columnTypeList, before,value.get("op").toString(), 1,schemaName, tableName, tsMs);
break; break;
default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Exception:", e); logger.error("SchameTable: {} - Exception:", e);
......
...@@ -132,6 +132,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -132,6 +132,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
case "latest-offset": case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default:
} }
} else { } else {
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
......
...@@ -103,6 +103,7 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -103,6 +103,7 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
case "latest-offset": case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default:
} }
} else { } else {
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
* *
*/ */
package com.dlink.cdc.postgres; package com.dlink.cdc.postgres;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
...@@ -26,8 +25,7 @@ import com.dlink.cdc.CDCBuilder; ...@@ -26,8 +25,7 @@ import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant; import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant; import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...@@ -36,6 +34,9 @@ import java.util.List; ...@@ -36,6 +34,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
/** /**
* postgresCDCBuilder * postgresCDCBuilder
* *
...@@ -44,8 +45,8 @@ import java.util.Properties; ...@@ -44,8 +45,8 @@ import java.util.Properties;
**/ **/
public class PostgresCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { public class PostgresCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
private final static String KEY_WORD = "postgres-cdc"; private static final String KEY_WORD = "postgres-cdc";
private final static String METADATA_TYPE = "PostgreSql"; private static final String METADATA_TYPE = "PostgreSql";
public PostgresCDCBuilder() { public PostgresCDCBuilder() {
} }
......
...@@ -141,6 +141,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -141,6 +141,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
out.collect(uarow); out.collect(uarow);
break; break;
default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
......
...@@ -6,13 +6,9 @@ import com.dlink.cdc.CDCBuilder; ...@@ -6,13 +6,9 @@ import com.dlink.cdc.CDCBuilder;
import com.dlink.constant.ClientConstant; import com.dlink.constant.ClientConstant;
import com.dlink.constant.FlinkParamConstant; import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig; import com.dlink.model.FlinkCDCConfig;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
...@@ -21,17 +17,22 @@ import java.util.List; ...@@ -21,17 +17,22 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
/** /**
* sql server CDC * sql server CDC
* *
* @author 郑文豪 * @author 郑文豪
* @date 2022/8/12 18:00
*/ */
public class SqlServerCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { public class SqlServerCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
protected static final Logger logger = LoggerFactory.getLogger(SqlServerCDCBuilder.class); protected static final Logger logger = LoggerFactory.getLogger(SqlServerCDCBuilder.class);
private final static String KEY_WORD = "sqlserver-cdc"; private static final String KEY_WORD = "sqlserver-cdc";
private final static String METADATA_TYPE = "SqlServer"; private static final String METADATA_TYPE = "SqlServer";
public SqlServerCDCBuilder() { public SqlServerCDCBuilder() {
} }
...@@ -55,8 +56,8 @@ public class SqlServerCDCBuilder extends AbstractCDCBuilder implements CDCBuilde ...@@ -55,8 +56,8 @@ public class SqlServerCDCBuilder extends AbstractCDCBuilder implements CDCBuilde
String database = config.getDatabase(); String database = config.getDatabase();
Properties debeziumProperties = new Properties(); Properties debeziumProperties = new Properties();
// 为部分转换添加默认值 // 为部分转换添加默认值
// debeziumProperties.setProperty("bigint.unsigned.handling.mode", "long"); //debeziumProperties.setProperty("bigint.unsigned.handling.mode", "long");
// debeziumProperties.setProperty("decimal.handling.mode", "string"); //debeziumProperties.setProperty("decimal.handling.mode", "string");
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) { for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) { if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
debeziumProperties.setProperty(entry.getKey(), entry.getValue()); debeziumProperties.setProperty(entry.getKey(), entry.getValue());
...@@ -86,7 +87,7 @@ public class SqlServerCDCBuilder extends AbstractCDCBuilder implements CDCBuilde ...@@ -86,7 +87,7 @@ public class SqlServerCDCBuilder extends AbstractCDCBuilder implements CDCBuilde
} else { } else {
sourceBuilder.tableList(new String[0]); sourceBuilder.tableList(new String[0]);
} }
// sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema()); //sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.deserializer(new SqlServerJsonDebeziumDeserializationSchema()); sourceBuilder.deserializer(new SqlServerJsonDebeziumDeserializationSchema());
if (Asserts.isNotNullString(config.getStartupMode())) { if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) { switch (config.getStartupMode().toLowerCase()) {
...@@ -96,6 +97,7 @@ public class SqlServerCDCBuilder extends AbstractCDCBuilder implements CDCBuilde ...@@ -96,6 +97,7 @@ public class SqlServerCDCBuilder extends AbstractCDCBuilder implements CDCBuilde
case "latest-offset": case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default:
} }
} else { } else {
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
......
package com.dlink.cdc.sqlserver; package com.dlink.cdc.sqlserver;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.ConverterType;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
...@@ -12,12 +8,16 @@ import java.nio.charset.StandardCharsets; ...@@ -12,12 +8,16 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.ConverterType;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
/** /**
* @version 1.0 * @version 1.0
* @className: com.dlink.cdc.mysql.MysqlJsonDebeziumDeserializationSchema * @className: com.dlink.cdc.mysql.MysqlJsonDebeziumDeserializationSchema
* @Description: * @Description:
* @author: jack zhong * @author: jack zhong
* @date 8/2/221:43 PM
*/ */
public class SqlServerJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> { public class SqlServerJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -19,6 +19,12 @@ ...@@ -19,6 +19,12 @@
package com.dlink.executor; package com.dlink.executor;
import com.dlink.assertion.Asserts;
import com.dlink.model.LineageRel;
import com.dlink.result.SqlExplainResult;
import com.dlink.utils.FlinkStreamProgramWithoutPhysical;
import com.dlink.utils.LineageContext;
import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.dag.Transformation;
...@@ -70,11 +76,6 @@ import java.util.List; ...@@ -70,11 +76,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import com.dlink.assertion.Asserts;
import com.dlink.model.LineageRel;
import com.dlink.result.SqlExplainResult;
import com.dlink.utils.FlinkStreamProgramWithoutPhysical;
import com.dlink.utils.LineageContext;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
......
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
package com.dlink.utils; package com.dlink.utils;
import com.dlink.model.LineageRel;
import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Snapshot; import org.apache.calcite.rel.core.Snapshot;
...@@ -34,7 +36,6 @@ import org.apache.flink.table.catalog.CatalogManager; ...@@ -34,7 +36,6 @@ import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.calcite.FlinkRelBuilder; import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory; import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.delegation.PlannerBase;
...@@ -47,8 +48,6 @@ import java.util.ArrayList; ...@@ -47,8 +48,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import com.dlink.model.LineageRel;
import javassist.ClassPool; import javassist.ClassPool;
import javassist.CtClass; import javassist.CtClass;
import javassist.CtMethod; import javassist.CtMethod;
...@@ -188,10 +187,10 @@ public class LineageContext { ...@@ -188,10 +187,10 @@ public class LineageContext {
return MiniBatchInterval.NONE; return MiniBatchInterval.NONE;
} }
private PlannerBase getPlanner() { private PlannerBase getPlanner() {
return (PlannerBase) tableEnv.getPlanner(); return (PlannerBase) tableEnv.getPlanner();
} }
}); });
} }
......
...@@ -190,6 +190,7 @@ public abstract class AbstractSinkBuilder { ...@@ -190,6 +190,7 @@ public abstract class AbstractSinkBuilder {
} }
out.collect(uagenericRowData); out.collect(uagenericRowData);
break; break;
default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
......
...@@ -121,6 +121,7 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui ...@@ -121,6 +121,7 @@ public class KafkaSinkJsonBuilder extends AbstractSinkBuilder implements SinkBui
before = (Map) value.get("before"); before = (Map) value.get("before");
convertAttr(columnNameList, columnTypeList, before, value.get("op").toString(), 1, schemaName, tableName, tsMs); convertAttr(columnNameList, columnTypeList, before, value.get("op").toString(), 1, schemaName, tableName, tsMs);
break; break;
default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Exception:", e); logger.error("SchameTable: {} - Exception:", e);
......
...@@ -132,6 +132,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -132,6 +132,7 @@ public class MysqlCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
case "latest-offset": case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default:
} }
} else { } else {
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
......
...@@ -103,6 +103,7 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder { ...@@ -103,6 +103,7 @@ public class OracleCDCBuilder extends AbstractCDCBuilder implements CDCBuilder {
case "latest-offset": case "latest-offset":
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
break; break;
default:
} }
} else { } else {
sourceBuilder.startupOptions(StartupOptions.latest()); sourceBuilder.startupOptions(StartupOptions.latest());
......
...@@ -141,6 +141,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder, ...@@ -141,6 +141,7 @@ public class SQLSinkBuilder extends AbstractSinkBuilder implements SinkBuilder,
} }
out.collect(uarow); out.collect(uarow);
break; break;
default:
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage()); logger.error("SchameTable: {} - Row: {} - Exception: {}", schemaTableName, JSONUtil.toJsonString(value), e.getCause().getMessage());
......
...@@ -43,7 +43,7 @@ public class LineageRel { ...@@ -43,7 +43,7 @@ public class LineageRel {
private String targetColumn; private String targetColumn;
private final static String DELIMITER = "."; private static final String DELIMITER = ".";
public LineageRel(String sourceCatalog, String sourceDatabase, String sourceTable, String sourceColumn, String targetCatalog, String targetDatabase, String targetTable, public LineageRel(String sourceCatalog, String sourceDatabase, String sourceTable, String sourceColumn, String targetCatalog, String targetDatabase, String targetTable,
String targetColumn) { String targetColumn) {
......
...@@ -19,11 +19,12 @@ ...@@ -19,11 +19,12 @@
package com.dlink.model; package com.dlink.model;
import com.dlink.assertion.Asserts;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.dlink.assertion.Asserts;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
/** /**
...@@ -40,14 +41,16 @@ public class SystemConfiguration { ...@@ -40,14 +41,16 @@ public class SystemConfiguration {
return systemConfiguration; return systemConfiguration;
} }
private static final List<Configuration> CONFIGURATION_LIST = new ArrayList<Configuration>() {{ private static final List<Configuration> CONFIGURATION_LIST = new ArrayList<Configuration>() {
{
add(systemConfiguration.sqlSubmitJarPath); add(systemConfiguration.sqlSubmitJarPath);
add(systemConfiguration.sqlSubmitJarParas); add(systemConfiguration.sqlSubmitJarParas);
add(systemConfiguration.sqlSubmitJarMainAppClass); add(systemConfiguration.sqlSubmitJarMainAppClass);
add(systemConfiguration.useRestAPI); add(systemConfiguration.useRestAPI);
add(systemConfiguration.useLogicalPlan); add(systemConfiguration.useLogicalPlan);
add(systemConfiguration.sqlSeparator); add(systemConfiguration.sqlSeparator);
}}; }
};
private Configuration sqlSubmitJarPath = new Configuration( private Configuration sqlSubmitJarPath = new Configuration(
"sqlSubmitJarPath", "sqlSubmitJarPath",
......
...@@ -26,6 +26,7 @@ import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKN ...@@ -26,6 +26,7 @@ import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKN
import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL; import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS; import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
import com.dlink.assertion.Asserts;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -50,8 +51,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; ...@@ -50,8 +51,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode; import com.fasterxml.jackson.databind.node.TextNode;
import com.fasterxml.jackson.databind.type.CollectionType; import com.fasterxml.jackson.databind.type.CollectionType;
import com.dlink.assertion.Asserts;
/** /**
* JSONUtil * JSONUtil
* *
......
...@@ -57,8 +57,8 @@ import org.slf4j.LoggerFactory; ...@@ -57,8 +57,8 @@ import org.slf4j.LoggerFactory;
/** A JDBC outputFormat that supports batching records before writing records to database. */ /** A JDBC outputFormat that supports batching records before writing records to database. */
@Internal @Internal
public class JdbcBatchingOutputFormat< public class JdbcBatchingOutputFormat<
In, JdbcIn, JdbcExec extends JdbcBatchStatementExecutor<JdbcIn>> I, J, E extends JdbcBatchStatementExecutor<J>>
extends AbstractJdbcOutputFormat<In> { extends AbstractJdbcOutputFormat<I> {
/** /**
* An interface to extract a value from given argument. * An interface to extract a value from given argument.
...@@ -85,10 +85,10 @@ public class JdbcBatchingOutputFormat< ...@@ -85,10 +85,10 @@ public class JdbcBatchingOutputFormat<
private static final Logger LOG = LoggerFactory.getLogger(JdbcBatchingOutputFormat.class); private static final Logger LOG = LoggerFactory.getLogger(JdbcBatchingOutputFormat.class);
private final JdbcExecutionOptions executionOptions; private final JdbcExecutionOptions executionOptions;
private final StatementExecutorFactory<JdbcExec> statementExecutorFactory; private final StatementExecutorFactory<E> statementExecutorFactory;
private final RecordExtractor<In, JdbcIn> jdbcRecordExtractor; private final RecordExtractor<I, J> jdbcRecordExtractor;
private transient JdbcExec jdbcStatementExecutor; private transient E jdbcStatementExecutor;
private transient int batchCount = 0; private transient int batchCount = 0;
private transient volatile boolean closed = false; private transient volatile boolean closed = false;
...@@ -100,8 +100,8 @@ public class JdbcBatchingOutputFormat< ...@@ -100,8 +100,8 @@ public class JdbcBatchingOutputFormat<
public JdbcBatchingOutputFormat( public JdbcBatchingOutputFormat(
@Nonnull JdbcConnectionProvider connectionProvider, @Nonnull JdbcConnectionProvider connectionProvider,
@Nonnull JdbcExecutionOptions executionOptions, @Nonnull JdbcExecutionOptions executionOptions,
@Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory, @Nonnull StatementExecutorFactory<E> statementExecutorFactory,
@Nonnull RecordExtractor<In, JdbcIn> recordExtractor) { @Nonnull RecordExtractor<I, J> recordExtractor) {
super(connectionProvider); super(connectionProvider);
this.executionOptions = checkNotNull(executionOptions); this.executionOptions = checkNotNull(executionOptions);
this.statementExecutorFactory = checkNotNull(statementExecutorFactory); this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
...@@ -150,9 +150,9 @@ public class JdbcBatchingOutputFormat< ...@@ -150,9 +150,9 @@ public class JdbcBatchingOutputFormat<
} }
private JdbcExec createAndOpenStatementExecutor( private E createAndOpenStatementExecutor(
StatementExecutorFactory<JdbcExec> statementExecutorFactory) throws IOException { StatementExecutorFactory<E> statementExecutorFactory) throws IOException {
JdbcExec exec = statementExecutorFactory.apply(getRuntimeContext()); E exec = statementExecutorFactory.apply(getRuntimeContext());
try { try {
exec.prepareStatements(connectionProvider.getConnection()); exec.prepareStatements(connectionProvider.getConnection());
} catch (SQLException e) { } catch (SQLException e) {
...@@ -168,7 +168,7 @@ public class JdbcBatchingOutputFormat< ...@@ -168,7 +168,7 @@ public class JdbcBatchingOutputFormat<
} }
@Override @Override
public final synchronized void writeRecord(In record) throws IOException { public final synchronized void writeRecord(I record) throws IOException {
checkFlushException(); checkFlushException();
try { try {
...@@ -184,7 +184,7 @@ public class JdbcBatchingOutputFormat< ...@@ -184,7 +184,7 @@ public class JdbcBatchingOutputFormat<
} }
} }
protected void addToBatch(In original, JdbcIn extracted) throws SQLException { protected void addToBatch(I original, J extracted) throws SQLException {
jdbcStatementExecutor.addToBatch(extracted); jdbcStatementExecutor.addToBatch(extracted);
} }
......
...@@ -57,8 +57,8 @@ import org.slf4j.LoggerFactory; ...@@ -57,8 +57,8 @@ import org.slf4j.LoggerFactory;
/** A JDBC outputFormat that supports batching records before writing records to database. */ /** A JDBC outputFormat that supports batching records before writing records to database. */
@Internal @Internal
public class JdbcBatchingOutputFormat< public class JdbcBatchingOutputFormat<
In, JdbcIn, JdbcExec extends JdbcBatchStatementExecutor<JdbcIn>> I, J, E extends JdbcBatchStatementExecutor<J>>
extends AbstractJdbcOutputFormat<In> { extends AbstractJdbcOutputFormat<I> {
/** /**
* An interface to extract a value from given argument. * An interface to extract a value from given argument.
...@@ -85,10 +85,10 @@ public class JdbcBatchingOutputFormat< ...@@ -85,10 +85,10 @@ public class JdbcBatchingOutputFormat<
private static final Logger LOG = LoggerFactory.getLogger(JdbcBatchingOutputFormat.class); private static final Logger LOG = LoggerFactory.getLogger(JdbcBatchingOutputFormat.class);
private final JdbcExecutionOptions executionOptions; private final JdbcExecutionOptions executionOptions;
private final StatementExecutorFactory<JdbcExec> statementExecutorFactory; private final StatementExecutorFactory<E> statementExecutorFactory;
private final RecordExtractor<In, JdbcIn> jdbcRecordExtractor; private final RecordExtractor<I, J> jdbcRecordExtractor;
private transient JdbcExec jdbcStatementExecutor; private transient E jdbcStatementExecutor;
private transient int batchCount = 0; private transient int batchCount = 0;
private transient volatile boolean closed = false; private transient volatile boolean closed = false;
...@@ -100,8 +100,8 @@ public class JdbcBatchingOutputFormat< ...@@ -100,8 +100,8 @@ public class JdbcBatchingOutputFormat<
public JdbcBatchingOutputFormat( public JdbcBatchingOutputFormat(
@Nonnull JdbcConnectionProvider connectionProvider, @Nonnull JdbcConnectionProvider connectionProvider,
@Nonnull JdbcExecutionOptions executionOptions, @Nonnull JdbcExecutionOptions executionOptions,
@Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory, @Nonnull StatementExecutorFactory<E> statementExecutorFactory,
@Nonnull RecordExtractor<In, JdbcIn> recordExtractor) { @Nonnull RecordExtractor<I, J> recordExtractor) {
super(connectionProvider); super(connectionProvider);
this.executionOptions = checkNotNull(executionOptions); this.executionOptions = checkNotNull(executionOptions);
this.statementExecutorFactory = checkNotNull(statementExecutorFactory); this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
...@@ -150,9 +150,9 @@ public class JdbcBatchingOutputFormat< ...@@ -150,9 +150,9 @@ public class JdbcBatchingOutputFormat<
} }
private JdbcExec createAndOpenStatementExecutor( private E createAndOpenStatementExecutor(
StatementExecutorFactory<JdbcExec> statementExecutorFactory) throws IOException { StatementExecutorFactory<E> statementExecutorFactory) throws IOException {
JdbcExec exec = statementExecutorFactory.apply(getRuntimeContext()); E exec = statementExecutorFactory.apply(getRuntimeContext());
try { try {
exec.prepareStatements(connectionProvider.getConnection()); exec.prepareStatements(connectionProvider.getConnection());
} catch (SQLException e) { } catch (SQLException e) {
...@@ -168,7 +168,7 @@ public class JdbcBatchingOutputFormat< ...@@ -168,7 +168,7 @@ public class JdbcBatchingOutputFormat<
} }
@Override @Override
public final synchronized void writeRecord(In record) throws IOException { public final synchronized void writeRecord(I record) throws IOException {
checkFlushException(); checkFlushException();
try { try {
...@@ -184,7 +184,7 @@ public class JdbcBatchingOutputFormat< ...@@ -184,7 +184,7 @@ public class JdbcBatchingOutputFormat<
} }
} }
protected void addToBatch(In original, JdbcIn extracted) throws SQLException { protected void addToBatch(I original, J extracted) throws SQLException {
jdbcStatementExecutor.addToBatch(extracted); jdbcStatementExecutor.addToBatch(extracted);
} }
......
...@@ -133,6 +133,8 @@ public class FlinkAPI { ...@@ -133,6 +133,8 @@ public class FlinkAPI {
//paramMap.put("target-directory","hdfs:///flink13/ss1"); //paramMap.put("target-directory","hdfs:///flink13/ss1");
paramType = FlinkRestAPIConstant.SAVEPOINTS; paramType = FlinkRestAPIConstant.SAVEPOINTS;
jobInfo.setStatus(JobInfo.JobStatus.RUN); jobInfo.setStatus(JobInfo.JobStatus.RUN);
break;
default:
} }
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
JsonNode json = null; JsonNode json = null;
......
...@@ -19,6 +19,12 @@ ...@@ -19,6 +19,12 @@
package com.dlink.explainer.lineage; package com.dlink.explainer.lineage;
import com.dlink.explainer.ca.ColumnCAResult;
import com.dlink.explainer.ca.NodeRel;
import com.dlink.explainer.ca.TableCA;
import com.dlink.model.LineageRel;
import com.dlink.plus.FlinkSqlPlus;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
...@@ -26,12 +32,6 @@ import java.util.List; ...@@ -26,12 +32,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import com.dlink.explainer.ca.ColumnCAResult;
import com.dlink.explainer.ca.NodeRel;
import com.dlink.explainer.ca.TableCA;
import com.dlink.model.LineageRel;
import com.dlink.plus.FlinkSqlPlus;
/** /**
* LineageBuilder * LineageBuilder
* *
......
...@@ -109,8 +109,10 @@ public class LineageRelation { ...@@ -109,8 +109,10 @@ public class LineageRelation {
return false; return false;
} }
LineageRelation that = (LineageRelation) o; LineageRelation that = (LineageRelation) o;
return Objects.equals(srcTableId, that.srcTableId) && Objects.equals(tgtTableId, that.tgtTableId) && Objects.equals(srcTableColName, that.srcTableColName) && return Objects.equals(srcTableId, that.srcTableId)
Objects.equals(tgtTableColName, that.tgtTableColName); && Objects.equals(tgtTableId, that.tgtTableId)
&& Objects.equals(srcTableColName, that.srcTableColName)
&& Objects.equals(tgtTableColName, that.tgtTableColName);
} }
@Override @Override
......
...@@ -19,11 +19,11 @@ ...@@ -19,11 +19,11 @@
package com.dlink.explainer.lineage; package com.dlink.explainer.lineage;
import com.dlink.explainer.ca.TableCA;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.dlink.explainer.ca.TableCA;
/** /**
* LineageTable * LineageTable
* *
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
* *
*/ */
package com.dlink.explainer.sqlLineage; package com.dlink.explainer.sqllineage;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.explainer.lineage.LineageRelation; import com.dlink.explainer.lineage.LineageRelation;
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
* *
*/ */
package com.dlink.explainer.sqlLineage; package com.dlink.explainer.sqllineage;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
* *
*/ */
package com.dlink.explainer.sqlLineage; package com.dlink.explainer.sqllineage;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
* *
*/ */
package com.dlink.explainer.sqlLineage; package com.dlink.explainer.sqllineage;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
* *
*/ */
package com.dlink.explainer.sqlLineage; package com.dlink.explainer.sqllineage;
import java.util.Iterator; import java.util.Iterator;
......
...@@ -596,6 +596,8 @@ public class JobManager { ...@@ -596,6 +596,8 @@ public class JobManager {
if (Asserts.isNotNull(config.getGatewayConfig()) && Asserts.isNotNullString(config.getGatewayConfig().getFlinkConfig().getJobName())) { if (Asserts.isNotNull(config.getGatewayConfig()) && Asserts.isNotNullString(config.getGatewayConfig().getFlinkConfig().getJobName())) {
sb.append("set " + YarnConfigOptions.APPLICATION_NAME.key() + " = " + config.getGatewayConfig().getFlinkConfig().getJobName() + ";\r\n"); sb.append("set " + YarnConfigOptions.APPLICATION_NAME.key() + " = " + config.getGatewayConfig().getFlinkConfig().getJobName() + ";\r\n");
} }
break;
default:
} }
sb.append(statement); sb.append(statement);
return sb.toString(); return sb.toString();
......
...@@ -73,7 +73,7 @@ public class ResultRunnable implements Runnable { ...@@ -73,7 +73,7 @@ public class ResultRunnable implements Runnable {
catchData(ResultPool.get(jobId)); catchData(ResultPool.get(jobId));
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
} }
} }
} }
......
...@@ -50,10 +50,10 @@ public class SingleSqlParserFactory { ...@@ -50,10 +50,10 @@ public class SingleSqlParserFactory {
tmp = new UpdateSqlParser(sql); tmp = new UpdateSqlParser(sql);
} else if (contains(sql, "(insert\\s+into)(.+)(values)(.+)")) { } else if (contains(sql, "(insert\\s+into)(.+)(values)(.+)")) {
tmp = new InsertSqlParser(sql); tmp = new InsertSqlParser(sql);
} else if (contains(sql, "(create\\s+table)(.+)")) { //} else if (contains(sql, "(create\\s+table)(.+)")) {
} else if (contains(sql, "(create\\s+database)(.+)")) { //} else if (contains(sql, "(create\\s+database)(.+)")) {
} else if (contains(sql, "(show\\s+databases)")) { //} else if (contains(sql, "(show\\s+databases)")) {
} else if (contains(sql, "(use)(.+)")) { //} else if (contains(sql, "(use)(.+)")) {
} else if (contains(sql, "(set)(.+)")) { } else if (contains(sql, "(set)(.+)")) {
tmp = new SetSqlParser(sql); tmp = new SetSqlParser(sql);
} else if (contains(sql, "(show\\s+fragment)\\s+(.+)")) { } else if (contains(sql, "(show\\s+fragment)\\s+(.+)")) {
......
...@@ -68,6 +68,7 @@ public class SetOperation extends AbstractOperation implements Operation { ...@@ -68,6 +68,7 @@ public class SetOperation extends AbstractOperation implements Operation {
return null; return null;
} }
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
e.printStackTrace();
} }
Map<String, List<String>> map = SingleSqlParserFactory.generateParser(statement); Map<String, List<String>> map = SingleSqlParserFactory.generateParser(statement);
if (Asserts.isNotNullMap(map) && map.size() == 2) { if (Asserts.isNotNullMap(map) && map.size() == 2) {
......
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent> <parent>
<artifactId>dlink</artifactId> <artifactId>dlink</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
......
...@@ -77,7 +77,7 @@ public class FlinkConfig { ...@@ -77,7 +77,7 @@ public class FlinkConfig {
} }
public static FlinkConfig build(String jobName, String jobId, String actionStr, String savePointTypeStr, String savePoint, String configParasStr) { public static FlinkConfig build(String jobName, String jobId, String actionStr, String savePointTypeStr, String savePoint, String configParasStr) {
// List<ConfigPara> configParasList = new ArrayList<>(); //List<ConfigPara> configParasList = new ArrayList<>();
Map<String, String> configMap = new HashMap<>(); Map<String, String> configMap = new HashMap<>();
JsonNode paras = null; JsonNode paras = null;
if (Asserts.isNotNullString(configParasStr)) { if (Asserts.isNotNullString(configParasStr)) {
......
...@@ -25,7 +25,6 @@ import com.dlink.metadata.driver.DriverConfig; ...@@ -25,7 +25,6 @@ import com.dlink.metadata.driver.DriverConfig;
import com.dlink.metadata.result.JdbcSelectResult; import com.dlink.metadata.result.JdbcSelectResult;
import com.dlink.model.Column; import com.dlink.model.Column;
import com.dlink.model.Schema; import com.dlink.model.Schema;
import com.dlink.utils.JSONUtil;
import java.util.List; import java.util.List;
...@@ -55,30 +54,30 @@ public class ClickHouseTest { ...@@ -55,30 +54,30 @@ public class ClickHouseTest {
@Test @Test
public void connectTest() { public void connectTest() {
String test = getDriver().test(); String test = getDriver().test();
System.out.println(test); //System.out.println(test);
System.out.println("end..."); //System.out.println("end...");
} }
@Test @Test
public void schemaTest() { public void schemaTest() {
List<Schema> schemasAndTables = getDriver().getSchemasAndTables(); List<Schema> schemasAndTables = getDriver().getSchemasAndTables();
System.out.println(JSONUtil.toJsonString(schemasAndTables)); //System.out.println(JSONUtil.toJsonString(schemasAndTables));
System.out.println("end..."); //System.out.println("end...");
} }
@Test @Test
public void columnTest() { public void columnTest() {
Driver driver = getDriver(); Driver driver = getDriver();
List<Column> columns = driver.listColumns("xxx", "xxx"); List<Column> columns = driver.listColumns("xxx", "xxx");
System.out.println(JSONUtil.toJsonString(columns)); //System.out.println(JSONUtil.toJsonString(columns));
System.out.println("end..."); //System.out.println("end...");
} }
@Test @Test
public void queryTest() { public void queryTest() {
Driver driver = getDriver(); Driver driver = getDriver();
JdbcSelectResult query = driver.query("select * from xxx", 10); JdbcSelectResult query = driver.query("select * from xxx", 10);
System.out.println(JSONUtil.toJsonString(query)); //System.out.println(JSONUtil.toJsonString(query));
System.out.println("end..."); //System.out.println("end...");
} }
} }
...@@ -481,7 +481,6 @@ ...@@ -481,7 +481,6 @@
<encoding>UTF-8</encoding> <encoding>UTF-8</encoding>
<configLocation>style/checkstyle.xml</configLocation> <configLocation>style/checkstyle.xml</configLocation>
<failOnViolation>true</failOnViolation> <failOnViolation>true</failOnViolation>
<violationSeverity>warning</violationSeverity>
<includeTestSourceDirectory>true</includeTestSourceDirectory> <includeTestSourceDirectory>true</includeTestSourceDirectory>
<sourceDirectories> <sourceDirectories>
<sourceDirectory>${project.build.sourceDirectory}</sourceDirectory> <sourceDirectory>${project.build.sourceDirectory}</sourceDirectory>
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
<module name="Checker"> <module name="Checker">
<property name="charset" value="UTF-8"/> <property name="charset" value="UTF-8"/>
<property name="severity" value="info"/> <!-- <property name="severity" value="error"/>-->
<property name="fileExtensions" value="java, properties, xml"/> <property name="fileExtensions" value="java, properties, xml"/>
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
</module> </module>
<module name="RegexpSingleline"> <module name="RegexpSingleline">
<property name="severity" value="warning"/>
<property name="format" value="System\.out\.println"/> <property name="format" value="System\.out\.println"/>
<property name="message" value="Prohibit invoking System.out.println in source code !"/> <property name="message" value="Prohibit invoking System.out.println in source code !"/>
</module> </module>
...@@ -222,6 +223,7 @@ ...@@ -222,6 +223,7 @@
<module name="UnusedImports"/> <module name="UnusedImports"/>
<module name="ImportOrder"> <module name="ImportOrder">
<!-- <property name="severity" value="error"/>-->
<property name="staticGroups" value="com.dlink,org.apache,java,javax,org,com"/> <property name="staticGroups" value="com.dlink,org.apache,java,javax,org,com"/>
<property name="separatedStaticGroups" value="true"/> <property name="separatedStaticGroups" value="true"/>
...@@ -230,6 +232,7 @@ ...@@ -230,6 +232,7 @@
<property name="separated" value="true"/> <property name="separated" value="true"/>
<property name="option" value="top"/> <property name="option" value="top"/>
<property name="sortStaticImportsAlphabetically" value="true"/> <property name="sortStaticImportsAlphabetically" value="true"/>
<message key="import.ordering" value="Import {0} appears after other imports that it should precede"/>
</module> </module>
<module name="NoWhitespaceBefore"> <module name="NoWhitespaceBefore">
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment