Commit 2cbc96e0 authored by wenmo's avatar wenmo

执行历史与集群自动注册

parent 9f684bbb
......@@ -114,6 +114,14 @@
<groupId>com.dlink</groupId>
<artifactId>dlink-metadata-base</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-executor</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-gateway</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
......@@ -124,6 +132,16 @@
<configuration>
<layout>ZIP</layout>
<mainClass>com.dlink.Dlink</mainClass>
<excludes>
<exclude>
<groupId>com.dlink</groupId>
<artifactId>dlink-executor</artifactId>
</exclude>
<exclude>
<groupId>com.dlink</groupId>
<artifactId>dlink-gateway</artifactId>
</exclude>
</excludes>
</configuration>
<executions>
<execution>
......
......@@ -33,13 +33,9 @@ public class ClusterController {
*/
@PutMapping
public Result saveOrUpdate(@RequestBody Cluster cluster) throws Exception {
checkHealth(cluster);
if(clusterService.saveOrUpdate(cluster)){
return Result.succeed("新增成功");
}else {
return Result.failed("新增失败");
}
cluster.setAutoRegisters(false);
clusterService.registersCluster(cluster);
return Result.succeed("新增成功");
}
/**
......@@ -91,6 +87,15 @@ public class ClusterController {
return Result.succeed(clusters,"获取成功");
}
/**
* 获取可用的集群列表
*/
@GetMapping("/listSessionEnable")
public Result listSessionEnable() {
List<Cluster >clusters = clusterService.listSessionEnable();
return Result.succeed(clusters,"获取成功");
}
/**
* 全部心跳监测
*/
......@@ -99,21 +104,9 @@ public class ClusterController {
List<Cluster> clusters = clusterService.listEnabledAll();
for (int i = 0; i < clusters.size(); i++) {
Cluster cluster = clusters.get(i);
checkHealth(cluster);
clusterService.updateById(cluster);
clusterService.registersCluster(cluster);
}
return Result.succeed("状态刷新完成");
}
private void checkHealth(Cluster cluster){
FlinkClusterInfo info = clusterService.checkHeartBeat(cluster.getHosts(), cluster.getJobManagerHost());
if(!info.isEffective()){
cluster.setJobManagerHost("");
cluster.setStatus(0);
}else{
cluster.setJobManagerHost(info.getJobManagerAddress());
cluster.setStatus(1);
cluster.setVersion(info.getVersion());
}
}
}
......@@ -13,6 +13,7 @@ import lombok.Setter;
@Getter
@Setter
public class StudioDDLDTO {
private String type;
private boolean useResult;
private boolean useSession;
private String session;
......@@ -21,7 +22,7 @@ public class StudioDDLDTO {
private String statement;
public JobConfig getJobConfig() {
return new JobConfig(useResult, useSession, session, useRemote, clusterId);
return new JobConfig(type,useResult, useSession, session, useRemote, clusterId);
}
}
package com.dlink.dto;
import com.dlink.assertion.Asserts;
import com.dlink.job.JobConfig;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import java.util.HashMap;
import java.util.Map;
/**
......@@ -15,11 +20,14 @@ import java.util.Map;
@Getter
@Setter
public class StudioExecuteDTO {
private String type;
private boolean useResult;
private boolean useStatementSet;
private boolean useSession;
private String session;
private boolean useRemote;
private Integer clusterId;
private Integer clusterConfigurationId;
private boolean fragment;
private String statement;
private String jobName;
......@@ -28,17 +36,26 @@ public class StudioExecuteDTO {
private Integer checkPoint;
private Integer parallelism;
private String savePointPath;
// private Map<String,String> config;
private String configJson;
private static final ObjectMapper mapper = new ObjectMapper();
public JobConfig getJobConfig() {
return new JobConfig(useResult, useSession, session, useRemote, clusterId, taskId, jobName, fragment, maxRowNum, checkPoint, parallelism, savePointPath);
}
/*public String getSession() {
if(useRemote) {
return clusterId + "_" + session;
}else{
return "0_" + session;
Map<String,String> config = new HashMap<>();
JsonNode paras = null;
if(Asserts.isNotNullString(configJson)) {
try {
paras = mapper.readTree(configJson);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
paras.forEach((JsonNode node) -> {
config.put(node.get("key").asText(), node.get("value").asText());
}
);
}
}*/
return new JobConfig(
type,useResult, useSession, session, useRemote, clusterId,
clusterConfigurationId, taskId, jobName, fragment,useStatementSet,
maxRowNum, checkPoint, parallelism, savePointPath,config);
}
}
package com.dlink.job;
import cn.hutool.json.JSONUtil;
import com.dlink.assertion.Asserts;
import com.dlink.context.SpringContextUtils;
import com.dlink.model.Cluster;
import com.dlink.model.History;
import com.dlink.parser.SqlType;
import com.dlink.service.ClusterService;
import com.dlink.service.HistoryService;
import org.springframework.context.annotation.DependsOn;
......@@ -17,19 +19,23 @@ import org.springframework.context.annotation.DependsOn;
public class Job2MysqlHandler implements JobHandler {
private static HistoryService historyService;
private static ClusterService clusterService;
static {
historyService = SpringContextUtils.getBean("historyServiceImpl",HistoryService.class);
clusterService = SpringContextUtils.getBean("clusterServiceImpl",ClusterService.class);
}
@Override
public boolean init() {
Job job = JobContextHolder.getJob();
if(job.getType()!= SqlType.SELECT&&job.getType()!=SqlType.INSERT){
return false;
}
History history = new History();
history.setClusterId(job.getJobConfig().getClusterId());
history.setType(job.getType().getLongValue());
if(job.isUseGateway()) {
history.setClusterConfigurationId(job.getJobConfig().getClusterConfigurationId());
}else{
history.setClusterId(job.getJobConfig().getClusterId());
}
history.setJobManagerAddress(job.getJobManagerAddress());
history.setJobName(job.getJobConfig().getJobName());
history.setSession(job.getJobConfig().getSession());
......@@ -56,16 +62,23 @@ public class Job2MysqlHandler implements JobHandler {
@Override
public boolean success() {
Job job = JobContextHolder.getJob();
if(job.getType()!= SqlType.SELECT&&job.getType()!=SqlType.INSERT){
return false;
}
History history = new History();
history.setId(job.getId());
history.setJobId(job.getJobId());
history.setType(job.getType().getType());
history.setStatus(job.getStatus().ordinal());
history.setEndTime(job.getEndTime());
if(job.isUseGateway()){
history.setJobManagerAddress(job.getJobManagerAddress());
}
// history.setResult(JSONUtil.toJsonStr(job.getResult()));
if(job.isUseGateway()){
Cluster cluster = clusterService.registersCluster(Cluster.autoRegistersCluster(job.getJobManagerAddress(),
job.getJobConfig().getJobName(), job.getType().getLongValue(),
job.getJobConfig().getClusterConfigurationId(),job.getJobConfig().getClusterConfigurationId()));
if(Asserts.isNotNull(cluster)){
history.setClusterId(cluster.getId());
}
}
historyService.updateById(history);
return true;
}
......@@ -73,14 +86,11 @@ public class Job2MysqlHandler implements JobHandler {
@Override
public boolean failed() {
Job job = JobContextHolder.getJob();
if(job.getType()!= SqlType.SELECT&&job.getType()!=SqlType.INSERT){
return false;
}
History history = new History();
history.setId(job.getId());
history.setJobId(job.getJobId());
history.setStatus(job.getStatus().ordinal());
history.setType(job.getType().getType());
history.setJobManagerAddress(job.getJobManagerAddress());
history.setEndTime(job.getEndTime());
history.setError(job.getError());
historyService.updateById(history);
......
......@@ -4,6 +4,8 @@ import com.dlink.db.mapper.SuperMapper;
import com.dlink.model.Cluster;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
/**
* ClusterMapper
*
......@@ -12,4 +14,6 @@ import org.apache.ibatis.annotations.Mapper;
**/
@Mapper
public interface ClusterMapper extends SuperMapper<Cluster> {
List<Cluster> listSessionEnable();
}
......@@ -34,4 +34,23 @@ public class Cluster extends SuperEntity {
private Integer status;
private String note;
private boolean autoRegisters;
private Integer clusterConfigurationId;
private Integer taskId;
public static Cluster autoRegistersCluster(String hosts,String name,String type,Integer clusterConfigurationId,Integer taskId){
Cluster cluster = new Cluster();
cluster.setName(name);
cluster.setAlias(name);
cluster.setHosts(hosts);
cluster.setType(type);
cluster.setClusterConfigurationId(clusterConfigurationId);
cluster.setTaskId(taskId);
cluster.setAutoRegisters(true);
cluster.setEnabled(true);
return cluster;
}
}
......@@ -23,6 +23,7 @@ public class History implements Serializable {
private Integer id;
private Integer clusterId;
private Integer clusterConfigurationId;
private String session;
private String jobId;
private String jobName;
......
......@@ -25,4 +25,8 @@ public interface ClusterService extends ISuperService<Cluster> {
String buildLocalEnvironmentAddress();
List<Cluster> listEnabledAll();
List<Cluster> listSessionEnable();
Cluster registersCluster(Cluster cluster);
}
......@@ -65,7 +65,7 @@ public class ClusterServiceImpl extends SuperServiceImpl<ClusterMapper, Cluster>
try {
InetAddress inetAddress = InetAddress.getLocalHost();
if(inetAddress!=null) {
return inetAddress.getHostAddress()+ NetConstant.COLON+FlinkConstant.PORT;
return inetAddress.getHostAddress()+ NetConstant.COLON+FlinkConstant.FLINK_REST_DEFAULT_PORT;
}
} catch (UnknownHostException e) {
e.printStackTrace();
......@@ -77,4 +77,28 @@ public class ClusterServiceImpl extends SuperServiceImpl<ClusterMapper, Cluster>
public List<Cluster> listEnabledAll() {
return this.list(new QueryWrapper<Cluster>().eq("enabled",1));
}
@Override
public List<Cluster> listSessionEnable() {
return baseMapper.listSessionEnable();
}
@Override
public Cluster registersCluster(Cluster cluster) {
checkHealth(cluster);
saveOrUpdate(cluster);
return cluster;
}
private void checkHealth(Cluster cluster){
FlinkClusterInfo info = checkHeartBeat(cluster.getHosts(), cluster.getJobManagerHost());
if(!info.isEffective()){
cluster.setJobManagerHost("");
cluster.setStatus(0);
}else{
cluster.setJobManagerHost(info.getJobManagerAddress());
cluster.setStatus(1);
cluster.setVersion(info.getVersion());
}
}
}
......@@ -13,6 +13,9 @@
<result column="version" property="version" />
<result column="status" property="status" />
<result column="note" property="note" />
<result column="auto_registers" property="autoRegisters" />
<result column="cluster_configuration_id" property="clusterConfigurationId" />
<result column="task_id" property="taskId" />
<result column="enabled" property="enabled" />
<result column="create_time" property="createTime" />
<result column="update_time" property="updateTime" />
......@@ -20,7 +23,7 @@
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, name, alias, type,hosts,job_manager_host,version, status,note, enabled, create_time, update_time
id, name, alias, type,hosts,job_manager_host,version, status,note,auto_registers,cluster_configuration_id,task_id, enabled, create_time, update_time
</sql>
......@@ -51,4 +54,13 @@
</if>
</where>
</select>
<select id="listSessionEnable" resultType="com.dlink.model.Cluster">
select
a.*
from
dlink_cluster a
where enabled = 1
and (type = 'standalone' or type = 'yarn-session')
</select>
</mapper>
......@@ -6,6 +6,7 @@
<resultMap id="BaseResultMap" type="com.dlink.model.History">
<id column="id" property="id"/>
<result column="cluster_id" property="clusterId"/>
<result column="cluster_configuration_id" property="clusterConfigurationId"/>
<result column="session" property="session"/>
<result column="jod_id" property="jobId"/>
<result column="job_name" property="jobName"/>
......@@ -23,7 +24,7 @@
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id,cluster_id,session,jod_id,job_name,
id,cluster_id,cluster_configuration_id,session,jod_id,job_name,
job_manager_address,status,statement,type, error,
result,config,start_time,end_time,task_id
</sql>
......
......@@ -179,5 +179,13 @@
<include>dlink-app-${project.version}-jar-with-dependencies.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-web/dist</directory>
<outputDirectory>html</outputDirectory>
<includes>
<include>*</include>
<include>*/*</include>
</includes>
</fileSet>
</fileSets>
</assembly>
\ No newline at end of file
package com.dlink.constant;
/**
* FlinkConstant
*
* @author wenmo
* @since 2021/5/25 14:39
**/
public interface FlinkConstant {
/**
* flink端口
*/
Integer PORT = 8081;
/**
* flink会话默认个数
*/
Integer DEFAULT_SESSION_COUNT = 256;
/**
* flink加载因子
*/
Double DEFAULT_FACTOR = 0.75;
/**
* 本地模式host
*/
String LOCAL_HOST = "localhost:8081";
}
package com.dlink.constant;
public interface FlinkFunctionConstant {
/**
* TO_MAP 函数
*/
String TO_MAP = "to_map";
/**
* GET_KEY 函数
*/
String GET_KEY = "get_key";
/**
* TOP2 函数
*/
String TOP2 = "top2";
}
package com.dlink.constant;
/**
* FlinkSQLConstant
*
* @author wenmo
* @since 2021/5/25 15:51
**/
public interface FlinkSQLConstant {
/**
* 分隔符
*/
String SEPARATOR = ";";
/**
* DDL 类型
*/
String DDL = "DDL";
/**
* DML 类型
*/
String DML = "DML";
}
......@@ -2,6 +2,7 @@ package com.dlink.job;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.gateway.GatewayType;
import com.dlink.parser.SqlType;
import com.dlink.result.IResult;
import lombok.Getter;
......@@ -23,7 +24,7 @@ public class Job {
private JobConfig jobConfig;
private String jobManagerAddress;
private JobStatus status;
private SqlType type;
private GatewayType type;
private String statement;
private String jobId;
private String error;
......@@ -32,8 +33,9 @@ public class Job {
private LocalDateTime startTime;
private LocalDateTime endTime;
private Executor executor;
private boolean useGateway;
enum JobStatus{
enum JobStatus {
INITIALIZE,
RUNNING,
SUCCESS,
......@@ -41,17 +43,22 @@ public class Job {
CANCEL
}
public Job(JobConfig jobConfig, String jobManagerAddress, JobStatus status, String statement,ExecutorSetting executorSetting, LocalDateTime startTime, Executor executor) {
public Job(JobConfig jobConfig, GatewayType type, JobStatus status, String statement, ExecutorSetting executorSetting, Executor executor, boolean useGateway) {
this.jobConfig = jobConfig;
this.jobManagerAddress = jobManagerAddress;
this.type = type;
this.status = status;
this.statement = statement;
this.executorSetting = executorSetting;
this.startTime = startTime;
this.startTime = LocalDateTime.now();
this.executor = executor;
this.useGateway = useGateway;
}
public JobResult getJobResult(){
return new JobResult(id,jobConfig,jobManagerAddress,status,statement,jobId,error,result,startTime,endTime);
public static Job init(GatewayType type, JobConfig jobConfig, ExecutorSetting executorSetting, Executor executor, String statement, boolean useGateway) {
return new Job(jobConfig, type, JobStatus.INITIALIZE, statement, executorSetting, executor, useGateway);
}
public JobResult getJobResult() {
return new JobResult(id, jobConfig, jobManagerAddress, status, statement, jobId, error, result, startTime, endTime);
}
}
......@@ -38,27 +38,32 @@ public class JobConfig {
private String savePointPath;
private GatewayConfig gatewayConfig;
//private Map<String,String> config;
private Map<String,String> config;
public JobConfig(boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId,
Integer taskId, String jobName, boolean useSqlFragment, Integer maxRowNum, Integer checkpoint,
Integer parallelism, String savePointPath) {
public JobConfig(String type,boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId,
Integer clusterConfigurationId,Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet,Integer maxRowNum, Integer checkpoint,
Integer parallelism, String savePointPath,Map<String,String> config) {
this.type = type;
this.useResult = useResult;
this.useSession = useSession;
this.session = session;
this.useRemote = useRemote;
this.clusterId = clusterId;
this.clusterConfigurationId = clusterConfigurationId;
this.taskId = taskId;
this.jobName = jobName;
this.useSqlFragment = useSqlFragment;
this.useStatementSet = useStatementSet;
this.maxRowNum = maxRowNum;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.savePointPath = savePointPath;
// this.config = config;
this.config = config;
}
public JobConfig(boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId) {
public JobConfig(String type,boolean useResult, boolean useSession, String session, boolean useRemote, Integer clusterId) {
this.type = type;
this.useResult = useResult;
this.useSession = useSession;
this.session = session;
......
package com.dlink.parser;
import com.dlink.assertion.Asserts;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* BaseSingleSqlParser
*
* @author wenmo
* @since 2021/6/14 16:43
*/
public abstract class BaseSingleSqlParser {
//原始Sql语句
protected String originalSql;
//Sql语句片段
protected List<SqlSegment> segments;
/**
* 构造函数,传入原始Sql语句,进行劈分。
**/
public BaseSingleSqlParser(String originalSql) {
this.originalSql = originalSql;
segments = new ArrayList<SqlSegment>();
initializeSegments();
}
/**
* 初始化segments,强制子类实现
**/
protected abstract void initializeSegments();
/**
* 将originalSql劈分成一个个片段
**/
protected Map<String,List<String>> splitSql2Segment() {
Map<String,List<String>> map = new HashMap<>();
for (SqlSegment sqlSegment : segments) {
sqlSegment.parse(originalSql);
if(Asserts.isNotNullString(sqlSegment.getStart())) {
map.put(sqlSegment.getStart().toUpperCase(), sqlSegment.getBodyPieces());
}
}
return map;
}
/**
* 得到解析完毕的Sql语句
**/
public String getParsedSql() {
StringBuffer sb = new StringBuffer();
for (SqlSegment sqlSegment : segments) {
sb.append(sqlSegment.getParsedSqlSegment() + "\n");
}
String retval = sb.toString().replaceAll("\n+", "\n");
return retval;
}
}
package com.dlink.parser;
/**
* CreateAggTableSelectSqlParser
*
* @author wenmo
* @since 2021/6/14 16:56
*/
public class CreateAggTableSelectSqlParser extends BaseSingleSqlParser {
public CreateAggTableSelectSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(create\\s+aggtable)(.+)(as\\s+select)", "[,]"));
segments.add(new SqlSegment("(select)(.+)(from)", "[,]"));
segments.add(new SqlSegment("(from)(.+?)( where | on | having | group\\s+by | order\\s+by | agg\\s+by | ENDOFSQL)", "(,|\\s+left\\s+join\\s+|\\s+right\\s+join\\s+|\\s+inner\\s+join\\s+)"));
segments.add(new SqlSegment("(where|on|having)(.+?)( group\\s+by | order\\s+by | agg\\s+by | ENDOFSQL)", "(and|or)"));
segments.add(new SqlSegment("(group\\s+by)(.+?)( order\\s+by | agg\\s+by | ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(order\\s+by)(.+?)( agg\\s+by | ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(agg\\s+by)(.+?)( ENDOFSQL)", "[,]"));
}
}
package com.dlink.parser;
/**
* DeleteSqlParser
*
* @author wenmo
* @since 2021/6/14 16:51
*/
public class DeleteSqlParser extends BaseSingleSqlParser {
public DeleteSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(delete\\s+from)(.+)( where | ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(where)(.+)( ENDOFSQL)", "(and|or)"));
}
}
package com.dlink.parser;
/**
* InsertSelectSqlParser
*
* @author wenmo
* @since 2021/6/14 16:53
*/
public class InsertSelectSqlParser extends BaseSingleSqlParser {
public InsertSelectSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(insert\\s+into)(.+)( select )", "[,]"));
segments.add(new SqlSegment("(select)(.+)(from)", "[,]"));
segments.add(new SqlSegment("(from)(.+?)( where | on | having | group\\s+by | order\\s+by | ENDOFSQL)", "(,|\\s+left\\s+join\\s+|\\s+right\\s+join\\s+|\\s+inner\\s+join\\s+)"));
segments.add(new SqlSegment("(where|on|having)(.+?)( group\\s+by | order\\s+by | ENDOFSQL)", "(and|or)"));
segments.add(new SqlSegment("(group\\s+by)(.+?)( order\\s+by| ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(order\\s+by)(.+?)( ENDOFSQL)", "[,]"));
}
}
package com.dlink.parser;
/**
* InsertSqlParser
*
* @author wenmo
* @since 2021/6/14 16:54
*/
public class InsertSqlParser extends BaseSingleSqlParser {
public InsertSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(insert\\s+into)(.+?)([(])", "[,]"));
segments.add(new SqlSegment("([(])(.+?)([)]\\s+values\\s+[(])", "[,]"));
segments.add(new SqlSegment("([)]\\s+values\\s+[(])(.+)([)]\\s+ENDOFSQL)", "[,]"));
}
public String getParsedSql() {
String retval = super.getParsedSql();
retval = retval + ")";
return retval;
}
}
package com.dlink.parser;
/**
* SelectSqlParser
*
* @author wenmo
* @since 2021/6/14 16:53
*/
public class SelectSqlParser extends BaseSingleSqlParser {
public SelectSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(select)(.+)(from)", "[,]"));
segments.add(new SqlSegment("(from)(.+?)(where |group\\s+by|having|order\\s+by | ENDOFSQL)", "(,|s+lefts+joins+|s+rights+joins+|s+inners+joins+)"));
segments.add(new SqlSegment("(where)(.+?)(group\\s+by |having| order\\s+by | ENDOFSQL)", "(and|or)"));
segments.add(new SqlSegment("(group\\s+by)(.+?)(having|order\\s+by| ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(having)(.+?)(order\\s+by| ENDOFSQL)", "(and|or)"));
segments.add(new SqlSegment("(order\\s+by)(.+)( ENDOFSQL)", "[,]"));
}
}
package com.dlink.parser;
/**
* SetSqlParser
*
* @author wenmo
* @since 2021/10/21 18:41
**/
public class SetSqlParser extends BaseSingleSqlParser {
public SetSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
//SET(\s+(\S+)\s*=(.*))?
segments.add(new SqlSegment("(set)\\s+(.+)(\\s*=)", "[.]"));
segments.add(new SqlSegment("(=)\\s*(.*)( ENDOFSQL)", ","));
}
}
package com.dlink.parser;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* SingleSqlParserFactory
*
* @author wenmo
* @since 2021/6/14 16:49
*/
public class SingleSqlParserFactory {
public static Map<String,List<String>> generateParser(String sql) {
BaseSingleSqlParser tmp = null;
// sql = sql.replace("\n"," ").replaceAll("\\s{1,}", " ") +" ENDOFSQL";
sql = sql.replace("\r\n"," ").replace("\n"," ") +" ENDOFSQL";
if (contains(sql, "(insert\\s+into)(.+)(select)(.+)(from)(.+)")) {
tmp = new InsertSelectSqlParser(sql);
} else if (contains(sql, "(create\\s+aggtable)(.+)(as\\s+select)(.+)")) {
tmp = new CreateAggTableSelectSqlParser(sql);
} else if (contains(sql, "(select)(.+)(from)(.+)")) {
tmp = new SelectSqlParser(sql);
} else if (contains(sql, "(delete\\s+from)(.+)")) {
tmp = new DeleteSqlParser(sql);
} else if (contains(sql, "(update)(.+)(set)(.+)")) {
tmp = new UpdateSqlParser(sql);
} else if (contains(sql, "(insert\\s+into)(.+)(values)(.+)")) {
tmp = new InsertSqlParser(sql);
} else if (contains(sql, "(create\\s+table)(.+)")) {
} else if (contains(sql, "(create\\s+database)(.+)")) {
} else if (contains(sql, "(show\\s+databases)")) {
} else if (contains(sql, "(use)(.+)")) {
} else if (contains(sql, "(set)(.+)")) {
tmp = new SetSqlParser(sql);
} else {
}
return tmp.splitSql2Segment();
}
/**
* 看word是否在lineText中存在,支持正则表达式
*
* @param sql:要解析的sql语句
* @param regExp:正则表达式
* @return
**/
private static boolean contains(String sql, String regExp) {
Pattern pattern = Pattern.compile(regExp, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(sql);
return matcher.find();
}
}
package com.dlink.parser;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* SqlSegment
*
* @author wenmo
* @since 2021/6/14 16:12
*/
public class SqlSegment {
private static final String Crlf = "|";
@SuppressWarnings("unused")
private static final String FourSpace = "  ";
/**
* Sql语句片段类型,大写
**/
private String type;
/**
* Sql语句片段开头部分
**/
private String start;
/**
* Sql语句片段中间部分
**/
private String body;
/**
* Sql语句片段结束部分
**/
private String end;
/**
* 用于分割中间部分的正则表达式
**/
private String bodySplitPattern;
/**
* 表示片段的正则表达式
**/
private String segmentRegExp;
/**
* 分割后的Body小片段
**/
private List<String> bodyPieces;
/**
* 构造函数
*
* @param segmentRegExp 表示这个Sql片段的正则表达式
* @param bodySplitPattern 用于分割body的正则表达式
**/
public SqlSegment(String segmentRegExp, String bodySplitPattern) {
this.type = "";
this.start = "";
this.body = "";
this.end = "";
this.segmentRegExp = segmentRegExp;
this.bodySplitPattern = bodySplitPattern;
this.bodyPieces = new ArrayList<String>();
}
/**
* 从sql中查找符合segmentRegExp的部分,并赋值到start,body,end等三个属性中
**/
public void parse(String sql) {
Pattern pattern = Pattern.compile(segmentRegExp, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(sql);
while (matcher.find()) {
start = matcher.group(1);
body = matcher.group(2);
end = matcher.group(3);
type = start.replace("\n"," ").replaceAll("\\s{1,}", " ").toUpperCase();
parseBody();
}
}
/**
* 解析body部分
**/
private void parseBody() {
List<String> ls = new ArrayList<String>();
Pattern p = Pattern.compile(bodySplitPattern, Pattern.CASE_INSENSITIVE);
body = body.trim();
Matcher m = p.matcher(body);
StringBuffer sb = new StringBuffer();
boolean result = m.find();
while (result) {
m.appendReplacement(sb, Crlf);
result = m.find();
}
m.appendTail(sb);
//ls.add(start);
String[] arr = sb.toString().split("[|]");
int arrLength = arr.length;
for (int i = 0; i < arrLength; i++) {
ls.add(arr[i]);
}
bodyPieces = ls;
}
/**
* 取得解析好的Sql片段
**/
public String getParsedSqlSegment() {
StringBuffer sb = new StringBuffer();
sb.append(start + Crlf);
for (String piece : bodyPieces) {
sb.append(piece + Crlf);
}
return sb.toString();
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getStart() {
return start;
}
public void setStart(String start) {
this.start = start;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
public String getEnd() {
return end;
}
public void setEnd(String end) {
this.end = end;
}
public String getBodySplitPattern() {
return bodySplitPattern;
}
public void setBodySplitPattern(String bodySplitPattern) {
this.bodySplitPattern = bodySplitPattern;
}
public String getSegmentRegExp() {
return segmentRegExp;
}
public void setSegmentRegExp(String segmentRegExp) {
this.segmentRegExp = segmentRegExp;
}
public List<String> getBodyPieces() {
return bodyPieces;
}
public void setBodyPieces(List<String> bodyPieces) {
this.bodyPieces = bodyPieces;
}
}
package com.dlink.parser;
/**
* SqlType
*
* @author wenmo
* @since 2021/7/3 11:11
*/
public enum SqlType {
SELECT("SELECT"),
CREATE("CREATE"),
DROP("DROP"),
ALTER("ALTER"),
INSERT("INSERT"),
DESCRIBE("DESCRIBE"),
EXPLAIN("EXPLAIN"),
USE("USE"),
SHOW("SHOW"),
LOAD("LOAD"),
UNLOAD("UNLOAD"),
SET("SET"),
RESET("RESET"),
UNKNOWN("UNKNOWN"),
;
private String type;
SqlType(String type) {
this.type = type;
}
public void setType(String type) {
this.type = type;
}
public String getType() {
return type;
}
public boolean equalsValue(String value){
return type.equalsIgnoreCase(value);
}
}
package com.dlink.parser;
/**
* UpdateSqlParser
*
* @author wenmo
* @since 2021/6/14 16:52
*/
public class UpdateSqlParser extends BaseSingleSqlParser {
public UpdateSqlParser(String originalSql) {
super(originalSql);
}
@Override
protected void initializeSegments() {
segments.add(new SqlSegment("(update)(.+)(set)", "[,]"));
segments.add(new SqlSegment("(set)(.+?)( where | ENDOFSQL)", "[,]"));
segments.add(new SqlSegment("(where)(.+)(ENDOFSQL)", "(and|or)"));
}
}
......@@ -23,6 +23,9 @@ public class InsertResult extends AbstractResult implements IResult {
this.endTime = LocalDateTime.now();
}
public static InsertResult success(String jobID){
return new InsertResult(jobID,true);
}
@Override
public String getJobId() {
return jobID;
......
......@@ -12,6 +12,7 @@ import com.dlink.result.SubmitResult;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
......@@ -62,9 +63,9 @@ public class JobManagerTest {
@Test
public void cancelJobSelect(){
JobConfig config = new JobConfig(true, true, "s1", true, 2,
null, "测试", false, 100, 0,
1, null);
JobConfig config = new JobConfig("session-yarn",true, true, "s1", true, 2,
null, null, "测试", false,false, 100, 0,
1, null,new HashMap<>());
if(config.isUseRemote()) {
config.setAddress("192.168.123.157:8081");
}
......
......@@ -328,7 +328,7 @@ CREATE TABLE `dlink_history` (
INDEX `cluster_index`(`cluster_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '执行历史' ROW_FORMAT = Dynamic;
ALTER TABLE `dlink`.`dlink_task`
ALTER TABLE `dlink_task`
ADD COLUMN `config` text NULL COMMENT '配置' AFTER `cluster_id`;
-- ----------------------------
......@@ -358,10 +358,10 @@ CREATE TABLE `dlink_database` (
UNIQUE INDEX `db_index`(`name`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
ALTER TABLE `dlink`.`dlink_cluster`
ALTER TABLE `dlink_cluster`
ADD COLUMN `version` varchar(20) NULL COMMENT '版本' AFTER `job_manager_host`;
ALTER TABLE `dlink`.`dlink_flink_document`
ALTER TABLE `dlink_flink_document`
ADD COLUMN `fill_value` varchar(255) NULL COMMENT '填充值' AFTER `description`;
update dlink_flink_document set fill_value=name;
......@@ -397,10 +397,34 @@ CREATE TABLE `dlink_jar` (
PRIMARY KEY (`id`)
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
ALTER TABLE `dlink`.`dlink_task`
ALTER TABLE `dlink_task`
ADD COLUMN `cluster_configuration_id` int(11) NULL COMMENT '集群配置ID' AFTER `cluster_id`;
ALTER TABLE `dlink`.`dlink_task`
ALTER TABLE `dlink_task`
ADD COLUMN `statement_set` tinyint(1) NULL COMMENT '启用语句集' AFTER `fragment`;
alter table dlink_history
add cluster_configuration_id int(11) null COMMENT '集群配置ID' after cluster_id;
CREATE TABLE `dlink_sys_config` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '配置名',
`value` text CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '值',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
alter table dlink_cluster
add auto_registers tinyint(1) default 0 null comment '是否自动注册' after note;
update dlink_cluster set type ='yarn-session' where type ='Yarn';
update dlink_cluster set type ='standalone' where type ='Standalone';
ALTER TABLE `dlink_cluster`
ADD COLUMN `cluster_configuration_id` int(11) NULL COMMENT '集群配置ID' AFTER `auto_registers`;
ALTER TABLE `dlink_cluster`
ADD COLUMN `task_id` int(11) NULL COMMENT '任务ID' AFTER `cluster_configuration_id`;
SET FOREIGN_KEY_CHECKS = 1;
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
import {Button, Tag, Space, Typography, Divider, Badge, Drawer, Modal,} from 'antd';
import {MessageOutlined,ClusterOutlined,FireOutlined,ReloadOutlined} from "@ant-design/icons";
import {MessageOutlined,ClusterOutlined,FireOutlined,ReloadOutlined,RocketOutlined} from "@ant-design/icons";
import ProList from '@ant-design/pro-list';
import {handleRemove, queryData} from "@/components/Common/crud";
import ProDescriptions from '@ant-design/pro-descriptions';
......@@ -31,6 +31,7 @@ type HistoryItem = {
error: string;
result: string;
config: string;
type: string;
startTime: string;
endTime: string;
taskId: number;
......@@ -121,11 +122,10 @@ const StudioHistory = (props: any) => {
description: {
search: false,
render:(_, row)=>{
let jobConfig = JSON.parse(row.config);
return (<Paragraph>
<blockquote>
<Link href={`http://${jobConfig.host}`} target="_blank">
[{jobConfig.session}:{row.jobManagerAddress}]
<Link href={`http://${row.jobManagerAddress}`} target="_blank">
[{row.jobManagerAddress}]
</Link>
<Divider type="vertical"/>开始于:{row.startTime}
<Divider type="vertical"/>完成于:{row.endTime}
......@@ -151,9 +151,14 @@ const StudioHistory = (props: any) => {
<Tag color="green" key={row.clusterAlias}>
<ClusterOutlined /> {row.clusterAlias}
</Tag>
):(<Tag color="blue" key={row.clusterAlias}>
):(<Tag color="green" key={row.clusterAlias}>
<ClusterOutlined /> 本地环境
</Tag>)}
{row.type?(
<Tag color="blue" key={row.type}>
<RocketOutlined /> {row.type}
</Tag>
):''}
{(row.status==2) ?
(<><Badge status="success"/><Text type="success">SUCCESS</Text></>):
(row.status==1) ?
......
......@@ -12,8 +12,8 @@ const StudioMsg = (props:any) => {
return (
<Typography>
{current.console.result.jobConfig?(<Paragraph>
<blockquote><Link href={`http://${current.console.result.jobConfig.host}`} target="_blank">
[{current.console.result.jobConfig.session}:{current.console.result.jobConfig.host}]
<blockquote><Link href={`http://${current.console.result.jobConfig.address}`} target="_blank">
[{current.console.result.jobConfig.session}:{current.console.result.jobConfig.address}]
</Link> <Divider type="vertical"/>{current.console.result.startTime}
<Divider type="vertical"/>{current.console.result.endTime}
<Divider type="vertical"/>
......
......@@ -109,6 +109,16 @@ export function showCluster(dispatch: any) {
});
});
}
/*--- 刷新 Session集群 ---*/
export function showSessionCluster(dispatch: any) {
const res = getData('api/cluster/listSessionEnable');
res.then((result) => {
result.datas && dispatch && dispatch({
type: "Studio/saveSessionCluster",
payload: result.datas,
});
});
}
/*--- 刷新 数据源 ---*/
export function showDataBase(dispatch: any) {
const res = getData('api/database/listEnabledAll');
......
......@@ -11,16 +11,16 @@ const { Text } = Typography;
const StudioSetting = (props: any) => {
const {cluster,clusterConfiguration,current,form,dispatch,tabs,currentSession} = props;
const {sessionCluster,clusterConfiguration,current,form,dispatch,tabs,currentSession} = props;
const getClusterOptions = ()=>{
let itemList = [(<Option value={0} label={(<><Tag color="default">Local</Tag>本地环境</>)}>
let itemList = [(<Option key={0} value={0} label={(<><Tag color="default">Local</Tag>本地环境</>)}>
<Tag color="default">Local</Tag>
本地环境
</Option>)];
for(let item of cluster){
for(let item of sessionCluster){
let tag =(<><Tag color={item.enabled?"processing":"error"}>{item.type}</Tag>{item.alias}</>);
itemList.push(<Option value={item.id} label={tag}>
itemList.push(<Option key={item.id} value={item.id} label={tag}>
{tag}
</Option>)
}
......@@ -31,23 +31,21 @@ const StudioSetting = (props: any) => {
let itemList = [];
for(let item of clusterConfiguration){
let tag =(<><Tag color={item.enabled?"processing":"error"}>{item.type}</Tag>{item.alias}</>);
itemList.push(<Option value={item.id} label={tag}>
itemList.push(<Option key={item.id} value={item.id} label={tag}>
{tag}
</Option>)
}
return itemList;
};
useEffect(()=>{
form.setFieldsValue(current.task);
},[])
form.setFieldsValue(current.task);
const onValuesChange = (change:any,all:any)=>{
let newTabs = tabs;
for(let i=0;i<newTabs.panes.length;i++){
if(newTabs.panes[i].key==newTabs.activeKey){
for(let key in change){
newTabs.panes[i].task[key]=change[key];
newTabs.panes[i].task[key]=all[key];
}
break;
}
......@@ -224,7 +222,7 @@ const StudioSetting = (props: any) => {
};
export default connect(({Studio}: { Studio: StateType }) => ({
cluster: Studio.cluster,
sessionCluster: Studio.sessionCluster,
clusterConfiguration: Studio.clusterConfiguration,
current: Studio.current,
tabs: Studio.tabs,
......
......@@ -154,7 +154,7 @@ const StudioTree: React.FC<StudioTreeProps> = (props) => {
console:{
result:[],
},
monaco: {},
monaco: React.createRef(),
};
newTabs.activeKey = node.taskId;
newTabs.panes.push(newPane);
......@@ -403,7 +403,6 @@ const StudioTree: React.FC<StudioTreeProps> = (props) => {
setTaskFormValues({});
openByKey(datas.id);
// getTreeData();
// console.log(datas);
// onSelect([],openByKey(datas.id));
}
}}
......
......@@ -13,7 +13,7 @@ import StudioLeftTool from "./StudioLeftTool";
import StudioRightTool from "./StudioRightTool";
import {
listSession, showCluster, showDataBase, getFillAllByVersion,
showClusterConfiguration
showClusterConfiguration,showSessionCluster
} from "@/components/Studio/StudioEvent/DDL";
import {loadSettings} from "@/pages/Settings/function";
......@@ -29,6 +29,7 @@ const Studio: React.FC<StudioProps> = (props) => {
loadSettings(dispatch);
getFillAllByVersion('',dispatch);
showCluster(dispatch);
showSessionCluster(dispatch);
showClusterConfiguration(dispatch);
showDataBase(dispatch);
listSession(dispatch);
......
......@@ -50,10 +50,11 @@ const ClusterForm: React.FC<ClusterFormProps> = (props) => {
name="type"
label="类型"
>
<Select defaultValue="Yarn" allowClear>
<Option value="Standalone">Standalone</Option>
<Option value="Yarn">Yarn</Option>
<Option value="Others">Others</Option>
<Select defaultValue="yarn-session" allowClear>
<Option value="standalone">Standalone</Option>
<Option value="yarn-session">Yarn Session</Option>
<Option value="yarn-per-job">Yarn Per-Job</Option>
<Option value="yarn-application">Yarn Application</Option>
</Select>
</Form.Item>
<Form.Item
......
......@@ -11,7 +11,6 @@ export type UpdateFormProps = {
updateModalVisible: boolean;
values: Partial<ClusterTableListItem>;
};
const FormItem = Form.Item;
const Option = Select.Option;
const formLayout = {
......@@ -48,29 +47,30 @@ const UpdateForm: React.FC<UpdateFormProps> = (props) => {
const renderContent = (formVals) => {
return (
<>
<FormItem
<Form.Item
name="name"
label="名称"
rules={[{required: true, message: '请输入名称!'}]}>
<Input placeholder="请输入"/>
</FormItem>
<FormItem
</Form.Item>
<Form.Item
name="alias"
label="别名"
>
<Input placeholder="请输入"/>
</FormItem>
<FormItem
</Form.Item>
<Form.Item
name="type"
label="类型"
>
<Select defaultValue="Yarn" allowClear>
<Option value="Standalone">Standalone</Option>
<Option value="Yarn">Yarn</Option>
<Option value="Others">Others</Option>
<Select defaultValue="yarn-session" allowClear>
<Option value="standalone">Standalone</Option>
<Option value="yarn-session">Yarn Session</Option>
<Option value="yarn-per-job">Yarn Per-Job</Option>
<Option value="yarn-application">Yarn Application</Option>
</Select>
</FormItem>
<FormItem
</Form.Item>
<Form.Item
name="hosts"
label="JobManager HA 地址"
>
......@@ -78,8 +78,8 @@ const UpdateForm: React.FC<UpdateFormProps> = (props) => {
placeholder="添加 Flink 集群的 JobManager 的 RestApi 地址。当 HA 模式时,地址间用英文逗号分隔,例如:192.168.123.101:8081,192.168.123.102:8081,192.168.123.103:8081"
allowClear
autoSize={{minRows: 3, maxRows: 10}}/>
</FormItem>
<FormItem
</Form.Item>
<Form.Item
name="note"
label="注释"
>
......@@ -87,14 +87,14 @@ const UpdateForm: React.FC<UpdateFormProps> = (props) => {
placeholder="请输入"
allowClear
autoSize={{minRows: 3, maxRows: 10}}/>
</FormItem>
<FormItem
</Form.Item>
<Form.Item
name="enabled"
label="是否启用"
rules={[{required: true, message: '请输入是否启用!'}]}>
<Switch checkedChildren="启用" unCheckedChildren="禁用"
defaultChecked={formVals.enabled}/>
</FormItem>
</Form.Item>
</>
);
};
......
......@@ -17,7 +17,7 @@ import {
handleAddOrUpdate, handleOption, handleRemove, queryData,
updateEnabled
} from "@/components/Common/crud";
import {showCluster} from "@/components/Studio/StudioEvent/DDL";
import {showCluster,showSessionCluster} from "@/components/Studio/StudioEvent/DDL";
const TextArea = Input.TextArea;
const url = '/api/cluster';
......@@ -111,23 +111,28 @@ const ClusterTableList: React.FC<{}> = (props: any) => {
hideInTable: false,
filters: [
{
text: 'Yarn',
value: 'Yarn',
text: 'Yarn Session',
value: 'yarn-session',
},
{
text: 'Standalone',
value: 'Standalone',
value: 'standalone',
},
{
text: 'Others',
value: 'Others',
text: 'Yarn Per-Job',
value: 'yarn-per-job',
},
{
text: 'Yarn Application',
value: 'yarn-application',
},
],
filterMultiple: false,
valueEnum: {
'Yarn': {text: 'Yarn'},
'Standalone': {text: 'Standalone'},
'Others': {text: 'Others'},
'yarn-session': {text: 'Yarn Session'},
'standalone': {text: 'Standalone'},
'yarn-per-job': {text: 'Yarn Per-Job'},
'yarn-application': {text: 'Yarn Application'},
},
},
{
......@@ -210,6 +215,28 @@ const ClusterTableList: React.FC<{}> = (props: any) => {
false: {text: '已禁用', status: 'Error'},
},
},
{
title: '注册方式',
dataIndex: 'autoRegisters',
hideInForm: true,
hideInSearch: true,
hideInTable: false,
filters: [
{
text: '自动',
value: 1,
},
{
text: '手动',
value: 0,
},
],
filterMultiple: false,
valueEnum: {
true: {text: '自动', status: 'Success'},
false: {text: '手动', status: 'Error'},
},
},
{
title: '创建时间',
dataIndex: 'createTime',
......@@ -356,6 +383,7 @@ const ClusterTableList: React.FC<{}> = (props: any) => {
actionRef.current.reload();
}
showCluster(dispatch);
showSessionCluster(dispatch);
}
}}
rowKey="id"
......@@ -374,6 +402,7 @@ const ClusterTableList: React.FC<{}> = (props: any) => {
actionRef.current.reload();
}
showCluster(dispatch);
showSessionCluster(dispatch);
}
}}
onCancel={() => {
......
......@@ -120,6 +120,7 @@ export type SessionType = {
export type StateType = {
cluster?: ClusterType[];
sessionCluster?: ClusterType[];
clusterConfiguration?: ClusterConfigurationType[];
database?: DataBaseType[];
currentSession?: SessionType;
......@@ -156,6 +157,7 @@ export type ModelType = {
quitCurrentSession: Reducer<StateType>;
saveResult: Reducer<StateType>;
saveCluster: Reducer<StateType>;
saveSessionCluster: Reducer<StateType>;
saveClusterConfiguration: Reducer<StateType>;
saveDataBase: Reducer<StateType>;
};
......@@ -165,6 +167,7 @@ const Model: ModelType = {
namespace: 'Studio',
state: {
cluster: [],
sessionCluster: [],
clusterConfiguration: [],
database: [],
currentSession: {
......@@ -178,6 +181,7 @@ const Model: ModelType = {
path: ['草稿'],
task: {
jobName: '草稿',
// type: 'standalone',
checkPoint: 0,
savePointPath: '',
parallelism: 1,
......@@ -214,6 +218,7 @@ const Model: ModelType = {
path: ['草稿'],
task: {
jobName: '草稿',
// type: 'standalone',
checkPoint: 0,
savePointPath: '',
parallelism: 1,
......@@ -424,6 +429,11 @@ const Model: ModelType = {
...state,
cluster: payload,
};
},saveSessionCluster(state, {payload}) {
return {
...state,
sessionCluster: payload,
};
},saveClusterConfiguration(state, {payload}) {
return {
...state,
......
......@@ -387,6 +387,9 @@ export default (): React.ReactNode => {
<li>
<Link>新增 yarn-application 的sql作业提交方式</Link>
</li>
<li>
<Link>新增 yarn-application 和 yarn-application 集群的自动注册</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment