Unverified Commit 4c7bb539 authored by ZackYoung's avatar ZackYoung Committed by GitHub

add Udf scala (#1145)

* add scala udf backend

* add scala udf frontend
parent 4947b119
......@@ -26,8 +26,11 @@ import com.dlink.model.Jar;
import com.dlink.model.Task;
import com.dlink.service.JarService;
import com.dlink.service.TaskService;
import com.dlink.udf.UDF;
import com.dlink.utils.UDFUtil;
import org.apache.flink.table.catalog.FunctionLanguage;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
......@@ -127,7 +130,10 @@ public class JarController {
@PostMapping("/udf/generateJar")
public Result<Map<String, List<String>>> generateJar() {
List<Task> allUDF = taskService.getAllUDF();
List<String> udfCodes = allUDF.stream().map(Task::getStatement).collect(Collectors.toList());
List<UDF> udfCodes = allUDF.stream().map(task -> {
return UDF.builder().code(task.getStatement()).className(task.getSavePointPath())
.functionLanguage(FunctionLanguage.valueOf(task.getDialect().toUpperCase())).build();
}).collect(Collectors.toList());
Map<String, List<String>> resultMap = UDFUtil.buildJar(udfCodes);
String msg = StrUtil.format("udf jar生成成功,jar文件在{};\n本次成功 class:{}。\n失败 class:{}"
, PathConstant.UDF_JAR_TMP_PATH, resultMap.get("success"), resultMap.get("failed"));
......
......@@ -368,6 +368,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
task.setSavePointPath(compiler.getFullClassName());
} else if (Dialect.PYTHON.equalsVal(task.getDialect())) {
task.setSavePointPath(task.getName() + "." + UDFUtil.getPyUDFAttr(task.getStatement()));
} else {
task.setSavePointPath(UDFUtil.getScalaFullClassName(task.getStatement()));
}
// if modify task else create task
if (task.getId() != null) {
......@@ -484,7 +486,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public Task getUDFByClassName(String className) {
Task task = getOne(
new QueryWrapper<Task>().in("dialect", "Java", "Python").eq("enabled", 1).eq("save_point_path", className));
new QueryWrapper<Task>().in("dialect", "Java", "Python", "Scala").eq("enabled", 1).eq("save_point_path", className));
Assert.check(task);
task.setStatement(statementService.getById(task.getId()).getStatement());
return task;
......@@ -493,7 +495,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
@Override
public List<Task> getAllUDF() {
List<Task> tasks =
list(new QueryWrapper<Task>().in("dialect", "Java", "Python").eq("enabled", 1).isNotNull("save_point_path"));
list(new QueryWrapper<Task>().in("dialect", "Java", "Python", "Scala").eq("enabled", 1).isNotNull("save_point_path"));
return tasks.stream().peek(task -> {
Assert.check(task);
task.setStatement(statementService.getById(task.getId()).getStatement());
......
......@@ -37,7 +37,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Resource;
......@@ -74,15 +73,16 @@ public class UDFServiceImpl implements UDFService {
ProcessEntity process = ProcessContextHolder.getProcess();
process.info("Initializing Flink UDF...Start");
List<UDF> udfList = UDFUtil.getUDF(statement);
List<UDF> udfClassList = UDFUtil.getUDF(statement);
List<UDF> javaUdf = new ArrayList<>();
List<UDF> pythonUdf = new ArrayList<>();
udfList.forEach(udf -> {
udfClassList.forEach(udf -> {
Task task = taskService.getUDFByClassName(udf.getClassName());
udf.setCode(task.getStatement());
if (udf.getFunctionLanguage() == FunctionLanguage.PYTHON) {
pythonUdf.add(udf);
} else {
udf.setFunctionLanguage(FunctionLanguage.valueOf(task.getDialect().toUpperCase()));
javaUdf.add(udf);
}
});
......@@ -94,14 +94,13 @@ public class UDFServiceImpl implements UDFService {
}
private static String[] initPythonUDF(List<UDF> udfList) {
return new String[] {UDFUtil.buildPy(udfList)};
return (udfList.size() > 0) ? new String[] {UDFUtil.buildPy(udfList)} : new String[] {};
}
private static String[] initJavaUDF(List<UDF> udfList) {
Opt<String> udfJarPath = Opt.empty();
if (udfList.size() > 0) {
List<String> codeList = udfList.stream().map(UDF::getCode).collect(Collectors.toList());
udfJarPath = Opt.ofBlankAble(UDFUtil.getUdfFileAndBuildJar(codeList));
udfJarPath = Opt.ofBlankAble(UDFUtil.getUdfFileAndBuildJar(udfList));
}
if (udfJarPath.isPresent()) {
......
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.dlink.utils;
import com.dlink.constant.PathConstant;
import scala.tools.nsc.GenericRunnerSettings;
import scala.tools.nsc.interpreter.IMain;
/**
* @author ZackYoung
* @since 0.6.8
*/
public class CustomStringScalaCompiler {
private static IMain interpreter;
public static IMain getInterpreter() {
if (interpreter != null) {
return interpreter;
}
GenericRunnerSettings settings = new GenericRunnerSettings((err) -> null);
settings.usejavacp().tryToSetFromPropertyValue("true");
settings.Yreploutdir().tryToSetFromPropertyValue(PathConstant.UDF_PATH);
interpreter = new IMain(settings);
return interpreter;
}
}
......@@ -33,6 +33,7 @@ import java.io.File;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
......@@ -73,6 +74,8 @@ public class UDFUtil {
private static final String LANGUAGE_REGEX = "language (.*);";
public static final String PYTHON_UDF_ATTR = "(\\S)\\s+=\\s+ud(?:f|tf|af|taf)";
public static final String PYTHON_UDF_DEF = "@ud(?:f|tf|af|taf).*\\n+def\\s+(.*)\\(.*\\):";
public static final String SCALA_UDF_CLASS = "class\\s+(\\w+)(\\s*\\(.*\\)){0,1}\\s+extends";
public static final String SCALA_UDF_PACKAGE = "package\\s+(.*);";
public static List<UDF> getUDF(String statement) {
ProcessEntity process = ProcessContextHolder.getProcess();
......@@ -117,6 +120,12 @@ public class UDFUtil {
.orElse(ReUtil.getGroup1(UDFUtil.PYTHON_UDF_DEF, code));
}
public static String getScalaFullClassName(String code) {
String packageName = ReUtil.getGroup1(UDFUtil.SCALA_UDF_PACKAGE, code);
String clazz = ReUtil.getGroup1(UDFUtil.SCALA_UDF_CLASS, code);
return String.join(".", Arrays.asList(packageName, clazz));
}
public static Boolean buildClass(String code) {
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(code);
boolean res = compiler.compiler();
......@@ -145,15 +154,16 @@ public class UDFUtil {
Thread.currentThread().setContextClassLoader(groovyClassLoader);
}
public static Map<String, List<String>> buildJar(List<String> codeList) {
public static Map<String, List<String>> buildJar(List<UDF> codeList) {
List<String> successList = new ArrayList<>();
List<String> failedList = new ArrayList<>();
String tmpPath = PathConstant.UDF_PATH;
String udfJarPath = PathConstant.UDF_JAR_TMP_PATH;
// 删除jar缓存
FileUtil.del(udfJarPath);
codeList.forEach(code -> {
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(code);
codeList.forEach(udf -> {
if (udf.getFunctionLanguage() == FunctionLanguage.JAVA) {
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(udf.getCode());
boolean res = compiler.compilerToTmpPath(tmpPath);
String className = compiler.getFullClassName();
if (res) {
......@@ -165,6 +175,17 @@ public class UDFUtil {
log.warn(compiler.getCompilerMessage());
failedList.add(className);
}
} else if (udf.getFunctionLanguage() == FunctionLanguage.SCALA) {
String className = udf.getClassName();
if (CustomStringScalaCompiler.getInterpreter().compileString(udf.getCode())) {
log.info("scala class编译成功:{}" + className);
successList.add(className);
} else {
log.warn("scala class编译失败:{}" + className);
failedList.add(className);
}
}
});
String[] clazzs = successList.stream().map(className -> StrUtil.replace(className, ".", "/") + ".class")
.toArray(String[]::new);
......@@ -186,7 +207,7 @@ public class UDFUtil {
* @param codeList 代码列表
* @return {@link java.lang.String}
*/
public static String getUdfFileAndBuildJar(List<String> codeList) {
public static String getUdfFileAndBuildJar(List<UDF> codeList) {
// 1. 检查所有jar的版本,通常名字为 udf-${version}.jar;如 udf-1.jar,没有这个目录则跳过
String md5 = buildJar(codeList).get("md5").get(0);
if (!FileUtil.exist(PathConstant.UDF_PATH)) {
......
......@@ -270,8 +270,11 @@ public abstract class Executor {
}
public void initPyUDF(String... udfPyFilePath) {
if (udfPyFilePath == null || udfPyFilePath.length == 0) {
return;
}
Map<String, String> config = executorSetting.getConfig();
if (Asserts.isAllNotNullString(udfPyFilePath) && Asserts.isNotNull(config)) {
if (Asserts.isNotNull(config)) {
config.put(PythonOptions.PYTHON_FILES.key(), String.join(",", udfPyFilePath));
}
update(executorSetting);
......
......@@ -57,6 +57,9 @@ const StudioRightTool = (props: any) => {
if (DIALECT.JAVA === current.task.dialect) {
return renderUDFContent();
}
if (DIALECT.SCALA === current.task.dialect) {
return renderUDFContent();
}
if (DIALECT.PYTHON === current.task.dialect) {
return renderUDFContent();
}
......
......@@ -140,6 +140,8 @@ const EditorTabs = (props: any) => {
switch (dialect) {
case DIALECT.JAVA:
return DIALECT.JAVA.toLowerCase()
case DIALECT.SCALA:
return DIALECT.SCALA.toLowerCase()
case DIALECT.PYTHON:
return DIALECT.PYTHON.toLowerCase()
default:
......
......@@ -90,6 +90,7 @@ const SimpleTaskForm: React.FC<UpdateFormProps> = (props) => {
<Option value={DIALECT.PHOENIX}>{DIALECT.PHOENIX}</Option>
<Option value={DIALECT.STARROCKS}>{DIALECT.STARROCKS}</Option>
<Option value={DIALECT.JAVA}>{DIALECT.JAVA}</Option>
<Option value={DIALECT.SCALA}>{DIALECT.SCALA}</Option>
<Option value={DIALECT.PYTHON}>{DIALECT.PYTHON}</Option>
<Option value={DIALECT.SQL}>{DIALECT.SQL}</Option>
</Select>
......
......@@ -44,6 +44,7 @@ export const DIALECT = {
STARROCKS: 'StarRocks',
KUBERNETES_APPLICATION: 'KubernetesApplaction',
JAVA: 'Java',
SCALA: 'Scala',
PYTHON: 'Python',
};
......
......@@ -49,6 +49,8 @@ export const getIcon = (type: string) => {
return (<Icon component={StarRocksSvg}/>);
case DIALECT.JAVA:
return (<Icon component={JavaSvg}/>);
case DIALECT.SCALA:
return (<Icon component={ScalaSvg}/>);
case DIALECT.PYTHON:
return (<Icon component={PythonSvg}/>);
default:
......@@ -389,6 +391,17 @@ export const JavaSvg = () => (
fill="#6699FF" p-id="19722"></path>
</svg>
);
export const ScalaSvg = () => (
<svg t="1666600662911" className="icon" viewBox="0 0 1024 1024" version="1.1" xmlns="http://www.w3.org/2000/svg"
p-id="816" width={svgSize} height={svgSize}>
<path
d="M213.844 203.564l596.4-87.928v197.952l-596.4 87.928zM213.788 457.312l596.4-87.928v197.952l-596.4 87.928zM213.768 710.4l596.392-87.932v197.952L213.768 908.348z"
fill="#F44336" p-id="817"></path>
<path
d="M225.12 366.636l382.416 123.684-11.328 35.028-382.416-123.684zM427.76 245.496l382.416 123.688-11.328 35.028-382.416-123.688zM225.112 620.148l382.416 123.684-11.328 35.032-382.416-123.688zM427.792 498.604l382.416 123.688-11.332 35.028-382.412-123.688z"
fill="#F44336" p-id="818"></path>
</svg>
)
export const PythonSvg = () => (
<svg t="1666454409766" className="icon" viewBox="0 0 1024 1024" version="1.1" xmlns="http://www.w3.org/2000/svg"
p-id="2303" width={svgSize} height={svgSize}>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment