Unverified Commit c6e87d9d authored by windWheel's avatar windWheel Committed by GitHub

Fix cluster submission taskId is empty (#862)

* The bug that the cluster information cannot be obtained in the savepoint save list

* Add taskId in case dlink maintains sql

* format code

* enhance manage logic

* 1. fix taskId is 0
2. reformat code

* reformat code

* refactor imports
parent 6a58b9a7
......@@ -29,6 +29,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts;
......@@ -75,10 +79,6 @@ import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool;
import com.dlink.sql.FlinkQuery;
import com.dlink.utils.RunTimeUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* StudioServiceImpl
......@@ -88,9 +88,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
*/
@Service
public class StudioServiceImpl implements StudioService {
private static final Logger logger = LoggerFactory.getLogger(StudioServiceImpl.class);
@Autowired
private ClusterService clusterService;
@Autowired
......@@ -101,9 +101,10 @@ public class StudioServiceImpl implements StudioService {
private DataBaseService dataBaseService;
@Autowired
private TaskService taskService;
@Autowired
private FragmentVariableService fragmentVariableService;
private void addFlinkSQLEnv(AbstractStatementDTO statementDTO) {
statementDTO.setVariables(fragmentVariableService.listEnabledVariables());
String flinkWithSql = dataBaseService.getEnabledFlinkWithSql();
......@@ -117,14 +118,14 @@ public class StudioServiceImpl implements StudioService {
}
}
}
private void buildSession(JobConfig config) {
// If you are using a shared session, configure the current jobmanager address
if (!config.isUseSession()) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), config.getClusterId()));
}
}
@Override
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
if (Dialect.isSql(studioExecuteDTO.getDialect())) {
......@@ -134,7 +135,7 @@ public class StudioServiceImpl implements StudioService {
return executeFlinkSql(studioExecuteDTO);
}
}
private JobResult executeFlinkSql(StudioExecuteDTO studioExecuteDTO) {
addFlinkSQLEnv(studioExecuteDTO);
JobConfig config = studioExecuteDTO.getJobConfig();
......@@ -146,7 +147,7 @@ public class StudioServiceImpl implements StudioService {
RunTimeUtil.recovery(jobManager);
return jobResult;
}
private IResult executeMSFlinkSql(StudioMetaStoreDTO studioMetaStoreDTO) {
addFlinkSQLEnv(studioMetaStoreDTO);
JobConfig config = studioMetaStoreDTO.getJobConfig();
......@@ -155,7 +156,7 @@ public class StudioServiceImpl implements StudioService {
RunTimeUtil.recovery(jobManager);
return jobResult;
}
public JobResult executeCommonSql(SqlDTO sqlDTO) {
JobResult result = new JobResult();
result.setStatement(sqlDTO.getStatement());
......@@ -187,7 +188,7 @@ public class StudioServiceImpl implements StudioService {
return result;
}
}
@Override
public IResult executeDDL(StudioDDLDTO studioDDLDTO) {
JobConfig config = studioDDLDTO.getJobConfig();
......@@ -197,7 +198,7 @@ public class StudioServiceImpl implements StudioService {
JobManager jobManager = JobManager.build(config);
return jobManager.executeDDL(studioDDLDTO.getStatement());
}
@Override
public List<SqlExplainResult> explainSql(StudioExecuteDTO studioExecuteDTO) {
if (Dialect.isSql(studioExecuteDTO.getDialect())) {
......@@ -206,7 +207,7 @@ public class StudioServiceImpl implements StudioService {
return explainFlinkSql(studioExecuteDTO);
}
}
private List<SqlExplainResult> explainFlinkSql(StudioExecuteDTO studioExecuteDTO) {
addFlinkSQLEnv(studioExecuteDTO);
JobConfig config = studioExecuteDTO.getJobConfig();
......@@ -218,7 +219,7 @@ public class StudioServiceImpl implements StudioService {
JobManager jobManager = JobManager.buildPlanMode(config);
return jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults();
}
private List<SqlExplainResult> explainCommonSql(StudioExecuteDTO studioExecuteDTO) {
if (Asserts.isNull(studioExecuteDTO.getDatabaseId())) {
return new ArrayList<SqlExplainResult>() {{
......@@ -237,7 +238,7 @@ public class StudioServiceImpl implements StudioService {
return sqlExplainResults;
}
}
@Override
public ObjectNode getStreamGraph(StudioExecuteDTO studioExecuteDTO) {
addFlinkSQLEnv(studioExecuteDTO);
......@@ -248,7 +249,7 @@ public class StudioServiceImpl implements StudioService {
JobManager jobManager = JobManager.buildPlanMode(config);
return jobManager.getStreamGraph(studioExecuteDTO.getStatement());
}
@Override
public ObjectNode getJobPlan(StudioExecuteDTO studioExecuteDTO) {
addFlinkSQLEnv(studioExecuteDTO);
......@@ -268,12 +269,12 @@ public class StudioServiceImpl implements StudioService {
return objectNode;
}
}
@Override
public SelectResult getJobData(String jobId) {
return JobManager.getJobData(jobId);
}
@Override
public SessionInfo createSession(SessionDTO sessionDTO, String createUser) {
if (sessionDTO.isUseRemote()) {
......@@ -291,21 +292,17 @@ public class StudioServiceImpl implements StudioService {
return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser);
}
}
@Override
public boolean clearSession(String session) {
if (SessionPool.remove(session) > 0) {
return true;
} else {
return false;
}
return SessionPool.remove(session) > 0;
}
@Override
public List<SessionInfo> listSession(String createUser) {
return JobManager.listSession(createUser);
}
@Override
public LineageResult getLineage(StudioCADTO studioCADTO) {
if (Asserts.isNotNullString(studioCADTO.getDialect()) && !studioCADTO.getDialect().equalsIgnoreCase("flinksql")) {
......@@ -330,7 +327,7 @@ public class StudioServiceImpl implements StudioService {
}
}
}
@Override
public List<JsonNode> listJobs(Integer clusterId) {
Cluster cluster = clusterService.getById(clusterId);
......@@ -342,7 +339,7 @@ public class StudioServiceImpl implements StudioService {
}
return new ArrayList<>();
}
@Override
public boolean cancel(Integer clusterId, String jobId) {
Cluster cluster = clusterService.getById(clusterId);
......@@ -356,28 +353,36 @@ public class StudioServiceImpl implements StudioService {
JobManager jobManager = JobManager.build(jobConfig);
return jobManager.cancel(jobId);
}
@Override
public boolean savepoint(Integer taskId, Integer clusterId, String jobId, String savePointType, String name) {
Cluster cluster = clusterService.getById(clusterId);
Asserts.checkNotNull(cluster, "该集群不存在");
boolean useGateway = false;
JobConfig jobConfig = new JobConfig();
jobConfig.setAddress(cluster.getJobManagerHost());
jobConfig.setType(cluster.getType());
//如果用户选择用dlink平台来托管集群信息 说明任务一定是从dlink发起提交的
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName());
jobConfig.setTaskId(cluster.getTaskId());
useGateway = true;
} else {
}
//用户选择外部的平台来托管集群信息,但是集群上的任务不一定是通过dlink提交的
else {
jobConfig.setTaskId(taskId);
}
JobManager jobManager = JobManager.build(jobConfig);
jobManager.setUseGateway(useGateway);
SavePointResult savePointResult = jobManager.savepoint(jobId, savePointType, null);
if (Asserts.isNotNull(savePointResult)) {
if (jobConfig.getTaskId().equals(0)) {
return true;
}
for (JobInfo item : savePointResult.getJobInfos()) {
if (Asserts.isEqualsIgnoreCase(jobId, item.getJobId()) && Asserts.isNotNull(jobConfig.getTaskId())) {
Savepoints savepoints = new Savepoints();
......@@ -392,7 +397,7 @@ public class StudioServiceImpl implements StudioService {
}
return false;
}
@Override
public List<Catalog> getMSCatalogs(StudioMetaStoreDTO studioMetaStoreDTO) {
List<Catalog> catalogs = new ArrayList<>();
......@@ -439,7 +444,7 @@ public class StudioServiceImpl implements StudioService {
}
return catalogs;
}
@Override
public Schema getMSSchemaInfo(StudioMetaStoreDTO studioMetaStoreDTO) {
Schema schema = Schema.build(studioMetaStoreDTO.getDatabase());
......@@ -482,7 +487,7 @@ public class StudioServiceImpl implements StudioService {
schema.setTables(tables);
return schema;
}
@Override
public List<FlinkColumn> getMSFlinkColumns(StudioMetaStoreDTO studioMetaStoreDTO) {
List<FlinkColumn> columns = new ArrayList<>();
......@@ -515,7 +520,7 @@ public class StudioServiceImpl implements StudioService {
}
return columns;
}
private List<String> showInfo(StudioMetaStoreDTO studioMetaStoreDTO, String baseStatement, String statement) {
List<String> infos = new ArrayList<>();
String tableStatement = baseStatement + statement;
......@@ -534,7 +539,7 @@ public class StudioServiceImpl implements StudioService {
}
return infos;
}
private void initUDF(JobConfig config, String statement) {
if (!GatewayType.LOCAL.equalsValue(config.getType())) {
return;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment