Unverified Commit 803c8d83 authored by ZackYoung's avatar ZackYoung Committed by GitHub

[fix][udf]修复udf逻辑遗留问题 (#1174)

* 因为之前udf 开发逻辑遗留问题,导致其他任务添加savepoint变null.null

* 因为之前udf 开发逻辑遗留问题,导致其他任务添加savepoint变null.null

* 更新sql模板语句
parent 0459b324
......@@ -198,7 +198,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private String buildParas(Integer id) {
return "--id " + id + " --driver " + driver + " --url " + url + " --username " + username + " --password "
+ password;
+ password;
}
@Override
......@@ -207,7 +207,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
if (Dialect.notFlinkSql(task.getDialect())) {
return executeCommonSql(SqlDTO.build(task.getStatement(),
task.getDatabaseId(), null));
task.getDatabaseId(), null));
}
JobConfig config = buildJobConfig(task);
UDFPath udfPath = udfService.initUDF(task.getStatement(), GatewayType.get(config.getType()));
......@@ -228,7 +228,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
task.setStep(JobLifeCycle.ONLINE.getValue());
if (Dialect.notFlinkSql(task.getDialect())) {
return executeCommonSql(SqlDTO.build(task.getStatement(),
task.getDatabaseId(), null));
task.getDatabaseId(), null));
}
JobConfig config = buildJobConfig(task);
JobManager jobManager = JobManager.build(config);
......@@ -248,7 +248,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
if (Dialect.notFlinkSql(task.getDialect())) {
return executeCommonSql(SqlDTO.build(task.getStatement(),
task.getDatabaseId(), null));
task.getDatabaseId(), null));
}
if (StringUtils.isBlank(savePointPath)) {
task.setSavePointStrategy(SavePointStrategy.LATEST.getValue());
......@@ -373,20 +373,20 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
UDFTemplate template = udfTemplateService.getById(config.get("templateId"));
if (template != null) {
String code = UDFUtil.templateParse(task.getDialect(), template.getTemplateCode(),
config.get("className"));
config.get("className"));
task.setStatement(code);
}
}
}
// to compiler udf
if (Asserts.isNotNullString(task.getDialect()) && Dialect.JAVA.equalsVal(task.getDialect())
// to compiler udf
if (Asserts.isNotNullString(task.getDialect()) && Dialect.JAVA.equalsVal(task.getDialect())
&& Asserts.isNotNullString(task.getStatement())) {
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(task.getStatement());
task.setSavePointPath(compiler.getFullClassName());
} else if (Dialect.PYTHON.equalsVal(task.getDialect())) {
task.setSavePointPath(task.getName() + "." + UDFUtil.getPyUDFAttr(task.getStatement()));
} else {
task.setSavePointPath(UDFUtil.getScalaFullClassName(task.getStatement()));
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(task.getStatement());
task.setSavePointPath(compiler.getFullClassName());
} else if (Dialect.PYTHON.equalsVal(task.getDialect())) {
task.setSavePointPath(task.getName() + "." + UDFUtil.getPyUDFAttr(task.getStatement()));
} else if (Dialect.SCALA.equalsVal(task.getDialect())) {
task.setSavePointPath(UDFUtil.getScalaFullClassName(task.getStatement()));
}
}
// if modify task else create task
......@@ -394,8 +394,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Task taskInfo = getById(task.getId());
Assert.check(taskInfo);
if (JobLifeCycle.RELEASE.equalsValue(taskInfo.getStep())
|| JobLifeCycle.ONLINE.equalsValue(taskInfo.getStep())
|| JobLifeCycle.CANCEL.equalsValue(taskInfo.getStep())) {
|| JobLifeCycle.ONLINE.equalsValue(taskInfo.getStep())
|| JobLifeCycle.CANCEL.equalsValue(taskInfo.getStep())) {
throw new BusException("该作业已" + JobLifeCycle.get(taskInfo.getStep()).getLabel() + ",禁止修改!");
}
task.setStep(JobLifeCycle.DEVELOP.getValue());
......@@ -506,8 +506,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public Task getUDFByClassName(String className) {
Task task = getOne(
new QueryWrapper<Task>().in("dialect", Dialect.JAVA, Dialect.SCALA, Dialect.PYTHON).eq("enabled", 1)
.eq("save_point_path", className));
new QueryWrapper<Task>().in("dialect", Dialect.JAVA, Dialect.SCALA, Dialect.PYTHON).eq("enabled", 1)
.eq("save_point_path", className));
Assert.check(task);
task.setStatement(statementService.getById(task.getId()).getStatement());
return task;
......@@ -516,8 +516,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public List<Task> getAllUDF() {
List<Task> tasks =
list(new QueryWrapper<Task>().in("dialect", Dialect.JAVA, Dialect.SCALA, Dialect.PYTHON)
.eq("enabled", 1).isNotNull("save_point_path"));
list(new QueryWrapper<Task>().in("dialect", Dialect.JAVA, Dialect.SCALA, Dialect.PYTHON)
.eq("enabled", 1).isNotNull("save_point_path"));
return tasks.stream().peek(task -> {
Assert.check(task);
task.setStatement(statementService.getById(task.getId()).getStatement());
......@@ -554,7 +554,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
List<TaskVersion> taskVersions = taskVersionService.getTaskVersionByTaskId(task.getId());
List<Integer> versionIds = taskVersions.stream().map(TaskVersion::getVersionId).collect(Collectors.toList());
Map<Integer, TaskVersion> versionMap =
taskVersions.stream().collect(Collectors.toMap(TaskVersion::getVersionId, t -> t));
taskVersions.stream().collect(Collectors.toMap(TaskVersion::getVersionId, t -> t));
TaskVersion taskVersion = new TaskVersion();
BeanUtil.copyProperties(task, taskVersion);
......@@ -593,14 +593,14 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Task taskInfo = getTaskInfoById(dto.getId());
if (JobLifeCycle.RELEASE.equalsValue(taskInfo.getStep())
|| JobLifeCycle.ONLINE.equalsValue(taskInfo.getStep())
|| JobLifeCycle.CANCEL.equalsValue(taskInfo.getStep())) {
|| JobLifeCycle.ONLINE.equalsValue(taskInfo.getStep())
|| JobLifeCycle.CANCEL.equalsValue(taskInfo.getStep())) {
// throw new BusException("该作业已" + JobLifeCycle.get(taskInfo.getStep()).getLabel() + ",禁止回滚!");
return Result.failed("该作业已" + JobLifeCycle.get(taskInfo.getStep()).getLabel() + ",禁止回滚!");
}
LambdaQueryWrapper<TaskVersion> queryWrapper = new LambdaQueryWrapper<TaskVersion>()
.eq(TaskVersion::getTaskId, dto.getId()).eq(TaskVersion::getVersionId, dto.getVersionId());
.eq(TaskVersion::getTaskId, dto.getId()).eq(TaskVersion::getVersionId, dto.getVersionId());
TaskVersion taskVersion = taskVersionService.getOne(queryWrapper);
......@@ -747,7 +747,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig =
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName());
useGateway = true;
......@@ -783,7 +783,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private JobConfig buildJobConfig(Task task) {
boolean isJarTask = Dialect.FLINKJAR.equalsVal(task.getDialect())
|| Dialect.KUBERNETES_APPLICATION.equalsVal(task.getDialect());
|| Dialect.KUBERNETES_APPLICATION.equalsVal(task.getDialect());
if (!isJarTask && Asserts.isNotNull(task.getFragment()) ? task.getFragment() : false) {
String flinkWithSql = dataBaseService.getEnabledFlinkWithSql();
if (Asserts.isNotNullString(flinkWithSql)) {
......@@ -803,20 +803,20 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
// support custom K8s app submit, rather than clusterConfiguration
else if (Dialect.KUBERNETES_APPLICATION.equalsVal(task.getDialect())
&& GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
&& GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
Map<String, Object> gatewayConfig = JSONUtil.toMap(task.getStatement(), String.class, Object.class);
config.buildGatewayConfig(gatewayConfig);
} else {
Map<String, Object> gatewayConfig =
clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
// submit application type with clusterConfiguration
if (GatewayType.YARN_APPLICATION.equalsValue(config.getType())
|| GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
|| GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
if (!isJarTask) {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
gatewayConfig.put("userJarPath", systemConfiguration.getSqlSubmitJarPath());
gatewayConfig.put("userJarParas",
systemConfiguration.getSqlSubmitJarParas() + buildParas(config.getTaskId()));
systemConfiguration.getSqlSubmitJarParas() + buildParas(config.getTaskId()));
gatewayConfig.put("userJarMainAppClass", systemConfiguration.getSqlSubmitJarMainAppClass());
} else {
Jar jar = jarService.getById(task.getJarId());
......@@ -874,7 +874,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
history.setConfig(JSONUtil.parseObject(history.getConfigJson()));
if (Asserts.isNotNull(history) && Asserts.isNotNull(history.getClusterConfigurationId())) {
jobInfoDetail.setClusterConfiguration(
clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
clusterConfigurationService.getClusterConfigById(history.getClusterConfigurationId()));
}
jobInfoDetail.setHistory(history);
jobInfoDetail.setJobHistory(jobHistoryService.getJobHistory(id));
......@@ -884,12 +884,12 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return jobInfoDetail.getInstance();
}
JobHistory jobHistoryJson =
jobHistoryService.refreshJobHistory(id, jobInfoDetail.getCluster().getJobManagerHost(),
jobInfoDetail.getInstance().getJid(), jobInfoDetail.isNeedSave());
jobHistoryService.refreshJobHistory(id, jobInfoDetail.getCluster().getJobManagerHost(),
jobInfoDetail.getInstance().getJid(), jobInfoDetail.isNeedSave());
JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson);
jobInfoDetail.setJobHistory(jobHistory);
if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus())
&& (Asserts.isNull(jobHistory.getJob()) || jobHistory.isError())) {
&& (Asserts.isNull(jobHistory.getJob()) || jobHistory.isError())) {
return jobInfoDetail.getInstance();
}
String status = jobInfoDetail.getInstance().getStatus();
......@@ -898,12 +898,12 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
jobInfoDetail.getInstance().setStatus(JobStatus.UNKNOWN.getValue());
} else {
jobInfoDetail.getInstance().setDuration(
jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000);
jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_DURATION).asLong() / 1000);
jobInfoDetail.getInstance()
.setStatus(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_STATE).asText());
.setStatus(jobInfoDetail.getJobHistory().getJob().get(FlinkRestResultConstant.JOB_STATE).asText());
}
if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus())
&& !status.equals(jobInfoDetail.getInstance().getStatus())) {
&& !status.equals(jobInfoDetail.getInstance().getStatus())) {
jobStatusChanged = true;
jobInfoDetail.getInstance().setFinishTime(LocalDateTime.now());
// handleJobDone(jobInfoDetail.getInstance());
......@@ -920,7 +920,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private boolean inRefreshPlan(JobInstance jobInstance) {
if ((!JobStatus.isDone(jobInstance.getStatus())) || (Asserts.isNotNull(jobInstance.getFinishTime())
&& Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()).toMinutes() < 1)) {
&& Duration.between(jobInstance.getFinishTime(), LocalDateTime.now()).toMinutes() < 1)) {
return true;
} else {
return false;
......@@ -967,9 +967,9 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
// clusterConfigurationName
if (Asserts.isNotNull(task.getClusterConfigurationId())) {
ClusterConfiguration clusterConfiguration =
clusterConfigurationService.getById(task.getClusterConfigurationId());
clusterConfigurationService.getById(task.getClusterConfigurationId());
((ObjectNode) jsonNode).put("clusterConfigurationName",
Asserts.isNotNull(clusterConfiguration) ? clusterConfiguration.getName() : null);
Asserts.isNotNull(clusterConfiguration) ? clusterConfiguration.getName() : null);
}
// databaseName
if (Asserts.isNotNull(task.getDatabaseId())) {
......@@ -1045,15 +1045,15 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
if (Asserts.isNotNull(task.getClusterConfigurationName())) {
ClusterConfiguration clusterConfiguration = clusterConfigurationService
.getOne(new QueryWrapper<ClusterConfiguration>().eq("name",
task.getClusterConfigurationName()));
.getOne(new QueryWrapper<ClusterConfiguration>().eq("name",
task.getClusterConfigurationName()));
if (Asserts.isNotNull(clusterConfiguration)) {
task.setClusterConfigurationId(clusterConfiguration.getId());
}
}
if (Asserts.isNotNull(task.getDatabaseName())) {
DataBase dataBase =
dataBaseService.getOne(new QueryWrapper<DataBase>().eq("name", task.getDatabaseName()));
dataBaseService.getOne(new QueryWrapper<DataBase>().eq("name", task.getDatabaseName()));
if (Asserts.isNotNull(dataBase)) {
task.setDatabaseId(dataBase.getId());
}
......@@ -1070,7 +1070,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
*/
if (Asserts.isNotNull(task.getAlertGroupName())) {
AlertGroup alertGroup =
alertGroupService.getOne(new QueryWrapper<AlertGroup>().eq("name", task.getAlertGroupName()));
alertGroupService.getOne(new QueryWrapper<AlertGroup>().eq("name", task.getAlertGroupName()));
if (Asserts.isNotNull(alertGroup)) {
task.setAlertGroupId(alertGroup.getId());
}
......@@ -1106,7 +1106,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return Result.failed("一共" + jsonNodes.size() + "个作业,全部导入失败");
} else if (errorNumber > 0) {
return Result.failed("一共" + jsonNodes.size() + "个作业,其中成功导入" + (jsonNode.size() - errorNumber) + "个,失败"
+ errorNumber + "个");
+ errorNumber + "个");
}
return Result.succeed("成功导入" + jsonNodes.size() + "个作业");
}
......@@ -1157,7 +1157,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
long minutes = ChronoUnit.MINUTES.between(startTime, endTime);
long seconds = ChronoUnit.SECONDS.between(startTime, endTime);
String duration = days + "天 " + (hours - (days * 24)) + "小时 " + (minutes - (hours * 60)) + "分 "
+ (seconds - (minutes * 60)) + "秒";
+ (seconds - (minutes * 60)) + "秒";
return duration;
}
......@@ -1237,7 +1237,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private void sendAlert(AlertInstance alertInstance, JobInstance jobInstance, Task task, AlertMsg alertMsg) {
AlertConfig alertConfig = AlertConfig.build(alertInstance.getName(), alertInstance.getType(),
JSONUtil.toMap(alertInstance.getParams()));
JSONUtil.toMap(alertInstance.getParams()));
Alert alert = Alert.build(alertConfig);
String title = "任务【" + task.getAlias() + "】:" + jobInstance.getStatus();
String content = alertMsg.toString();
......@@ -1256,10 +1256,10 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public Result queryAllCatalogue() {
final LambdaQueryWrapper<Catalogue> queryWrapper = new LambdaQueryWrapper<Catalogue>()
.select(Catalogue::getId, Catalogue::getName, Catalogue::getParentId)
.eq(Catalogue::getIsLeaf, 0)
.eq(Catalogue::getEnabled, 1)
.isNull(Catalogue::getTaskId);
.select(Catalogue::getId, Catalogue::getName, Catalogue::getParentId)
.eq(Catalogue::getIsLeaf, 0)
.eq(Catalogue::getEnabled, 1)
.isNull(Catalogue::getTaskId);
final List<Catalogue> catalogueList = catalogueService.list(queryWrapper);
return Result.succeed(TreeUtil.build(dealWithCatalogue(catalogueList), -1).get(0));
}
......@@ -1285,7 +1285,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
public Result<List<Task>> queryOnLineTaskByDoneStatus(List<JobLifeCycle> jobLifeCycle, List<JobStatus> jobStatuses,
boolean includeNull, Integer catalogueId) {
final Tree<Integer> node = ((Tree<Integer>) queryAllCatalogue().getDatas())
.getNode(Objects.isNull(catalogueId) ? 0 : catalogueId);
.getNode(Objects.isNull(catalogueId) ? 0 : catalogueId);
final List<Integer> parentIds = new ArrayList<>(0);
parentIds.add(node.getId());
childrenNodeParse(node, parentIds);
......@@ -1296,8 +1296,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private List<Task> getTasks(List<JobLifeCycle> jobLifeCycle, List<JobStatus> jobStatuses, boolean includeNull,
List<Integer> parentIds) {
return this.baseMapper.queryOnLineTaskByDoneStatus(parentIds,
jobLifeCycle.stream().filter(Objects::nonNull).map(JobLifeCycle::getValue).collect(Collectors.toList()),
includeNull, jobStatuses.stream().map(JobStatus::name).collect(Collectors.toList()));
jobLifeCycle.stream().filter(Objects::nonNull).map(JobLifeCycle::getValue).collect(Collectors.toList()),
includeNull, jobStatuses.stream().map(JobStatus::name).collect(Collectors.toList()));
}
private void childrenNodeParse(Tree<Integer> node, List<Integer> parentIds) {
......@@ -1317,7 +1317,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public void selectSavepointOnLineTask(TaskOperatingResult taskOperatingResult) {
final JobInstance jobInstanceByTaskId =
jobInstanceService.getJobInstanceByTaskId(taskOperatingResult.getTask().getId());
jobInstanceService.getJobInstanceByTaskId(taskOperatingResult.getTask().getId());
if (jobInstanceByTaskId == null) {
startGoingLiveTask(taskOperatingResult, null);
return;
......@@ -1336,8 +1336,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private void findTheConditionSavePointToOnline(TaskOperatingResult taskOperatingResult,
JobInstance jobInstanceByTaskId) {
final LambdaQueryWrapper<JobHistory> queryWrapper = new LambdaQueryWrapper<JobHistory>()
.select(JobHistory::getId, JobHistory::getCheckpointsJson)
.eq(JobHistory::getId, jobInstanceByTaskId.getId());
.select(JobHistory::getId, JobHistory::getCheckpointsJson)
.eq(JobHistory::getId, jobInstanceByTaskId.getId());
final JobHistory jobHistory = jobHistoryService.getOne(queryWrapper);
if (jobHistory != null && StringUtils.isNotBlank(jobHistory.getCheckpointsJson())) {
final ObjectNode jsonNodes = JSONUtil.parseObject(jobHistory.getCheckpointsJson());
......
......@@ -64,6 +64,13 @@ public class UDFServiceImpl implements UDFService {
@Resource
TaskService taskService;
/**
* init udf
*
* @param statement sql 语句
* @param gatewayType flink gateway类型
* @return {@link UDFPath}
*/
@Override
public UDFPath initUDF(String statement, GatewayType gatewayType) {
if (gatewayType == GatewayType.KUBERNETES_APPLICATION) {
......
......@@ -71,4 +71,3 @@ VALUES (4, 'python_udf_1', 'Python', 'UDF', 'from pyflink.table import ScalarFun
INSERT INTO `dlink_udf_template` (`id`, `name`, `code_type`, `function_type`, `template_code`, `enabled`, `create_time`, `update_time`)
VALUES (5, 'python_udf_2', 'Python', 'UDF', 'from pyflink.table import DataTypes\nfrom pyflink.table.udf import udf\n\n@udf(result_type=DataTypes.STRING())\ndef ${className}(variable1:string):\n return \'\'', NULL, '2022-10-25 09:25:13', '2022-10-25 09:34:47');
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment