Unverified Commit dd8341b4 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Feature-704][catalog,admin] Add default mysql catalog in FlinkSQLEnv (#705)

Co-authored-by: 's avatarwenmo <32723967+wenmo@users.noreply.github.com>
parent 5126bead
package com.dlink.init;
import com.dlink.daemon.task.DaemonFactory;
import com.dlink.daemon.task.DaemonTaskConfig;
import com.dlink.job.FlinkJobTask;
import com.dlink.model.JobInstance;
import com.dlink.service.JobInstanceService;
import com.dlink.service.SysConfigService;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -14,8 +11,13 @@ import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import com.dlink.daemon.task.DaemonFactory;
import com.dlink.daemon.task.DaemonTaskConfig;
import com.dlink.job.FlinkJobTask;
import com.dlink.model.JobInstance;
import com.dlink.service.JobInstanceService;
import com.dlink.service.SysConfigService;
import com.dlink.service.TaskService;
/**
* SystemInit
......@@ -33,10 +35,13 @@ public class SystemInit implements ApplicationRunner {
private SysConfigService sysConfigService;
@Autowired
private JobInstanceService jobInstanceService;
@Autowired
private TaskService taskService;
@Override
public void run(ApplicationArguments args) throws Exception {
sysConfigService.initSysConfig();
taskService.initDefaultFlinkSQLEnv();
List<JobInstance> jobInstances = jobInstanceService.listJobInstanceActive();
List<DaemonTaskConfig> configList = new ArrayList<>();
for (JobInstance jobInstance : jobInstances) {
......
......@@ -10,7 +10,6 @@ import com.dlink.job.JobResult;
import com.dlink.model.JobInfoDetail;
import com.dlink.model.JobInstance;
import com.dlink.model.Task;
import com.dlink.model.TaskVersion;
import com.dlink.result.SqlExplainResult;
/**
......@@ -35,6 +34,8 @@ public interface TaskService extends ISuperService<Task> {
List<Task> listFlinkSQLEnv();
Task initDefaultFlinkSQLEnv();
String exportSql(Integer id);
Task getUDFByClassName(String className);
......
package com.dlink.service.impl;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts;
import com.dlink.config.Dialect;
import com.dlink.dto.*;
import com.dlink.dto.AbstractStatementDTO;
import com.dlink.dto.SessionDTO;
import com.dlink.dto.SqlDTO;
import com.dlink.dto.StudioCADTO;
import com.dlink.dto.StudioDDLDTO;
import com.dlink.dto.StudioExecuteDTO;
import com.dlink.explainer.lineage.LineageBuilder;
import com.dlink.explainer.lineage.LineageResult;
import com.dlink.gateway.GatewayType;
......@@ -21,7 +36,12 @@ import com.dlink.model.Task;
import com.dlink.result.IResult;
import com.dlink.result.SelectResult;
import com.dlink.result.SqlExplainResult;
import com.dlink.service.*;
import com.dlink.service.ClusterConfigurationService;
import com.dlink.service.ClusterService;
import com.dlink.service.DataBaseService;
import com.dlink.service.SavepointsService;
import com.dlink.service.StudioService;
import com.dlink.service.TaskService;
import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool;
......@@ -30,15 +50,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* StudioServiceImpl
......@@ -67,7 +78,7 @@ public class StudioServiceImpl implements StudioService {
if (statementDTO.isFragment() && Asserts.isNotNullString(flinkWithSql)) {
statementDTO.setStatement(flinkWithSql + "\r\n" + statementDTO.getStatement());
}
if (Asserts.isNotNull(statementDTO.getEnvId()) && statementDTO.getEnvId() != 0) {
if (Asserts.isNotNull(statementDTO.getEnvId()) && !statementDTO.getEnvId().equals(0)) {
Task task = taskService.getTaskInfoById(statementDTO.getEnvId());
if (Asserts.isNotNull(task) && Asserts.isNotNullString(task.getStatement())) {
statementDTO.setStatement(task.getStatement() + "\r\n" + statementDTO.getStatement());
......@@ -86,7 +97,7 @@ public class StudioServiceImpl implements StudioService {
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
if (Dialect.isSql(studioExecuteDTO.getDialect())) {
return executeCommonSql(SqlDTO.build(studioExecuteDTO.getStatement(),
studioExecuteDTO.getDatabaseId(), studioExecuteDTO.getMaxRowNum()));
studioExecuteDTO.getDatabaseId(), studioExecuteDTO.getMaxRowNum()));
} else {
return executeFlinkSql(studioExecuteDTO);
}
......@@ -227,15 +238,15 @@ public class StudioServiceImpl implements StudioService {
if (sessionDTO.isUseRemote()) {
Cluster cluster = clusterService.getById(sessionDTO.getClusterId());
SessionConfig sessionConfig = SessionConfig.build(
sessionDTO.getType(), true,
cluster.getId(), cluster.getAlias(),
clusterService.buildEnvironmentAddress(true, sessionDTO.getClusterId()));
sessionDTO.getType(), true,
cluster.getId(), cluster.getAlias(),
clusterService.buildEnvironmentAddress(true, sessionDTO.getClusterId()));
return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser);
} else {
SessionConfig sessionConfig = SessionConfig.build(
sessionDTO.getType(), false,
null, null,
clusterService.buildEnvironmentAddress(false, null));
sessionDTO.getType(), false,
null, null,
clusterService.buildEnvironmentAddress(false, null));
return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser);
}
}
......@@ -257,14 +268,14 @@ public class StudioServiceImpl implements StudioService {
@Override
public LineageResult getLineage(StudioCADTO studioCADTO) {
if (Asserts.isNotNullString(studioCADTO.getDialect()) && !studioCADTO.getDialect().equalsIgnoreCase("flinksql")) {
if(Asserts.isNull(studioCADTO.getDatabaseId())){
if (Asserts.isNull(studioCADTO.getDatabaseId())) {
return null;
}
DataBase dataBase = dataBaseService.getById(studioCADTO.getDatabaseId());
if (Asserts.isNull(dataBase)) {
return null;
}
if(studioCADTO.getDialect().equalsIgnoreCase("doris")){
if (studioCADTO.getDialect().equalsIgnoreCase("doris")) {
return com.dlink.explainer.sqlLineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), "mysql", dataBase.getDriverConfig());
} else {
return com.dlink.explainer.sqlLineage.LineageBuilder.getSqlLineage(studioCADTO.getStatement(), studioCADTO.getDialect().toLowerCase(), dataBase.getDriverConfig());
......
......@@ -343,6 +343,42 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return this.list(new QueryWrapper<Task>().eq("dialect", Dialect.FLINKSQLENV).eq("enabled", 1));
}
@Override
public Task initDefaultFlinkSQLEnv() {
String separator = SystemConfiguration.getInstances().getSqlSeparator();
separator = separator.replace("\\r", "\r").replace("\\n", "\n");
Task defaultFlinkSQLEnvTask = new Task();
defaultFlinkSQLEnvTask.setId(1);
defaultFlinkSQLEnvTask.setName("dlink_default_catalog");
defaultFlinkSQLEnvTask.setAlias("DefaultCatalog");
defaultFlinkSQLEnvTask.setDialect(Dialect.FLINKSQLENV.getValue());
StringBuilder sb = new StringBuilder();
sb.append("create catalog myCatalog with(\n");
sb.append(" 'type' = 'dlink_mysql',\n");
sb.append(" 'username' = '");
sb.append(username);
sb.append("',\n");
sb.append(" 'password' = '");
sb.append(password);
sb.append("',\n");
sb.append(" 'url' = '");
sb.append(url);
sb.append("'\n");
sb.append(")");
sb.append(separator);
sb.append("use catalog myCatalog");
sb.append(separator);
defaultFlinkSQLEnvTask.setStatement(sb.toString());
defaultFlinkSQLEnvTask.setFragment(true);
defaultFlinkSQLEnvTask.setEnabled(true);
saveOrUpdate(defaultFlinkSQLEnvTask);
Statement statement = new Statement();
statement.setId(1);
statement.setStatement(sb.toString());
statementService.saveOrUpdate(statement);
return defaultFlinkSQLEnvTask;
}
@Override
public String exportSql(Integer id) {
Task task = getTaskInfoById(id);
......@@ -617,7 +653,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
task.setStatement(flinkWithSql + "\r\n" + task.getStatement());
}
}
if (!isJarTask && Asserts.isNotNull(task.getEnvId()) && task.getEnvId() != 0) {
if (!isJarTask && Asserts.isNotNull(task.getEnvId()) && !task.getEnvId().equals(0)) {
Task envTask = getTaskInfoById(task.getEnvId());
if (Asserts.isNotNull(envTask) && Asserts.isNotNullString(envTask.getStatement())) {
task.setStatement(envTask.getStatement() + "\r\n" + task.getStatement());
......
......@@ -263,6 +263,27 @@
<include>dlink-app-1.15-${project.version}-jar-with-dependencies.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-catalog/dlink-catalog-mysql/dlink-catalog-mysql-1.13/target</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>dlink-catalog-mysql-1.13-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-catalog/dlink-catalog-mysql/dlink-catalog-mysql-1.14/target</directory>
<outputDirectory>extends</outputDirectory>
<includes>
<include>dlink-catalog-mysql-1.14-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-catalog/dlink-catalog-mysql/dlink-catalog-mysql-1.15/target</directory>
<outputDirectory>extends</outputDirectory>
<includes>
<include>dlink-catalog-mysql-1.15-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-client/dlink-client-base/target</directory>
<outputDirectory>jar</outputDirectory>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-catalog-mysql</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-catalog-mysql-1.13</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-flink-1.13</artifactId>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
# __
# / | ____ ___ _
# / / | / __// // / /
# /_/`_|/_/ / /_//___/
create @ 2022/6/20
*/
package com.dlink.flink.catalog;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions;
/**
* 自定义 catalog
* 检查connection done.
* 默认db,会被强制指定,不管输入的是什么,都会指定为 default_database
* 可以读取配置文件信息来获取数据库连接,而不是在sql语句中强制指定。
*/
public class DlinkMysqlCatalog extends AbstractCatalog {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
public static final String MYSQL_DRIVER = "com.mysql.cj.jdbc.Driver";
public static final String DEFAULT_DATABASE = "default_database";
static {
try {
Class.forName(MYSQL_DRIVER);
} catch (ClassNotFoundException e) {
throw new CatalogException("未加载 mysql 驱动!", e);
}
}
private static final String COMMENT = "comment";
/**
* 判断是否发生过SQL异常,如果发生过,那么conn可能失效。要注意判断
*/
private boolean SQLExceptionHappened = false;
/**
* 对象类型,例如 库、表、视图等
*/
protected static class ObjectType {
/**
* 数据库
*/
public static final String DATABASE = "database";
/**
* 数据表
*/
public static final String TABLE = "TABLE";
/**
* 视图
*/
public static final String VIEW = "VIEW";
}
/**
* 对象类型,例如 库、表、视图等
*/
protected static class ColumnType {
/**
* 物理字段
*/
public static final String PHYSICAL = "physical";
/**
* 计算字段
*/
public static final String COMPUTED = "computed";
/**
* 元数据字段
*/
public static final String METADATA = "metadata";
/**
* 水印
*/
public static final String WATERMARK = "watermark";
}
/**
* 数据库用户名
*/
private final String user;
/**
* 数据库密码
*/
private final String pwd;
/**
* 数据库连接
*/
private final String url;
/**
* 默认database
*/
private static final String defaultDatabase = "default_database";
/**
* 数据库用户名
*
* @return 数据库用户名
*/
public String getUser() {
return user;
}
/**
* 数据库密码
*
* @return 数据库密码
*/
public String getPwd() {
return pwd;
}
/**
* 数据库用户名
*
* @return 数据库用户名
*/
public String getUrl() {
return url;
}
public DlinkMysqlCatalog(String name,
String url,
String user,
String pwd) {
super(name, defaultDatabase);
this.url = url;
this.user = user;
this.pwd = pwd;
}
public DlinkMysqlCatalog(String name) {
super(name, defaultDatabase);
this.url = DlinkMysqlCatalogFactoryOptions.URL.defaultValue();
this.user = DlinkMysqlCatalogFactoryOptions.USERNAME.defaultValue();
this.pwd = DlinkMysqlCatalogFactoryOptions.PASSWORD.defaultValue();
}
@Override
public void open() throws CatalogException {
// 验证连接是否有效
// 获取默认db看看是否存在
Integer defaultDbId = getDatabaseId(defaultDatabase);
if (defaultDbId == null) {
try {
createDatabase(defaultDatabase, new CatalogDatabaseImpl(new HashMap<>(), "")
, true);
} catch (DatabaseAlreadyExistException a) {
logger.info("重复创建默认库");
}
}
}
@Override
public void close() throws CatalogException {
if (connection != null) {
try {
connection.close();
connection = null;
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("Fail to close connection.", e);
}
}
}
private Connection connection;
protected Connection getConnection() throws CatalogException {
try {
// todo: 包装一个方法用于获取连接,方便后续改造使用其他的连接生成。
// Class.forName(MYSQL_DRIVER);
if (connection == null) {
connection = DriverManager.getConnection(url, user, pwd);
}
if (SQLExceptionHappened) {
SQLExceptionHappened = false;
if (!connection.isValid(10)) {
connection.close();
}
if (connection.isClosed()) {
connection = null;
return getConnection();
}
connection = null;
return getConnection();
}
return connection;
} catch (Exception e) {
throw new CatalogException("Fail to get connection.", e);
}
}
@Override
public List<String> listDatabases() throws CatalogException {
List<String> myDatabases = new ArrayList<>();
String querySql = "SELECT database_name FROM metadata_database";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String dbName = rs.getString(1);
myDatabases.add(dbName);
}
return myDatabases;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", getName()), e);
}
}
@Override
public CatalogDatabase getDatabase(String databaseName)
throws DatabaseNotExistException, CatalogException {
String querySql = "SELECT id, database_name,description " +
" FROM metadata_database where database_name=?";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, databaseName);
ResultSet rs = ps.executeQuery();
if (rs.next()) {
int id = rs.getInt("id");
String description = rs.getString("description");
Map<String, String> map = new HashMap<>();
String sql = "select `key`,`value` " +
"from metadata_database_property " +
"where database_id=? ";
try (PreparedStatement pStat = conn.prepareStatement(sql)) {
pStat.setInt(1, id);
ResultSet prs = pStat.executeQuery();
while (prs.next()) {
map.put(rs.getString("key"), rs.getString("value"));
}
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException(
String.format("Failed get database properties in catalog %s", getName()), e);
}
return new CatalogDatabaseImpl(map, description);
} else {
throw new DatabaseNotExistException(getName(), databaseName);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException(
String.format("Failed get database in catalog %s", getName()), e);
}
}
@Override
public boolean databaseExists(String databaseName) throws CatalogException {
return getDatabaseId(databaseName) != null;
}
private Integer getDatabaseId(String databaseName) throws CatalogException {
String querySql = "select id from metadata_database where database_name=?";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, databaseName);
ResultSet rs = ps.executeQuery();
boolean multiDB = false;
Integer id = null;
while (rs.next()) {
if (!multiDB) {
id = rs.getInt(1);
multiDB = true;
} else {
throw new CatalogException("存在多个同名database: " + databaseName);
}
}
return id;
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException(String.format("获取 database 信息失败:%s.%s", getName(), databaseName), e);
}
}
@Override
public void createDatabase(String databaseName, CatalogDatabase db, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
checkNotNull(db);
if (databaseExists(databaseName)) {
if (!ignoreIfExists) {
throw new DatabaseAlreadyExistException(getName(), databaseName);
}
} else {
// 在这里实现创建库的代码
Connection conn = getConnection();
// 启动事务
String insertSql = "insert into metadata_database(database_name, description) values(?, ?)";
try (PreparedStatement stat = conn.prepareStatement(insertSql, Statement.RETURN_GENERATED_KEYS)) {
conn.setAutoCommit(false);
stat.setString(1, databaseName);
stat.setString(2, db.getComment());
stat.executeUpdate();
ResultSet idRs = stat.getGeneratedKeys();
if (idRs.next() && db.getProperties() != null && db.getProperties().size() > 0) {
int id = idRs.getInt(1);
String propInsertSql = "insert into metadata_database_property(database_id, " +
"`key`,`value`) values (?,?,?)";
PreparedStatement pstat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : db.getProperties().entrySet()) {
pstat.setInt(1, id);
pstat.setString(2, entry.getKey());
pstat.setString(3, entry.getValue());
pstat.addBatch();
}
pstat.executeBatch();
pstat.close();
}
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
logger.error("创建 database 信息失败:", e);
}
}
}
@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
if (name.equals(defaultDatabase)) {
throw new CatalogException("默认 database 不可以删除");
}
// 1、取出db id,
Integer id = getDatabaseId(name);
if (id == null) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(getName(), name);
}
return;
}
Connection conn = getConnection();
try {
conn.setAutoCommit(false);
// 查询是否有表
List<String> tables = listTables(name);
if (tables.size() > 0) {
if (!cascade) {
// 有表,不做级联删除。
throw new DatabaseNotEmptyException(getName(), name);
}
// 做级联删除
for (String table : tables) {
try {
dropTable(new ObjectPath(name, table), true);
} catch (TableNotExistException t) {
logger.warn("表{}不存在", name + "." + table);
}
}
}
// todo: 现在是真实删除,后续设计是否做记录保留。
String deletePropSql = "delete from metadata_database_property where database_id=?";
PreparedStatement dStat = conn.prepareStatement(deletePropSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
String deleteDbSql = "delete from metadata_database where database_id=?";
dStat = conn.prepareStatement(deleteDbSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("删除 database 信息失败:", e);
}
}
@Override
public void alterDatabase(String name, CatalogDatabase newDb, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
if (name.equals(defaultDatabase)) {
throw new CatalogException("默认 database 不可以修改");
}
// 1、取出db id,
Integer id = getDatabaseId(name);
if (id == null) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(getName(), name);
}
return;
}
Connection conn = getConnection();
try {
conn.setAutoCommit(false);
// 1、名称不能改,类型不能改。只能改备注
String updateCommentSql = "update metadata_database set description=? where database_id=?";
PreparedStatement uState = conn.prepareStatement(updateCommentSql);
uState.setString(1, newDb.getComment());
uState.setInt(2, id);
uState.executeUpdate();
uState.close();
if (newDb.getProperties() != null && newDb.getProperties().size() > 0) {
String upsertSql = "insert into metadata_database_property (database_id, `key`,`value`) \n" +
"values (?,?,?)\n" +
"on duplicate key update `value` =?, update_time = sysdate()\n";
PreparedStatement pstat = conn.prepareStatement(upsertSql);
for (Map.Entry<String, String> entry : newDb.getProperties().entrySet()) {
pstat.setInt(1, id);
pstat.setString(2, entry.getKey());
pstat.setString(3, entry.getValue());
pstat.setString(4, entry.getValue());
pstat.addBatch();
}
pstat.executeBatch();
}
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("修改 database 信息失败:", e);
}
}
@Override
public List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException {
return listTablesViews(databaseName, ObjectType.TABLE);
}
@Override
public List<String> listViews(String databaseName)
throws DatabaseNotExistException, CatalogException {
return listTablesViews(databaseName, ObjectType.VIEW);
}
protected List<String> listTablesViews(String databaseName, String tableType)
throws DatabaseNotExistException, CatalogException {
Integer databaseId = getDatabaseId(databaseName);
if (null == databaseId) {
throw new DatabaseNotExistException(getName(), databaseName);
}
// get all schemas
// 要给出table 或 view
String querySql = "SELECT table_name FROM metadata_table where table_type=? and database_id = ?";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, tableType);
ps.setInt(2, databaseId);
ResultSet rs = ps.executeQuery();
List<String> tables = new ArrayList<>();
while (rs.next()) {
String table = rs.getString(1);
tables.add(table);
}
return tables;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing %s in catalog %s", tableType, getName()), e);
}
}
@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
// 还是分步骤来
// 1、先取出表 这可能是view也可能是table
// 2、取出列
// 3、取出属性
Integer id = getTableId(tablePath);
if (id == null) {
throw new TableNotExistException(getName(), tablePath);
}
Connection conn = getConnection();
try {
String queryTable = "SELECT table_name " +
" ,description, table_type " +
" FROM metadata_table " +
" where id=?";
PreparedStatement ps = conn.prepareStatement(queryTable);
ps.setInt(1, id);
ResultSet rs = ps.executeQuery();
String description;
String tableType;
if (rs.next()) {
description = rs.getString("description");
tableType = rs.getString("table_type");
ps.close();
} else {
ps.close();
throw new TableNotExistException(getName(), tablePath);
}
if (tableType.equals(ObjectType.TABLE)) {
// 这个是 table
String propSql = "SELECT `key`, `value` from metadata_table_property " +
"WHERE table_id=?";
PreparedStatement pState = conn.prepareStatement(propSql);
pState.setInt(1, id);
ResultSet prs = pState.executeQuery();
Map<String, String> props = new HashMap<>();
while (prs.next()) {
String key = prs.getString("key");
String value = prs.getString("value");
props.put(key, value);
}
pState.close();
props.put(COMMENT, description);
return CatalogTable.fromProperties(props);
} else if (tableType.equals(ObjectType.VIEW)) {
// 1、从库中取出table信息。(前面已做)
// 2、取出字段。
String colSql = "SELECT column_name, column_type, data_type, description " +
" FROM metadata_column WHERE " +
" table_id=?";
PreparedStatement cStat = conn.prepareStatement(colSql);
cStat.setInt(1, id);
ResultSet crs = cStat.executeQuery();
Schema.Builder builder = Schema.newBuilder();
while (crs.next()) {
String colName = crs.getString("column_name");
String dataType = crs.getString("data_type");
builder.column(colName, dataType);
}
cStat.close();
// 3、取出query
String qSql = "SELECT `key`, value FROM metadata_table_property" +
" WHERE table_id=? ";
PreparedStatement qStat = conn.prepareStatement(qSql);
qStat.setInt(1, id);
ResultSet qrs = qStat.executeQuery();
String originalQuery = "";
String expandedQuery = "";
Map<String, String> options = new HashMap<>();
while (qrs.next()) {
String key = qrs.getString("key");
String value = qrs.getString("value");
if ("OriginalQuery".equals(key)) {
originalQuery = value;
} else if ("ExpandedQuery".equals(key)) {
expandedQuery = value;
} else {
options.put(key, value);
}
}
// 合成view
return CatalogView.of(builder.build(), description
, originalQuery, expandedQuery, options);
} else {
throw new CatalogException("不支持的数据类型。" + tableType);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("获取 表信息失败。", e);
}
}
@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
Integer id = getTableId(tablePath);
return id != null;
}
private Integer getTableId(ObjectPath tablePath) {
Integer dbId = getDatabaseId(tablePath.getDatabaseName());
if (dbId == null) {
return null;
}
// 获取id
String getIdSql = "select id from metadata_table " +
" where table_name=? and database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) {
gStat.setString(1, tablePath.getObjectName());
gStat.setInt(2, dbId);
ResultSet rs = gStat.executeQuery();
if (rs.next()) {
return rs.getInt(1);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
logger.error("get table fail", e);
throw new CatalogException("get table fail.", e);
}
return null;
}
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
Integer id = getTableId(tablePath);
if (id == null) {
throw new TableNotExistException(getName(), tablePath);
}
Connection conn = getConnection();
try {
// todo: 现在是真实删除,后续设计是否做记录保留。
conn.setAutoCommit(false);
String deletePropSql = "delete from metadata_table_property " +
" where table_id=?";
PreparedStatement dStat = conn.prepareStatement(deletePropSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
String deleteColSql = "delete from metadata_column " +
" where table_id=?";
dStat = conn.prepareStatement(deleteColSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
String deleteDbSql = "delete from metadata_table " +
" where id=?";
dStat = conn.prepareStatement(deleteDbSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
logger.error("drop table fail", e);
throw new CatalogException("drop table fail.", e);
}
}
@Override
public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException, CatalogException {
Integer id = getTableId(tablePath);
if (id == null) {
throw new TableNotExistException(getName(), tablePath);
}
ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
if (tableExists(newPath)) {
throw new TableAlreadyExistException(getName(), newPath);
}
String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=?";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(updateSql)) {
ps.setString(1, newTableName);
ps.setInt(2, id);
ps.executeUpdate();
} catch (SQLException ex) {
SQLExceptionHappened = true;
throw new CatalogException("修改表名失败", ex);
}
}
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
Integer db_id = getDatabaseId(tablePath.getDatabaseName());
if (null == db_id) {
throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
}
if (tableExists(tablePath)) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(getName(), tablePath);
}
return;
}
// 插入表
// 插入到table表。这里,它可能是table也可能是view
// 如果是一个table,我们认为它是一个 resolved table,就可以使用properties方式来进行序列化并保存。
// 如果是一个view,我们认为它只能有物理字段
if (!(table instanceof ResolvedCatalogBaseTable)) {
throw new UnsupportedOperationException("暂时不支持输入非 ResolvedCatalogBaseTable 类型的表");
}
Connection conn = getConnection();
try {
conn.setAutoCommit(false);
// 首先插入表信息
CatalogBaseTable.TableKind kind = table.getTableKind();
String insertSql = "insert into metadata_table(\n" +
" table_name," +
" table_type," +
" database_id," +
" description)" +
" values(?,?,?,?)";
PreparedStatement iStat = conn.prepareStatement(insertSql, Statement.RETURN_GENERATED_KEYS);
iStat.setString(1, tablePath.getObjectName());
iStat.setString(2, kind.toString());
iStat.setInt(3, db_id);
iStat.setString(4, table.getComment());
iStat.executeUpdate();
ResultSet idRs = iStat.getGeneratedKeys();
if (!idRs.next()) {
iStat.close();
throw new CatalogException("插入元数据表信息失败");
}
int id = idRs.getInt(1);
iStat.close();
// 插入属性和列
if (table instanceof ResolvedCatalogTable) {
// table 就可以直接拿properties了。
Map<String, String> props = ((ResolvedCatalogTable) table).toProperties();
String propInsertSql = "insert into metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?)";
PreparedStatement pStat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : props.entrySet()) {
pStat.setInt(1, id);
pStat.setString(2, entry.getKey());
pStat.setString(3, entry.getValue());
pStat.addBatch();
}
pStat.executeBatch();
pStat.close();
} else {
// view,咱先假定它只有物理字段
// view 还需要保存:query,expanded query
// 插入属性和列
ResolvedCatalogView view = (ResolvedCatalogView) table;
List<Schema.UnresolvedColumn> cols = view.getUnresolvedSchema().getColumns();
if (cols.size() > 0) {
String colInsertSql = "insert into metadata_column(" +
" column_name, column_type, data_type" +
" , `expr`" +
" , description" +
" , table_id" +
" , `primary`) " +
" values(?,?,?,?,?,?,?)";
PreparedStatement colIStat = conn.prepareStatement(colInsertSql);
for (Schema.UnresolvedColumn col : cols) {
if (col instanceof Schema.UnresolvedPhysicalColumn) {
Schema.UnresolvedPhysicalColumn pCol = (Schema.UnresolvedPhysicalColumn) col;
if (!(pCol.getDataType() instanceof DataType)) {
throw new UnsupportedOperationException(String.format(
"类型识别失败,该列不是有效类型:%s.%s.%s : %s"
, tablePath.getDatabaseName()
, tablePath.getObjectName()
, pCol.getName(),
pCol.getDataType()
));
}
DataType dataType = (DataType) pCol.getDataType();
colIStat.setString(1, pCol.getName());
colIStat.setString(2, ColumnType.PHYSICAL);
colIStat.setString(3,
dataType.getLogicalType().asSerializableString());
colIStat.setObject(4, null);
colIStat.setString(5, "");
colIStat.setInt(6, id);
colIStat.setObject(7, null); // view没有主键
colIStat.addBatch();
} else {
throw new UnsupportedOperationException("暂时认为view 不会出现 非物理字段");
}
}
colIStat.executeBatch();
colIStat.close();
// 写 query等信息到数据库
Map<String, String> option = view.getOptions();
if (option == null) {
option = new HashMap<>();
}
option.put("OriginalQuery", view.getOriginalQuery());
option.put("ExpandedQuery", view.getExpandedQuery());
String propInsertSql = "insert into metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?)";
PreparedStatement pStat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : option.entrySet()) {
pStat.setInt(1, id);
pStat.setString(2, entry.getKey());
pStat.setString(3, entry.getValue());
pStat.addBatch();
}
pStat.executeBatch();
pStat.close();
}
}
conn.commit();
} catch (SQLException ex) {
SQLExceptionHappened = true;
logger.error("插入数据库失败", ex);
throw new CatalogException("插入数据库失败", ex);
}
}
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
Integer id = getTableId(tablePath);
if (id == null) {
throw new TableNotExistException(getName(), tablePath);
}
Map<String, String> opts = newTable.getOptions();
if (opts != null && opts.size() > 0) {
String updateSql = "INSERT INTO metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?) " +
"on duplicate key update `value` =?, update_time = sysdate()";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(updateSql)) {
for (Map.Entry<String, String> entry : opts.entrySet()) {
ps.setInt(1, id);
ps.setString(2, entry.getKey());
ps.setString(3, entry.getValue());
ps.setString(4, entry.getValue());
ps.addBatch();
}
ps.executeBatch();
} catch (SQLException ex) {
SQLExceptionHappened = true;
throw new CatalogException("修改表名失败", ex);
}
}
}
/************************ partition *************************/
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
/***********************Functions**********************/
@Override
public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
Integer dbId = getDatabaseId(dbName);
if (null == dbId) {
throw new DatabaseNotExistException(getName(), dbName);
}
String querySql = "SELECT function_name from metadata_function " +
" WHERE database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(querySql)) {
gStat.setInt(1, dbId);
ResultSet rs = gStat.executeQuery();
List<String> functions = new ArrayList<>();
while (rs.next()) {
String n = rs.getString("function_name");
functions.add(n);
}
return functions;
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("获取 UDF 列表失败");
}
}
@Override
public CatalogFunction getFunction(ObjectPath functionPath)
throws FunctionNotExistException, CatalogException {
Integer id = getFunctionId(functionPath);
if (null == id) {
throw new FunctionNotExistException(getName(), functionPath);
}
String querySql = "SELECT class_name,function_language from metadata_function " +
" WHERE id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(querySql)) {
gStat.setInt(1, id);
ResultSet rs = gStat.executeQuery();
if (rs.next()) {
String className = rs.getString("class_name");
String language = rs.getString("function_language");
CatalogFunctionImpl func = new CatalogFunctionImpl(className, FunctionLanguage.valueOf(language));
return func;
} else {
throw new FunctionNotExistException(getName(), functionPath);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("获取 UDF 失败:"
+ functionPath.getDatabaseName() + "."
+ functionPath.getObjectName());
}
}
@Override
public boolean functionExists(ObjectPath functionPath) throws CatalogException {
Integer id = getFunctionId(functionPath);
return id != null;
}
private Integer getFunctionId(ObjectPath functionPath) {
Integer dbId = getDatabaseId(functionPath.getDatabaseName());
if (dbId == null) {
return null;
}
// 获取id
String getIdSql = "select id from metadata_function " +
" where function_name=? and database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) {
gStat.setString(1, functionPath.getObjectName());
gStat.setInt(2, dbId);
ResultSet rs = gStat.executeQuery();
if (rs.next()) {
int id = rs.getInt(1);
return id;
}
} catch (SQLException e) {
SQLExceptionHappened = true;
logger.error("get function fail", e);
throw new CatalogException("get function fail.", e);
}
return null;
}
@Override
public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
Integer dbId = getDatabaseId(functionPath.getDatabaseName());
if (null == dbId) {
throw new DatabaseNotExistException(getName(), functionPath.getDatabaseName());
}
if (functionExists(functionPath)) {
if (!ignoreIfExists) {
throw new FunctionAlreadyExistException(getName(), functionPath);
}
}
Connection conn = getConnection();
String insertSql = "Insert into metadata_function " +
"(function_name,class_name,database_id,function_language) " +
" values (?,?,?,?)";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setString(1, functionPath.getObjectName());
ps.setString(2, function.getClassName());
ps.setInt(3, dbId);
ps.setString(4, function.getFunctionLanguage().toString());
ps.executeUpdate();
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("创建 函数 失败", e);
}
}
@Override
public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException {
Integer id = getFunctionId(functionPath);
if (null == id) {
if (!ignoreIfNotExists) {
throw new FunctionNotExistException(getName(), functionPath);
}
return;
}
Connection conn = getConnection();
String insertSql = "update metadata_function " +
"set (class_name =?, function_language=?) " +
" where id=?";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setString(1, newFunction.getClassName());
ps.setString(2, newFunction.getFunctionLanguage().toString());
ps.setInt(3, id);
ps.executeUpdate();
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("修改 函数 失败", e);
}
}
@Override
public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException {
Integer id = getFunctionId(functionPath);
if (null == id) {
if (!ignoreIfNotExists) {
throw new FunctionNotExistException(getName(), functionPath);
}
return;
}
Connection conn = getConnection();
String insertSql = "delete from metadata_function " +
" where id=?";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setInt(1, id);
ps.executeUpdate();
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("删除 函数 失败", e);
}
}
@Override
public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
// todo: 补充完成该方法。
checkNotNull(tablePath);
if (!tableExists(tablePath)) {
throw new TableNotExistException(getName(), tablePath);
}
/*if (!isPartitionedTable(tablePath)) {
CatalogTableStatistics result = tableStats.get(tablePath);
return result != null ? result.copy() : CatalogTableStatistics.UNKNOWN;
} else {
return CatalogTableStatistics.UNKNOWN;
}*/
return CatalogTableStatistics.UNKNOWN;
}
@Override
public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
// todo: 补充完成该方法。
checkNotNull(tablePath);
if (!tableExists(tablePath)) {
throw new TableNotExistException(getName(), tablePath);
}
// CatalogColumnStatistics result = tableColumnStats.get(tablePath);
// return result != null ? result.copy() : CatalogColumnStatistics.UNKNOWN;
return CatalogColumnStatistics.UNKNOWN;
}
@Override
public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException, TablePartitionedException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dlink.flink.catalog.factory;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.PASSWORD;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.URL;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.USERNAME;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
/**
* Factory for {@link DlinkMysqlCatalog}.
*/
public class DlinkMysqlCatalogFactory implements CatalogFactory {
@Override
public String factoryIdentifier() {
return DlinkMysqlCatalogFactoryOptions.IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(USERNAME);
options.add(PASSWORD);
options.add(URL);
options.add(PROPERTY_VERSION);
return options;
}
@Override
public Catalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validate();
return new DlinkMysqlCatalog(
context.getName(),
helper.getOptions().get(URL),
helper.getOptions().get(USERNAME),
helper.getOptions().get(PASSWORD));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dlink.flink.catalog.factory;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
/**
* {@link ConfigOption}s for {@link DlinkMysqlCatalog}.
*/
@Internal
public class DlinkMysqlCatalogFactoryOptions {
public static final String IDENTIFIER = "dlink_mysql";
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue();
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue();
public static final ConfigOption<String> URL = ConfigOptions.key("url").stringType().noDefaultValue();
private DlinkMysqlCatalogFactoryOptions() {
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactory
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactory
......@@ -6,28 +6,62 @@
# /_/`_|/_/ / /_//___/
create @ 2022/6/20
*/
package com.dlink.flink.catalog;
import com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.exceptions.*;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions;
/**
* 自定义 catalog
......@@ -173,7 +207,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
if (defaultDbId == null) {
try {
createDatabase(defaultDatabase, new CatalogDatabaseImpl(new HashMap<>(), "")
, true);
, true);
} catch (DatabaseAlreadyExistException a) {
logger.info("重复创建默认库");
}
......@@ -237,15 +271,15 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return myDatabases;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", getName()), e);
String.format("Failed listing database in catalog %s", getName()), e);
}
}
@Override
public CatalogDatabase getDatabase(String databaseName)
throws DatabaseNotExistException, CatalogException {
throws DatabaseNotExistException, CatalogException {
String querySql = "SELECT id, database_name,description " +
" FROM metadata_database where database_name=?";
" FROM metadata_database where database_name=?";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, databaseName);
......@@ -258,8 +292,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Map<String, String> map = new HashMap<>();
String sql = "select `key`,`value` " +
"from metadata_database_property " +
"where database_id=? ";
"from metadata_database_property " +
"where database_id=? ";
try (PreparedStatement pStat = conn.prepareStatement(sql)) {
pStat.setInt(1, id);
ResultSet prs = pStat.executeQuery();
......@@ -269,7 +303,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException(
String.format("Failed get database properties in catalog %s", getName()), e);
String.format("Failed get database properties in catalog %s", getName()), e);
}
return new CatalogDatabaseImpl(map, description);
......@@ -279,7 +313,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException(
String.format("Failed get database in catalog %s", getName()), e);
String.format("Failed get database in catalog %s", getName()), e);
}
}
......@@ -313,7 +347,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void createDatabase(String databaseName, CatalogDatabase db, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
throws DatabaseAlreadyExistException, CatalogException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
checkNotNull(db);
......@@ -336,7 +370,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
if (idRs.next() && db.getProperties() != null && db.getProperties().size() > 0) {
int id = idRs.getInt(1);
String propInsertSql = "insert into metadata_database_property(database_id, " +
"`key`,`value`) values (?,?,?)";
"`key`,`value`) values (?,?,?)";
PreparedStatement pstat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : db.getProperties().entrySet()) {
pstat.setInt(1, id);
......@@ -357,7 +391,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
if (name.equals(defaultDatabase)) {
throw new CatalogException("默认 database 不可以删除");
}
......@@ -408,7 +442,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void alterDatabase(String name, CatalogDatabase newDb, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
throws DatabaseNotExistException, CatalogException {
if (name.equals(defaultDatabase)) {
throw new CatalogException("默认 database 不可以修改");
}
......@@ -432,8 +466,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
uState.close();
if (newDb.getProperties() != null && newDb.getProperties().size() > 0) {
String upsertSql = "insert into metadata_database_property (database_id, `key`,`value`) \n" +
"values (?,?,?)\n" +
"on duplicate key update `value` =?, update_time = sysdate()\n";
"values (?,?,?)\n" +
"on duplicate key update `value` =?, update_time = sysdate()\n";
PreparedStatement pstat = conn.prepareStatement(upsertSql);
for (Map.Entry<String, String> entry : newDb.getProperties().entrySet()) {
pstat.setInt(1, id);
......@@ -454,18 +488,18 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException {
throws DatabaseNotExistException, CatalogException {
return listTablesViews(databaseName, ObjectType.TABLE);
}
@Override
public List<String> listViews(String databaseName)
throws DatabaseNotExistException, CatalogException {
throws DatabaseNotExistException, CatalogException {
return listTablesViews(databaseName, ObjectType.VIEW);
}
protected List<String> listTablesViews(String databaseName, String tableType)
throws DatabaseNotExistException, CatalogException {
throws DatabaseNotExistException, CatalogException {
Integer databaseId = getDatabaseId(databaseName);
if (null == databaseId) {
throw new DatabaseNotExistException(getName(), databaseName);
......@@ -489,7 +523,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return tables;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing %s in catalog %s", tableType, getName()), e);
String.format("Failed listing %s in catalog %s", tableType, getName()), e);
}
}
......@@ -508,9 +542,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Connection conn = getConnection();
try {
String queryTable = "SELECT table_name " +
" ,description, table_type " +
" FROM metadata_table " +
" where id=?";
" ,description, table_type " +
" FROM metadata_table " +
" where id=?";
PreparedStatement ps = conn.prepareStatement(queryTable);
ps.setInt(1, id);
ResultSet rs = ps.executeQuery();
......@@ -527,7 +561,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
if (tableType.equals(ObjectType.TABLE)) {
// 这个是 table
String propSql = "SELECT `key`, `value` from metadata_table_property " +
"WHERE table_id=?";
"WHERE table_id=?";
PreparedStatement pState = conn.prepareStatement(propSql);
pState.setInt(1, id);
ResultSet prs = pState.executeQuery();
......@@ -544,8 +578,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
// 1、从库中取出table信息。(前面已做)
// 2、取出字段。
String colSql = "SELECT column_name, column_type, data_type, description " +
" FROM metadata_column WHERE " +
" table_id=?";
" FROM metadata_column WHERE " +
" table_id=?";
PreparedStatement cStat = conn.prepareStatement(colSql);
cStat.setInt(1, id);
ResultSet crs = cStat.executeQuery();
......@@ -564,7 +598,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
cStat.close();
// 3、取出query
String qSql = "SELECT `key`, value FROM metadata_table_property" +
" WHERE table_id=? ";
" WHERE table_id=? ";
PreparedStatement qStat = conn.prepareStatement(qSql);
qStat.setInt(1, id);
ResultSet qrs = qStat.executeQuery();
......@@ -584,7 +618,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
// 合成view
return CatalogView.of(builder.build(), description
, originalQuery, expandedQuery, options);
, originalQuery, expandedQuery, options);
} else {
throw new CatalogException("不支持的数据类型。" + tableType);
}
......@@ -608,7 +642,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
// 获取id
String getIdSql = "select id from metadata_table " +
" where table_name=? and database_id=?";
" where table_name=? and database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) {
gStat.setString(1, tablePath.getObjectName());
......@@ -627,7 +661,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throws TableNotExistException, CatalogException {
Integer id = getTableId(tablePath);
if (id == null) {
......@@ -638,19 +672,19 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
// todo: 现在是真实删除,后续设计是否做记录保留。
conn.setAutoCommit(false);
String deletePropSql = "delete from metadata_table_property " +
" where table_id=?";
" where table_id=?";
PreparedStatement dStat = conn.prepareStatement(deletePropSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
String deleteColSql = "delete from metadata_column " +
" where table_id=?";
" where table_id=?";
dStat = conn.prepareStatement(deleteColSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
String deleteDbSql = "delete from metadata_table " +
" where id=?";
" where id=?";
dStat = conn.prepareStatement(deleteDbSql);
dStat.setInt(1, id);
dStat.executeUpdate();
......@@ -665,7 +699,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException, CatalogException {
throws TableNotExistException, TableAlreadyExistException, CatalogException {
Integer id = getTableId(tablePath);
if (id == null) {
......@@ -689,7 +723,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
Integer db_id = getDatabaseId(tablePath.getDatabaseName());
if (null == db_id) {
throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
......@@ -714,11 +748,11 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
CatalogBaseTable.TableKind kind = table.getTableKind();
String insertSql = "insert into metadata_table(\n" +
" table_name," +
" table_type," +
" database_id," +
" description)" +
" values(?,?,?,?)";
" table_name," +
" table_type," +
" database_id," +
" description)" +
" values(?,?,?,?)";
PreparedStatement iStat = conn.prepareStatement(insertSql, Statement.RETURN_GENERATED_KEYS);
iStat.setString(1, tablePath.getObjectName());
iStat.setString(2, kind.toString());
......@@ -737,7 +771,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
// table 就可以直接拿properties了。
Map<String, String> props = ((ResolvedCatalogTable) table).toProperties();
String propInsertSql = "insert into metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?)";
"`key`,`value`) values (?,?,?)";
PreparedStatement pStat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : props.entrySet()) {
pStat.setInt(1, id);
......@@ -755,23 +789,23 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
List<Schema.UnresolvedColumn> cols = view.getUnresolvedSchema().getColumns();
if (cols.size() > 0) {
String colInsertSql = "insert into metadata_column(" +
" column_name, column_type, data_type" +
" , `expr`" +
" , description" +
" , table_id" +
" , `primary`) " +
" values(?,?,?,?,?,?,?)";
" column_name, column_type, data_type" +
" , `expr`" +
" , description" +
" , table_id" +
" , `primary`) " +
" values(?,?,?,?,?,?,?)";
PreparedStatement colIStat = conn.prepareStatement(colInsertSql);
for (Schema.UnresolvedColumn col : cols) {
if (col instanceof Schema.UnresolvedPhysicalColumn) {
Schema.UnresolvedPhysicalColumn pCol = (Schema.UnresolvedPhysicalColumn) col;
if (!(pCol.getDataType() instanceof DataType)) {
throw new UnsupportedOperationException(String.format(
"类型识别失败,该列不是有效类型:%s.%s.%s : %s"
, tablePath.getDatabaseName()
, tablePath.getObjectName()
, pCol.getName(),
pCol.getDataType()
"类型识别失败,该列不是有效类型:%s.%s.%s : %s"
, tablePath.getDatabaseName()
, tablePath.getObjectName()
, pCol.getName(),
pCol.getDataType()
));
}
DataType dataType = (DataType) pCol.getDataType();
......@@ -779,7 +813,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
colIStat.setString(1, pCol.getName());
colIStat.setString(2, ColumnType.PHYSICAL);
colIStat.setString(3,
dataType.getLogicalType().asSerializableString());
dataType.getLogicalType().asSerializableString());
colIStat.setObject(4, null);
colIStat.setString(5, pCol.getComment().orElse(""));
colIStat.setInt(6, id);
......@@ -800,7 +834,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
option.put("OriginalQuery", view.getOriginalQuery());
option.put("ExpandedQuery", view.getExpandedQuery());
String propInsertSql = "insert into metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?)";
"`key`,`value`) values (?,?,?)";
PreparedStatement pStat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : option.entrySet()) {
pStat.setInt(1, id);
......@@ -822,7 +856,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throws TableNotExistException, CatalogException {
Integer id = getTableId(tablePath);
if (id == null) {
......@@ -832,8 +866,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Map<String, String> opts = newTable.getOptions();
if (opts != null && opts.size() > 0) {
String updateSql = "INSERT INTO metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?) " +
"on duplicate key update `value` =?, update_time = sysdate()";
"`key`,`value`) values (?,?,?) " +
"on duplicate key update `value` =?, update_time = sysdate()";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(updateSql)) {
for (Map.Entry<String, String> entry : opts.entrySet()) {
......@@ -854,41 +888,42 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
/************************ partition *************************/
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
throws TableNotExistException, TableNotPartitionedException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
throws TableNotExistException, TableNotPartitionedException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException {
throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws CatalogException {
throws CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
......@@ -914,7 +949,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
throw new DatabaseNotExistException(getName(), dbName);
}
String querySql = "SELECT function_name from metadata_function " +
" WHERE database_id=?";
" WHERE database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(querySql)) {
......@@ -934,14 +969,14 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public CatalogFunction getFunction(ObjectPath functionPath)
throws FunctionNotExistException, CatalogException {
throws FunctionNotExistException, CatalogException {
Integer id = getFunctionId(functionPath);
if (null == id) {
throw new FunctionNotExistException(getName(), functionPath);
}
String querySql = "SELECT class_name,function_language from metadata_function " +
" WHERE id=?";
" WHERE id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(querySql)) {
gStat.setInt(1, id);
......@@ -957,8 +992,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("获取 UDF 失败:"
+ functionPath.getDatabaseName() + "."
+ functionPath.getObjectName());
+ functionPath.getDatabaseName() + "."
+ functionPath.getObjectName());
}
}
......@@ -975,7 +1010,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
// 获取id
String getIdSql = "select id from metadata_function " +
" where function_name=? and database_id=?";
" where function_name=? and database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) {
gStat.setString(1, functionPath.getObjectName());
......@@ -995,7 +1030,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
Integer dbId = getDatabaseId(functionPath.getDatabaseName());
if (null == dbId) {
throw new DatabaseNotExistException(getName(), functionPath.getDatabaseName());
......@@ -1008,8 +1043,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Connection conn = getConnection();
String insertSql = "Insert into metadata_function " +
"(function_name,class_name,database_id,function_language) " +
" values (?,?,?,?)";
"(function_name,class_name,database_id,function_language) " +
" values (?,?,?,?)";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setString(1, functionPath.getObjectName());
ps.setString(2, function.getClassName());
......@@ -1024,7 +1059,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException {
throws FunctionNotExistException, CatalogException {
Integer id = getFunctionId(functionPath);
if (null == id) {
if (!ignoreIfNotExists) {
......@@ -1035,8 +1070,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Connection conn = getConnection();
String insertSql = "update metadata_function " +
"set (class_name =?, function_language=?) " +
" where id=?";
"set (class_name =?, function_language=?) " +
" where id=?";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setString(1, newFunction.getClassName());
ps.setString(2, newFunction.getFunctionLanguage().toString());
......@@ -1050,7 +1085,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
@Override
public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException {
throws FunctionNotExistException, CatalogException {
Integer id = getFunctionId(functionPath);
if (null == id) {
if (!ignoreIfNotExists) {
......@@ -1061,7 +1096,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Connection conn = getConnection();
String insertSql = "delete from metadata_function " +
" where id=?";
" where id=?";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setInt(1, id);
ps.executeUpdate();
......@@ -1121,19 +1156,22 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
@Override
public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException, TablePartitionedException {
public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException, TablePartitionedException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
......
......@@ -18,7 +18,11 @@
package com.dlink.flink.catalog.factory;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.PASSWORD;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.URL;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.USERNAME;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
......@@ -27,10 +31,11 @@ import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.*;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
/** Factory for {@link DlinkMysqlCatalog}. */
/**
* Factory for {@link DlinkMysqlCatalog}.
*/
public class DlinkMysqlCatalogFactory implements CatalogFactory {
@Override
......@@ -57,13 +62,13 @@ public class DlinkMysqlCatalogFactory implements CatalogFactory {
@Override
public Catalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validate();
return new DlinkMysqlCatalog(
context.getName(),
helper.getOptions().get(URL),
helper.getOptions().get(USERNAME),
helper.getOptions().get(PASSWORD));
context.getName(),
helper.getOptions().get(URL),
helper.getOptions().get(USERNAME),
helper.getOptions().get(PASSWORD));
}
}
......@@ -18,18 +18,11 @@
package com.dlink.flink.catalog.factory;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
/**
* {@link ConfigOption}s for {@link DlinkMysqlCatalog}.
......@@ -37,55 +30,13 @@ import java.util.Properties;
@Internal
public class DlinkMysqlCatalogFactoryOptions {
public static final String IDENTIFIER = "dlink_mysql_catalog";
public static final ConfigOption<String> USERNAME; // =
// ConfigOptions.key("mysql-catalog-username").stringType().noDefaultValue();
public static final ConfigOption<String> PASSWORD; // =
// ConfigOptions.key("mysql-catalog-password").stringType().noDefaultValue();
public static final ConfigOption<String> URL; // =
// ConfigOptions.key("mysql-catalog-url").stringType().noDefaultValue();
public static final String IDENTIFIER = "dlink_mysql";
public static final String prefix = "dlink-mysql-catalog";
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue();
static {
try {
String configPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
if (!configPath.endsWith("/")) {
configPath = configPath + "/";
}
configPath = configPath + addPrefix(".properties");
File propFile = new File(configPath);
if (!propFile.exists()) {
throw new CatalogException("配置文件不存在!");
}
InputStream is = new FileInputStream(propFile);
Properties props = new Properties();
props.load(is);
String username = props.getProperty(addPrefix("-username"));
USERNAME = ConfigOptions.key(addPrefix("-username"))
.stringType()
.defaultValue(username);
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue();
String password = props.getProperty(addPrefix("-password"));
PASSWORD = ConfigOptions.key(addPrefix("-password"))
.stringType()
.defaultValue(password);
String url = props.getProperty(addPrefix("-url"));
URL = ConfigOptions.key(addPrefix("-url"))
.stringType()
.defaultValue(url);
} catch (Exception e) {
throw new CatalogException("获取配置信息失败!", e);
}
}
private static String addPrefix(String key) {
return prefix + key;
}
public static final ConfigOption<String> URL = ConfigOptions.key("url").stringType().noDefaultValue();
private DlinkMysqlCatalogFactoryOptions() {
}
......
# catalog数据库用户名
dlink-mysql-catalog-username=root
# catalog 数据库密码
dlink-mysql-catalog-password=123456
# catalog 数据库url
dlink-mysql-catalog-url=jdbc:mysql://localhost:3306/flink_metastore?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC
\ No newline at end of file
......@@ -6,134 +6,53 @@
# /_/`_|/_/ / /_//___/
create @ 2022/7/9
*/
package com.dlink.flink.catalog;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.junit.Before;
import org.junit.Test;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
public class DlinkMysqlCatalogTest {
protected static String url;
protected static DlinkMysqlCatalog catalog;
protected static final String TEST_CATALOG_NAME = "mysql-catalog";
protected static final String TEST_USERNAME = "flink_metastore";
protected static final String TEST_PWD = "flink_metastore";
protected static final String TEST_CATALOG_NAME = "dlink";
protected static final String TEST_USERNAME = "dlink";
protected static final String TEST_PWD = "dlink";
private TableEnvironment tableEnv;
@Before
public void setup(){
url = "jdbc:mysql://localhost:3306/flink_metastore?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC";
public void setup() {
url = "jdbc:mysql://127.0.0.1:3306/dlink?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC";
catalog =
new DlinkMysqlCatalog(
TEST_CATALOG_NAME,
url,
TEST_USERNAME,
TEST_PWD);
new DlinkMysqlCatalog(
TEST_CATALOG_NAME,
url,
TEST_USERNAME,
TEST_PWD);
this.tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tableEnv.getConfig()
.getConfiguration()
.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
.getConfiguration()
.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
}
@Test
public void testSqlCatalog(){
public void testSqlCatalog() {
String createSql = "create catalog myCatalog \n" +
" with('type'='dlink_mysql_catalog',\n" +
" 'mysql-catalog-username'='flink_metastore',\n" +
" 'mysql-catalog-password'='flink_metastore',\n" +
" 'mysql-catalog-url'='jdbc:mysql://localhost:3306/" +
"flink_metastore?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC')";
" with('type'='dlink_mysql',\n" +
" 'username'='dlink',\n" +
" 'password'='dlink',\n" +
" 'url'='jdbc:mysql://127.0.0.1:3306/" +
"dlink?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC')";
tableEnv.executeSql(createSql);
tableEnv.executeSql("use catalog myCatalog");
}
@Test
public void test() {
// 这个 test 依赖个人环境,直接保留会导致打包不通过。但是展示了用法。
/*//1\. 获取上下文环境 table的环境
// use mysql-catalog
tableEnv.registerCatalog(DlinkMysqlCatalogFactoryOptions.IDENTIFIER, catalog);
tableEnv.useCatalog(DlinkMysqlCatalogFactoryOptions.IDENTIFIER);
//2\. 读取score.csv
String csvFile = "D:\\code\\test\\mycatalog\\src\\test\\resources\\score.csv";
String createTable = "CREATE TABLE IF NOT EXISTS player_data\n" +
"( season varchar,\n" +
" player varchar,\n" +
" play_num varchar,\n" +
" first_court int,\n" +
" `time` double,\n" +
" assists double,\n" +
" steals double,\n" +
" blocks double,\n" +
" scores double\n" +
") WITH ( \n" +
" 'connector' = 'filesystem',\n" +
" 'path' = '" + csvFile + " ',\n" +
" 'format' = 'csv'\n" +
")";
tableEnv.executeSql(createTable);
String createView= "CREATE VIEW IF NOT EXISTS test_view " +
" (player, play_num" +
" ,sumaabb)" +
" COMMENT 'test view' " +
" AS SELECT player, play_num, assists + steals as sumaabb FROM player_data";
tableEnv.executeSql(createView);
String createSinkTable = "CREATE TABLE IF NOT EXISTS mysql_player_from_view\n" +
"( " +
" player varchar,\n" +
" play_num varchar,\n" +
" sumaabb double\n" +
") WITH ( \n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/a01_rep_db',\n" +
" 'table-name' = 'mysql_player_from_view',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456'\n" +
")";
tableEnv.executeSql(createSinkTable);
tableEnv.executeSql("Insert into mysql_player_from_view\n" +
"SELECT \n" +
"player ,\n" +
" play_num ,\n" +
" sumaabb \n" +
"FROM test_view");
List<Row> results =
CollectionUtil.iteratorToList(
tableEnv.sqlQuery("select * from mysql_player_from_view")
.execute()
.collect());
List<Row> tresults =
CollectionUtil.iteratorToList(
tableEnv.sqlQuery("select * from test_view")
.execute()
.collect());
List<Row> presults =
CollectionUtil.iteratorToList(
tableEnv.sqlQuery("select * from player_data")
.execute()
.collect());
*/
}
}
......@@ -6,42 +6,45 @@
# /_/`_|/_/ / /_//___/
create @ 2022/7/9
*/
package com.dlink.flink.catalog.com.dlink.flink.catalog.factory;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
import com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.factories.FactoryUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.BeforeClass;
import org.junit.Test;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
import com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions;
public class DlinkMysqlCatalogFactoryTest {
protected static String url;
protected static DlinkMysqlCatalog catalog;
protected static final String TEST_CATALOG_NAME = "mysql-catalog";
protected static final String TEST_USERNAME = "flink_metastore";
protected static final String TEST_PWD = "flink_metastore";
protected static final String TEST_CATALOG_NAME = "dlink";
protected static final String TEST_USERNAME = "dlink";
protected static final String TEST_PWD = "dlink";
@BeforeClass
public static void setup() throws SQLException {
url = "jdbc:mysql://localhost:3306/flink_metastore?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC";
url = "jdbc:mysql://10.1.51.25:3306/dlink?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC";
catalog =
new DlinkMysqlCatalog(
TEST_CATALOG_NAME,
TEST_USERNAME,
TEST_PWD,
url);
new DlinkMysqlCatalog(
TEST_CATALOG_NAME,
url,
TEST_USERNAME,
TEST_PWD);
}
@Test
......@@ -53,15 +56,15 @@ public class DlinkMysqlCatalogFactoryTest {
options.put(DlinkMysqlCatalogFactoryOptions.URL.key(), url);
final Catalog actualCatalog =
FactoryUtil.createCatalog(
TEST_CATALOG_NAME,
options,
null,
Thread.currentThread().getContextClassLoader());
FactoryUtil.createCatalog(
TEST_CATALOG_NAME,
options,
null,
Thread.currentThread().getContextClassLoader());
checkEquals(catalog, (DlinkMysqlCatalog) actualCatalog);
assertTrue( actualCatalog instanceof DlinkMysqlCatalog);
assertTrue(actualCatalog instanceof DlinkMysqlCatalog);
}
private static void checkEquals(DlinkMysqlCatalog c1, DlinkMysqlCatalog c2) {
......
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactory
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-catalog-mysql</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-catalog-mysql-1.15</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-flink-1.15</artifactId>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
# __
# / | ____ ___ _
# / / | / __// // / /
# /_/`_|/_/ / /_//___/
create @ 2022/6/20
*/
package com.dlink.flink.catalog;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions;
/**
* 自定义 catalog
* 检查connection done.
* 默认db,会被强制指定,不管输入的是什么,都会指定为 default_database
* 可以读取配置文件信息来获取数据库连接,而不是在sql语句中强制指定。
*/
public class DlinkMysqlCatalog extends AbstractCatalog {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
public static final String MYSQL_DRIVER = "com.mysql.cj.jdbc.Driver";
public static final String DEFAULT_DATABASE = "default_database";
static {
try {
Class.forName(MYSQL_DRIVER);
} catch (ClassNotFoundException e) {
throw new CatalogException("未加载 mysql 驱动!", e);
}
}
private static final String COMMENT = "comment";
/**
* 判断是否发生过SQL异常,如果发生过,那么conn可能失效。要注意判断
*/
private boolean SQLExceptionHappened = false;
/**
* 对象类型,例如 库、表、视图等
*/
protected static class ObjectType {
/**
* 数据库
*/
public static final String DATABASE = "database";
/**
* 数据表
*/
public static final String TABLE = "TABLE";
/**
* 视图
*/
public static final String VIEW = "VIEW";
}
/**
* 对象类型,例如 库、表、视图等
*/
protected static class ColumnType {
/**
* 物理字段
*/
public static final String PHYSICAL = "physical";
/**
* 计算字段
*/
public static final String COMPUTED = "computed";
/**
* 元数据字段
*/
public static final String METADATA = "metadata";
/**
* 水印
*/
public static final String WATERMARK = "watermark";
}
/**
* 数据库用户名
*/
private final String user;
/**
* 数据库密码
*/
private final String pwd;
/**
* 数据库连接
*/
private final String url;
/**
* 默认database
*/
private static final String defaultDatabase = "default_database";
/**
* 数据库用户名
*
* @return 数据库用户名
*/
public String getUser() {
return user;
}
/**
* 数据库密码
*
* @return 数据库密码
*/
public String getPwd() {
return pwd;
}
/**
* 数据库用户名
*
* @return 数据库用户名
*/
public String getUrl() {
return url;
}
public DlinkMysqlCatalog(String name,
String url,
String user,
String pwd) {
super(name, defaultDatabase);
this.url = url;
this.user = user;
this.pwd = pwd;
}
public DlinkMysqlCatalog(String name) {
super(name, defaultDatabase);
this.url = DlinkMysqlCatalogFactoryOptions.URL.defaultValue();
this.user = DlinkMysqlCatalogFactoryOptions.USERNAME.defaultValue();
this.pwd = DlinkMysqlCatalogFactoryOptions.PASSWORD.defaultValue();
}
@Override
public void open() throws CatalogException {
// 验证连接是否有效
// 获取默认db看看是否存在
Integer defaultDbId = getDatabaseId(defaultDatabase);
if (defaultDbId == null) {
try {
createDatabase(defaultDatabase, new CatalogDatabaseImpl(new HashMap<>(), "")
, true);
} catch (DatabaseAlreadyExistException a) {
logger.info("重复创建默认库");
}
}
}
@Override
public void close() throws CatalogException {
if (connection != null) {
try {
connection.close();
connection = null;
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("Fail to close connection.", e);
}
}
}
private Connection connection;
protected Connection getConnection() throws CatalogException {
try {
// todo: 包装一个方法用于获取连接,方便后续改造使用其他的连接生成。
// Class.forName(MYSQL_DRIVER);
if (connection == null) {
connection = DriverManager.getConnection(url, user, pwd);
}
if (SQLExceptionHappened) {
SQLExceptionHappened = false;
if (!connection.isValid(10)) {
connection.close();
}
if (connection.isClosed()) {
connection = null;
return getConnection();
}
connection = null;
return getConnection();
}
return connection;
} catch (Exception e) {
throw new CatalogException("Fail to get connection.", e);
}
}
@Override
public List<String> listDatabases() throws CatalogException {
List<String> myDatabases = new ArrayList<>();
String querySql = "SELECT database_name FROM metadata_database";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String dbName = rs.getString(1);
myDatabases.add(dbName);
}
return myDatabases;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", getName()), e);
}
}
@Override
public CatalogDatabase getDatabase(String databaseName)
throws DatabaseNotExistException, CatalogException {
String querySql = "SELECT id, database_name,description " +
" FROM metadata_database where database_name=?";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, databaseName);
ResultSet rs = ps.executeQuery();
if (rs.next()) {
int id = rs.getInt("id");
String description = rs.getString("description");
Map<String, String> map = new HashMap<>();
String sql = "select `key`,`value` " +
"from metadata_database_property " +
"where database_id=? ";
try (PreparedStatement pStat = conn.prepareStatement(sql)) {
pStat.setInt(1, id);
ResultSet prs = pStat.executeQuery();
while (prs.next()) {
map.put(rs.getString("key"), rs.getString("value"));
}
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException(
String.format("Failed get database properties in catalog %s", getName()), e);
}
return new CatalogDatabaseImpl(map, description);
} else {
throw new DatabaseNotExistException(getName(), databaseName);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException(
String.format("Failed get database in catalog %s", getName()), e);
}
}
@Override
public boolean databaseExists(String databaseName) throws CatalogException {
return getDatabaseId(databaseName) != null;
}
private Integer getDatabaseId(String databaseName) throws CatalogException {
String querySql = "select id from metadata_database where database_name=?";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, databaseName);
ResultSet rs = ps.executeQuery();
boolean multiDB = false;
Integer id = null;
while (rs.next()) {
if (!multiDB) {
id = rs.getInt(1);
multiDB = true;
} else {
throw new CatalogException("存在多个同名database: " + databaseName);
}
}
return id;
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException(String.format("获取 database 信息失败:%s.%s", getName(), databaseName), e);
}
}
@Override
public void createDatabase(String databaseName, CatalogDatabase db, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
checkNotNull(db);
if (databaseExists(databaseName)) {
if (!ignoreIfExists) {
throw new DatabaseAlreadyExistException(getName(), databaseName);
}
} else {
// 在这里实现创建库的代码
Connection conn = getConnection();
// 启动事务
String insertSql = "insert into metadata_database(database_name, description) values(?, ?)";
try (PreparedStatement stat = conn.prepareStatement(insertSql, Statement.RETURN_GENERATED_KEYS)) {
conn.setAutoCommit(false);
stat.setString(1, databaseName);
stat.setString(2, db.getComment());
stat.executeUpdate();
ResultSet idRs = stat.getGeneratedKeys();
if (idRs.next() && db.getProperties() != null && db.getProperties().size() > 0) {
int id = idRs.getInt(1);
String propInsertSql = "insert into metadata_database_property(database_id, " +
"`key`,`value`) values (?,?,?)";
PreparedStatement pstat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : db.getProperties().entrySet()) {
pstat.setInt(1, id);
pstat.setString(2, entry.getKey());
pstat.setString(3, entry.getValue());
pstat.addBatch();
}
pstat.executeBatch();
pstat.close();
}
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
logger.error("创建 database 信息失败:", e);
}
}
}
@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
if (name.equals(defaultDatabase)) {
throw new CatalogException("默认 database 不可以删除");
}
// 1、取出db id,
Integer id = getDatabaseId(name);
if (id == null) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(getName(), name);
}
return;
}
Connection conn = getConnection();
try {
conn.setAutoCommit(false);
// 查询是否有表
List<String> tables = listTables(name);
if (tables.size() > 0) {
if (!cascade) {
// 有表,不做级联删除。
throw new DatabaseNotEmptyException(getName(), name);
}
// 做级联删除
for (String table : tables) {
try {
dropTable(new ObjectPath(name, table), true);
} catch (TableNotExistException t) {
logger.warn("表{}不存在", name + "." + table);
}
}
}
// todo: 现在是真实删除,后续设计是否做记录保留。
String deletePropSql = "delete from metadata_database_property where database_id=?";
PreparedStatement dStat = conn.prepareStatement(deletePropSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
String deleteDbSql = "delete from metadata_database where database_id=?";
dStat = conn.prepareStatement(deleteDbSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("删除 database 信息失败:", e);
}
}
@Override
public void alterDatabase(String name, CatalogDatabase newDb, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
if (name.equals(defaultDatabase)) {
throw new CatalogException("默认 database 不可以修改");
}
// 1、取出db id,
Integer id = getDatabaseId(name);
if (id == null) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(getName(), name);
}
return;
}
Connection conn = getConnection();
try {
conn.setAutoCommit(false);
// 1、名称不能改,类型不能改。只能改备注
String updateCommentSql = "update metadata_database set description=? where database_id=?";
PreparedStatement uState = conn.prepareStatement(updateCommentSql);
uState.setString(1, newDb.getComment());
uState.setInt(2, id);
uState.executeUpdate();
uState.close();
if (newDb.getProperties() != null && newDb.getProperties().size() > 0) {
String upsertSql = "insert into metadata_database_property (database_id, `key`,`value`) \n" +
"values (?,?,?)\n" +
"on duplicate key update `value` =?, update_time = sysdate()\n";
PreparedStatement pstat = conn.prepareStatement(upsertSql);
for (Map.Entry<String, String> entry : newDb.getProperties().entrySet()) {
pstat.setInt(1, id);
pstat.setString(2, entry.getKey());
pstat.setString(3, entry.getValue());
pstat.setString(4, entry.getValue());
pstat.addBatch();
}
pstat.executeBatch();
}
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("修改 database 信息失败:", e);
}
}
@Override
public List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException {
return listTablesViews(databaseName, ObjectType.TABLE);
}
@Override
public List<String> listViews(String databaseName)
throws DatabaseNotExistException, CatalogException {
return listTablesViews(databaseName, ObjectType.VIEW);
}
protected List<String> listTablesViews(String databaseName, String tableType)
throws DatabaseNotExistException, CatalogException {
Integer databaseId = getDatabaseId(databaseName);
if (null == databaseId) {
throw new DatabaseNotExistException(getName(), databaseName);
}
// get all schemas
// 要给出table 或 view
String querySql = "SELECT table_name FROM metadata_table where table_type=? and database_id = ?";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, tableType);
ps.setInt(2, databaseId);
ResultSet rs = ps.executeQuery();
List<String> tables = new ArrayList<>();
while (rs.next()) {
String table = rs.getString(1);
tables.add(table);
}
return tables;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing %s in catalog %s", tableType, getName()), e);
}
}
@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
// 还是分步骤来
// 1、先取出表 这可能是view也可能是table
// 2、取出列
// 3、取出属性
Integer id = getTableId(tablePath);
if (id == null) {
throw new TableNotExistException(getName(), tablePath);
}
Connection conn = getConnection();
try {
String queryTable = "SELECT table_name " +
" ,description, table_type " +
" FROM metadata_table " +
" where id=?";
PreparedStatement ps = conn.prepareStatement(queryTable);
ps.setInt(1, id);
ResultSet rs = ps.executeQuery();
String description;
String tableType;
if (rs.next()) {
description = rs.getString("description");
tableType = rs.getString("table_type");
ps.close();
} else {
ps.close();
throw new TableNotExistException(getName(), tablePath);
}
if (tableType.equals(ObjectType.TABLE)) {
// 这个是 table
String propSql = "SELECT `key`, `value` from metadata_table_property " +
"WHERE table_id=?";
PreparedStatement pState = conn.prepareStatement(propSql);
pState.setInt(1, id);
ResultSet prs = pState.executeQuery();
Map<String, String> props = new HashMap<>();
while (prs.next()) {
String key = prs.getString("key");
String value = prs.getString("value");
props.put(key, value);
}
pState.close();
props.put(COMMENT, description);
return CatalogTable.fromProperties(props);
} else if (tableType.equals(ObjectType.VIEW)) {
// 1、从库中取出table信息。(前面已做)
// 2、取出字段。
String colSql = "SELECT column_name, column_type, data_type, description " +
" FROM metadata_column WHERE " +
" table_id=?";
PreparedStatement cStat = conn.prepareStatement(colSql);
cStat.setInt(1, id);
ResultSet crs = cStat.executeQuery();
Schema.Builder builder = Schema.newBuilder();
while (crs.next()) {
String colName = crs.getString("column_name");
String dataType = crs.getString("data_type");
builder.column(colName, dataType);
String cDesc = crs.getString("description");
if (null != cDesc && cDesc.length() > 0) {
builder.withComment(cDesc);
}
}
cStat.close();
// 3、取出query
String qSql = "SELECT `key`, value FROM metadata_table_property" +
" WHERE table_id=? ";
PreparedStatement qStat = conn.prepareStatement(qSql);
qStat.setInt(1, id);
ResultSet qrs = qStat.executeQuery();
String originalQuery = "";
String expandedQuery = "";
Map<String, String> options = new HashMap<>();
while (qrs.next()) {
String key = qrs.getString("key");
String value = qrs.getString("value");
if ("OriginalQuery".equals(key)) {
originalQuery = value;
} else if ("ExpandedQuery".equals(key)) {
expandedQuery = value;
} else {
options.put(key, value);
}
}
// 合成view
return CatalogView.of(builder.build(), description
, originalQuery, expandedQuery, options);
} else {
throw new CatalogException("不支持的数据类型。" + tableType);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("获取 表信息失败。", e);
}
}
@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
Integer id = getTableId(tablePath);
return id != null;
}
private Integer getTableId(ObjectPath tablePath) {
Integer dbId = getDatabaseId(tablePath.getDatabaseName());
if (dbId == null) {
return null;
}
// 获取id
String getIdSql = "select id from metadata_table " +
" where table_name=? and database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) {
gStat.setString(1, tablePath.getObjectName());
gStat.setInt(2, dbId);
ResultSet rs = gStat.executeQuery();
if (rs.next()) {
return rs.getInt(1);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
logger.error("get table fail", e);
throw new CatalogException("get table fail.", e);
}
return null;
}
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
Integer id = getTableId(tablePath);
if (id == null) {
throw new TableNotExistException(getName(), tablePath);
}
Connection conn = getConnection();
try {
// todo: 现在是真实删除,后续设计是否做记录保留。
conn.setAutoCommit(false);
String deletePropSql = "delete from metadata_table_property " +
" where table_id=?";
PreparedStatement dStat = conn.prepareStatement(deletePropSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
String deleteColSql = "delete from metadata_column " +
" where table_id=?";
dStat = conn.prepareStatement(deleteColSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
String deleteDbSql = "delete from metadata_table " +
" where id=?";
dStat = conn.prepareStatement(deleteDbSql);
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
conn.commit();
} catch (SQLException e) {
SQLExceptionHappened = true;
logger.error("drop table fail", e);
throw new CatalogException("drop table fail.", e);
}
}
@Override
public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException, CatalogException {
Integer id = getTableId(tablePath);
if (id == null) {
throw new TableNotExistException(getName(), tablePath);
}
ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
if (tableExists(newPath)) {
throw new TableAlreadyExistException(getName(), newPath);
}
String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=?";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(updateSql)) {
ps.setString(1, newTableName);
ps.setInt(2, id);
ps.executeUpdate();
} catch (SQLException ex) {
SQLExceptionHappened = true;
throw new CatalogException("修改表名失败", ex);
}
}
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
Integer db_id = getDatabaseId(tablePath.getDatabaseName());
if (null == db_id) {
throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
}
if (tableExists(tablePath)) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(getName(), tablePath);
}
return;
}
// 插入表
// 插入到table表。这里,它可能是table也可能是view
// 如果是一个table,我们认为它是一个 resolved table,就可以使用properties方式来进行序列化并保存。
// 如果是一个view,我们认为它只能有物理字段
if (!(table instanceof ResolvedCatalogBaseTable)) {
throw new UnsupportedOperationException("暂时不支持输入非 ResolvedCatalogBaseTable 类型的表");
}
Connection conn = getConnection();
try {
conn.setAutoCommit(false);
// 首先插入表信息
CatalogBaseTable.TableKind kind = table.getTableKind();
String insertSql = "insert into metadata_table(\n" +
" table_name," +
" table_type," +
" database_id," +
" description)" +
" values(?,?,?,?)";
PreparedStatement iStat = conn.prepareStatement(insertSql, Statement.RETURN_GENERATED_KEYS);
iStat.setString(1, tablePath.getObjectName());
iStat.setString(2, kind.toString());
iStat.setInt(3, db_id);
iStat.setString(4, table.getComment());
iStat.executeUpdate();
ResultSet idRs = iStat.getGeneratedKeys();
if (!idRs.next()) {
iStat.close();
throw new CatalogException("插入元数据表信息失败");
}
int id = idRs.getInt(1);
iStat.close();
// 插入属性和列
if (table instanceof ResolvedCatalogTable) {
// table 就可以直接拿properties了。
Map<String, String> props = ((ResolvedCatalogTable) table).toProperties();
String propInsertSql = "insert into metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?)";
PreparedStatement pStat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : props.entrySet()) {
pStat.setInt(1, id);
pStat.setString(2, entry.getKey());
pStat.setString(3, entry.getValue());
pStat.addBatch();
}
pStat.executeBatch();
pStat.close();
} else {
// view,咱先假定它只有物理字段
// view 还需要保存:query,expanded query
// 插入属性和列
ResolvedCatalogView view = (ResolvedCatalogView) table;
List<Schema.UnresolvedColumn> cols = view.getUnresolvedSchema().getColumns();
if (cols.size() > 0) {
String colInsertSql = "insert into metadata_column(" +
" column_name, column_type, data_type" +
" , `expr`" +
" , description" +
" , table_id" +
" , `primary`) " +
" values(?,?,?,?,?,?,?)";
PreparedStatement colIStat = conn.prepareStatement(colInsertSql);
for (Schema.UnresolvedColumn col : cols) {
if (col instanceof Schema.UnresolvedPhysicalColumn) {
Schema.UnresolvedPhysicalColumn pCol = (Schema.UnresolvedPhysicalColumn) col;
if (!(pCol.getDataType() instanceof DataType)) {
throw new UnsupportedOperationException(String.format(
"类型识别失败,该列不是有效类型:%s.%s.%s : %s"
, tablePath.getDatabaseName()
, tablePath.getObjectName()
, pCol.getName(),
pCol.getDataType()
));
}
DataType dataType = (DataType) pCol.getDataType();
colIStat.setString(1, pCol.getName());
colIStat.setString(2, ColumnType.PHYSICAL);
colIStat.setString(3,
dataType.getLogicalType().asSerializableString());
colIStat.setObject(4, null);
colIStat.setString(5, pCol.getComment().orElse(""));
colIStat.setInt(6, id);
colIStat.setObject(7, null); // view没有主键
colIStat.addBatch();
} else {
throw new UnsupportedOperationException("暂时认为view 不会出现 非物理字段");
}
}
colIStat.executeBatch();
colIStat.close();
// 写 query等信息到数据库
Map<String, String> option = view.getOptions();
if (option == null) {
option = new HashMap<>();
}
option.put("OriginalQuery", view.getOriginalQuery());
option.put("ExpandedQuery", view.getExpandedQuery());
String propInsertSql = "insert into metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?)";
PreparedStatement pStat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : option.entrySet()) {
pStat.setInt(1, id);
pStat.setString(2, entry.getKey());
pStat.setString(3, entry.getValue());
pStat.addBatch();
}
pStat.executeBatch();
pStat.close();
}
}
conn.commit();
} catch (SQLException ex) {
SQLExceptionHappened = true;
logger.error("插入数据库失败", ex);
throw new CatalogException("插入数据库失败", ex);
}
}
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
Integer id = getTableId(tablePath);
if (id == null) {
throw new TableNotExistException(getName(), tablePath);
}
Map<String, String> opts = newTable.getOptions();
if (opts != null && opts.size() > 0) {
String updateSql = "INSERT INTO metadata_table_property(table_id," +
"`key`,`value`) values (?,?,?) " +
"on duplicate key update `value` =?, update_time = sysdate()";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(updateSql)) {
for (Map.Entry<String, String> entry : opts.entrySet()) {
ps.setInt(1, id);
ps.setString(2, entry.getKey());
ps.setString(3, entry.getValue());
ps.setString(4, entry.getValue());
ps.addBatch();
}
ps.executeBatch();
} catch (SQLException ex) {
SQLExceptionHappened = true;
throw new CatalogException("修改表名失败", ex);
}
}
}
/************************ partition *************************/
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
/***********************Functions**********************/
@Override
public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
Integer dbId = getDatabaseId(dbName);
if (null == dbId) {
throw new DatabaseNotExistException(getName(), dbName);
}
String querySql = "SELECT function_name from metadata_function " +
" WHERE database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(querySql)) {
gStat.setInt(1, dbId);
ResultSet rs = gStat.executeQuery();
List<String> functions = new ArrayList<>();
while (rs.next()) {
String n = rs.getString("function_name");
functions.add(n);
}
return functions;
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("获取 UDF 列表失败");
}
}
@Override
public CatalogFunction getFunction(ObjectPath functionPath)
throws FunctionNotExistException, CatalogException {
Integer id = getFunctionId(functionPath);
if (null == id) {
throw new FunctionNotExistException(getName(), functionPath);
}
String querySql = "SELECT class_name,function_language from metadata_function " +
" WHERE id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(querySql)) {
gStat.setInt(1, id);
ResultSet rs = gStat.executeQuery();
if (rs.next()) {
String className = rs.getString("class_name");
String language = rs.getString("function_language");
CatalogFunctionImpl func = new CatalogFunctionImpl(className, FunctionLanguage.valueOf(language));
return func;
} else {
throw new FunctionNotExistException(getName(), functionPath);
}
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("获取 UDF 失败:"
+ functionPath.getDatabaseName() + "."
+ functionPath.getObjectName());
}
}
@Override
public boolean functionExists(ObjectPath functionPath) throws CatalogException {
Integer id = getFunctionId(functionPath);
return id != null;
}
private Integer getFunctionId(ObjectPath functionPath) {
Integer dbId = getDatabaseId(functionPath.getDatabaseName());
if (dbId == null) {
return null;
}
// 获取id
String getIdSql = "select id from metadata_function " +
" where function_name=? and database_id=?";
Connection conn = getConnection();
try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) {
gStat.setString(1, functionPath.getObjectName());
gStat.setInt(2, dbId);
ResultSet rs = gStat.executeQuery();
if (rs.next()) {
int id = rs.getInt(1);
return id;
}
} catch (SQLException e) {
SQLExceptionHappened = true;
logger.error("get function fail", e);
throw new CatalogException("get function fail.", e);
}
return null;
}
@Override
public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
Integer dbId = getDatabaseId(functionPath.getDatabaseName());
if (null == dbId) {
throw new DatabaseNotExistException(getName(), functionPath.getDatabaseName());
}
if (functionExists(functionPath)) {
if (!ignoreIfExists) {
throw new FunctionAlreadyExistException(getName(), functionPath);
}
}
Connection conn = getConnection();
String insertSql = "Insert into metadata_function " +
"(function_name,class_name,database_id,function_language) " +
" values (?,?,?,?)";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setString(1, functionPath.getObjectName());
ps.setString(2, function.getClassName());
ps.setInt(3, dbId);
ps.setString(4, function.getFunctionLanguage().toString());
ps.executeUpdate();
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("创建 函数 失败", e);
}
}
@Override
public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException {
Integer id = getFunctionId(functionPath);
if (null == id) {
if (!ignoreIfNotExists) {
throw new FunctionNotExistException(getName(), functionPath);
}
return;
}
Connection conn = getConnection();
String insertSql = "update metadata_function " +
"set (class_name =?, function_language=?) " +
" where id=?";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setString(1, newFunction.getClassName());
ps.setString(2, newFunction.getFunctionLanguage().toString());
ps.setInt(3, id);
ps.executeUpdate();
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("修改 函数 失败", e);
}
}
@Override
public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException {
Integer id = getFunctionId(functionPath);
if (null == id) {
if (!ignoreIfNotExists) {
throw new FunctionNotExistException(getName(), functionPath);
}
return;
}
Connection conn = getConnection();
String insertSql = "delete from metadata_function " +
" where id=?";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setInt(1, id);
ps.executeUpdate();
} catch (SQLException e) {
SQLExceptionHappened = true;
throw new CatalogException("删除 函数 失败", e);
}
}
@Override
public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
// todo: 补充完成该方法。
checkNotNull(tablePath);
if (!tableExists(tablePath)) {
throw new TableNotExistException(getName(), tablePath);
}
/*if (!isPartitionedTable(tablePath)) {
CatalogTableStatistics result = tableStats.get(tablePath);
return result != null ? result.copy() : CatalogTableStatistics.UNKNOWN;
} else {
return CatalogTableStatistics.UNKNOWN;
}*/
return CatalogTableStatistics.UNKNOWN;
}
@Override
public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
// todo: 补充完成该方法。
checkNotNull(tablePath);
if (!tableExists(tablePath)) {
throw new TableNotExistException(getName(), tablePath);
}
// CatalogColumnStatistics result = tableColumnStats.get(tablePath);
// return result != null ? result.copy() : CatalogColumnStatistics.UNKNOWN;
return CatalogColumnStatistics.UNKNOWN;
}
@Override
public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException, TablePartitionedException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
@Override
public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
// todo: 补充完成该方法。
throw new UnsupportedOperationException("该方法尚未完成");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dlink.flink.catalog.factory;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.PASSWORD;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.URL;
import static com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactoryOptions.USERNAME;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
/**
* Factory for {@link DlinkMysqlCatalog}.
*/
public class DlinkMysqlCatalogFactory implements CatalogFactory {
@Override
public String factoryIdentifier() {
return DlinkMysqlCatalogFactoryOptions.IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(USERNAME);
options.add(PASSWORD);
options.add(URL);
options.add(PROPERTY_VERSION);
return options;
}
@Override
public Catalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validate();
return new DlinkMysqlCatalog(
context.getName(),
helper.getOptions().get(URL),
helper.getOptions().get(USERNAME),
helper.getOptions().get(PASSWORD));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dlink.flink.catalog.factory;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import com.dlink.flink.catalog.DlinkMysqlCatalog;
/**
* {@link ConfigOption}s for {@link DlinkMysqlCatalog}.
*/
@Internal
public class DlinkMysqlCatalogFactoryOptions {
public static final String IDENTIFIER = "dlink_mysql";
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue();
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue();
public static final ConfigOption<String> URL = ConfigOptions.key("url").stringType().noDefaultValue();
private DlinkMysqlCatalogFactoryOptions() {
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactory
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.dlink.flink.catalog.factory.DlinkMysqlCatalogFactory
......@@ -13,6 +13,8 @@
<packaging>pom</packaging>
<modules>
<module>dlink-catalog-mysql-1.14</module>
<module>dlink-catalog-mysql-1.13</module>
<module>dlink-catalog-mysql-1.15</module>
</modules>
<properties>
......
......@@ -75,6 +75,11 @@
<artifactId>dlink-gateway</artifactId>
<scope>${scope.runtime}</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-catalog-mysql-${dlink.flink.version}</artifactId>
<scope>${scope.runtime}</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-client-hadoop</artifactId>
......
......@@ -219,6 +219,21 @@
<artifactId>dlink-client-1.15</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-catalog-mysql-1.13</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-catalog-mysql-1.14</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-catalog-mysql-1.15</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-connector-jdbc-1.11</artifactId>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment