Unverified Commit 4f04d9ad authored by 第一片心意's avatar 第一片心意 Committed by GitHub

[feature] [dlink-admin] File upload (#939)

* add a table, name: dlink_upload_file_record

* File upload implement

* Modify function name and javadoc
parent 46ae9176
......@@ -195,6 +195,25 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!-- hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>3.3.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
......
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.constant;
import org.springframework.boot.system.ApplicationHome;
/**
* Upload file's some constant.
**/
public class UploadFileConstant {
// Upload file's type constant----------------------------------------------------------------------------------------
/**
* Not internal upload file type, this value represent upload the file to the specific dir.
*/
public static final byte TYPE_OTHER = -1;
public static final byte HADOOP_CONF_ID = 1;
public static final String HADOOP_CONF_NAME = "hadoop-conf";
public static final byte FLINK_CONF_ID = 2;
public static final String FLINK_CONF_NAME = "flink-conf";
public static final byte FLINK_LIB_ID = 3;
public static final String FLINK_LIB_NAME = "flink-lib";
public static final byte USER_JAR_ID = 4;
public static final String USER_JAR_NAME = "user-jar";
public static final byte DLINK_JAR_ID = 5;
public static final String DLINK_JAR_NAME = "dlink-jar";
// Upload file's dir constant----------------------------------------------------------------------------------------
static {
// Get admin jar's parent absolute path
DLINK_HOME_DIR = new ApplicationHome(UploadFileConstant.class).getSource().getParent();
}
public static final String DLINK_HOME_DIR;
public static final String HDFS_HOME_DIR = "hdfs:///";
public static final String HADOOP_CONF_DIR = DLINK_HOME_DIR + "/config/hadoop-conf";
public static final String FLINK_CONF_DIR = DLINK_HOME_DIR + "/config/flink-conf";
public static final String FLINK_LIB_DIR = HDFS_HOME_DIR + "dlink/jar/flink/lib";
public static final String DLINK_JAR_DIR = HDFS_HOME_DIR + "dlink/jar/dlink";
public static final String USER_JAR_DIR = HDFS_HOME_DIR + "dlink/jar/user";
// Upload file's target constant----------------------------------------------------------------------------------------
/**
* An unidentified upload file type
*/
public static final byte TARGET_OTHER = -1;
public static final byte TARGET_LOCAL = 1;
public static final byte TARGET_HDFS = 2;
/**
* Get internal upload file type's dir name
*
* @param fileType Upload file type id.
* @return Internal upload file dir name
*/
public static String getDirName(byte fileType) {
switch (fileType) {
case HADOOP_CONF_ID:
return HADOOP_CONF_NAME;
case FLINK_CONF_ID:
return FLINK_CONF_NAME;
case FLINK_LIB_ID:
return FLINK_LIB_NAME;
case USER_JAR_ID:
return USER_JAR_NAME;
case DLINK_JAR_ID:
return DLINK_JAR_NAME;
default:
return null;
}
}
/**
* Get internal upload file type's dir path
*
* @param fileType Upload file type id.
* @return Internal upload file dir path
*/
public static String getDirPath(byte fileType) {
switch (fileType) {
case HADOOP_CONF_ID:
return HADOOP_CONF_DIR;
case FLINK_CONF_ID:
return FLINK_CONF_DIR;
case FLINK_LIB_ID:
return FLINK_LIB_DIR;
case USER_JAR_ID:
return USER_JAR_DIR;
case DLINK_JAR_ID:
return DLINK_JAR_DIR;
default:
return "";
}
}
/**
* Get internal upload file type's target
*
* @param fileType Upload file type id.
* @return Upload file target
*/
public static byte getTarget(byte fileType) {
switch (fileType) {
case HADOOP_CONF_ID:
case FLINK_CONF_ID:
return TARGET_LOCAL;
case FLINK_LIB_ID:
case USER_JAR_ID:
case DLINK_JAR_ID:
return TARGET_HDFS;
default:
return TARGET_OTHER;
}
}
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.controller;
import com.dlink.common.result.Result;
import com.dlink.constant.UploadFileConstant;
import com.dlink.service.FileUploadService;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.Resource;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import lombok.extern.slf4j.Slf4j;
/**
* FileUploadController
*/
@Slf4j
@RestController
@RequestMapping("/api/fileUpload")
public class FileUploadController {
@Resource
private FileUploadService fileUploadService;
/**
* Upload file<br>
*
* @param files Multi files
* @param dir Dir, default is empty. If not provide, please provide the 'fileType' value
* @param fileType Please refer {@link UploadFileConstant}, default is -1. If not provide, please provide the 'dir' value
* @return {@link Result}
*/
@PutMapping
public Result upload(@RequestPart("files") MultipartFile[] files,
@RequestParam(value = "dir", defaultValue = "", required = false) String dir,
@RequestParam(value = "fileType", defaultValue = "-1", required = false) Byte fileType) {
if (!StringUtils.isEmpty(dir) && fileType != -1) {
return Result.failed("不要同时指定 dir 和 fileType 参数");
} else if (StringUtils.isEmpty(dir) && fileType == -1) {
return Result.failed("dir 和 fileType 参数必选其一");
}
if (StringUtils.isEmpty(dir)) {
return fileUploadService.upload(files, fileType);
} else {
return fileUploadService.upload(files, dir, fileType);
}
}
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.controller;
import com.dlink.common.result.Result;
import com.dlink.model.UploadFileRecord;
import com.dlink.service.UploadFileRecordService;
import java.util.List;
import javax.annotation.Resource;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
/**
* FileUploadController
*/
@Slf4j
@RestController
@RequestMapping("/api/uploadFileRecord")
public class UploadFileRecordController {
@Resource
private UploadFileRecordService uploadFileRecordService;
/**
* @param record {@link UploadFileRecord}
*/
@PostMapping("/list")
public Result get(@RequestBody UploadFileRecord record) {
List<UploadFileRecord> records = uploadFileRecordService.list(new QueryWrapper<>(record));
return Result.succeed(JSONUtil.toJsonStr(records), "");
}
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.mapper;
import com.dlink.db.mapper.SuperMapper;
import com.dlink.model.UploadFileRecord;
import org.apache.ibatis.annotations.Mapper;
/**
* UploadFileRecordMapper
**/
@Mapper
public interface UploadFileRecordMapper extends SuperMapper<UploadFileRecord> {
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.model;
import com.dlink.db.model.SuperEntity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* UploadFileRecord
**/
@Data
@EqualsAndHashCode(callSuper = false)
@TableName("dlink_upload_file_record")
public class UploadFileRecord extends SuperEntity {
private static final long serialVersionUID = 3769285632787490408L;
/**
* File type id: hadoop-conf(1)、flink-conf(2)、flink-lib(3)、user-jar(4)、dlink-jar(5), -1 represent no file type.
*/
private Byte fileType = -1;
private String fileName;
/**
* Where file upload to: local(1)、hdfs(2)
*/
private Byte target;
private String fileParentPath;
private String fileAbsolutePath;
private Boolean isFile = true;
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.service;
import com.dlink.common.result.Result;
import com.dlink.constant.UploadFileConstant;
import org.springframework.web.multipart.MultipartFile;
/**
* File upload
**/
public interface FileUploadService {
/**
* Upload one file, if target file exists, will delete it first
*
* @param file {@link MultipartFile} instance
* @param fileType Upload file's type, refer ${@link UploadFileConstant}
* @return {@link com.dlink.common.result.Result}
*/
Result upload(MultipartFile file, Byte fileType);
/**
* Upload multy file, if target file exists, will delete it first
*
* @param files {@link MultipartFile} instance
* @param fileType Upload file's type, refer ${@link UploadFileConstant}
* @return {@link com.dlink.common.result.Result}
*/
Result upload(MultipartFile[] files, Byte fileType);
/**
* Upload one file, if target file exists, will delete it first
*
* @param file {@link MultipartFile} instance
* @param dir Local absolute dir
* @param fileType Upload file's type, refer ${@link UploadFileConstant}
* @return {@link com.dlink.common.result.Result}
*/
Result upload(MultipartFile file, String dir, Byte fileType);
/**
* Upload multy file, if target file exists, will delete it first
*
* @param files {@link MultipartFile} instance
* @param dir Local absolute dir
* @param fileType Upload file's type, refer ${@link UploadFileConstant}
* @return {@link com.dlink.common.result.Result}
*/
Result upload(MultipartFile[] files, String dir, Byte fileType);
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.service;
import com.dlink.db.service.ISuperService;
import com.dlink.model.UploadFileRecord;
/**
* UploadFileRecordService
**/
public interface UploadFileRecordService extends ISuperService<UploadFileRecord> {
/**
* Save or update base on file absolute path and file type.
*/
boolean saveOrUpdateFile(String fileName, String parentPath, String absolutePath, Byte fileType, Byte target);
/**
* Save or update base on file type.
*/
boolean saveOrUpdateDir(String parentPath, Byte fileType, Byte target);
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.service.impl;
import com.dlink.common.result.Result;
import com.dlink.constant.UploadFileConstant;
import com.dlink.model.CodeEnum;
import com.dlink.service.FileUploadService;
import com.dlink.service.UploadFileRecordService;
import com.dlink.utils.FilePathUtil;
import com.dlink.utils.HdfsUtil;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.io.IOException;
import java.util.Objects;
import javax.annotation.Resource;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import cn.hutool.core.exceptions.ExceptionUtil;
import lombok.extern.slf4j.Slf4j;
/**
* FileUploadServiceImpl
**/
@Slf4j
@Service
public class FileUploadServiceImpl implements FileUploadService {
@Resource
private HdfsUtil hdfsUtil;
@Resource
private UploadFileRecordService uploadFileRecordService;
@Override
public Result upload(MultipartFile file, String dir, Byte fileType) {
byte target = getTarget(dir, fileType);
if (Objects.equals(target, UploadFileConstant.TARGET_LOCAL)) {
new File(dir).mkdirs();
}
String filePath = FilePathUtil.addFileSeparator(dir) + file.getOriginalFilename();
switch (target) {
case UploadFileConstant.TARGET_LOCAL: {
try {
file.transferTo(new File(filePath));
if (uploadFileRecordService.saveOrUpdateFile(file.getOriginalFilename(), dir, filePath, fileType, UploadFileConstant.TARGET_LOCAL)) {
return Result.succeed("上传成功");
} else {
return Result.failed("数据库异常");
}
} catch (IOException e) {
log.error("File " + file.getOriginalFilename() + " upload to local dir fail, exception is:\n" + ExceptionUtil.stacktraceToString(e));
return Result.failed("上传失败");
}
}
case UploadFileConstant.TARGET_HDFS: {
Result result = hdfsUtil.uploadFile(filePath, file);
if (Objects.equals(result.getCode(), CodeEnum.SUCCESS.getCode())) {
if (uploadFileRecordService.saveOrUpdateFile(file.getOriginalFilename(), dir, filePath, fileType, UploadFileConstant.TARGET_HDFS)) {
return Result.succeed("上传成功");
} else {
return Result.failed("数据库异常");
}
} else {
return result;
}
}
default:
return Result.failed("非法的上传文件目的地");
}
}
@Override
public Result upload(MultipartFile file, Byte fileType) {
String dir = UploadFileConstant.getDirPath(fileType);
if (StringUtils.isEmpty(dir)) {
return Result.failed("非法的上传文件类型");
}
return upload(file, dir, fileType);
}
@Override
public Result upload(MultipartFile[] files, String dir, Byte fileType) {
if (files.length > 0) {
for (MultipartFile file : files) {
Result uploadResult = upload(file, dir, fileType);
if (Objects.equals(uploadResult.getCode(), CodeEnum.ERROR.getCode())) {
return uploadResult;
}
}
if (!uploadFileRecordService.saveOrUpdateDir(dir, fileType, getTarget(dir, fileType))) {
return Result.failed("数据库异常");
}
return Result.succeed("全部上传成功");
} else {
return Result.succeed("没有检测到要上传的文件");
}
}
@Override
public Result upload(MultipartFile[] files, Byte fileType) {
String dir = UploadFileConstant.getDirPath(fileType);
if (StringUtils.isEmpty(dir)) {
return Result.failed("非法的上传文件类型");
}
return upload(files, dir, fileType);
}
/**
* Get upload file target.
*
* @param dir If null, will return -1
* @param fileType Internal upload file type, refer {@link UploadFileConstant}
* @return Upload file target, refer {@link UploadFileConstant}
*/
private byte getTarget(String dir, byte fileType) {
byte target = UploadFileConstant.getTarget(fileType);
if (target == -1) {
target = FilePathUtil.getDirTarget(dir);
}
return target;
}
}
......@@ -129,7 +129,7 @@ public class StudioServiceImpl implements StudioService {
@Override
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
if (Dialect.isSql(studioExecuteDTO.getDialect())) {
if (Dialect.notFlinkSql(studioExecuteDTO.getDialect())) {
return executeCommonSql(SqlDTO.build(studioExecuteDTO.getStatement(),
studioExecuteDTO.getDatabaseId(), studioExecuteDTO.getMaxRowNum()));
} else {
......@@ -202,7 +202,7 @@ public class StudioServiceImpl implements StudioService {
@Override
public List<SqlExplainResult> explainSql(StudioExecuteDTO studioExecuteDTO) {
if (Dialect.isSql(studioExecuteDTO.getDialect())) {
if (Dialect.notFlinkSql(studioExecuteDTO.getDialect())) {
return explainCommonSql(studioExecuteDTO);
} else {
return explainFlinkSql(studioExecuteDTO);
......@@ -368,7 +368,7 @@ public class StudioServiceImpl implements StudioService {
JobConfig jobConfig = new JobConfig();
jobConfig.setAddress(cluster.getJobManagerHost());
jobConfig.setType(cluster.getType());
//如果用户选择用dlink平台来托管集群信息 说明任务一定是从dlink发起提交的
// 如果用户选择用dlink平台来托管集群信息 说明任务一定是从dlink发起提交的
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
......@@ -376,7 +376,7 @@ public class StudioServiceImpl implements StudioService {
jobConfig.setTaskId(cluster.getTaskId());
useGateway = true;
}
//用户选择外部的平台来托管集群信息,但是集群上的任务不一定是通过dlink提交的
// 用户选择外部的平台来托管集群信息,但是集群上的任务不一定是通过dlink提交的
else {
jobConfig.setTaskId(taskId);
}
......@@ -406,7 +406,7 @@ public class StudioServiceImpl implements StudioService {
@Override
public List<Catalog> getMSCatalogs(StudioMetaStoreDTO studioMetaStoreDTO) {
List<Catalog> catalogs = new ArrayList<>();
if (Dialect.isSql(studioMetaStoreDTO.getDialect())) {
if (Dialect.notFlinkSql(studioMetaStoreDTO.getDialect())) {
DataBase dataBase = dataBaseService.getById(studioMetaStoreDTO.getDatabaseId());
if (!Asserts.isNull(dataBase)) {
Catalog defaultCatalog = Catalog.build(FlinkQuery.defaultCatalog());
......@@ -454,7 +454,7 @@ public class StudioServiceImpl implements StudioService {
public Schema getMSSchemaInfo(StudioMetaStoreDTO studioMetaStoreDTO) {
Schema schema = Schema.build(studioMetaStoreDTO.getDatabase());
List<Table> tables = new ArrayList<>();
if (Dialect.isSql(studioMetaStoreDTO.getDialect())) {
if (Dialect.notFlinkSql(studioMetaStoreDTO.getDialect())) {
DataBase dataBase = dataBaseService.getById(studioMetaStoreDTO.getDatabaseId());
if (Asserts.isNotNull(dataBase)) {
Driver driver = Driver.build(dataBase.getDriverConfig());
......@@ -496,7 +496,7 @@ public class StudioServiceImpl implements StudioService {
@Override
public List<FlinkColumn> getMSFlinkColumns(StudioMetaStoreDTO studioMetaStoreDTO) {
List<FlinkColumn> columns = new ArrayList<>();
if (Dialect.isSql(studioMetaStoreDTO.getDialect())) {
if (Dialect.notFlinkSql(studioMetaStoreDTO.getDialect())) {
// nothing to do
} else {
String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog()) + FlinkQuery.separator()
......
......@@ -192,7 +192,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
public JobResult submitTask(Integer id) {
Task task = this.getTaskInfoById(id);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
if (Dialect.isSql(task.getDialect())) {
if (Dialect.notFlinkSql(task.getDialect())) {
return executeCommonSql(SqlDTO.build(task.getStatement(),
task.getDatabaseId(), null));
}
......@@ -210,7 +210,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
final Task task = (dtoTask == null ? this.getTaskInfoById(id) : dtoTask);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
task.setStep(JobLifeCycle.ONLINE.getValue());
if (Dialect.isSql(task.getDialect())) {
if (Dialect.notFlinkSql(task.getDialect())) {
return executeCommonSql(SqlDTO.build(task.getStatement(),
task.getDatabaseId(), null));
}
......@@ -230,7 +230,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
if (Asserts.isNotNull(task.getJobInstanceId()) && task.getJobInstanceId() != 0) {
savepointJobInstance(task.getJobInstanceId(), SavePointType.CANCEL.getValue());
}
if (Dialect.isSql(task.getDialect())) {
if (Dialect.notFlinkSql(task.getDialect())) {
return executeCommonSql(SqlDTO.build(task.getStatement(),
task.getDatabaseId(), null));
}
......@@ -285,7 +285,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public List<SqlExplainResult> explainTask(Integer id) {
Task task = getTaskInfoById(id);
if (Dialect.isSql(task.getDialect())) {
if (Dialect.notFlinkSql(task.getDialect())) {
return explainCommonSqlTask(task);
} else {
return explainFlinkSqlTask(task);
......@@ -301,15 +301,19 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private List<SqlExplainResult> explainCommonSqlTask(Task task) {
if (Asserts.isNull(task.getDatabaseId())) {
return new ArrayList<SqlExplainResult>() {{
return new ArrayList<SqlExplainResult>() {
{
add(SqlExplainResult.fail(task.getStatement(), "请指定数据源"));
}};
}
};
} else {
DataBase dataBase = dataBaseService.getById(task.getDatabaseId());
if (Asserts.isNull(dataBase)) {
return new ArrayList<SqlExplainResult>() {{
return new ArrayList<SqlExplainResult>() {
{
add(SqlExplainResult.fail(task.getStatement(), "数据源不存在"));
}};
}
};
}
Driver driver = Driver.build(dataBase.getDriverConfig());
List<SqlExplainResult> sqlExplainResults = driver.explain(task.getStatement());
......@@ -436,7 +440,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
public String exportSql(Integer id) {
Task task = getTaskInfoById(id);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
if (Dialect.isSql(task.getDialect())) {
if (Dialect.notFlinkSql(task.getDialect())) {
return task.getStatement();
}
JobConfig config = buildJobConfig(task);
......@@ -491,20 +495,19 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
taskVersion.setTaskId(taskVersion.getId());
taskVersion.setId(null);
if (Asserts.isNull(task.getVersionId())) {
//首次发布,新增版本
// 首次发布,新增版本
taskVersion.setVersionId(1);
task.setVersionId(1);
taskVersionService.save(taskVersion);
} else {
//说明存在版本,需要判断是否 是回退后的老版本
//1、版本号存在
//2、md5值与上一个版本一致
// 说明存在版本,需要判断是否 是回退后的老版本
// 1、版本号存在
// 2、md5值与上一个版本一致
TaskVersion version = versionMap.get(task.getVersionId());
version.setId(null);
if (versionIds.contains(task.getVersionId()) && !taskVersion.equals(version)
if (versionIds.contains(task.getVersionId()) && !taskVersion.equals(version)) {
//|| !versionIds.contains(task.getVersionId()) && !taskVersion.equals(version)
) {
taskVersion.setVersionId(Collections.max(versionIds) + 1);
task.setVersionId(Collections.max(versionIds) + 1);
taskVersionService.save(taskVersion);
......@@ -523,7 +526,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
if (JobLifeCycle.RELEASE.equalsValue(taskInfo.getStep())
|| JobLifeCycle.ONLINE.equalsValue(taskInfo.getStep())
|| JobLifeCycle.CANCEL.equalsValue(taskInfo.getStep())) {
//throw new BusException("该作业已" + JobLifeCycle.get(taskInfo.getStep()).getLabel() + ",禁止回滚!");
// throw new BusException("该作业已" + JobLifeCycle.get(taskInfo.getStep()).getLabel() + ",禁止回滚!");
return Result.failed("该作业已" + JobLifeCycle.get(taskInfo.getStep()).getLabel() + ",禁止回滚!");
}
......@@ -863,7 +866,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
// path
JsonNode jsonNode = task.parseJsonNode(mapper);
((ObjectNode)jsonNode).put("path",getTaskPathByTaskId(taskId));
((ObjectNode) jsonNode).put("path", getTaskPathByTaskId(taskId));
// clusterConfigurationName
if (Asserts.isNotNull(task.getClusterConfigurationId())) {
ClusterConfiguration clusterConfiguration = clusterConfigurationService.getById(task.getClusterConfigurationId());
......@@ -919,10 +922,10 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return buildTaskByJsonNode(jsonNode, mapper);
}
public Result buildTaskByJsonNode(JsonNode jsonNode,ObjectMapper mapper) throws JsonProcessingException {
public Result buildTaskByJsonNode(JsonNode jsonNode, ObjectMapper mapper) throws JsonProcessingException {
List<JsonNode> jsonNodes = new ArrayList<>();
if (jsonNode.isArray()) {
for (JsonNode a: jsonNode) {
for (JsonNode a : jsonNode) {
/*if(a.get("dialect").asText().equals("FlinkSqlEnv")){
jsonNodes.add(0,a);
}else{
......@@ -991,10 +994,10 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
if (Asserts.isNotNull(task.getEnvName())) {
tasks.add(task);
}
Catalogue catalogue = new Catalogue(task.getAlias(),task.getId(),task.getDialect(),parentId,true);
Catalogue catalogue = new Catalogue(task.getAlias(), task.getId(), task.getDialect(), parentId, true);
catalogueService.saveOrUpdate(catalogue);
}
for (Task task: tasks) {
for (Task task : tasks) {
Task task1 = getOne(new QueryWrapper<Task>().eq("name", task.getEnvName()));
if (Asserts.isNotNull(task1)) {
task.setEnvId(task1.getId());
......@@ -1071,15 +1074,15 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return;
}
Integer jobInstanceId = jobInstance.getId();
JobHistory jobHistory = jobHistoryService.getById(jobInstanceId); //获取任务历史信息
String jobJson = jobHistory.getJobJson(); //获取任务历史信息的jobJson
JobHistory jobHistory = jobHistoryService.getById(jobInstanceId); // 获取任务历史信息
String jobJson = jobHistory.getJobJson(); // 获取任务历史信息的jobJson
ObjectNode jsonNodes = JSONUtil.parseObject(jobJson);
if (jsonNodes.has("errors")) {
return;
}
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long asLongStartTime = jsonNodes.get("start-time").asLong(); //获取任务历史信息的start-time
long asLongEndTime = jsonNodes.get("end-time").asLong(); //获取任务历史信息的end-time
long asLongStartTime = jsonNodes.get("start-time").asLong(); // 获取任务历史信息的start-time
long asLongEndTime = jsonNodes.get("end-time").asLong(); // 获取任务历史信息的end-time
if (asLongEndTime < asLongStartTime) {
asLongEndTime = System.currentTimeMillis();
......@@ -1087,9 +1090,9 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
String startTime = dateFormat.format(asLongStartTime);
String endTime = dateFormat.format(asLongEndTime);
// Long duration = jsonNodes.get("duration").asLong();
String duration = getDuration(asLongStartTime, asLongEndTime); //获取任务的 duration 使用的是 start-time 和 end-time 计算 不采用 duration 字段
String duration = getDuration(asLongStartTime, asLongEndTime); // 获取任务的 duration 使用的是 start-time 和 end-time 计算 不采用 duration 字段
String clusterJson = jobHistory.getClusterJson(); //获取任务历史信息的clusterJson 主要获取 jobManagerHost
String clusterJson = jobHistory.getClusterJson(); // 获取任务历史信息的clusterJson 主要获取 jobManagerHost
ObjectNode clusterJsonNodes = JSONUtil.parseObject(clusterJson);
String jobManagerHost = clusterJsonNodes.get("jobManagerHost").asText();
......
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.service.impl;
import com.dlink.constant.UploadFileConstant;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.UploadFileRecordMapper;
import com.dlink.model.UploadFileRecord;
import com.dlink.service.UploadFileRecordService;
import com.dlink.utils.FilePathUtil;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
/**
* UploadFileRecordServiceImpl
**/
@Service
public class UploadFileRecordServiceImpl extends SuperServiceImpl<UploadFileRecordMapper, UploadFileRecord> implements UploadFileRecordService {
@Override
public boolean saveOrUpdateFile(String fileName, String parentPath, String absolutePath, Byte fileType, Byte target) {
UploadFileRecord updateWrapper = new UploadFileRecord();
updateWrapper.setFileType(fileType);
updateWrapper.setTarget(target);
updateWrapper.setFileAbsolutePath(absolutePath);
UploadFileRecord entity = new UploadFileRecord();
entity.setFileType(fileType);
entity.setTarget(target);
entity.setName(UploadFileConstant.getDirName(fileType));
entity.setIsFile(true);
entity.setFileName(fileName);
entity.setFileParentPath(FilePathUtil.removeFileSeparator(parentPath));
entity.setFileAbsolutePath(absolutePath);
return saveOrUpdate(entity, new UpdateWrapper<>(updateWrapper));
}
@Override
public boolean saveOrUpdateDir(String parentPath, Byte fileType, Byte target) {
UploadFileRecord updateWrapper = new UploadFileRecord();
updateWrapper.setFileType(fileType);
updateWrapper.setTarget(target);
updateWrapper.setIsFile(false);
UploadFileRecord entity = new UploadFileRecord();
entity.setFileType(fileType);
entity.setTarget(target);
entity.setName(UploadFileConstant.getDirName(fileType));
entity.setIsFile(false);
entity.setFileParentPath(FilePathUtil.removeFileSeparator(parentPath));
return saveOrUpdate(entity, new UpdateWrapper<>(updateWrapper));
}
}
package com.dlink.utils;
import com.dlink.constant.UploadFileConstant;
import org.apache.commons.lang3.StringUtils;
/**
* File path handle
**/
public class FilePathUtil {
/**
* Add a file separator '/' at the end of the file path
*
* @param filePath File path
*/
public static String addFileSeparator(String filePath) {
if (StringUtils.isEmpty(filePath)) {
return filePath;
} else {
if (filePath.endsWith("/")) {
return filePath;
} else {
return filePath + "/";
}
}
}
/**
* Remove a file separator '/' at the end of the file path
*
* @param filePath File path
*/
public static String removeFileSeparator(String filePath) {
if (StringUtils.isEmpty(filePath)) {
return filePath;
} else {
if (filePath.endsWith("/")) {
return filePath.substring(0, filePath.length() - 1);
} else {
return filePath;
}
}
}
/**
* Get dir type, refer {@link UploadFileConstant}
*
* @param dir Directory
* @return Refer {@link UploadFileConstant}
*/
public static byte getDirTarget(String dir) {
if (StringUtils.isEmpty(dir)) {
return UploadFileConstant.TARGET_OTHER;
} else if (dir.contains("hdfs")) {
return UploadFileConstant.TARGET_HDFS;
} else {
return UploadFileConstant.TARGET_LOCAL;
}
}
}
package com.dlink.utils;
import com.dlink.common.result.Result;
import com.dlink.constant.UploadFileConstant;
import com.dlink.model.CodeEnum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.IOException;
import java.util.Objects;
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;
import cn.hutool.core.exceptions.ExceptionUtil;
import lombok.extern.slf4j.Slf4j;
/**
* Hdfs Handle
**/
@Slf4j
@Component
public class HdfsUtil {
private final Configuration configuration = new Configuration();
private FileSystem hdfs = null;
/**
* Init internal hdfs client
*/
@PostConstruct
private Result init() {
if (hdfs == null) {
String coreSiteFilePath = FilePathUtil.addFileSeparator(UploadFileConstant.HADOOP_CONF_DIR) + "core-site.xml";
String hdfsSiteFilePath = FilePathUtil.addFileSeparator(UploadFileConstant.HADOOP_CONF_DIR) + "hdfs-site.xml";
if (!new File(coreSiteFilePath).exists() || !new File(hdfsSiteFilePath).exists()) {
return Result.failed("在项目根目录下没有找到 core-site.xml/hdfs-site.xml/yarn-site.xml 文件,请先上传这些文件");
}
try {
configuration.addResource(new Path(coreSiteFilePath));
configuration.addResource(new Path(hdfsSiteFilePath));
hdfs = FileSystem.get(configuration);
} catch (IOException e) {
log.error(ExceptionUtil.stacktraceToString(e));
return Result.failed("内部 hdfs 客户端初始化错误");
}
return Result.succeed("hdfs 客户端初始化成功");
}
return Result.succeed("");
}
/**
* Upload file byte content to HDFS
*
* @param path HDFS path
* @param bytes File byte content
* @return {@link com.dlink.common.result.Result}
*/
public Result uploadFile(String path, byte[] bytes) {
Result initResult = init();
if (Objects.equals(initResult.getCode(), CodeEnum.SUCCESS.getCode())) {
try (FSDataOutputStream stream = hdfs.create(new Path(path), true)) {
stream.write(bytes);
stream.flush();
return Result.succeed("");
} catch (IOException e) {
log.error(ExceptionUtil.stacktraceToString(e));
return Result.failed("文件上传失败");
}
} else {
return initResult;
}
}
/**
* Upload file byte content to HDFS
*
* @param path HDFS path
* @param file MultipartFile instance
* @return {@link com.dlink.common.result.Result}
*/
public Result uploadFile(String path, MultipartFile file) {
try {
return uploadFile(path, file.getBytes());
} catch (IOException e) {
log.error(ExceptionUtil.stacktraceToString(e));
return Result.failed("文件上传失败");
}
}
}
......@@ -11,11 +11,11 @@ spring:
matching-strategy: ant_path_matcher
main:
allow-circular-references: true
# flyway:
# enabled: false
# clean-disabled: true
## baseline-on-migrate: true
# table: dlink_schema_history
# flyway:
# enabled: false
# clean-disabled: true
## baseline-on-migrate: true
# table: dlink_schema_history
# Redis配置
#sa-token如需依赖redis,请打开redis配置和pom.xml、dlink-admin/pom.xml中依赖
# redis:
......@@ -35,6 +35,11 @@ spring:
# min-idle: 5
# # 连接超时时间(毫秒)
# timeout: 5000
servlet:
multipart:
max-file-size: 524288000
max-request-size: 524288000
enabled: true
server:
port: 8888
......@@ -47,7 +52,7 @@ mybatis-plus:
id-type: auto
configuration:
##### mybatis-plus打印完整sql(只适用于开发环境)
# log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
# Sa-Token 配置
......
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dlink.mapper.UploadFileRecordMapper">
</mapper>
......@@ -58,7 +58,13 @@ public enum Dialect {
return Dialect.FLINKSQL;
}
public static boolean isSql(String value) {
/**
* Judge sql dialect.
*
* @param value {@link Dialect}
* @return If is flink sql, return false, otherwise return true.
*/
public static boolean notFlinkSql(String value) {
Dialect dialect = Dialect.get(value);
switch (dialect) {
case SQL:
......
......@@ -634,4 +634,29 @@ CREATE TABLE `dlink_fragment` (
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='全局变量';
-- ----------------------------
-- Table structure for dlink_upload_file_record
-- ----------------------------
DROP TABLE IF EXISTS `dlink_upload_file_record`;
CREATE TABLE `dlink_upload_file_record` (
`id` tinyint NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '上传文件的类型名称,目前有:hadoop-conf(1)、flink-conf(2)、flink-lib(3)、user-jar(4)、dlink-jar(5)',
`enabled` tinyint(1) DEFAULT NULL COMMENT '是否可用',
`file_type` tinyint DEFAULT '-1' COMMENT '上传文件的类型ID,目前有:hadoop-conf(1)、flink-conf(2)、flink-lib(3)、user-jar(4)、dlink-jar(5),默认值-1表示无类型',
`target` tinyint NOT NULL COMMENT '上传文件的目的地,目前有:local(1)、hdfs(2)',
`file_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '文件名称',
`file_parent_path` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '文件父路径',
`file_absolute_path` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '文件完全绝对父路径',
`is_file` tinyint(1) NOT NULL DEFAULT '1' COMMENT '是否为文件',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='上传文件记录';
SET FOREIGN_KEY_CHECKS = 1;
......@@ -707,3 +707,22 @@ CREATE TABLE `dlink_fragment` (
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `un_idx1` (`name`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='全局变量';
-- 0.7.7-SNAPSHOT 2022-08-22
-- -----------------------
-- DROP TABLE IF EXISTS `dlink_upload_file_record`;
CREATE TABLE `dlink_upload_file_record` (
`id` tinyint NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '上传文件的类型名称,目前有:hadoop-conf(1)、flink-conf(2)、flink-lib(3)、user-jar(4)、dlink-jar(5)',
`enabled` tinyint(1) DEFAULT NULL COMMENT '是否可用',
`file_type` tinyint DEFAULT '-1' COMMENT '上传文件的类型ID,目前有:hadoop-conf(1)、flink-conf(2)、flink-lib(3)、user-jar(4)、dlink-jar(5),默认值-1表示无类型',
`target` tinyint NOT NULL COMMENT '上传文件的目的地,目前有:local(1)、hdfs(2)',
`file_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '文件名称',
`file_parent_path` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '文件父路径',
`file_absolute_path` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '文件完全绝对父路径',
`is_file` tinyint(1) NOT NULL DEFAULT '1' COMMENT '是否为文件',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='上传文件记录';
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment