Unverified Commit 5b2b3347 authored by ZackYoung's avatar ZackYoung Committed by GitHub

add support session run udf.jar (#1090)

* Add udf jar Packing compression
Add udf version verification

* change msg tips

* add RemoteStreamExecutor jarFiles params

* add support session run udf.jar

* change PathConstant

* Modification and compilation of udf must succeed

* add Protocol Header
parent 7717387f
...@@ -21,6 +21,7 @@ package com.dlink.controller; ...@@ -21,6 +21,7 @@ package com.dlink.controller;
import com.dlink.common.result.ProTableResult; import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result; import com.dlink.common.result.Result;
import com.dlink.constant.PathConstant;
import com.dlink.model.Jar; import com.dlink.model.Jar;
import com.dlink.model.Task; import com.dlink.model.Task;
import com.dlink.service.JarService; import com.dlink.service.JarService;
...@@ -128,8 +129,8 @@ public class JarController { ...@@ -128,8 +129,8 @@ public class JarController {
List<Task> allUDF = taskService.getAllUDF(); List<Task> allUDF = taskService.getAllUDF();
List<String> udfCodes = allUDF.stream().map(Task::getStatement).collect(Collectors.toList()); List<String> udfCodes = allUDF.stream().map(Task::getStatement).collect(Collectors.toList());
Map<String, List<String>> resultMap = UDFUtil.buildJar(udfCodes); Map<String, List<String>> resultMap = UDFUtil.buildJar(udfCodes);
String msg = StrUtil.format("udf jar生成成功,jar文件在dinky安装目录下tmp/udf/udf.jar;\n本次成功 class:{}。\n失败 class:{}" String msg = StrUtil.format("udf jar生成成功,jar文件在{};\n本次成功 class:{}。\n失败 class:{}"
, resultMap.get("success"), resultMap.get("failed")); , PathConstant.UDF_JAR_TMP_PATH, resultMap.get("success"), resultMap.get("failed"));
return Result.succeed(resultMap, msg); return Result.succeed(resultMap, msg);
} }
} }
...@@ -22,6 +22,7 @@ package com.dlink.service.impl; ...@@ -22,6 +22,7 @@ package com.dlink.service.impl;
import com.dlink.api.FlinkAPI; import com.dlink.api.FlinkAPI;
import com.dlink.assertion.Asserts; import com.dlink.assertion.Asserts;
import com.dlink.config.Dialect; import com.dlink.config.Dialect;
import com.dlink.constant.PathConstant;
import com.dlink.dto.AbstractStatementDTO; import com.dlink.dto.AbstractStatementDTO;
import com.dlink.dto.SessionDTO; import com.dlink.dto.SessionDTO;
import com.dlink.dto.SqlDTO; import com.dlink.dto.SqlDTO;
...@@ -64,9 +65,11 @@ import com.dlink.session.SessionInfo; ...@@ -64,9 +65,11 @@ import com.dlink.session.SessionInfo;
import com.dlink.session.SessionPool; import com.dlink.session.SessionPool;
import com.dlink.sql.FlinkQuery; import com.dlink.sql.FlinkQuery;
import com.dlink.utils.RunTimeUtil; import com.dlink.utils.RunTimeUtil;
import com.dlink.utils.UDFUtil;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -81,6 +84,9 @@ import com.fasterxml.jackson.databind.JsonNode; ...@@ -81,6 +84,9 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
/** /**
* StudioServiceImpl * StudioServiceImpl
* *
...@@ -92,6 +98,14 @@ public class StudioServiceImpl implements StudioService { ...@@ -92,6 +98,14 @@ public class StudioServiceImpl implements StudioService {
private static final Logger logger = LoggerFactory.getLogger(StudioServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(StudioServiceImpl.class);
/**
* 网关类型 map
* 快速获取 session 与 application 等类型,为了减少判断
*/
private static final Map<String, List<GatewayType>> GATEWAY_TYPE_MAP = MapUtil
.builder("session", Arrays.asList(GatewayType.YARN_SESSION, GatewayType.YARN_SESSION, GatewayType.STANDALONE))
.build();
@Autowired @Autowired
private ClusterService clusterService; private ClusterService clusterService;
@Autowired @Autowired
...@@ -134,7 +148,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -134,7 +148,7 @@ public class StudioServiceImpl implements StudioService {
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) { public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
if (Dialect.notFlinkSql(studioExecuteDTO.getDialect())) { if (Dialect.notFlinkSql(studioExecuteDTO.getDialect())) {
return executeCommonSql(SqlDTO.build(studioExecuteDTO.getStatement(), return executeCommonSql(SqlDTO.build(studioExecuteDTO.getStatement(),
studioExecuteDTO.getDatabaseId(), studioExecuteDTO.getMaxRowNum())); studioExecuteDTO.getDatabaseId(), studioExecuteDTO.getMaxRowNum()));
} else { } else {
return executeFlinkSql(studioExecuteDTO); return executeFlinkSql(studioExecuteDTO);
} }
...@@ -145,7 +159,8 @@ public class StudioServiceImpl implements StudioService { ...@@ -145,7 +159,8 @@ public class StudioServiceImpl implements StudioService {
JobConfig config = studioExecuteDTO.getJobConfig(); JobConfig config = studioExecuteDTO.getJobConfig();
buildSession(config); buildSession(config);
// To initialize java udf, but it only support local mode. // To initialize java udf, but it only support local mode.
initUDF(config, studioExecuteDTO.getStatement()); String jarPath = initUDF(config, studioExecuteDTO.getStatement());
config.setJarFiles(new String[]{PathConstant.UDF_PATH + jarPath});
JobManager jobManager = JobManager.build(config); JobManager jobManager = JobManager.build(config);
JobResult jobResult = jobManager.executeSql(studioExecuteDTO.getStatement()); JobResult jobResult = jobManager.executeSql(studioExecuteDTO.getStatement());
RunTimeUtil.recovery(jobManager); RunTimeUtil.recovery(jobManager);
...@@ -161,6 +176,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -161,6 +176,7 @@ public class StudioServiceImpl implements StudioService {
return jobResult; return jobResult;
} }
@Override
public JobResult executeCommonSql(SqlDTO sqlDTO) { public JobResult executeCommonSql(SqlDTO sqlDTO) {
JobResult result = new JobResult(); JobResult result = new JobResult();
result.setStatement(sqlDTO.getStatement()); result.setStatement(sqlDTO.getStatement());
...@@ -288,15 +304,15 @@ public class StudioServiceImpl implements StudioService { ...@@ -288,15 +304,15 @@ public class StudioServiceImpl implements StudioService {
if (sessionDTO.isUseRemote()) { if (sessionDTO.isUseRemote()) {
Cluster cluster = clusterService.getById(sessionDTO.getClusterId()); Cluster cluster = clusterService.getById(sessionDTO.getClusterId());
SessionConfig sessionConfig = SessionConfig.build( SessionConfig sessionConfig = SessionConfig.build(
sessionDTO.getType(), true, sessionDTO.getType(), true,
cluster.getId(), cluster.getAlias(), cluster.getId(), cluster.getAlias(),
clusterService.buildEnvironmentAddress(true, sessionDTO.getClusterId())); clusterService.buildEnvironmentAddress(true, sessionDTO.getClusterId()));
return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser); return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser);
} else { } else {
SessionConfig sessionConfig = SessionConfig.build( SessionConfig sessionConfig = SessionConfig.build(
sessionDTO.getType(), false, sessionDTO.getType(), false,
null, null, null, null,
clusterService.buildEnvironmentAddress(false, null)); clusterService.buildEnvironmentAddress(false, null));
return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser); return JobManager.createSession(sessionDTO.getSession(), sessionConfig, createUser);
} }
} }
...@@ -465,7 +481,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -465,7 +481,7 @@ public class StudioServiceImpl implements StudioService {
} }
} else { } else {
String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog()) + FlinkQuery.separator() String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog()) + FlinkQuery.separator()
+ FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator(); + FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator();
// show tables // show tables
String tableStatement = baseStatement + FlinkQuery.showTables(); String tableStatement = baseStatement + FlinkQuery.showTables();
studioMetaStoreDTO.setStatement(tableStatement); studioMetaStoreDTO.setStatement(tableStatement);
...@@ -503,7 +519,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -503,7 +519,7 @@ public class StudioServiceImpl implements StudioService {
// nothing to do // nothing to do
} else { } else {
String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog()) + FlinkQuery.separator() String baseStatement = FlinkQuery.useCatalog(studioMetaStoreDTO.getCatalog()) + FlinkQuery.separator()
+ FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator(); + FlinkQuery.useDatabase(studioMetaStoreDTO.getDatabase()) + FlinkQuery.separator();
// desc tables // desc tables
String tableStatement = baseStatement + FlinkQuery.descTable(studioMetaStoreDTO.getTable()); String tableStatement = baseStatement + FlinkQuery.descTable(studioMetaStoreDTO.getTable());
studioMetaStoreDTO.setStatement(tableStatement); studioMetaStoreDTO.setStatement(tableStatement);
...@@ -514,12 +530,12 @@ public class StudioServiceImpl implements StudioService { ...@@ -514,12 +530,12 @@ public class StudioServiceImpl implements StudioService {
int i = 1; int i = 1;
for (Map<String, Object> item : rowData) { for (Map<String, Object> item : rowData) {
FlinkColumn column = FlinkColumn.build(i, FlinkColumn column = FlinkColumn.build(i,
item.get(FlinkQuery.columnName()).toString(), item.get(FlinkQuery.columnName()).toString(),
item.get(FlinkQuery.columnType()).toString(), item.get(FlinkQuery.columnType()).toString(),
item.get(FlinkQuery.columnKey()).toString(), item.get(FlinkQuery.columnKey()).toString(),
item.get(FlinkQuery.columnNull()).toString(), item.get(FlinkQuery.columnNull()).toString(),
item.get(FlinkQuery.columnExtras()).toString(), item.get(FlinkQuery.columnExtras()).toString(),
item.get(FlinkQuery.columnWatermark()).toString() item.get(FlinkQuery.columnWatermark()).toString()
); );
columns.add(column); columns.add(column);
i++; i++;
...@@ -548,14 +564,18 @@ public class StudioServiceImpl implements StudioService { ...@@ -548,14 +564,18 @@ public class StudioServiceImpl implements StudioService {
return infos; return infos;
} }
private void initUDF(JobConfig config, String statement) { private String initUDF(JobConfig config, String statement) {
if (!GatewayType.LOCAL.equalsValue(config.getType())) { if (GATEWAY_TYPE_MAP.get("session").contains(GatewayType.get(config.getType())) || GatewayType.LOCAL.equalsValue(config.getType())) {
return; List<String> udfClassNameList = JobManager.getUDFClassName(statement);
} List<String> codeList = CollUtil.map(udfClassNameList, x -> {
List<String> udfClassNameList = JobManager.getUDFClassName(statement); Task task = taskService.getUDFByClassName(x);
for (String item : udfClassNameList) { JobManager.initMustSuccessUDF(x, task.getStatement());
Task task = taskService.getUDFByClassName(item); return task.getStatement();
JobManager.initUDF(item, task.getStatement()); }, true);
if (codeList.size() > 0) {
return UDFUtil.getUdfNameAndBuildJar(codeList);
}
} }
return null;
} }
} }
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.constant;
import java.io.File;
/**
* 文件路径常量
* @author ZackYoung
* @since 0.6.8
*/
public class PathConstant {
/**
* 基本路径,dinky 部署的路径
*/
public static final String WORK_DIR = System.getProperty("user.dir");
/**
* tmp路径
*/
public static final String TMP_PATH = WORK_DIR + File.separator + "tmp" + File.separator;
/**
* udf路径
*/
public static final String UDF_PATH = TMP_PATH + "udf" + File.separator;
/**
* udf jar规则
*/
public static final String UDF_JAR_RULE = "udf-\\d+.jar";
/**
* udf版本规则
*/
public static final String UDF_VERSION_RULE = "\\d+";
/**
* udf jar tmp名字
*/
public static final String UDF_JAR_TMP_NAME = "udf-tmp.jar";
/**
* udf jar tmp路径
*/
public static final String UDF_JAR_TMP_PATH = UDF_PATH + UDF_JAR_TMP_NAME;
}
...@@ -61,6 +61,7 @@ public class JobConfig { ...@@ -61,6 +61,7 @@ public class JobConfig {
private boolean isJarTask = false; private boolean isJarTask = false;
private String address; private String address;
private Integer taskId; private Integer taskId;
private String[] jarFiles;
private String jobName; private String jobName;
private boolean useSqlFragment; private boolean useSqlFragment;
private boolean useStatementSet; private boolean useStatementSet;
......
...@@ -60,6 +60,7 @@ import org.apache.flink.configuration.CoreOptions; ...@@ -60,6 +60,7 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.JobClient;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
...@@ -103,6 +104,8 @@ public class JobManager { ...@@ -103,6 +104,8 @@ public class JobManager {
private String sqlSeparator = FlinkSQLConstant.SEPARATOR; private String sqlSeparator = FlinkSQLConstant.SEPARATOR;
private GatewayType runMode = GatewayType.LOCAL; private GatewayType runMode = GatewayType.LOCAL;
private static final String FUNCTION_REGEX = "function (.*?)'(.*?)'";
public JobManager() { public JobManager() {
} }
...@@ -160,6 +163,7 @@ public class JobManager { ...@@ -160,6 +163,7 @@ public class JobManager {
initGatewayConfig(config); initGatewayConfig(config);
JobManager manager = new JobManager(config); JobManager manager = new JobManager(config);
manager.init(); manager.init();
manager.executor.initUDF(config.getJarFiles());
return manager; return manager;
} }
...@@ -183,7 +187,7 @@ public class JobManager { ...@@ -183,7 +187,7 @@ public class JobManager {
public static boolean useGateway(String type) { public static boolean useGateway(String type) {
return (GatewayType.YARN_PER_JOB.equalsValue(type) || GatewayType.YARN_APPLICATION.equalsValue(type) return (GatewayType.YARN_PER_JOB.equalsValue(type) || GatewayType.YARN_APPLICATION.equalsValue(type)
|| GatewayType.KUBERNETES_APPLICATION.equalsValue(type)); || GatewayType.KUBERNETES_APPLICATION.equalsValue(type));
} }
private Executor createExecutor() { private Executor createExecutor() {
...@@ -309,7 +313,7 @@ public class JobManager { ...@@ -309,7 +313,7 @@ public class JobManager {
if (config.isUseResult()) { if (config.isUseResult()) {
// Build insert result. // Build insert result.
IResult result = IResult result =
ResultBuilder.build(SqlType.INSERT, config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone()).getResult(tableResult); ResultBuilder.build(SqlType.INSERT, config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone()).getResult(tableResult);
job.setResult(result); job.setResult(result);
} }
} }
...@@ -333,7 +337,7 @@ public class JobManager { ...@@ -333,7 +337,7 @@ public class JobManager {
if (Asserts.isNotNull(flinkInterceptorResult.getTableResult())) { if (Asserts.isNotNull(flinkInterceptorResult.getTableResult())) {
if (config.isUseResult()) { if (config.isUseResult()) {
IResult result = ResultBuilder.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone()) IResult result = ResultBuilder.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone())
.getResult(flinkInterceptorResult.getTableResult()); .getResult(flinkInterceptorResult.getTableResult());
job.setResult(result); job.setResult(result);
} }
} else { } else {
...@@ -349,7 +353,8 @@ public class JobManager { ...@@ -349,7 +353,8 @@ public class JobManager {
} }
if (config.isUseResult()) { if (config.isUseResult()) {
IResult result = IResult result =
ResultBuilder.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), executor.getTimeZone()).getResult(tableResult); ResultBuilder.build(item.getType(), config.getMaxRowNum(), config.isUseChangeLog()
, config.isUseAutoCancel(), executor.getTimeZone()).getResult(tableResult);
job.setResult(result); job.setResult(result);
} }
} }
...@@ -376,7 +381,7 @@ public class JobManager { ...@@ -376,7 +381,7 @@ public class JobManager {
streamGraph.setJobName(config.getJobName()); streamGraph.setJobName(config.getJobName());
JobGraph jobGraph = streamGraph.getJobGraph(); JobGraph jobGraph = streamGraph.getJobGraph();
if (Asserts.isNotNullString(config.getSavePointPath())) { if (Asserts.isNotNullString(config.getSavePointPath())) {
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath(),true)); jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath(), true));
} }
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph); gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
} }
...@@ -434,7 +439,7 @@ public class JobManager { ...@@ -434,7 +439,7 @@ public class JobManager {
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts); JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
// Perjob mode need to set savepoint restore path, when recovery from savepoint. // Perjob mode need to set savepoint restore path, when recovery from savepoint.
if (Asserts.isNotNullString(config.getSavePointPath())) { if (Asserts.isNotNullString(config.getSavePointPath())) {
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath(),true)); jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath(), true));
} }
// Perjob mode need to submit job graph. // Perjob mode need to submit job graph.
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph); gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph);
...@@ -513,7 +518,7 @@ public class JobManager { ...@@ -513,7 +518,7 @@ public class JobManager {
public boolean cancel(String jobId) { public boolean cancel(String jobId) {
if (useGateway && !useRestAPI) { if (useGateway && !useRestAPI) {
config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.CANCEL.getValue(), config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.CANCEL.getValue(),
null, null)); null, null));
Gateway.build(config.getGatewayConfig()).savepointJob(); Gateway.build(config.getGatewayConfig()).savepointJob();
return true; return true;
} else { } else {
...@@ -529,7 +534,7 @@ public class JobManager { ...@@ -529,7 +534,7 @@ public class JobManager {
public SavePointResult savepoint(String jobId, String savePointType, String savePoint) { public SavePointResult savepoint(String jobId, String savePointType, String savePoint) {
if (useGateway && !useRestAPI) { if (useGateway && !useRestAPI) {
config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.SAVEPOINT.getValue(), config.getGatewayConfig().setFlinkConfig(FlinkConfig.build(jobId, ActionType.SAVEPOINT.getValue(),
savePointType, null)); savePointType, null));
return Gateway.build(config.getGatewayConfig()).savepointJob(savePoint); return Gateway.build(config.getGatewayConfig()).savepointJob(savePoint);
} else { } else {
return FlinkAPI.build(config.getAddress()).savepoints(jobId, savePointType); return FlinkAPI.build(config.getAddress()).savepoints(jobId, savePointType);
...@@ -604,7 +609,7 @@ public class JobManager { ...@@ -604,7 +609,7 @@ public class JobManager {
} }
public static List<String> getUDFClassName(String statement) { public static List<String> getUDFClassName(String statement) {
Pattern pattern = Pattern.compile("function (.*?)'(.*?)'", Pattern.CASE_INSENSITIVE); Pattern pattern = Pattern.compile(FUNCTION_REGEX, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(statement); Matcher matcher = pattern.matcher(statement);
List<String> classNameList = new ArrayList<>(); List<String> classNameList = new ArrayList<>();
while (matcher.find()) { while (matcher.find()) {
...@@ -620,4 +625,15 @@ public class JobManager { ...@@ -620,4 +625,15 @@ public class JobManager {
UDFUtil.buildClass(code); UDFUtil.buildClass(code);
} }
} }
public static void initMustSuccessUDF(String className, String code) {
if (ClassPool.exist(ClassEntity.build(className, code))) {
UDFUtil.initClassLoader(className);
} else {
// 如果编译失败,返回异常。因为必须用到的函数,也必须编译成功
if (!UDFUtil.buildClass(code)) {
throw new CompilerException(String.format("class:%s 编译异常,请检查代码", className));
}
}
}
} }
...@@ -19,12 +19,19 @@ ...@@ -19,12 +19,19 @@
package com.dlink.utils; package com.dlink.utils;
import static com.dlink.constant.PathConstant.UDF_JAR_RULE;
import static com.dlink.constant.PathConstant.UDF_JAR_TMP_PATH;
import static com.dlink.constant.PathConstant.UDF_VERSION_RULE;
import com.dlink.constant.PathConstant;
import com.dlink.pool.ClassEntity; import com.dlink.pool.ClassEntity;
import com.dlink.pool.ClassPool; import com.dlink.pool.ClassPool;
import java.io.File;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -32,10 +39,13 @@ import org.codehaus.groovy.control.CompilerConfiguration; ...@@ -32,10 +39,13 @@ import org.codehaus.groovy.control.CompilerConfiguration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.FileUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.ZipUtil; import cn.hutool.crypto.digest.MD5;
import groovy.lang.GroovyClassLoader; import groovy.lang.GroovyClassLoader;
/** /**
...@@ -45,22 +55,27 @@ import groovy.lang.GroovyClassLoader; ...@@ -45,22 +55,27 @@ import groovy.lang.GroovyClassLoader;
* @since 2021/12/27 23:25 * @since 2021/12/27 23:25
*/ */
public class UDFUtil { public class UDFUtil {
protected static final Logger logger = LoggerFactory.getLogger(UDFUtil.class); protected static final Logger log = LoggerFactory.getLogger(UDFUtil.class);
/**
* 存放 udf md5与版本对应的k,v值
*/
protected static final Map<String, Integer> UDF_MD5_MAP = new HashMap<>();
public static void buildClass(String code) { public static Boolean buildClass(String code) {
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(code); CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(code);
boolean res = compiler.compiler(); boolean res = compiler.compiler();
String className = compiler.getFullClassName();
if (res) { if (res) {
String className = compiler.getFullClassName();
byte[] compiledBytes = compiler.getJavaFileObjectMap(className).getCompiledBytes(); byte[] compiledBytes = compiler.getJavaFileObjectMap(className).getCompiledBytes();
ClassPool.push(new ClassEntity(className, code, compiledBytes)); ClassPool.push(new ClassEntity(className, code, compiledBytes));
logger.info("编译成功"); log.info("class:{} 编译成功", className);
logger.info("compilerTakeTime:{}",compiler.getCompilerTakeTime()); log.info("compilerTakeTime:{}", compiler.getCompilerTakeTime());
initClassLoader(className); initClassLoader(className);
} else { } else {
logger.info("编译失败"); log.warn("class:{} 编译失败", className);
logger.info(compiler.getCompilerMessage()); log.warn(compiler.getCompilerMessage());
} }
return res;
} }
public static void initClassLoader(String name) { public static void initClassLoader(String name) {
...@@ -72,14 +87,13 @@ public class UDFUtil { ...@@ -72,14 +87,13 @@ public class UDFUtil {
groovyClassLoader.setShouldRecompile(true); groovyClassLoader.setShouldRecompile(true);
groovyClassLoader.defineClass(classEntity.getName(), classEntity.getClassByte()); groovyClassLoader.defineClass(classEntity.getName(), classEntity.getClassByte());
Thread.currentThread().setContextClassLoader(groovyClassLoader); Thread.currentThread().setContextClassLoader(groovyClassLoader);
//Class<?> clazz = groovyClassLoader.parseClass(codeSource,"com.dlink.ud.udf.SubstringFunction");
} }
public static Map<String, List<String>> buildJar(List<String> codeList) { public static Map<String, List<String>> buildJar(List<String> codeList) {
List<String> successList = new ArrayList<>(); List<String> successList = new ArrayList<>();
List<String> failedList = new ArrayList<>(); List<String> failedList = new ArrayList<>();
String tmpPath = System.getProperty("user.dir") + File.separator + "tmp/udf/"; String tmpPath = PathConstant.UDF_PATH;
String udfJarPath = tmpPath + "udf.jar"; String udfJarPath = UDF_JAR_TMP_PATH;
//删除jar缓存 //删除jar缓存
FileUtil.del(udfJarPath); FileUtil.del(udfJarPath);
codeList.forEach(code -> { codeList.forEach(code -> {
...@@ -87,20 +101,74 @@ public class UDFUtil { ...@@ -87,20 +101,74 @@ public class UDFUtil {
boolean res = compiler.compilerToTmpPath(tmpPath); boolean res = compiler.compilerToTmpPath(tmpPath);
String className = compiler.getFullClassName(); String className = compiler.getFullClassName();
if (res) { if (res) {
logger.info("编译成功"); log.info("class编译成功:{}" + className);
logger.info("compilerTakeTime:{}",compiler.getCompilerTakeTime()); log.info("compilerTakeTime:" + compiler.getCompilerTakeTime());
successList.add(className); successList.add(className);
} else { } else {
logger.info("编译失败"); log.warn("class编译失败:{}" + className);
logger.info(compiler.getCompilerMessage()); log.warn(compiler.getCompilerMessage());
failedList.add(className); failedList.add(className);
} }
}); });
String[] clazzs = successList.stream().map(className -> StrUtil.replace(className, ".", "/") + ".class").toArray(String[]::new); String[] clazzs = successList.stream().map(className -> StrUtil.replace(className, ".", "/") + ".class").toArray(String[]::new);
InputStream[] fileInputStreams = successList.stream().map(className -> tmpPath + StrUtil.replace(className, ".", "/") + ".class") InputStream[] fileInputStreams = successList.stream().map(className -> tmpPath + StrUtil.replace(className, ".", "/") + ".class").map(FileUtil::getInputStream).toArray(InputStream[]::new);
.map(FileUtil::getInputStream).toArray(InputStream[]::new);
// 编译好的文件打包jar // 编译好的文件打包jar
ZipUtil.zip(FileUtil.file(udfJarPath), clazzs, fileInputStreams); try (ZipUtils zipWriter = new ZipUtils(FileUtil.file(udfJarPath), Charset.defaultCharset())) {
return MapUtil.builder("success", successList).put("failed", failedList).build(); zipWriter.add(clazzs, fileInputStreams);
}
String md5 = md5sum(udfJarPath);
return MapUtil.builder("success", successList).put("failed", failedList).put("md5", Collections.singletonList(md5)).build();
}
/**
* 得到udf版本和构建jar
*
* @param codeList 代码列表
* @return {@link java.lang.String}
*/
public static String getUdfNameAndBuildJar(List<String> codeList) {
// 1. 检查所有jar的版本,通常名字为 udf-${version}.jar;如 udf-1.jar,没有这个目录则跳过
String md5 = buildJar(codeList).get("md5").get(0);
if (!FileUtil.exist(PathConstant.UDF_PATH)) {
FileUtil.mkdir(PathConstant.UDF_PATH);
}
try {
// 获取所有的udf jar的 md5 值,放入 map 里面
if (UDF_MD5_MAP.isEmpty()) {
scanUDFMD5();
}
//2. 如果有匹配的,返回对应udf 版本,没有则构建jar,对应信息写入 jar
if (UDF_MD5_MAP.containsKey(md5)) {
FileUtil.del(UDF_JAR_TMP_PATH);
return StrUtil.format("udf-{}.jar", UDF_MD5_MAP.get(md5));
}
// 3. 生成新版本jar
Integer newVersion = UDF_MD5_MAP.values().size() > 0 ? CollUtil.max(UDF_MD5_MAP.values()) + 1 : 1;
String jarName = StrUtil.format("udf-{}.jar", newVersion);
String newName = PathConstant.UDF_PATH + jarName;
FileUtil.rename(FileUtil.file(UDF_JAR_TMP_PATH), newName, true);
UDF_MD5_MAP.put(md5, newVersion);
return jarName;
} catch (Exception e) {
log.warn("builder jar failed! please check env. msg:{}", e.getMessage());
throw new RuntimeException(e);
}
}
/**
* 扫描udf包文件,写入md5到 UDF_MD5_MAP
*/
private static void scanUDFMD5() {
List<String> fileList = FileUtil.listFileNames(PathConstant.UDF_PATH);
fileList.stream().filter(fileName -> ReUtil.isMatch(UDF_JAR_RULE, fileName)).distinct().forEach(fileName -> {
Integer version = Convert.toInt(ReUtil.getGroup0(UDF_VERSION_RULE, fileName));
UDF_MD5_MAP.put(md5sum(PathConstant.UDF_PATH + fileName), version);
});
}
private static String md5sum(String filePath) {
return MD5.create().digestHex(FileUtil.file(filePath));
} }
} }
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.utils;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import cn.hutool.core.compress.ZipWriter;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IORuntimeException;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
/**
* zip压缩包工具类
*
* @author ZackYoung
* @since 0.6.8
*/
public class ZipUtils extends ZipWriter {
public ZipUtils(File zipFile, Charset charset) {
super(zipFile, charset);
}
public ZipUtils(OutputStream out, Charset charset) {
super(out, charset);
}
public ZipUtils(ZipOutputStream out) {
super(out);
}
@Override
public ZipWriter add(String[] paths, InputStream[] ins) throws IORuntimeException {
if (ArrayUtil.isEmpty(paths) || ArrayUtil.isEmpty(ins)) {
throw new IllegalArgumentException("Paths or ins is empty !");
}
if (paths.length != ins.length) {
throw new IllegalArgumentException("Paths length is not equals to ins length !");
}
long maxTime = Stream.of(paths).map(FileUtil::file).mapToLong(File::lastModified).max().getAsLong();
for (int i = 0; i < paths.length; i++) {
add(paths[i], ins[i], maxTime);
}
return this;
}
public ZipWriter add(String path, InputStream in, long fileTime) throws IORuntimeException {
path = StrUtil.nullToEmpty(path);
if (null == in) {
// 空目录需要检查路径规范性,目录以"/"结尾
path = StrUtil.addSuffixIfNot(path, StrUtil.SLASH);
if (StrUtil.isBlank(path)) {
return this;
}
}
return putEntry(path, in, fileTime);
}
private ZipWriter putEntry(String path, InputStream in, long fileTime) throws IORuntimeException {
try {
ZipEntry zipEntry = new ZipEntry(path);
zipEntry.setTime(fileTime);
super.getOut().putNextEntry(zipEntry);
if (null != in) {
IoUtil.copy(in, super.getOut());
}
super.getOut().closeEntry();
} catch (IOException e) {
throw new IORuntimeException(e);
} finally {
IoUtil.close(in);
}
IoUtil.flush(super.getOut());
return this;
}
}
...@@ -37,6 +37,8 @@ import lombok.Setter; ...@@ -37,6 +37,8 @@ import lombok.Setter;
public class EnvironmentSetting { public class EnvironmentSetting {
private String host; private String host;
private int port; private int port;
private String[] jarFiles;
private boolean useRemote; private boolean useRemote;
public static final EnvironmentSetting LOCAL = new EnvironmentSetting(false); public static final EnvironmentSetting LOCAL = new EnvironmentSetting(false);
...@@ -45,19 +47,20 @@ public class EnvironmentSetting { ...@@ -45,19 +47,20 @@ public class EnvironmentSetting {
this.useRemote = useRemote; this.useRemote = useRemote;
} }
public EnvironmentSetting(String host, int port) { public EnvironmentSetting(String host, int port, String... jarFiles) {
this.host = host; this.host = host;
this.port = port; this.port = port;
this.useRemote = true; this.useRemote = true;
this.jarFiles = jarFiles;
} }
public static EnvironmentSetting build(String address) { public static EnvironmentSetting build(String address,String... jarFiles) {
Asserts.checkNull(address, "Flink 地址不能为空"); Asserts.checkNull(address, "Flink 地址不能为空");
String[] strs = address.split(NetConstant.COLON); String[] strs = address.split(NetConstant.COLON);
if (strs.length >= 2) { if (strs.length >= 2) {
return new EnvironmentSetting(strs[0], Integer.parseInt(strs[1])); return new EnvironmentSetting(strs[0], Integer.parseInt(strs[1]),jarFiles);
} else { } else {
return new EnvironmentSetting(strs[0], FlinkConstant.FLINK_REST_DEFAULT_PORT); return new EnvironmentSetting(strs[0], FlinkConstant.FLINK_REST_DEFAULT_PORT,jarFiles);
} }
} }
......
...@@ -41,7 +41,11 @@ import org.apache.flink.table.api.StatementSet; ...@@ -41,7 +41,11 @@ import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.util.JarUtils;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -255,6 +259,41 @@ public abstract class Executor { ...@@ -255,6 +259,41 @@ public abstract class Executor {
} }
} }
/**
* init udf
*
* @param udfFilePath udf文件路径
*/
public void initUDF(String... udfFilePath) {
JarUtils.getJarFiles(udfFilePath).forEach(Executor::loadJar);
}
private static void loadJar(final URL jarUrl) {
// 从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
// 修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
// jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
public String explainSql(String statement, ExplainDetail... extraDetails) { public String explainSql(String statement, ExplainDetail... extraDetails) {
statement = pretreatStatement(statement); statement = pretreatStatement(statement);
if (!pretreatExecute(statement).isNoExecute()) { if (!pretreatExecute(statement).isNoExecute()) {
......
...@@ -37,9 +37,9 @@ public class RemoteStreamExecutor extends Executor { ...@@ -37,9 +37,9 @@ public class RemoteStreamExecutor extends Executor {
this.executorSetting = executorSetting; this.executorSetting = executorSetting;
if (Asserts.isNotNull(executorSetting.getConfig())) { if (Asserts.isNotNull(executorSetting.getConfig())) {
Configuration configuration = Configuration.fromMap(executorSetting.getConfig()); Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort(), configuration); this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort(), configuration, environmentSetting.getJarFiles());
} else { } else {
this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort()); this.environment = StreamExecutionEnvironment.createRemoteEnvironment(environmentSetting.getHost(), environmentSetting.getPort(), environmentSetting.getJarFiles());
} }
init(); init();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment