Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
2282fcac
Unverified
Commit
2282fcac
authored
Oct 15, 2022
by
skylines
Committed by
GitHub
Oct 15, 2022
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add spotless plugin (#1105)
Co-authored-by:
rookiegao
<
rookiegao712@gmail
>
parent
02e300f0
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
261 additions
and
135 deletions
+261
-135
DlinkMysqlCatalog.java
.../main/java/com/dlink/flink/catalog/DlinkMysqlCatalog.java
+113
-116
pom.xml
dlink-catalog/pom.xml
+6
-8
pom.xml
pom.xml
+67
-11
eclipse.importorder
style/eclipse.importorder
+23
-0
spotless_dlink_formatter.xml
style/spotless_dlink_formatter.xml
+52
-0
No files found.
dlink-catalog/dlink-catalog-mysql/dlink-catalog-mysql-1.14/src/main/java/com/dlink/flink/catalog/DlinkMysqlCatalog.java
View file @
2282fcac
...
...
@@ -105,6 +105,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
* 对象类型,例如 库、表、视图等
*/
protected
static
class
ObjectType
{
/**
* 数据库
*/
...
...
@@ -125,6 +126,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
* 对象类型,例如 库、表、视图等
*/
protected
static
class
ColumnType
{
/**
* 物理字段
*/
...
...
@@ -215,8 +217,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Integer
defaultDbId
=
getDatabaseId
(
defaultDatabase
);
if
(
defaultDbId
==
null
)
{
try
{
createDatabase
(
defaultDatabase
,
new
CatalogDatabaseImpl
(
new
HashMap
<>(),
""
)
,
true
);
createDatabase
(
defaultDatabase
,
new
CatalogDatabaseImpl
(
new
HashMap
<>(),
""
),
true
);
}
catch
(
DatabaseAlreadyExistException
a
)
{
logger
.
info
(
"重复创建默认库"
);
}
...
...
@@ -280,15 +281,14 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return
myDatabases
;
}
catch
(
Exception
e
)
{
throw
new
CatalogException
(
String
.
format
(
"Failed listing database in catalog %s"
,
getName
()),
e
);
String
.
format
(
"Failed listing database in catalog %s"
,
getName
()),
e
);
}
}
@Override
public
CatalogDatabase
getDatabase
(
String
databaseName
)
throws
DatabaseNotExistException
,
CatalogException
{
public
CatalogDatabase
getDatabase
(
String
databaseName
)
throws
DatabaseNotExistException
,
CatalogException
{
String
querySql
=
"SELECT id, database_name,description "
+
" FROM metadata_database where database_name=?"
;
+
" FROM metadata_database where database_name=?"
;
Connection
conn
=
getConnection
();
try
(
PreparedStatement
ps
=
conn
.
prepareStatement
(
querySql
))
{
ps
.
setString
(
1
,
databaseName
);
...
...
@@ -301,8 +301,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Map
<
String
,
String
>
map
=
new
HashMap
<>();
String
sql
=
"select `key`,`value` "
+
"from metadata_database_property "
+
"where database_id=? "
;
+
"from metadata_database_property "
+
"where database_id=? "
;
try
(
PreparedStatement
pStat
=
conn
.
prepareStatement
(
sql
))
{
pStat
.
setInt
(
1
,
id
);
ResultSet
prs
=
pStat
.
executeQuery
();
...
...
@@ -312,7 +312,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
catch
(
SQLException
e
)
{
sqlExceptionHappened
=
true
;
throw
new
CatalogException
(
String
.
format
(
"Failed get database properties in catalog %s"
,
getName
()),
e
);
String
.
format
(
"Failed get database properties in catalog %s"
,
getName
()),
e
);
}
return
new
CatalogDatabaseImpl
(
map
,
description
);
...
...
@@ -322,7 +322,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
catch
(
SQLException
e
)
{
sqlExceptionHappened
=
true
;
throw
new
CatalogException
(
String
.
format
(
"Failed get database in catalog %s"
,
getName
()),
e
);
String
.
format
(
"Failed get database in catalog %s"
,
getName
()),
e
);
}
}
...
...
@@ -355,8 +355,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
@Override
public
void
createDatabase
(
String
databaseName
,
CatalogDatabase
db
,
boolean
ignoreIfExists
)
throws
DatabaseAlreadyExistException
,
CatalogException
{
public
void
createDatabase
(
String
databaseName
,
CatalogDatabase
db
,
boolean
ignoreIfExists
)
throws
DatabaseAlreadyExistException
,
CatalogException
{
checkArgument
(!
StringUtils
.
isNullOrWhitespaceOnly
(
databaseName
));
checkNotNull
(
db
);
...
...
@@ -379,7 +379,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
if
(
idRs
.
next
()
&&
db
.
getProperties
()
!=
null
&&
db
.
getProperties
().
size
()
>
0
)
{
int
id
=
idRs
.
getInt
(
1
);
String
propInsertSql
=
"insert into metadata_database_property(database_id, "
+
"`key`,`value`) values (?,?,?)"
;
+
"`key`,`value`) values (?,?,?)"
;
PreparedStatement
pstat
=
conn
.
prepareStatement
(
propInsertSql
);
for
(
Map
.
Entry
<
String
,
String
>
entry
:
db
.
getProperties
().
entrySet
())
{
pstat
.
setInt
(
1
,
id
);
...
...
@@ -399,8 +399,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
@Override
public
void
dropDatabase
(
String
name
,
boolean
ignoreIfNotExists
,
boolean
cascade
)
throws
DatabaseNotExistException
,
DatabaseNotEmptyException
,
CatalogException
{
public
void
dropDatabase
(
String
name
,
boolean
ignoreIfNotExists
,
boolean
cascade
)
throws
DatabaseNotExistException
,
DatabaseNotEmptyException
,
CatalogException
{
if
(
name
.
equals
(
defaultDatabase
))
{
throw
new
CatalogException
(
"默认 database 不可以删除"
);
}
...
...
@@ -450,8 +450,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
@Override
public
void
alterDatabase
(
String
name
,
CatalogDatabase
newDb
,
boolean
ignoreIfNotExists
)
throws
DatabaseNotExistException
,
CatalogException
{
public
void
alterDatabase
(
String
name
,
CatalogDatabase
newDb
,
boolean
ignoreIfNotExists
)
throws
DatabaseNotExistException
,
CatalogException
{
if
(
name
.
equals
(
defaultDatabase
))
{
throw
new
CatalogException
(
"默认 database 不可以修改"
);
}
...
...
@@ -475,8 +475,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
uState
.
close
();
if
(
newDb
.
getProperties
()
!=
null
&&
newDb
.
getProperties
().
size
()
>
0
)
{
String
upsertSql
=
"insert into metadata_database_property (database_id, `key`,`value`) \n"
+
"values (?,?,?)\n"
+
"on duplicate key update `value` =?, update_time = sysdate()\n"
;
+
"values (?,?,?)\n"
+
"on duplicate key update `value` =?, update_time = sysdate()\n"
;
PreparedStatement
pstat
=
conn
.
prepareStatement
(
upsertSql
);
for
(
Map
.
Entry
<
String
,
String
>
entry
:
newDb
.
getProperties
().
entrySet
())
{
pstat
.
setInt
(
1
,
id
);
...
...
@@ -496,19 +496,17 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
@Override
public
List
<
String
>
listTables
(
String
databaseName
)
throws
DatabaseNotExistException
,
CatalogException
{
public
List
<
String
>
listTables
(
String
databaseName
)
throws
DatabaseNotExistException
,
CatalogException
{
return
listTablesViews
(
databaseName
,
ObjectType
.
TABLE
);
}
@Override
public
List
<
String
>
listViews
(
String
databaseName
)
throws
DatabaseNotExistException
,
CatalogException
{
public
List
<
String
>
listViews
(
String
databaseName
)
throws
DatabaseNotExistException
,
CatalogException
{
return
listTablesViews
(
databaseName
,
ObjectType
.
VIEW
);
}
protected
List
<
String
>
listTablesViews
(
String
databaseName
,
String
tableType
)
throws
DatabaseNotExistException
,
CatalogException
{
protected
List
<
String
>
listTablesViews
(
String
databaseName
,
String
tableType
)
throws
DatabaseNotExistException
,
CatalogException
{
Integer
databaseId
=
getDatabaseId
(
databaseName
);
if
(
null
==
databaseId
)
{
throw
new
DatabaseNotExistException
(
getName
(),
databaseName
);
...
...
@@ -532,7 +530,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
return
tables
;
}
catch
(
Exception
e
)
{
throw
new
CatalogException
(
String
.
format
(
"Failed listing %s in catalog %s"
,
tableType
,
getName
()),
e
);
String
.
format
(
"Failed listing %s in catalog %s"
,
tableType
,
getName
()),
e
);
}
}
...
...
@@ -551,9 +549,9 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Connection
conn
=
getConnection
();
try
{
String
queryTable
=
"SELECT table_name "
+
" ,description, table_type "
+
" FROM metadata_table "
+
" where id=?"
;
+
" ,description, table_type "
+
" FROM metadata_table "
+
" where id=?"
;
PreparedStatement
ps
=
conn
.
prepareStatement
(
queryTable
);
ps
.
setInt
(
1
,
id
);
ResultSet
rs
=
ps
.
executeQuery
();
...
...
@@ -570,7 +568,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
if
(
tableType
.
equals
(
ObjectType
.
TABLE
))
{
// 这个是 table
String
propSql
=
"SELECT `key`, `value` from metadata_table_property "
+
"WHERE table_id=?"
;
+
"WHERE table_id=?"
;
PreparedStatement
pState
=
conn
.
prepareStatement
(
propSql
);
pState
.
setInt
(
1
,
id
);
ResultSet
prs
=
pState
.
executeQuery
();
...
...
@@ -587,8 +585,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
// 1、从库中取出table信息。(前面已做)
// 2、取出字段。
String
colSql
=
"SELECT column_name, column_type, data_type, description "
+
" FROM metadata_column WHERE "
+
" table_id=?"
;
+
" FROM metadata_column WHERE "
+
" table_id=?"
;
PreparedStatement
cStat
=
conn
.
prepareStatement
(
colSql
);
cStat
.
setInt
(
1
,
id
);
ResultSet
crs
=
cStat
.
executeQuery
();
...
...
@@ -607,7 +605,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
cStat
.
close
();
// 3、取出query
String
qSql
=
"SELECT `key`, value FROM metadata_table_property"
+
" WHERE table_id=? "
;
+
" WHERE table_id=? "
;
PreparedStatement
qStat
=
conn
.
prepareStatement
(
qSql
);
qStat
.
setInt
(
1
,
id
);
ResultSet
qrs
=
qStat
.
executeQuery
();
...
...
@@ -626,8 +624,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
}
// 合成view
return
CatalogView
.
of
(
builder
.
build
(),
description
,
originalQuery
,
expandedQuery
,
options
);
return
CatalogView
.
of
(
builder
.
build
(),
description
,
originalQuery
,
expandedQuery
,
options
);
}
else
{
throw
new
CatalogException
(
"不支持的数据类型。"
+
tableType
);
}
...
...
@@ -651,7 +648,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
// 获取id
String
getIdSql
=
"select id from metadata_table "
+
" where table_name=? and database_id=?"
;
+
" where table_name=? and database_id=?"
;
Connection
conn
=
getConnection
();
try
(
PreparedStatement
gStat
=
conn
.
prepareStatement
(
getIdSql
))
{
gStat
.
setString
(
1
,
tablePath
.
getObjectName
());
...
...
@@ -669,8 +666,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
@Override
public
void
dropTable
(
ObjectPath
tablePath
,
boolean
ignoreIfNotExists
)
throws
TableNotExistException
,
CatalogException
{
public
void
dropTable
(
ObjectPath
tablePath
,
boolean
ignoreIfNotExists
)
throws
TableNotExistException
,
CatalogException
{
Integer
id
=
getTableId
(
tablePath
);
if
(
id
==
null
)
{
...
...
@@ -681,19 +678,19 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
// todo: 现在是真实删除,后续设计是否做记录保留。
conn
.
setAutoCommit
(
false
);
String
deletePropSql
=
"delete from metadata_table_property "
+
" where table_id=?"
;
+
" where table_id=?"
;
PreparedStatement
dStat
=
conn
.
prepareStatement
(
deletePropSql
);
dStat
.
setInt
(
1
,
id
);
dStat
.
executeUpdate
();
dStat
.
close
();
String
deleteColSql
=
"delete from metadata_column "
+
" where table_id=?"
;
+
" where table_id=?"
;
dStat
=
conn
.
prepareStatement
(
deleteColSql
);
dStat
.
setInt
(
1
,
id
);
dStat
.
executeUpdate
();
dStat
.
close
();
String
deleteDbSql
=
"delete from metadata_table "
+
" where id=?"
;
+
" where id=?"
;
dStat
=
conn
.
prepareStatement
(
deleteDbSql
);
dStat
.
setInt
(
1
,
id
);
dStat
.
executeUpdate
();
...
...
@@ -707,8 +704,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
@Override
public
void
renameTable
(
ObjectPath
tablePath
,
String
newTableName
,
boolean
ignoreIfNotExists
)
throws
TableNotExistException
,
TableAlreadyExistException
,
CatalogException
{
public
void
renameTable
(
ObjectPath
tablePath
,
String
newTableName
,
boolean
ignoreIfNotExists
)
throws
TableNotExistException
,
TableAlreadyExistException
,
CatalogException
{
Integer
id
=
getTableId
(
tablePath
);
if
(
id
==
null
)
{
...
...
@@ -731,8 +728,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
@Override
public
void
createTable
(
ObjectPath
tablePath
,
CatalogBaseTable
table
,
boolean
ignoreIfExists
)
throws
TableAlreadyExistException
,
DatabaseNotExistException
,
CatalogException
{
public
void
createTable
(
ObjectPath
tablePath
,
CatalogBaseTable
table
,
boolean
ignoreIfExists
)
throws
TableAlreadyExistException
,
DatabaseNotExistException
,
CatalogException
{
Integer
dbId
=
getDatabaseId
(
tablePath
.
getDatabaseName
());
if
(
null
==
dbId
)
{
throw
new
DatabaseNotExistException
(
getName
(),
tablePath
.
getDatabaseName
());
...
...
@@ -757,11 +754,11 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
CatalogBaseTable
.
TableKind
kind
=
table
.
getTableKind
();
String
insertSql
=
"insert into metadata_table(\n"
+
" table_name,"
+
" table_type,"
+
" database_id,"
+
" description)"
+
" values(?,?,?,?)"
;
+
" table_name,"
+
" table_type,"
+
" database_id,"
+
" description)"
+
" values(?,?,?,?)"
;
PreparedStatement
iStat
=
conn
.
prepareStatement
(
insertSql
,
Statement
.
RETURN_GENERATED_KEYS
);
iStat
.
setString
(
1
,
tablePath
.
getObjectName
());
iStat
.
setString
(
2
,
kind
.
toString
());
...
...
@@ -780,7 +777,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
// table 就可以直接拿properties了。
Map
<
String
,
String
>
props
=
((
ResolvedCatalogTable
)
table
).
toProperties
();
String
propInsertSql
=
"insert into metadata_table_property(table_id,"
+
"`key`,`value`) values (?,?,?)"
;
+
"`key`,`value`) values (?,?,?)"
;
PreparedStatement
pStat
=
conn
.
prepareStatement
(
propInsertSql
);
for
(
Map
.
Entry
<
String
,
String
>
entry
:
props
.
entrySet
())
{
pStat
.
setInt
(
1
,
id
);
...
...
@@ -798,31 +795,28 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
List
<
Schema
.
UnresolvedColumn
>
cols
=
view
.
getUnresolvedSchema
().
getColumns
();
if
(
cols
.
size
()
>
0
)
{
String
colInsertSql
=
"insert into metadata_column("
+
" column_name, column_type, data_type"
+
" , `expr`"
+
" , description"
+
" , table_id"
+
" , `primary`) "
+
" values(?,?,?,?,?,?,?)"
;
+
" column_name, column_type, data_type"
+
" , `expr`"
+
" , description"
+
" , table_id"
+
" , `primary`) "
+
" values(?,?,?,?,?,?,?)"
;
PreparedStatement
colIStat
=
conn
.
prepareStatement
(
colInsertSql
);
for
(
Schema
.
UnresolvedColumn
col
:
cols
)
{
if
(
col
instanceof
Schema
.
UnresolvedPhysicalColumn
)
{
Schema
.
UnresolvedPhysicalColumn
pCol
=
(
Schema
.
UnresolvedPhysicalColumn
)
col
;
if
(!(
pCol
.
getDataType
()
instanceof
DataType
))
{
throw
new
UnsupportedOperationException
(
String
.
format
(
"类型识别失败,该列不是有效类型:%s.%s.%s : %s"
,
tablePath
.
getDatabaseName
()
,
tablePath
.
getObjectName
()
,
pCol
.
getName
(),
pCol
.
getDataType
()
));
"类型识别失败,该列不是有效类型:%s.%s.%s : %s"
,
tablePath
.
getDatabaseName
(),
tablePath
.
getObjectName
(),
pCol
.
getName
(),
pCol
.
getDataType
()));
}
DataType
dataType
=
(
DataType
)
pCol
.
getDataType
();
colIStat
.
setString
(
1
,
pCol
.
getName
());
colIStat
.
setString
(
2
,
ColumnType
.
PHYSICAL
);
colIStat
.
setString
(
3
,
dataType
.
getLogicalType
().
asSerializableString
());
dataType
.
getLogicalType
().
asSerializableString
());
colIStat
.
setObject
(
4
,
null
);
colIStat
.
setString
(
5
,
pCol
.
getComment
().
orElse
(
""
));
colIStat
.
setInt
(
6
,
id
);
...
...
@@ -843,7 +837,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
option
.
put
(
"OriginalQuery"
,
view
.
getOriginalQuery
());
option
.
put
(
"ExpandedQuery"
,
view
.
getExpandedQuery
());
String
propInsertSql
=
"insert into metadata_table_property(table_id,"
+
"`key`,`value`) values (?,?,?)"
;
+
"`key`,`value`) values (?,?,?)"
;
PreparedStatement
pStat
=
conn
.
prepareStatement
(
propInsertSql
);
for
(
Map
.
Entry
<
String
,
String
>
entry
:
option
.
entrySet
())
{
pStat
.
setInt
(
1
,
id
);
...
...
@@ -864,8 +858,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
@Override
public
void
alterTable
(
ObjectPath
tablePath
,
CatalogBaseTable
newTable
,
boolean
ignoreIfNotExists
)
throws
TableNotExistException
,
CatalogException
{
public
void
alterTable
(
ObjectPath
tablePath
,
CatalogBaseTable
newTable
,
boolean
ignoreIfNotExists
)
throws
TableNotExistException
,
CatalogException
{
Integer
id
=
getTableId
(
tablePath
);
if
(
id
==
null
)
{
...
...
@@ -875,8 +869,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Map
<
String
,
String
>
opts
=
newTable
.
getOptions
();
if
(
opts
!=
null
&&
opts
.
size
()
>
0
)
{
String
updateSql
=
"INSERT INTO metadata_table_property(table_id,"
+
"`key`,`value`) values (?,?,?) "
+
"on duplicate key update `value` =?, update_time = sysdate()"
;
+
"`key`,`value`) values (?,?,?) "
+
"on duplicate key update `value` =?, update_time = sysdate()"
;
Connection
conn
=
getConnection
();
try
(
PreparedStatement
ps
=
conn
.
prepareStatement
(
updateSql
))
{
for
(
Map
.
Entry
<
String
,
String
>
entry
:
opts
.
entrySet
())
{
...
...
@@ -896,55 +890,55 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
/************************ partition *************************/
@Override
public
List
<
CatalogPartitionSpec
>
listPartitions
(
ObjectPath
tablePath
)
throws
TableNotExistException
,
TableNotPartitionedException
,
CatalogException
{
public
List
<
CatalogPartitionSpec
>
listPartitions
(
ObjectPath
tablePath
)
throws
TableNotExistException
,
TableNotPartitionedException
,
CatalogException
{
// todo: 补充完成该方法。
throw
new
UnsupportedOperationException
(
"该方法尚未完成"
);
}
@Override
public
List
<
CatalogPartitionSpec
>
listPartitions
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
)
throws
TableNotExistException
,
TableNotPartitionedException
,
PartitionSpecInvalidException
,
CatalogException
{
public
List
<
CatalogPartitionSpec
>
listPartitions
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
)
throws
TableNotExistException
,
TableNotPartitionedException
,
PartitionSpecInvalidException
,
CatalogException
{
// todo: 补充完成该方法。
throw
new
UnsupportedOperationException
(
"该方法尚未完成"
);
}
@Override
public
List
<
CatalogPartitionSpec
>
listPartitionsByFilter
(
ObjectPath
tablePath
,
List
<
Expression
>
filters
)
throws
TableNotExistException
,
TableNotPartitionedException
,
CatalogException
{
public
List
<
CatalogPartitionSpec
>
listPartitionsByFilter
(
ObjectPath
tablePath
,
List
<
Expression
>
filters
)
throws
TableNotExistException
,
TableNotPartitionedException
,
CatalogException
{
// todo: 补充完成该方法。
throw
new
UnsupportedOperationException
(
"该方法尚未完成"
);
}
@Override
public
CatalogPartition
getPartition
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
)
throws
PartitionNotExistException
,
CatalogException
{
public
CatalogPartition
getPartition
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
)
throws
PartitionNotExistException
,
CatalogException
{
// todo: 补充完成该方法。
throw
new
UnsupportedOperationException
(
"该方法尚未完成"
);
}
@Override
public
boolean
partitionExists
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
)
throws
CatalogException
{
public
boolean
partitionExists
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
)
throws
CatalogException
{
// todo: 补充完成该方法。
throw
new
UnsupportedOperationException
(
"该方法尚未完成"
);
}
@Override
public
void
createPartition
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
,
CatalogPartition
partition
,
boolean
ignoreIfExists
)
throws
TableNotExistException
,
TableNotPartitionedException
,
PartitionSpecInvalidException
,
PartitionAlreadyExistsException
,
CatalogException
{
public
void
createPartition
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
,
CatalogPartition
partition
,
boolean
ignoreIfExists
)
throws
TableNotExistException
,
TableNotPartitionedException
,
PartitionSpecInvalidException
,
PartitionAlreadyExistsException
,
CatalogException
{
// todo: 补充完成该方法。
throw
new
UnsupportedOperationException
(
"该方法尚未完成"
);
}
@Override
public
void
dropPartition
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
,
boolean
ignoreIfNotExists
)
throws
PartitionNotExistException
,
CatalogException
{
public
void
dropPartition
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
,
boolean
ignoreIfNotExists
)
throws
PartitionNotExistException
,
CatalogException
{
// todo: 补充完成该方法。
throw
new
UnsupportedOperationException
(
"该方法尚未完成"
);
}
@Override
public
void
alterPartition
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
,
CatalogPartition
newPartition
,
boolean
ignoreIfNotExists
)
throws
PartitionNotExistException
,
CatalogException
{
public
void
alterPartition
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
,
CatalogPartition
newPartition
,
boolean
ignoreIfNotExists
)
throws
PartitionNotExistException
,
CatalogException
{
// todo: 补充完成该方法。
throw
new
UnsupportedOperationException
(
"该方法尚未完成"
);
}
...
...
@@ -958,7 +952,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
throw
new
DatabaseNotExistException
(
getName
(),
dbName
);
}
String
querySql
=
"SELECT function_name from metadata_function "
+
" WHERE database_id=?"
;
+
" WHERE database_id=?"
;
Connection
conn
=
getConnection
();
try
(
PreparedStatement
gStat
=
conn
.
prepareStatement
(
querySql
))
{
...
...
@@ -977,15 +971,14 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
@Override
public
CatalogFunction
getFunction
(
ObjectPath
functionPath
)
throws
FunctionNotExistException
,
CatalogException
{
public
CatalogFunction
getFunction
(
ObjectPath
functionPath
)
throws
FunctionNotExistException
,
CatalogException
{
Integer
id
=
getFunctionId
(
functionPath
);
if
(
null
==
id
)
{
throw
new
FunctionNotExistException
(
getName
(),
functionPath
);
}
String
querySql
=
"SELECT class_name,function_language from metadata_function "
+
" WHERE id=?"
;
+
" WHERE id=?"
;
Connection
conn
=
getConnection
();
try
(
PreparedStatement
gStat
=
conn
.
prepareStatement
(
querySql
))
{
gStat
.
setInt
(
1
,
id
);
...
...
@@ -1001,8 +994,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
catch
(
SQLException
e
)
{
sqlExceptionHappened
=
true
;
throw
new
CatalogException
(
"获取 UDF 失败:"
+
functionPath
.
getDatabaseName
()
+
"."
+
functionPath
.
getObjectName
());
+
functionPath
.
getDatabaseName
()
+
"."
+
functionPath
.
getObjectName
());
}
}
...
...
@@ -1019,7 +1012,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
// 获取id
String
getIdSql
=
"select id from metadata_function "
+
" where function_name=? and database_id=?"
;
+
" where function_name=? and database_id=?"
;
Connection
conn
=
getConnection
();
try
(
PreparedStatement
gStat
=
conn
.
prepareStatement
(
getIdSql
))
{
gStat
.
setString
(
1
,
functionPath
.
getObjectName
());
...
...
@@ -1038,8 +1031,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
@Override
public
void
createFunction
(
ObjectPath
functionPath
,
CatalogFunction
function
,
boolean
ignoreIfExists
)
throws
FunctionAlreadyExistException
,
DatabaseNotExistException
,
CatalogException
{
public
void
createFunction
(
ObjectPath
functionPath
,
CatalogFunction
function
,
boolean
ignoreIfExists
)
throws
FunctionAlreadyExistException
,
DatabaseNotExistException
,
CatalogException
{
Integer
dbId
=
getDatabaseId
(
functionPath
.
getDatabaseName
());
if
(
null
==
dbId
)
{
throw
new
DatabaseNotExistException
(
getName
(),
functionPath
.
getDatabaseName
());
...
...
@@ -1052,8 +1045,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Connection
conn
=
getConnection
();
String
insertSql
=
"Insert into metadata_function "
+
"(function_name,class_name,database_id,function_language) "
+
" values (?,?,?,?)"
;
+
"(function_name,class_name,database_id,function_language) "
+
" values (?,?,?,?)"
;
try
(
PreparedStatement
ps
=
conn
.
prepareStatement
(
insertSql
))
{
ps
.
setString
(
1
,
functionPath
.
getObjectName
());
ps
.
setString
(
2
,
function
.
getClassName
());
...
...
@@ -1067,8 +1060,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
@Override
public
void
alterFunction
(
ObjectPath
functionPath
,
CatalogFunction
newFunction
,
boolean
ignoreIfNotExists
)
throws
FunctionNotExistException
,
CatalogException
{
public
void
alterFunction
(
ObjectPath
functionPath
,
CatalogFunction
newFunction
,
boolean
ignoreIfNotExists
)
throws
FunctionNotExistException
,
CatalogException
{
Integer
id
=
getFunctionId
(
functionPath
);
if
(
null
==
id
)
{
if
(!
ignoreIfNotExists
)
{
...
...
@@ -1079,8 +1072,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Connection
conn
=
getConnection
();
String
insertSql
=
"update metadata_function "
+
"set (class_name =?, function_language=?) "
+
" where id=?"
;
+
"set (class_name =?, function_language=?) "
+
" where id=?"
;
try
(
PreparedStatement
ps
=
conn
.
prepareStatement
(
insertSql
))
{
ps
.
setString
(
1
,
newFunction
.
getClassName
());
ps
.
setString
(
2
,
newFunction
.
getFunctionLanguage
().
toString
());
...
...
@@ -1093,8 +1086,8 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
@Override
public
void
dropFunction
(
ObjectPath
functionPath
,
boolean
ignoreIfNotExists
)
throws
FunctionNotExistException
,
CatalogException
{
public
void
dropFunction
(
ObjectPath
functionPath
,
boolean
ignoreIfNotExists
)
throws
FunctionNotExistException
,
CatalogException
{
Integer
id
=
getFunctionId
(
functionPath
);
if
(
null
==
id
)
{
if
(!
ignoreIfNotExists
)
{
...
...
@@ -1105,7 +1098,7 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
Connection
conn
=
getConnection
();
String
insertSql
=
"delete from metadata_function "
+
" where id=?"
;
+
" where id=?"
;
try
(
PreparedStatement
ps
=
conn
.
prepareStatement
(
insertSql
))
{
ps
.
setInt
(
1
,
id
);
ps
.
executeUpdate
();
...
...
@@ -1123,12 +1116,11 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
if
(!
tableExists
(
tablePath
))
{
throw
new
TableNotExistException
(
getName
(),
tablePath
);
}
/*if (!isPartitionedTable(tablePath)) {
CatalogTableStatistics result = tableStats.get(tablePath);
return result != null ? result.copy() : CatalogTableStatistics.UNKNOWN;
} else {
return CatalogTableStatistics.UNKNOWN;
}*/
/*
* if (!isPartitionedTable(tablePath)) { CatalogTableStatistics result = tableStats.get(tablePath); return
* result != null ? result.copy() : CatalogTableStatistics.UNKNOWN; } else { return
* CatalogTableStatistics.UNKNOWN; }
*/
return
CatalogTableStatistics
.
UNKNOWN
;
}
...
...
@@ -1147,40 +1139,45 @@ public class DlinkMysqlCatalog extends AbstractCatalog {
}
@Override
public
CatalogTableStatistics
getPartitionStatistics
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
)
throws
PartitionNotExistException
,
CatalogException
{
public
CatalogTableStatistics
getPartitionStatistics
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
)
throws
PartitionNotExistException
,
CatalogException
{
// todo: 补充完成该方法。
throw
new
UnsupportedOperationException
(
"该方法尚未完成"
);
}
@Override
public
CatalogColumnStatistics
getPartitionColumnStatistics
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
)
throws
PartitionNotExistException
,
CatalogException
{
public
CatalogColumnStatistics
getPartitionColumnStatistics
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
)
throws
PartitionNotExistException
,
CatalogException
{
// todo: 补充完成该方法。
throw
new
UnsupportedOperationException
(
"该方法尚未完成"
);
}
@Override
public
void
alterTableStatistics
(
ObjectPath
tablePath
,
CatalogTableStatistics
tableStatistics
,
boolean
ignoreIfNotExists
)
throws
TableNotExistException
,
CatalogException
{
public
void
alterTableStatistics
(
ObjectPath
tablePath
,
CatalogTableStatistics
tableStatistics
,
boolean
ignoreIfNotExists
)
throws
TableNotExistException
,
CatalogException
{
// todo: 补充完成该方法。
throw
new
UnsupportedOperationException
(
"该方法尚未完成"
);
}
@Override
public
void
alterTableColumnStatistics
(
ObjectPath
tablePath
,
CatalogColumnStatistics
columnStatistics
,
boolean
ignoreIfNotExists
)
throws
TableNotExistException
,
CatalogException
,
TablePartitionedException
{
public
void
alterTableColumnStatistics
(
ObjectPath
tablePath
,
CatalogColumnStatistics
columnStatistics
,
boolean
ignoreIfNotExists
)
throws
TableNotExistException
,
CatalogException
,
TablePartitionedException
{
// todo: 补充完成该方法。
throw
new
UnsupportedOperationException
(
"该方法尚未完成"
);
}
@Override
public
void
alterPartitionStatistics
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
,
CatalogTableStatistics
partitionStatistics
,
boolean
ignoreIfNotExists
)
throws
PartitionNotExistException
,
CatalogException
{
public
void
alterPartitionStatistics
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
,
CatalogTableStatistics
partitionStatistics
,
boolean
ignoreIfNotExists
)
throws
PartitionNotExistException
,
CatalogException
{
// todo: 补充完成该方法。
throw
new
UnsupportedOperationException
(
"该方法尚未完成"
);
}
@Override
public
void
alterPartitionColumnStatistics
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
,
CatalogColumnStatistics
columnStatistics
,
boolean
ignoreIfNotExists
)
throws
PartitionNotExistException
,
CatalogException
{
public
void
alterPartitionColumnStatistics
(
ObjectPath
tablePath
,
CatalogPartitionSpec
partitionSpec
,
CatalogColumnStatistics
columnStatistics
,
boolean
ignoreIfNotExists
)
throws
PartitionNotExistException
,
CatalogException
{
// todo: 补充完成该方法。
throw
new
UnsupportedOperationException
(
"该方法尚未完成"
);
}
...
...
dlink-catalog/pom.xml
View file @
2282fcac
...
...
@@ -15,21 +15,19 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<artifactId>
dlink
</artifactId>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink
</artifactId>
<version>
0.6.8-SNAPSHOT
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<packaging>
pom
</packaging>
<artifactId>
dlink-catalog
</artifactId>
<packaging>
pom
</packaging>
<modules>
<module>
dlink-catalog-mysql
</module>
</modules>
</project>
\ No newline at end of file
</project>
pom.xml
View file @
2282fcac
...
...
@@ -15,9 +15,7 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
...
...
@@ -88,6 +86,7 @@
<poi.version>
4.1.2
</poi.version>
<commons-email>
1.5
</commons-email>
<knife4j.version>
3.0.3
</knife4j.version>
<spotless.version>
2.23.0
</spotless.version>
</properties>
<dependencyManagement>
...
...
@@ -480,13 +479,6 @@
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-checkstyle-plugin
</artifactId>
<version>
${maven-checkstyle-plugin.version}
</version>
<dependencies>
<dependency>
<groupId>
com.puppycrawl.tools
</groupId>
<artifactId>
checkstyle
</artifactId>
<version>
8.45
</version>
</dependency>
</dependencies>
<configuration>
<consoleOutput>
true
</consoleOutput>
<encoding>
UTF-8
</encoding>
...
...
@@ -498,12 +490,19 @@
</sourceDirectories>
<excludes>
**\/generated-sources\/
</excludes>
</configuration>
<dependencies>
<dependency>
<groupId>
com.puppycrawl.tools
</groupId>
<artifactId>
checkstyle
</artifactId>
<version>
8.45
</version>
</dependency>
</dependencies>
<executions>
<execution>
<phase>
compile
</phase>
<goals>
<goal>
check
</goal>
</goals>
<phase>
compile
</phase>
</execution>
</executions>
</plugin>
...
...
@@ -523,6 +522,63 @@
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-checkstyle-plugin
</artifactId>
</plugin>
<plugin>
<groupId>
com.diffplug.spotless
</groupId>
<artifactId>
spotless-maven-plugin
</artifactId>
<version>
${spotless.version}
</version>
<configuration>
<!-- optional: limit format enforcement to just the files changed by this feature branch -->
<ratchetFrom>
HEAD
</ratchetFrom>
<java>
<eclipse>
<file>
style/spotless_dlink_formatter.xml
</file>
</eclipse>
<removeUnusedImports
/>
<importOrder>
<file>
style/eclipse.importorder
</file>
</importOrder>
<replaceRegex>
<name>
Remove wildcard imports
</name>
<searchRegex>
import\s+[^\*\s]+\*;(\r\n|\r|\n)
</searchRegex>
<replacement>
$1
</replacement>
</replaceRegex>
</java>
<pom>
<sortPom>
<encoding>
UTF-8
</encoding>
<nrOfIndentSpace>
4
</nrOfIndentSpace>
<keepBlankLines>
true
</keepBlankLines>
<indentBlankLines>
false
</indentBlankLines>
<indentSchemaLocation>
true
</indentSchemaLocation>
<spaceBeforeCloseEmptyElement>
true
</spaceBeforeCloseEmptyElement>
<sortModules>
false
</sortModules>
<sortExecutions>
false
</sortExecutions>
<predefinedSortOrder>
custom_1
</predefinedSortOrder>
<expandEmptyElements>
false
</expandEmptyElements>
<sortProperties>
false
</sortProperties>
</sortPom>
<replace>
<name>
Leading blank line
</name>
<search>
project
</search>
<replacement>
project
</replacement>
</replace>
</pom>
<markdown>
<includes>
<include>
**/*.md
</include>
</includes>
<flexmark
/>
</markdown>
</configuration>
<executions>
<execution>
<goals>
<goal>
check
</goal>
</goals>
<phase>
compile
</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
style/eclipse.importorder
0 → 100644
View file @
2282fcac
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#Organize Import Order
0=com.dlink
1=org.apache
2=java
3=javax
4=org
5=com
\ No newline at end of file
style/spotless_dlink_formatter.xml
0 → 100644
View file @
2282fcac
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<profiles
version=
"22"
>
<profile
kind=
"CodeFormatterProfile"
name=
"'DLink DataLinkDC Current'"
version=
"13"
>
<setting
id=
"org.eclipse.jdt.core.compiler.source"
value=
"1.8"
/>
<setting
id=
"org.eclipse.jdt.core.compiler.compliance"
value=
"1.8"
/>
<setting
id=
"org.eclipse.jdt.core.compiler.codegen.targetPlatform"
value=
"1.8"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.indent_empty_lines"
value=
"false"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.tabulation.size"
value=
"4"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.lineSplit"
value=
"120"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.comment.line_length"
value=
"120"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.tabulation.char"
value=
"space"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.indentation.size"
value=
"1"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration"
value=
"1"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.comment.format_javadoc_comments"
value=
"false"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.join_wrapped_lines"
value=
"false"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional"
value=
"insert"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default"
value=
"do not insert"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.alignment_for_enum_constants"
value=
"16"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement"
value=
"do not insert"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case"
value=
"do not insert"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.alignment_for_conditional_expression"
value=
"80"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.alignment_for_assignment"
value=
"16"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.blank_lines_after_package"
value=
"1"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer"
value=
"2"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.alignment_for_resources_in_try"
value=
"160"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration"
value=
"10"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration"
value=
"106"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration"
value=
"106"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration"
value=
"106"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call.count_dependent"
value=
"16|5|80"
/>
<setting
id=
"org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing"
value=
"insert"
/>
</profile>
</profiles>
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment