Unverified Commit e311f5df authored by ZackYoung's avatar ZackYoung Committed by GitHub

[Feature][admin,core] add flink pre-job udf; (#1128)

* add flink pre-job udf;
FIX SOME  BUG(udf flink sql explain)

* change code style
parent ed477bcb
...@@ -259,7 +259,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -259,7 +259,7 @@ public class StudioServiceImpl implements StudioService {
buildSession(config); buildSession(config);
process.infoSuccess(); process.infoSuccess();
// To initialize java udf, but it has a bug in the product environment now. // To initialize java udf, but it has a bug in the product environment now.
// initUDF(config,studioExecuteDTO.getStatement()); config.setJarFiles(udfService.initUDF(studioExecuteDTO.getStatement()));
process.start(); process.start();
JobManager jobManager = JobManager.buildPlanMode(config); JobManager jobManager = JobManager.buildPlanMode(config);
List<SqlExplainResult> sqlExplainResults = List<SqlExplainResult> sqlExplainResults =
...@@ -311,6 +311,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -311,6 +311,7 @@ public class StudioServiceImpl implements StudioService {
// If you are using explainSql | getStreamGraph | getJobPlan, make the dialect change to local. // If you are using explainSql | getStreamGraph | getJobPlan, make the dialect change to local.
config.buildLocal(); config.buildLocal();
buildSession(config); buildSession(config);
config.setJarFiles(udfService.initUDF(studioExecuteDTO.getStatement()));
JobManager jobManager = JobManager.buildPlanMode(config); JobManager jobManager = JobManager.buildPlanMode(config);
String planJson = jobManager.getJobPlanJson(studioExecuteDTO.getStatement()); String planJson = jobManager.getJobPlanJson(studioExecuteDTO.getStatement());
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
......
...@@ -72,15 +72,20 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions; ...@@ -72,15 +72,20 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.URLUtil;
/** /**
* JobManager * JobManager
* *
...@@ -168,6 +173,7 @@ public class JobManager { ...@@ -168,6 +173,7 @@ public class JobManager {
JobManager manager = new JobManager(config); JobManager manager = new JobManager(config);
manager.setPlanMode(true); manager.setPlanMode(true);
manager.init(); manager.init();
manager.executor.initUDF(config.getJarFiles());
ProcessContextHolder.getProcess().info("Build Flink plan mode success."); ProcessContextHolder.getProcess().info("Build Flink plan mode success.");
return manager; return manager;
} }
...@@ -392,6 +398,7 @@ public class JobManager { ...@@ -392,6 +398,7 @@ public class JobManager {
jobGraph.setSavepointRestoreSettings( jobGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(config.getSavePointPath(), true)); SavepointRestoreSettings.forPath(config.getSavePointPath(), true));
} }
jobGraph.addJars(Arrays.stream(config.getJarFiles()).map(path -> URLUtil.getURL(FileUtil.file(path))).collect(Collectors.toList()));
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph); gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
} }
job.setResult(InsertResult.success(gatewayResult.getAppId())); job.setResult(InsertResult.success(gatewayResult.getAppId()));
...@@ -454,6 +461,7 @@ public class JobManager { ...@@ -454,6 +461,7 @@ public class JobManager {
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath(), true)); jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath(), true));
} }
// Perjob mode need to submit job graph. // Perjob mode need to submit job graph.
jobGraph.addJars(Arrays.stream(config.getJarFiles()).map(path -> URLUtil.getURL(FileUtil.file(path))).collect(Collectors.toList()));
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph); gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
} }
return gatewayResult; return gatewayResult;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment