Unverified Commit 02b9112a authored by aiwenmo's avatar aiwenmo Committed by GitHub

[Fix][admin] Fix job instance cause OOM (#683)

* [Feature-676][*] Release v0.6.5

* [Fix][admin] Fix job instance cause OOM
Co-authored-by: 's avatarwenmo <32723967+wenmo@users.noreply.github.com>
parent da3e00bc
...@@ -10,10 +10,8 @@ import java.time.temporal.ChronoUnit; ...@@ -10,10 +10,8 @@ import java.time.temporal.ChronoUnit;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.annotation.Resource; import javax.annotation.Resource;
...@@ -29,7 +27,6 @@ import com.dlink.alert.AlertConfig; ...@@ -29,7 +27,6 @@ import com.dlink.alert.AlertConfig;
import com.dlink.alert.AlertMsg; import com.dlink.alert.AlertMsg;
import com.dlink.alert.AlertResult; import com.dlink.alert.AlertResult;
import com.dlink.alert.ShowType; import com.dlink.alert.ShowType;
import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Assert; import com.dlink.assertion.Assert;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.assertion.Tips; import com.dlink.assertion.Tips;
...@@ -49,7 +46,6 @@ import com.dlink.gateway.config.SavePointStrategy; ...@@ -49,7 +46,6 @@ import com.dlink.gateway.config.SavePointStrategy;
import com.dlink.gateway.config.SavePointType; import com.dlink.gateway.config.SavePointType;
import com.dlink.gateway.model.JobInfo; import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
import com.dlink.job.BuildConfiguration;
import com.dlink.job.FlinkJobTask; import com.dlink.job.FlinkJobTask;
import com.dlink.job.FlinkJobTaskPool; import com.dlink.job.FlinkJobTaskPool;
import com.dlink.job.Job; import com.dlink.job.Job;
...@@ -70,13 +66,11 @@ import com.dlink.model.JobHistory; ...@@ -70,13 +66,11 @@ import com.dlink.model.JobHistory;
import com.dlink.model.JobInfoDetail; import com.dlink.model.JobInfoDetail;
import com.dlink.model.JobInstance; import com.dlink.model.JobInstance;
import com.dlink.model.JobLifeCycle; import com.dlink.model.JobLifeCycle;
import com.dlink.model.JobManagerConfiguration;
import com.dlink.model.JobStatus; import com.dlink.model.JobStatus;
import com.dlink.model.Savepoints; import com.dlink.model.Savepoints;
import com.dlink.model.Statement; import com.dlink.model.Statement;
import com.dlink.model.SystemConfiguration; import com.dlink.model.SystemConfiguration;
import com.dlink.model.Task; import com.dlink.model.Task;
import com.dlink.model.TaskManagerConfiguration;
import com.dlink.model.TaskVersion; import com.dlink.model.TaskVersion;
import com.dlink.result.SqlExplainResult; import com.dlink.result.SqlExplainResult;
import com.dlink.service.AlertGroupService; import com.dlink.service.AlertGroupService;
...@@ -94,7 +88,6 @@ import com.dlink.service.TaskService; ...@@ -94,7 +88,6 @@ import com.dlink.service.TaskService;
import com.dlink.service.TaskVersionService; import com.dlink.service.TaskVersionService;
import com.dlink.utils.CustomStringJavaCompiler; import com.dlink.utils.CustomStringJavaCompiler;
import com.dlink.utils.JSONUtil; import com.dlink.utils.JSONUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
...@@ -531,19 +524,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -531,19 +524,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
if (Asserts.isNullString(type)) { if (Asserts.isNullString(type)) {
type = SavePointType.CANCEL.getValue(); type = SavePointType.CANCEL.getValue();
} }
if (savepointTask(id, type)) { savepointTask(id, type);
if (!JobLifeCycle.ONLINE.equalsValue(task.getStep())) { if (!JobLifeCycle.ONLINE.equalsValue(task.getStep())) {
return Result.succeed("停止成功"); return Result.succeed("停止成功");
} }
task.setStep(JobLifeCycle.RELEASE.getValue()); task.setStep(JobLifeCycle.RELEASE.getValue());
if (updateById(task)) { updateById(task);
return Result.succeed("下线成功"); return Result.succeed("下线成功");
} else {
return Result.failed("由于未知原因,下线失败");
}
} else {
return Result.failed("SavePoint失败,下线失败");
}
} }
@Override @Override
...@@ -699,27 +686,6 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -699,27 +686,6 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
jobInfoDetail.setCluster(cluster); jobInfoDetail.setCluster(cluster);
History history = historyService.getById(jobInstance.getHistoryId()); History history = historyService.getById(jobInstance.getHistoryId());
history.setConfig(JSONUtil.parseObject(history.getConfigJson())); history.setConfig(JSONUtil.parseObject(history.getConfigJson()));
JobManagerConfiguration jobManagerConfiguration = new JobManagerConfiguration();
Set<TaskManagerConfiguration> taskManagerConfigurationList = new HashSet<>();
if (Asserts.isNotNullString(history.getJobManagerAddress()) && JobStatus.RUNNING.getValue().equals(jobInfoDetail.getInstance().getStatus())) { // 如果有jobManager地址,则使用该地址
FlinkAPI flinkAPI = FlinkAPI.build(history.getJobManagerAddress());
// 获取jobManager的配置信息 开始
BuildConfiguration.buildJobManagerConfiguration(jobManagerConfiguration, flinkAPI);
jobInfoDetail.setJobManagerConfiguration(jobManagerConfiguration);
// 获取jobManager的配置信息 结束
// 获取taskManager的配置信息 开始
JsonNode taskManagerContainers = flinkAPI.getTaskManagers(); //获取taskManager列表
BuildConfiguration.buildTaskManagerConfiguration(taskManagerConfigurationList, flinkAPI, taskManagerContainers);
jobInfoDetail.setTaskManagerConfiguration(taskManagerConfigurationList);
// 获取taskManager的配置信息 结束
}
if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) { if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) {
jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId())); jobInfoDetail.setClusterConfiguration(clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment