Commit ee14d2e9 authored by wenmo's avatar wenmo

底层优化

parent 8e5d443e
......@@ -74,10 +74,6 @@
<groupId>com.dlink</groupId>
<artifactId>dlink-core</artifactId>
</dependency>
<!--<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector</artifactId>
</dependency>-->
</dependencies>
<build>
<plugins>
......
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<contextName>${APP_NAME}</contextName>
<springProperty name="APP_NAME" scope="context" source="spring.application.name"/>
<springProperty name="LOG_FILE" scope="context" source="logging.file" defaultValue="./logs/application/${APP_NAME}"/>
<springProperty name="LOG_POINT_FILE" scope="context" source="logging.file" defaultValue="./logs/point"/>
<springProperty name="LOG_AUDIT_FILE" scope="context" source="logging.file" defaultValue="./logs/audit"/>
<springProperty name="LOG_MAXFILESIZE" scope="context" source="logback.filesize" defaultValue="50MB"/>
<springProperty name="LOG_FILEMAXDAY" scope="context" source="logback.filemaxday" defaultValue="7"/>
<springProperty name="ServerIP" scope="context" source="spring.cloud.client.ip-address" defaultValue="0.0.0.0"/>
<springProperty name="ServerPort" scope="context" source="server.port" defaultValue="0000"/>
<!-- 彩色日志 -->
<!-- 彩色日志依赖的渲染类 -->
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<!-- 彩色日志格式 -->
<property name="CONSOLE_LOG_PATTERN"
value="[${APP_NAME}:${ServerIP}:${ServerPort}] %clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%level){blue} %clr(${PID}){magenta} %clr([%X{traceId}]){yellow} %clr([%thread]){orange} %clr(%-40.40logger{39}){cyan} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}" />
<property name="CONSOLE_LOG_PATTERN_NO_COLOR" value="[${APP_NAME}:${ServerIP}:${ServerPort}] %d{yyyy-MM-dd HH:mm:ss.SSS} %level ${PID} [%X{traceId}] [%thread] %-40.40logger{39} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}" />
<!-- 控制台日志 -->
<appender name="StdoutAppender" class="ch.qos.logback.core.ConsoleAppender">
<withJansi>true</withJansi>
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- 按照每天生成常规日志文件 -->
<appender name="FileAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_FILE}/${APP_NAME}.log</file>
<encoder>
<pattern>${CONSOLE_LOG_PATTERN_NO_COLOR}</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 基于时间的分包策略 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_FILE}/${APP_NAME}.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<!--保留时间,单位:天-->
<maxHistory>${LOG_FILEMAXDAY}</maxHistory>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>${LOG_MAXFILESIZE}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
</filter>
</appender>
<appender name="point_log" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_POINT_FILE}/point.log</file>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}|${APP_NAME}|%msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 基于时间的分包策略 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_POINT_FILE}/point.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<!--保留时间,单位:天-->
<maxHistory>${LOG_FILEMAXDAY}</maxHistory>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>${LOG_MAXFILESIZE}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
</appender>
<appender name="audit_log" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_AUDIT_FILE}/audit.log</file>
<encoder>
<pattern>%msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 基于时间的分包策略 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_AUDIT_FILE}/audit.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<!--保留时间,单位:天-->
<maxHistory>${LOG_FILEMAXDAY}</maxHistory>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>${LOG_MAXFILESIZE}</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
</appender>
<appender name="point_log_async" class="ch.qos.logback.classic.AsyncAppender">
<discardingThreshold>0</discardingThreshold>
<appender-ref ref="point_log"/>
</appender>
<appender name="file_async" class="ch.qos.logback.classic.AsyncAppender">
<discardingThreshold>0</discardingThreshold>
<appender-ref ref="FileAppender"/>
</appender>
<root level="INFO">
<appender-ref ref="StdoutAppender"/>
<appender-ref ref="file_async"/>
</root>
</configuration>
......@@ -3,14 +3,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink</artifactId>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>dlink-connector</artifactId>
<artifactId>dlink-client-1.12</artifactId>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
......@@ -35,31 +34,32 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<!--<scope>provided</scope>-->
<version>${flink.version}</version>
</dependency>
<!--<dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
&lt;!&ndash;<scope>provided</scope>&ndash;&gt;
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>-->
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
......@@ -74,4 +74,5 @@
</plugins>
<finalName>${project.artifactId}</finalName>
</build>
</project>
\ No newline at end of file
......@@ -33,7 +33,7 @@ import java.util.Map;
* 定制TableEnvironmentImpl
*
* @author wenmo
* @since 2021/5/25
* @since 2021/6/7 22:06
**/
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
......
......@@ -19,7 +19,7 @@ import java.util.concurrent.*;
/**
* 定制TableResultImpl
* @author wenmo
* @since 2021/5/25
* @since 2021/6/7 22:06
**/
@Internal
class CustomTableResultImpl implements TableResult {
......
......@@ -20,7 +20,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Flink Sql Fragment Manager
* @author wenmo
* @since 2021/5/25
* @since 2021/6/7 22:06
**/
@Internal
public final class SqlManager {
......
......@@ -4,7 +4,7 @@ import org.apache.flink.table.types.DataType;
/**
* @author wenmo
* @since 2021/5/11 14:04
* @since 2021/6/7 22:06
**/
public class TableSchemaField {
private String name;
......
......@@ -6,7 +6,7 @@ import java.util.Date;
* 解释结果
*
* @author wenmo
* @since 2021/5/25 11:41
* @since 2021/6/7 22:06
**/
public class SqlExplainResult {
private Integer index;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** Helper class for generating a JSON representation from a {@link StreamGraph}. */
@Internal
public class JSONGenerator {
public static final String STEPS = "step_function";
public static final String ID = "id";
public static final String SIDE = "side";
public static final String SHIP_STRATEGY = "ship_strategy";
public static final String PREDECESSORS = "predecessors";
public static final String TYPE = "type";
public static final String PACT = "pact";
public static final String CONTENTS = "contents";
public static final String PARALLELISM = "parallelism";
private StreamGraph streamGraph;
private final ObjectMapper mapper = new ObjectMapper();
public JSONGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
}
public String getJSON() {
return getJSONNode().toPrettyString();
}
public ObjectNode getJSONNode() {
ObjectNode json = mapper.createObjectNode();
ArrayNode nodes = mapper.createArrayNode();
json.put("nodes", nodes);
List<Integer> operatorIDs = new ArrayList<>(streamGraph.getVertexIDs());
Comparator<Integer> operatorIDComparator =
Comparator.comparingInt(
(Integer id) -> streamGraph.getSinkIDs().contains(id) ? 1 : 0)
.thenComparingInt(id -> id);
operatorIDs.sort(operatorIDComparator);
visit(nodes, operatorIDs, new HashMap<>());
return json;
}
private void visit(
ArrayNode jsonArray, List<Integer> toVisit, Map<Integer, Integer> edgeRemapings) {
Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
if (streamGraph.getSourceIDs().contains(vertexID)
|| Collections.disjoint(vertex.getInEdges(), toVisit)) {
ObjectNode node = mapper.createObjectNode();
decorateNode(vertexID, node);
if (!streamGraph.getSourceIDs().contains(vertexID)) {
ArrayNode inputs = mapper.createArrayNode();
node.put(PREDECESSORS, inputs);
for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceId();
Integer mappedID =
(edgeRemapings.keySet().contains(inputID))
? edgeRemapings.get(inputID)
: inputID;
decorateEdge(inputs, inEdge, mappedID);
}
}
jsonArray.add(node);
toVisit.remove(vertexID);
} else {
Integer iterationHead = -1;
for (StreamEdge inEdge : vertex.getInEdges()) {
int operator = inEdge.getSourceId();
if (streamGraph.vertexIDtoLoopTimeout.containsKey(operator)) {
iterationHead = operator;
}
}
ObjectNode obj = mapper.createObjectNode();
ArrayNode iterationSteps = mapper.createArrayNode();
obj.put(STEPS, iterationSteps);
obj.put(ID, iterationHead);
obj.put(PACT, "IterativeDataStream");
obj.put(PARALLELISM, streamGraph.getStreamNode(iterationHead).getParallelism());
obj.put(CONTENTS, "Stream Iteration");
ArrayNode iterationInputs = mapper.createArrayNode();
obj.put(PREDECESSORS, iterationInputs);
toVisit.remove(iterationHead);
visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs);
jsonArray.add(obj);
}
if (!toVisit.isEmpty()) {
visit(jsonArray, toVisit, edgeRemapings);
}
}
private void visitIteration(
ArrayNode jsonArray,
List<Integer> toVisit,
int headId,
Map<Integer, Integer> edgeRemapings,
ArrayNode iterationInEdges) {
Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
toVisit.remove(vertexID);
// Ignoring head and tail to avoid redundancy
if (!streamGraph.vertexIDtoLoopTimeout.containsKey(vertexID)) {
ObjectNode obj = mapper.createObjectNode();
jsonArray.add(obj);
decorateNode(vertexID, obj);
ArrayNode inEdges = mapper.createArrayNode();
obj.put(PREDECESSORS, inEdges);
for (StreamEdge inEdge : vertex.getInEdges()) {
int inputID = inEdge.getSourceId();
if (edgeRemapings.keySet().contains(inputID)) {
decorateEdge(inEdges, inEdge, inputID);
} else if (!streamGraph.vertexIDtoLoopTimeout.containsKey(inputID)) {
decorateEdge(iterationInEdges, inEdge, inputID);
}
}
edgeRemapings.put(vertexID, headId);
visitIteration(jsonArray, toVisit, headId, edgeRemapings, iterationInEdges);
}
}
private void decorateEdge(ArrayNode inputArray, StreamEdge inEdge, int mappedInputID) {
ObjectNode input = mapper.createObjectNode();
inputArray.add(input);
input.put(ID, mappedInputID);
input.put(SHIP_STRATEGY, inEdge.getPartitioner().toString());
input.put(SIDE, (inputArray.size() == 0) ? "first" : "second");
}
private void decorateNode(Integer vertexID, ObjectNode node) {
StreamNode vertex = streamGraph.getStreamNode(vertexID);
node.put(ID, vertexID);
node.put(TYPE, vertex.getOperatorName());
if (streamGraph.getSourceIDs().contains(vertexID)) {
node.put(PACT, "Data Source");
} else if (streamGraph.getSinkIDs().contains(vertexID)) {
node.put(PACT, "Data Sink");
} else {
node.put(PACT, "Operator");
}
node.put(CONTENTS, vertex.getOperatorName());
node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<modules>
<module>dlink-client-1.12</module>
</modules>
<artifactId>dlink-client</artifactId>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-connectors</artifactId>
<groupId>com.dlink</groupId>
<version>0.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-connector-jdbc</artifactId>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.4</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
<postgres.version>42.2.10</postgres.version>
<otj-pg-embedded.version>0.13.3</otj-pg-embedded.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Postgres dependencies -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgres.version}</version>
<scope>provided</scope>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- Postgres test dependencies -->
<dependency>
<groupId>com.opentable.components</groupId>
<artifactId>otj-pg-embedded</artifactId>
<version>${otj-pg-embedded.version}</version>
<scope>test</scope>
</dependency>
<!-- MySQL test dependencies -->
<dependency>
<groupId>ch.vorburger.mariaDB4j</groupId>
<artifactId>mariaDB4j</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
<!-- ch.vorburger.mariaDB4j:mariaDB4j has a dependency of mariadb-java-client:2.3.0,
but we want to bump mariadb-java-client to 2.5.4 which fix a few notable bugs,
see: https://mariadb.com/kb/en/mariadb-connector-j-release-notes/
and the lower version may cause the test stability issue FLINK-18082.-->
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>2.5.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.20</version>
<scope>test</scope>
</dependency>
<!-- Derby test dependencies -->
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.14.2.0</version>
<scope>test</scope>
</dependency>
<!-- Oracle test dependencies -->
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
<finalName>${project.artifactId}</finalName>
</build>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* JDBC dialect for ClickHouse.
*
* @author wenmo
* @since 2021/6/7 21:48
*/
public class ClickHouseDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:clickhouse:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new ClickHouseRowConverter(rowType);
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return "`" + identifier + "`";
}
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
String columns = Arrays.stream(fieldNames).collect(Collectors.joining(", "));
String placeholders =
Arrays.stream(fieldNames)
.map(f -> quoteIdentifier(f))
.collect(Collectors.joining(", "));
return Optional.of(getInsertIntoStatement(tableName, fieldNames));
}
@Override
public String dialectName() {
return "ClickHouse";
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
// TODO: We can't convert BINARY data type to
// PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
// LegacyTypeInfoDataTypeConverter.
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED);
}
}
package org.apache.flink.connector.jdbc.dialect;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
/**
* JdbcDialects
*
* @author wenmo
* @since 2021/6/7 19:29
*/
public final class JdbcDialects {
private static final List<JdbcDialect> DIALECTS =
Arrays.asList(new DerbyDialect(), new MySQLDialect(), new PostgresDialect()
, new OracleDialect(), new ClickHouseDialect());
/** Fetch the JdbcDialect class corresponding to a given database url. */
public static Optional<JdbcDialect> get(String url) {
for (JdbcDialect dialect : DIALECTS) {
if (dialect.canHandle(url)) {
return Optional.of(dialect);
}
}
return Optional.empty();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.dialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* JDBC dialect for Oracle.
*
* @author wenmo
* @since 2021/6/7 21:42
*/
public class OracleDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
// Define MAX/MIN precision of TIMESTAMP type according to Oracle docs:
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
// Define MAX/MIN precision of DECIMAL type according to Mysql docs:
// https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
// jdbc:oracle:thin:@127.0.0.1:1521:ORCL
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:oracle:thin:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new OracleRowConverter(rowType);
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("oracle.jdbc.driver.OracleDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return identifier;
}
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
/*get update field*/
ArrayList<String> updateFieldNamesList = new ArrayList<String>(fieldNames.length);
Collections.addAll(updateFieldNamesList, fieldNames);
ArrayList<String> uniqueKeyFieldsList = new ArrayList<String>(uniqueKeyFields.length);
Collections.addAll(uniqueKeyFieldsList, uniqueKeyFields);
updateFieldNamesList.removeAll(uniqueKeyFieldsList);
String updateClause =
Arrays.stream(updateFieldNamesList.toArray(new String[0]))
.map(f -> "a." + quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String onClause =
Arrays.stream(uniqueKeyFields)
.map(f -> "a." + quoteIdentifier(f) + " = :" + quoteIdentifier(f))
.collect(Collectors.joining(" AND "));
String sql =
"MERGE INTO "
+ tableName
+ " a USING ( SELECT 1 FROM dual ) b ON ( "
+ onClause
+ " )"
+ " WHEN MATCHED THEN"
+ " UPDATE SET "
+ updateClause
+ " WHEN NOT MATCHED THEN "
+ getInsertStatement(fieldNames);
return Optional.of(sql);
}
private String getInsertStatement(String[] fieldNames) {
String columns =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders =
Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));
return "INSERT " + "(" + columns + ")" + " VALUES (" + placeholders + ")";
}
@Override
public String dialectName() {
return "Oracle";
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
// TODO: We can't convert BINARY data type to
// PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
// LegacyTypeInfoDataTypeConverter.
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* ClickHouse.
*
* @author wenmo
* @since 2021/6/7 21:49
*/
public class ClickHouseRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "ClickHouse";
}
public ClickHouseRowConverter(RowType rowType) {
super(rowType);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.internal.converter;
import org.apache.flink.table.types.logical.RowType;
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Oracle.
*
* @author wenmo
* @since 2021/6/7 21:45
*/
public class OracleRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "Oracle";
}
public OracleRowConverter(RowType rowType) {
super(rowType);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<modules>
<module>dlink-connector-jdbc</module>
</modules>
<artifactId>dlink-connectors</artifactId>
</project>
\ No newline at end of file
......@@ -58,7 +58,12 @@
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector</artifactId>
<artifactId>dlink-client-1.12</artifactId>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc</artifactId>
<!--<scope>provided</scope>-->
</dependency>
</dependencies>
......
......@@ -110,8 +110,8 @@ public class JobManager {
}
} catch (Exception e) {
e.printStackTrace();
StackTraceElement[] trace = e.getCause().getStackTrace();
/*StringBuffer resMsg = new StringBuffer("");
/*StackTraceElement[] trace = e.getStackTrace();
StringBuffer resMsg = new StringBuffer("");
for (StackTraceElement s : trace) {
resMsg.append(" \n " + s + " ");
}*/
......@@ -165,8 +165,8 @@ public class JobManager {
}
} catch (Exception e) {
e.printStackTrace();
StackTraceElement[] trace = e.getStackTrace();
/*StringBuilder resMsg = new StringBuilder();
/*StackTraceElement[] trace = e.getStackTrace();
StringBuilder resMsg = new StringBuilder();
for (StackTraceElement s : trace) {
resMsg.append(" \n " + s + " ");
}*/
......
......@@ -157,11 +157,11 @@ const StudioMenu = (props: any) => {
<Col span={8} offset={8}>
<Button
type="text"
icon={<FileAddTwoTone />}
icon={<FileAddTwoTone twoToneColor="#ddd" />}
/>
<Button
type="text"
icon={<FolderOpenTwoTone />}
icon={<FolderOpenTwoTone twoToneColor="#ddd" />}
/>
<Tooltip title="保存当前的 FlinkSql">
<Button
......@@ -173,11 +173,11 @@ const StudioMenu = (props: any) => {
<Divider type="vertical" />
<Button
type="text"
icon={<SafetyCertificateTwoTone />}
icon={<SafetyCertificateTwoTone twoToneColor="#ddd" />}
/>
<Button
type="text"
icon={<FlagTwoTone />}
icon={<FlagTwoTone twoToneColor="#ddd" />}
/>
<Tooltip title="执行当前的 FlinkSql">
<Button
......@@ -201,11 +201,11 @@ const StudioMenu = (props: any) => {
okText="停止"
cancelText="取消"
>
<Tooltip title="停止所有的 FlinkSql 任务">
<Tooltip title="停止所有的 FlinkSql 任务,暂不可用">
{/*<Badge size="small" count={1} offset={[-5, 5]}>*/}
<Button
type="text"
icon={<PauseCircleTwoTone />}
icon={<PauseCircleTwoTone twoToneColor="#ddd" />}
/>
{/*</Badge>*/}
</Tooltip>
......@@ -213,15 +213,15 @@ const StudioMenu = (props: any) => {
<Divider type="vertical" />
<Button
type="text"
icon={<DiffTwoTone />}
icon={<DiffTwoTone twoToneColor="#ddd" />}
/>
<Button
type="text"
icon={<CopyTwoTone />}
icon={<CopyTwoTone twoToneColor="#ddd" />}
/>
<Button
type="text"
icon={<DeleteTwoTone />}
icon={<DeleteTwoTone twoToneColor="#ddd" />}
/>
</Col>
</Row>
......
......@@ -265,6 +265,7 @@ const StudioTree: React.FC<StudioTreeProps> = (props) => {
payload: e.node.path,
});
setRightClickNodeTreeItem(null);
toOpen(e.node);
};
return (
......
......@@ -112,6 +112,28 @@ export default (): React.ReactNode => {
</ul>
</Paragraph>
</Timeline.Item>
<Timeline.Item><Text code>0.1.1</Text> <Text type="secondary">2021-06-08</Text>
<p> </p>
<Paragraph>
<ul>
<li>
<Link href="">FlinkSql Studio 代码底层架构优化</Link>
</li>
<li>
<Link href="">支持以 SPI 的方式扩展任意 Connector,同 Flink 官网</Link>
</li>
<li>
<Link href="">提供了 dlink-connector-jdbc,额外支持 Oracle 和 ClickHouse 读写,该扩展包可直接上传 Flink 集群的 lib 进行远程使用,无需重新编译</Link>
</li>
<li>
<Link href="">提供了 dlink-client-1.12,支持 Flink 1.12.0+ 多集群的远程使用与本地隔离使用,1.10、1.11 和 1.13 集群可能存在问题</Link>
</li>
<li>
<Link href="">对 Studio 界面进行了一定的提示优化</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
</Timeline>
</Card>
</PageContainer>
......
......@@ -11,7 +11,8 @@
<modules>
<module>dlink-core</module>
<module>dlink-admin</module>
<module>dlink-connector</module>
<module>dlink-connectors</module>
<module>dlink-client</module>
</modules>
<properties>
......@@ -123,7 +124,12 @@
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector</artifactId>
<artifactId>dlink-client-1.12</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment