Unverified Commit 7c0ed566 authored by ZackYoung's avatar ZackYoung Committed by GitHub

change UDF struct,independent UDFService (#1095)

* add UDFService

* change comment

* 修改重复项
parent c384175d
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.service;
import com.dlink.job.JobConfig;
/**
* @author ZackYoung
* @since 0.6.8
*/
public interface UDFService {
void initUDF(JobConfig config, String statement);
}
...@@ -22,7 +22,6 @@ package com.dlink.service.impl; ...@@ -22,7 +22,6 @@ package com.dlink.service.impl;
import com.dlink.api.FlinkAPI; import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.config.Dialect; import com.dlink.config.Dialect;
import com.dlink.constant.PathConstant;
import com.dlink.dto.AbstractStatementDTO; import com.dlink.dto.AbstractStatementDTO;
import com.dlink.dto.SessionDTO; import com.dlink.dto.SessionDTO;
import com.dlink.dto.SqlDTO; import com.dlink.dto.SqlDTO;
...@@ -32,7 +31,6 @@ import com.dlink.dto.StudioExecuteDTO; ...@@ -32,7 +31,6 @@ import com.dlink.dto.StudioExecuteDTO;
import com.dlink.dto.StudioMetaStoreDTO; import com.dlink.dto.StudioMetaStoreDTO;
import com.dlink.explainer.lineage.LineageBuilder; import com.dlink.explainer.lineage.LineageBuilder;
import com.dlink.explainer.lineage.LineageResult; import com.dlink.explainer.lineage.LineageResult;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.model.JobInfo; import com.dlink.gateway.model.JobInfo;
import com.dlink.gateway.result.SavePointResult; import com.dlink.gateway.result.SavePointResult;
import com.dlink.job.JobConfig; import com.dlink.job.JobConfig;
...@@ -60,16 +58,15 @@ import com.dlink.service.FragmentVariableService; ...@@ -60,16 +58,15 @@ import com.dlink.service.FragmentVariableService;
import com.dlink.service.SavepointsService; import com.dlink.service.SavepointsService;
import com.dlink.service.StudioService; import com.dlink.service.StudioService;
import com.dlink.service.TaskService; import com.dlink.service.TaskService;
import com.dlink.service.UDFService;
import com.dlink.session.SessionConfig; import com.dlink.session.SessionConfig;
import com.dlink.session.SessionInfo; import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool; import com.dlink.session.SessionPool;
import com.dlink.sql.FlinkQuery; import com.dlink.sql.FlinkQuery;
import com.dlink.utils.RunTimeUtil; import com.dlink.utils.RunTimeUtil;
import com.dlink.utils.UDFUtil;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -84,8 +81,6 @@ import com.fasterxml.jackson.databind.JsonNode; ...@@ -84,8 +81,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
/** /**
* StudioServiceImpl * StudioServiceImpl
...@@ -98,14 +93,6 @@ public class StudioServiceImpl implements StudioService { ...@@ -98,14 +93,6 @@ public class StudioServiceImpl implements StudioService {
private static final Logger logger = LoggerFactory.getLogger(StudioServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(StudioServiceImpl.class);
/**
* 网关类型 map
* 快速获取 session 与 application 等类型,为了减少判断
*/
private static final Map<String, List<GatewayType>> GATEWAY_TYPE_MAP = MapUtil
.builder("session", Arrays.asList(GatewayType.YARN_SESSION, GatewayType.YARN_SESSION, GatewayType.STANDALONE))
.build();
@Autowired @Autowired
private ClusterService clusterService; private ClusterService clusterService;
@Autowired @Autowired
...@@ -120,6 +107,9 @@ public class StudioServiceImpl implements StudioService { ...@@ -120,6 +107,9 @@ public class StudioServiceImpl implements StudioService {
@Autowired @Autowired
private FragmentVariableService fragmentVariableService; private FragmentVariableService fragmentVariableService;
@Autowired
private UDFService udfService;
private void addFlinkSQLEnv(AbstractStatementDTO statementDTO) { private void addFlinkSQLEnv(AbstractStatementDTO statementDTO) {
// initialize global variables // initialize global variables
statementDTO.setVariables(fragmentVariableService.listEnabledVariables()); statementDTO.setVariables(fragmentVariableService.listEnabledVariables());
...@@ -159,8 +149,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -159,8 +149,7 @@ public class StudioServiceImpl implements StudioService {
JobConfig config = studioExecuteDTO.getJobConfig(); JobConfig config = studioExecuteDTO.getJobConfig();
buildSession(config); buildSession(config);
// To initialize java udf, but it only support local mode. // To initialize java udf, but it only support local mode.
String jarPath = initUDF(config, studioExecuteDTO.getStatement()); udfService.initUDF(config, studioExecuteDTO.getStatement());
config.setJarFiles(new String[]{PathConstant.UDF_PATH + jarPath});
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
JobResult jobResult = jobManager.executeSql(studioExecuteDTO.getStatement()); JobResult jobResult = jobManager.executeSql(studioExecuteDTO.getStatement());
RunTimeUtil.recovery(jobManager); RunTimeUtil.recovery(jobManager);
...@@ -564,18 +553,4 @@ public class StudioServiceImpl implements StudioService { ...@@ -564,18 +553,4 @@ public class StudioServiceImpl implements StudioService {
return infos; return infos;
} }
private String initUDF(JobConfig config, String statement) {
if (GATEWAY_TYPE_MAP.get("session").contains(GatewayType.get(config.getType())) || GatewayType.LOCAL.equalsValue(config.getType())) {
List<String> udfClassNameList = JobManager.getUDFClassName(statement);
List<String> codeList = CollUtil.map(udfClassNameList, x -> {
Task task = taskService.getUDFByClassName(x);
JobManager.initMustSuccessUDF(x, task.getStatement());
return task.getStatement();
}, true);
if (codeList.size() > 0) {
return UDFUtil.getUdfNameAndBuildJar(codeList);
}
}
return null;
}
} }
...@@ -90,6 +90,7 @@ import com.dlink.service.SavepointsService; ...@@ -90,6 +90,7 @@ import com.dlink.service.SavepointsService;
import com.dlink.service.StatementService; import com.dlink.service.StatementService;
import com.dlink.service.TaskService; import com.dlink.service.TaskService;
import com.dlink.service.TaskVersionService; import com.dlink.service.TaskVersionService;
import com.dlink.service.UDFService;
import com.dlink.utils.CustomStringJavaCompiler; import com.dlink.utils.CustomStringJavaCompiler;
import com.dlink.utils.JSONUtil; import com.dlink.utils.JSONUtil;
...@@ -184,6 +185,9 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -184,6 +185,9 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Value("${server.port}") @Value("${server.port}")
private String serverPort; private String serverPort;
@Autowired
private UDFService udfService;
private String buildParas(Integer id) { private String buildParas(Integer id) {
return "--id " + id + " --driver " + driver + " --url " + url + " --username " + username + " --password " + password; return "--id " + id + " --driver " + driver + " --url " + url + " --username " + username + " --password " + password;
} }
...@@ -197,6 +201,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -197,6 +201,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
task.getDatabaseId(), null)); task.getDatabaseId(), null));
} }
JobConfig config = buildJobConfig(task); JobConfig config = buildJobConfig(task);
udfService.initUDF(config, task.getStatement());
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
if (!config.isJarTask()) { if (!config.isJarTask()) {
return jobManager.executeSql(task.getStatement()); return jobManager.executeSql(task.getStatement());
......
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.service.impl;
import com.dlink.constant.PathConstant;
import com.dlink.gateway.GatewayType;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.model.Task;
import com.dlink.service.TaskService;
import com.dlink.service.UDFService;
import com.dlink.utils.UDFUtil;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.springframework.stereotype.Service;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.map.MapUtil;
/**
* @author ZackYoung
* @since 0.6.8
*/
@Service
public class UDFServiceImpl implements UDFService {
/**
* 网关类型 map
* 快速获取 session 与 application 等类型,为了减少判断
*/
private static final Map<String, List<GatewayType>> GATEWAY_TYPE_MAP = MapUtil
.builder("session", Arrays.asList(GatewayType.YARN_SESSION, GatewayType.KUBERNETES_SESSION, GatewayType.STANDALONE))
.build();
@Resource
TaskService taskService;
@Override
public void initUDF(JobConfig config, String statement) {
Opt<String> udfJarPath = Opt.empty();
List<String> udfClassNameList = JobManager.getUDFClassName(statement);
List<String> codeList = CollUtil.map(udfClassNameList, x -> {
Task task = taskService.getUDFByClassName(x);
JobManager.initMustSuccessUDF(x, task.getStatement());
return task.getStatement();
}, true);
if (codeList.size() > 0) {
udfJarPath = Opt.ofBlankAble(UDFUtil.getUdfNameAndBuildJar(codeList));
}
udfJarPath.ifPresent(jarPath -> config.setJarFiles(new String[]{PathConstant.UDF_PATH + jarPath}));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment