Unverified Commit d70fd4c7 authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Fix][admin,web,app,core] Fix rc1 bugs (#1007)

Co-authored-by: 's avatarwenmo <32723967+wenmo@users.noreply.github.com>
parent a70e0209
......@@ -72,4 +72,19 @@ public class FileUploadController {
}
}
/**
* Upload hdfs file<br>
*
* @param files Multi files
* @param dir Dir, default is empty. If not provide, please provide the 'fileType' value
* @param hadoopConfigPath Please refer {@link UploadFileConstant}, default is -1. If not provide, please provide the 'dir' value
* @return {@link Result}
*/
@PostMapping(value = "hdfs")
public Result uploadHdfs(@RequestPart("files") MultipartFile[] files,
@RequestParam(value = "dir", defaultValue = "", required = false) String dir,
@RequestParam(value = "hadoopConfigPath", required = false) String hadoopConfigPath) {
return fileUploadService.uploadHdfs(files, dir, hadoopConfigPath);
}
}
......@@ -34,7 +34,7 @@ public interface FileUploadService {
*
* @param file {@link MultipartFile} instance
* @param fileType Upload file's type, refer ${@link UploadFileConstant}
* @return {@link com.dlink.common.result.Result}
* @return {@link Result}
*/
Result upload(MultipartFile file, Byte fileType);
......@@ -43,7 +43,7 @@ public interface FileUploadService {
*
* @param files {@link MultipartFile} instance
* @param fileType Upload file's type, refer ${@link UploadFileConstant}
* @return {@link com.dlink.common.result.Result}
* @return {@link Result}
*/
Result upload(MultipartFile[] files, Byte fileType);
......@@ -53,7 +53,7 @@ public interface FileUploadService {
* @param file {@link MultipartFile} instance
* @param dir Local absolute dir
* @param fileType Upload file's type, refer ${@link UploadFileConstant}
* @return {@link com.dlink.common.result.Result}
* @return {@link Result}
*/
Result upload(MultipartFile file, String dir, Byte fileType);
......@@ -63,8 +63,25 @@ public interface FileUploadService {
* @param files {@link MultipartFile} instance
* @param dir Local absolute dir
* @param fileType Upload file's type, refer ${@link UploadFileConstant}
* @return {@link com.dlink.common.result.Result}
* @return {@link Result}
*/
Result upload(MultipartFile[] files, String dir, Byte fileType);
/**
* Upload one hdfs file, if target file exists, will delete it first
*
* @param file {@link MultipartFile} instance
* @param hadoopConfigPath core-site.xml,hdfs-site.xml,yarn-site.xml
* @return {@link Result}
*/
Result uploadHdfs(MultipartFile file, String dir, String hadoopConfigPath);
/**
* Upload multy hdfs file, if target file exists, will delete it first
*
* @param files {@link MultipartFile} instance
* @param hadoopConfigPath core-site.xml,hdfs-site.xml,yarn-site.xml
* @return {@link Result}
*/
Result uploadHdfs(MultipartFile[] files, String dir, String hadoopConfigPath);
}
......@@ -19,6 +19,7 @@
package com.dlink.service.impl;
import com.dlink.assertion.Asserts;
import com.dlink.common.result.Result;
import com.dlink.constant.UploadFileConstant;
import com.dlink.model.CodeEnum;
......@@ -48,8 +49,6 @@ import lombok.extern.slf4j.Slf4j;
@Service
public class FileUploadServiceImpl implements FileUploadService {
@Resource
private HdfsUtil hdfsUtil;
@Resource
private UploadFileRecordService uploadFileRecordService;
......@@ -76,7 +75,7 @@ public class FileUploadServiceImpl implements FileUploadService {
}
}
case UploadFileConstant.TARGET_HDFS: {
Result result = hdfsUtil.uploadFile(filePath, file);
Result result = HdfsUtil.uploadFile(filePath, file);
if (Objects.equals(result.getCode(), CodeEnum.SUCCESS.getCode())) {
if (uploadFileRecordService.saveOrUpdateFile(file.getOriginalFilename(), dir, filePath, fileType, UploadFileConstant.TARGET_HDFS)) {
return Result.succeed("上传成功");
......@@ -119,6 +118,42 @@ public class FileUploadServiceImpl implements FileUploadService {
}
}
@Override
public Result uploadHdfs(MultipartFile file, String dir, String hadoopConfigPath) {
String filePath = FilePathUtil.addFileSeparator(dir) + file.getOriginalFilename();
Result result = HdfsUtil.uploadFile(filePath, file, hadoopConfigPath);
if (Objects.equals(result.getCode(), CodeEnum.SUCCESS.getCode())) {
if (uploadFileRecordService.saveOrUpdateFile(file.getOriginalFilename(), dir, filePath, UploadFileConstant.FLINK_LIB_ID, UploadFileConstant.TARGET_HDFS)) {
return Result.succeed("上传成功");
} else {
return Result.failed("数据库异常");
}
} else {
return result;
}
}
@Override
public Result uploadHdfs(MultipartFile[] files, String dir, String hadoopConfigPath) {
if (Asserts.isNullString(dir)) {
dir = UploadFileConstant.getDirPath(UploadFileConstant.FLINK_LIB_ID);
}
if (files.length > 0) {
for (MultipartFile file : files) {
Result uploadResult = uploadHdfs(file, dir, hadoopConfigPath);
if (Objects.equals(uploadResult.getCode(), CodeEnum.ERROR.getCode())) {
return uploadResult;
}
}
if (!uploadFileRecordService.saveOrUpdateDir(dir, UploadFileConstant.FLINK_LIB_ID, UploadFileConstant.TARGET_HDFS)) {
return Result.failed("数据库异常");
}
return Result.succeed("全部上传成功");
} else {
return Result.succeed("没有检测到要上传的文件");
}
}
@Override
public Result upload(MultipartFile[] files, Byte fileType) {
String dir = UploadFileConstant.getDirPath(fileType);
......
package com.dlink.utils;
import com.dlink.assertion.Asserts;
import com.dlink.common.result.Result;
import com.dlink.constant.UploadFileConstant;
import com.dlink.model.CodeEnum;
......@@ -13,9 +14,6 @@ import java.io.File;
import java.io.IOException;
import java.util.Objects;
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;
import cn.hutool.core.exceptions.ExceptionUtil;
......@@ -25,24 +23,27 @@ import lombok.extern.slf4j.Slf4j;
* Hdfs Handle
**/
@Slf4j
@Component
public class HdfsUtil {
private final Configuration configuration = new Configuration();
private FileSystem hdfs = null;
private static FileSystem hdfs = null;
/**
* Init internal hdfs client
*
* @param hadoopConfigPath HDFS config path
*/
@PostConstruct
private Result init() {
private static Result init(String hadoopConfigPath) {
if (hdfs == null) {
String coreSiteFilePath = FilePathUtil.addFileSeparator(UploadFileConstant.HADOOP_CONF_DIR) + "core-site.xml";
String hdfsSiteFilePath = FilePathUtil.addFileSeparator(UploadFileConstant.HADOOP_CONF_DIR) + "hdfs-site.xml";
if (Asserts.isNullString(hadoopConfigPath)) {
hadoopConfigPath = FilePathUtil.removeFileSeparator(UploadFileConstant.HADOOP_CONF_DIR);
}
String coreSiteFilePath = hadoopConfigPath + "/core-site.xml";
String hdfsSiteFilePath = hadoopConfigPath + "/hdfs-site.xml";
if (!new File(coreSiteFilePath).exists() || !new File(hdfsSiteFilePath).exists()) {
return Result.failed("在项目根目录下没有找到 core-site.xml/hdfs-site.xml/yarn-site.xml 文件,请先上传这些文件");
}
try {
final Configuration configuration = new Configuration();
configuration.addResource(new Path(coreSiteFilePath));
configuration.addResource(new Path(hdfsSiteFilePath));
hdfs = FileSystem.get(configuration);
......@@ -60,10 +61,22 @@ public class HdfsUtil {
*
* @param path HDFS path
* @param bytes File byte content
* @return {@link com.dlink.common.result.Result}
* @return {@link Result}
*/
public static Result uploadFile(String path, byte[] bytes) {
return uploadFile(path, bytes, null);
}
/**
* Upload file byte content to HDFS
*
* @param path HDFS path
* @param bytes File byte content
* @param hadoopConfigPath hdfs config path
* @return {@link Result}
*/
public Result uploadFile(String path, byte[] bytes) {
Result initResult = init();
public static Result uploadFile(String path, byte[] bytes, String hadoopConfigPath) {
Result initResult = init(hadoopConfigPath);
if (Objects.equals(initResult.getCode(), CodeEnum.SUCCESS.getCode())) {
try (FSDataOutputStream stream = hdfs.create(new Path(path), true)) {
stream.write(bytes);
......@@ -83,9 +96,9 @@ public class HdfsUtil {
*
* @param path HDFS path
* @param file MultipartFile instance
* @return {@link com.dlink.common.result.Result}
* @return {@link Result}
*/
public Result uploadFile(String path, MultipartFile file) {
public static Result uploadFile(String path, MultipartFile file) {
try {
return uploadFile(path, file.getBytes());
} catch (IOException e) {
......@@ -94,4 +107,21 @@ public class HdfsUtil {
}
}
/**
* Upload file byte content to HDFS
*
* @param path HDFS path
* @param file MultipartFile instance
* @param hadoopConfigPath hdfs config path
* @return {@link Result}
*/
public static Result uploadFile(String path, MultipartFile file, String hadoopConfigPath) {
try {
return uploadFile(path, file.getBytes(), hadoopConfigPath);
} catch (IOException e) {
log.error(ExceptionUtil.stacktraceToString(e));
return Result.failed("文件上传失败");
}
}
}
......@@ -28,6 +28,7 @@ import com.dlink.executor.ExecutorSetting;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType;
import com.dlink.trans.Operations;
import com.dlink.utils.SqlUtil;
import org.apache.flink.configuration.CheckpointingOptions;
......@@ -65,8 +66,8 @@ public class Submiter {
throw new SQLException("请指定任务ID");
}
return "select id, name, alias as jobName, type,check_point as checkpoint,"
+ "save_point_path as savePointPath, parallelism,fragment as useSqlFragment,statement_set as useStatementSet,config_json as config,"
+ " env_id as envId,batch_model AS useBatchModel from dlink_task where id = " + id;
+ "save_point_path as savePointPath, parallelism,fragment as useSqlFragment,statement_set as useStatementSet,config_json as config,"
+ " env_id as envId,batch_model AS useBatchModel from dlink_task where id = " + id;
}
private static String getFlinkSQLStatement(Integer id, DBConfig config) {
......@@ -90,7 +91,7 @@ public class Submiter {
}
public static List<String> getStatements(String sql) {
return Arrays.asList(sql.split(FlinkSQLConstant.SEPARATOR));
return Arrays.asList(SqlUtil.getStatements(sql));
}
public static void submit(Integer id, DBConfig dbConfig) {
......@@ -110,11 +111,11 @@ public class Submiter {
String uuid = UUID.randomUUID().toString().replace("-", "");
if (executorSetting.getConfig().containsKey(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key())) {
executorSetting.getConfig().put(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
executorSetting.getConfig().get(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()) + "/" + uuid);
executorSetting.getConfig().get(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()) + "/" + uuid);
}
if (executorSetting.getConfig().containsKey(CheckpointingOptions.SAVEPOINT_DIRECTORY.key())) {
executorSetting.getConfig().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
executorSetting.getConfig().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()) + "/" + uuid);
executorSetting.getConfig().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()) + "/" + uuid);
}
logger.info("作业配置如下: {}", executorSetting);
Executor executor = Executor.buildAppStreamExecutor(executorSetting);
......
......@@ -207,82 +207,82 @@ public class FlinkAPI {
}
/**
* @return JsonNode
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getJobManagerMetrics 获取jobManager的监控信息
* @return JsonNode
*/
public JsonNode getJobManagerMetrics() {
return get(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.METRICS + FlinkRestAPIConstant.GET + buildMetricsParms(FlinkRestAPIConstant.JOB_MANAGER));
}
/**
* @return JsonNode
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getJobManagerConfig 获取jobManager的配置信息
* @return JsonNode
*/
public JsonNode getJobManagerConfig() {
return get(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.CONFIG);
}
/**
* @return JsonNode
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getJobManagerLog 获取jobManager的日志信息
* @return JsonNode
*/
public String getJobManagerLog() {
return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOG);
}
/**
* @return String
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getJobManagerStdOut 获取jobManager的控制台输出日志
* @return String
*/
public String getJobManagerStdOut() {
return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.STDOUT);
}
/**
* @return JsonNode
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getJobManagerLogList 获取jobManager的日志列表
* @return JsonNode
*/
public JsonNode getJobManagerLogList() {
return get(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOGS);
}
/**
* @param logName 日志文件名
* @return String
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getJobManagerLogFileDetail 获取jobManager的日志文件的具体信息
* @param logName 日志文件名
* @return String
*/
public String getJobManagerLogFileDetail(String logName) {
return getResult(FlinkRestAPIConstant.JOB_MANAGER + FlinkRestAPIConstant.LOGS + logName);
}
/**
* @return JsonNode
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getTaskManagers 获取taskManager的列表
* @return JsonNode
*/
public JsonNode getTaskManagers() {
return get(FlinkRestAPIConstant.TASK_MANAGER);
}
/**
* @return String
* @Author: zhumingye
* @date: 2022/6/24
* @Description: buildMetricsParms 构建metrics参数
* @Params: type: 入参类型 可选值:task-manager, job-manager
* @return String
*/
public String buildMetricsParms(String type) {
JsonNode jsonNode = get(type + FlinkRestAPIConstant.METRICS);
......@@ -290,70 +290,75 @@ public class FlinkAPI {
Iterator<JsonNode> jsonNodeIterator = jsonNode.elements();
while (jsonNodeIterator.hasNext()) {
JsonNode node = jsonNodeIterator.next();
sb.append(node.get("id").asText()).append(",");
if (Asserts.isNotNull(node) && Asserts.isNotNull(node.get("id"))) {
if (sb.length() > 0) {
sb.append(",");
}
sb.append(node.get("id").asText());
}
}
return sb.deleteCharAt(sb.length() - 1).toString();
return sb.toString();
}
/**
* @return JsonNode
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getJobManagerLog 获取jobManager的日志信息
* @return JsonNode
*/
public JsonNode getTaskManagerMetrics(String containerId) {
return get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.METRICS + FlinkRestAPIConstant.GET + buildMetricsParms(FlinkRestAPIConstant.JOB_MANAGER));
}
/**
* @param containerId 容器id
* @return String
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getTaskManagerLog 获取taskManager的日志信息
* @param containerId 容器id
* @return String
*/
public String getTaskManagerLog(String containerId) {
return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.LOG);
}
/**
* @param containerId 容器id
* @return JsonNode
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getTaskManagerStdOut 获取taskManager的StdOut日志信息
* @param containerId 容器id
* @return JsonNode
*/
public String getTaskManagerStdOut(String containerId) {
return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.STDOUT);
}
/**
* @param containerId 容器id
* @return JsonNode
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getTaskManagerLogList 获取taskManager的日志列表
* @param containerId 容器id
* @return JsonNode
*/
public JsonNode getTaskManagerLogList(String containerId) {
return get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.LOGS);
}
/**
* @param logName 日志名称
* @return String
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getTaskManagerLogFileDeatil 获取具体日志的详细信息
* @param logName 日志名称
* @return String
*/
public String getTaskManagerLogFileDeatil(String containerId,String logName) {
public String getTaskManagerLogFileDeatil(String containerId, String logName) {
return getResult(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.LOGS + logName);
}
/**
* @return JsonNode
* @Author: zhumingye
* @date: 2022/6/24
* @Description: getTaskManagerThreadDump 获取taskManager的线程信息
* @return JsonNode
*/
public JsonNode getTaskManagerThreadDump(String containerId) {
return get(FlinkRestAPIConstant.TASK_MANAGER + containerId + FlinkRestAPIConstant.THREAD_DUMP);
......
......@@ -102,7 +102,6 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
return {
name: 'files',
action: '/api/fileUpload',
// accept: 'application/json',
headers: {
authorization: 'authorization-text',
},
......@@ -124,6 +123,32 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
}
};
const getUploadHdfsProps = (dir: string) => {
return {
name: 'files',
action: '/api/fileUpload/hdfs',
headers: {
authorization: 'authorization-text',
},
data: {
dir,
hadoopConfigPath
},
showUploadList: true,
onChange(info) {
if (info.file.status === 'done') {
if (info.file.response.code == CODE.SUCCESS) {
message.success(info.file.response.msg);
} else {
message.warn(info.file.response.msg);
}
} else if (info.file.status === 'error') {
message.error(`${info.file.name} 上传失败`);
}
},
}
};
const renderContent = (formValsPara: Partial<ClusterConfigurationTableListItem>) => {
return (
<>
......@@ -146,7 +171,7 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
>
<Input placeholder="值如 /etc/hadoop/conf" addonAfter={
<Form.Item name="suffix" noStyle>
<Upload {...getUploadProps(hadoopConfigPath)}>
<Upload {...getUploadProps(hadoopConfigPath)} multiple>
<UploadOutlined/>
</Upload>
</Form.Item>}/>
......@@ -235,7 +260,7 @@ const ClusterConfigurationForm: React.FC<ClusterConfigurationFormProps> = (props
>
<Input placeholder="值如 hdfs:///flink/lib" addonAfter={
<Form.Item name="suffix" noStyle>
<Upload {...getUploadProps(flinkLibPath)}>
<Upload {...getUploadHdfsProps(flinkLibPath)} multiple>
<UploadOutlined/>
</Upload>
</Form.Item>}/>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment