Unverified Commit 1039be7f authored by George zhao's avatar George zhao Committed by GitHub

Merge branch 'DataLinkDC:dev' into dev

parents 0ec79708 2da5c901
...@@ -95,7 +95,9 @@ body: ...@@ -95,7 +95,9 @@ body:
Which version of Dinky are you running? We only accept bugs report from the LTS projects. Which version of Dinky are you running? We only accept bugs report from the LTS projects.
options: options:
- dev - dev
- 0.6.1-SNAPSHOT - 0.6.3-SNAPSHOT
- 0.6.2
- 0.6.1
- 0.6.0 - 0.6.0
- 0.5.1 - 0.5.1
- 0.5.0 - 0.5.0
......
...@@ -17,7 +17,7 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探 ...@@ -17,7 +17,7 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探
## 特点 ## 特点
一个 `开箱即用``易扩展` `一站式` 实时计算平台,以 `Apache Flink` 为基础,连接 `OLAP``数据湖` 等众多框架,致力于 `流批一体``湖仓一体` 的建设与实践。 一个 `开箱即用``易扩展` ,以 `Apache Flink` 为基础,连接 `OLAP``数据湖` 等众多框架的 `一站式` 实时计算平台,致力于 `流批一体``湖仓一体` 的建设与实践。
其主要目标如下: 其主要目标如下:
...@@ -92,17 +92,12 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探 ...@@ -92,17 +92,12 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探
## 近期计划 ## 近期计划
- 多租户及命名空间 - [ ] 多租户及命名空间
- [ ] 全局血缘与影响分析
- 全局血缘与影响分析 - [ ] 统一元数据管理
- [ ] Flink 元数据持久化
- 统一元数据管理 - [ ] 多版本 Flink-Client Server
- [ ] 整库千表同步
- Flink 元数据持久化
- 多版本 Flink-Client Server
- 整库千表同步
## 参与贡献 ## 参与贡献
......
...@@ -17,7 +17,7 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探 ...@@ -17,7 +17,7 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探
## 特点 ## 特点
一个 `开箱即用``易扩展` `一站式` 实时计算平台,以 `Apache Flink` 为基础,连接 `OLAP``数据湖` 等众多框架,致力于 `流批一体``湖仓一体` 的建设与实践。 一个 `开箱即用``易扩展` ,以 `Apache Flink` 为基础,连接 `OLAP``数据湖` 等众多框架的 `一站式` 实时计算平台,致力于 `流批一体``湖仓一体` 的建设与实践。
其主要目标如下: 其主要目标如下:
......
...@@ -17,7 +17,7 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探 ...@@ -17,7 +17,7 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探
## 特点 ## 特点
一个 `开箱即用``易扩展` `一站式` 实时计算平台,以 `Apache Flink` 为基础,连接 `OLAP``数据湖` 等众多框架,致力于 `流批一体``湖仓一体` 的建设与实践。 一个 `开箱即用``易扩展` ,以 `Apache Flink` 为基础,连接 `OLAP``数据湖` 等众多框架的 `一站式` 实时计算平台,致力于 `流批一体``湖仓一体` 的建设与实践。
其主要目标如下: 其主要目标如下:
...@@ -92,17 +92,12 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探 ...@@ -92,17 +92,12 @@ Dinky 基于 Apache Flink 实现 Dlink ,增强 Flink 的应用与体验,探
## 近期计划 ## 近期计划
- 多租户及命名空间 - [ ] 多租户及命名空间
- [ ] 全局血缘与影响分析
- 全局血缘与影响分析 - [ ] 统一元数据管理
- [ ] Flink 元数据持久化
- 统一元数据管理 - [ ] 多版本 Flink-Client Server
- [ ] 整库千表同步
- Flink 元数据持久化
- 多版本 Flink-Client Server
- 整库千表同步
## 参与贡献 ## 参与贡献
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>dlink</artifactId> <artifactId>dlink</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<version>0.6.1</version> <version>0.6.2</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
...@@ -88,6 +88,10 @@ ...@@ -88,6 +88,10 @@
<groupId>com.nepxion</groupId> <groupId>com.nepxion</groupId>
<artifactId>banner</artifactId> <artifactId>banner</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.springframework</groupId> <groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId> <artifactId>spring-context-support</artifactId>
......
package com.dlink.controller; package com.dlink.controller;
import com.dlink.alert.AlertPool; import com.dlink.alert.AlertPool;
import com.dlink.alert.AlertResult;
import com.dlink.common.result.ProTableResult; import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result; import com.dlink.common.result.Result;
import com.dlink.model.AlertInstance; import com.dlink.model.AlertInstance;
import com.dlink.service.AlertInstanceService; import com.dlink.service.AlertInstanceService;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
...@@ -92,4 +89,17 @@ public class AlertInstanceController { ...@@ -92,4 +89,17 @@ public class AlertInstanceController {
public Result listEnabledAll() { public Result listEnabledAll() {
return Result.succeed(alertInstanceService.listEnabledAll(), "获取成功"); return Result.succeed(alertInstanceService.listEnabledAll(), "获取成功");
} }
/**
* 发送告警实例的测试信息
*/
@PostMapping("/sendTest")
public Result sendTest(@RequestBody AlertInstance alertInstance) throws Exception {
AlertResult alertResult = alertInstanceService.testAlert(alertInstance);
if (alertResult.getSuccess()) {
return Result.succeed("发送成功");
} else {
return Result.failed("发送失败");
}
}
} }
...@@ -85,11 +85,10 @@ public class CatalogueController { ...@@ -85,11 +85,10 @@ public class CatalogueController {
private String getFileText(File sourceFile) { private String getFileText(File sourceFile) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
BufferedReader br = null; try (InputStreamReader isr = new InputStreamReader(new FileInputStream(sourceFile));
try { BufferedReader br = new BufferedReader(isr);) {
if (sourceFile.isFile() && sourceFile.exists()) { if (sourceFile.isFile() && sourceFile.exists()) {
InputStreamReader isr = new InputStreamReader(new FileInputStream(sourceFile));
br = new BufferedReader(isr);
String lineText = null; String lineText = null;
while ((lineText = br.readLine()) != null) { while ((lineText = br.readLine()) != null) {
sb.append(lineText).append("\n"); sb.append(lineText).append("\n");
...@@ -97,14 +96,6 @@ public class CatalogueController { ...@@ -97,14 +96,6 @@ public class CatalogueController {
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} }
return sb.toString(); return sb.toString();
} }
......
package com.dlink.controller; package com.dlink.controller;
import com.dlink.assertion.Asserts;
import com.dlink.common.result.ProTableResult; import com.dlink.common.result.ProTableResult;
import com.dlink.common.result.Result; import com.dlink.common.result.Result;
import com.dlink.gateway.result.TestResult; import com.dlink.gateway.result.TestResult;
...@@ -38,11 +39,11 @@ public class ClusterConfigurationController { ...@@ -38,11 +39,11 @@ public class ClusterConfigurationController {
@PutMapping @PutMapping
public Result saveOrUpdate(@RequestBody ClusterConfiguration clusterConfiguration) { public Result saveOrUpdate(@RequestBody ClusterConfiguration clusterConfiguration) {
TestResult testResult = clusterConfigurationService.testGateway(clusterConfiguration); TestResult testResult = clusterConfigurationService.testGateway(clusterConfiguration);
clusterConfiguration.setAvailable(testResult.isAvailable()); clusterConfiguration.setIsAvailable(testResult.isAvailable());
if (clusterConfigurationService.saveOrUpdate(clusterConfiguration)) { if (clusterConfigurationService.saveOrUpdate(clusterConfiguration)) {
return Result.succeed("新增成功"); return Result.succeed(Asserts.isNotNull(clusterConfiguration.getId()) ? "修改成功" : "新增成功");
} else { } else {
return Result.failed("新增失败"); return Result.failed(Asserts.isNotNull(clusterConfiguration.getId()) ? "修改失败" : "新增失败");
} }
} }
......
...@@ -9,7 +9,9 @@ import com.dlink.job.JobResult; ...@@ -9,7 +9,9 @@ import com.dlink.job.JobResult;
import com.dlink.result.IResult; import com.dlink.result.IResult;
import com.dlink.service.StudioService; import com.dlink.service.StudioService;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
...@@ -60,7 +62,11 @@ public class StudioController { ...@@ -60,7 +62,11 @@ public class StudioController {
*/ */
@PostMapping("/getJobPlan") @PostMapping("/getJobPlan")
public Result getJobPlan(@RequestBody StudioExecuteDTO studioExecuteDTO) { public Result getJobPlan(@RequestBody StudioExecuteDTO studioExecuteDTO) {
try {
return Result.succeed(studioService.getJobPlan(studioExecuteDTO), "获取作业计划成功"); return Result.succeed(studioService.getJobPlan(studioExecuteDTO), "获取作业计划成功");
} catch (Exception e) {
return Result.failed("目前只支持获取 INSERT 语句的作业计划");
}
} }
/** /**
......
package com.dlink.exception; package com.dlink.exception;
import cn.dev33.satoken.exception.NotLoginException; import cn.dev33.satoken.exception.NotLoginException;
import com.dlink.common.result.Result; import com.dlink.common.result.Result;
import com.dlink.model.CodeEnum; import com.dlink.model.CodeEnum;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.bind.annotation.ControllerAdvice;
...@@ -41,6 +43,6 @@ public class WebExceptionHandler { ...@@ -41,6 +43,6 @@ public class WebExceptionHandler {
@ExceptionHandler @ExceptionHandler
public Result unknownException(Exception e) { public Result unknownException(Exception e) {
logger.error("ERROR:", e); logger.error("ERROR:", e);
return Result.failed("系统出现错误, 请联系平台管理员!"); return Result.failed(e.getMessage());
} }
} }
...@@ -35,7 +35,7 @@ public class Cluster extends SuperEntity { ...@@ -35,7 +35,7 @@ public class Cluster extends SuperEntity {
private String note; private String note;
private boolean autoRegisters; private Boolean autoRegisters;
private Integer clusterConfigurationId; private Integer clusterConfigurationId;
......
...@@ -33,7 +33,7 @@ public class ClusterConfiguration extends SuperEntity { ...@@ -33,7 +33,7 @@ public class ClusterConfiguration extends SuperEntity {
private String configJson; private String configJson;
private boolean isAvailable; private Boolean isAvailable;
private String note; private String note;
......
...@@ -44,7 +44,7 @@ public class DataBase extends SuperEntity { ...@@ -44,7 +44,7 @@ public class DataBase extends SuperEntity {
private String dbVersion; private String dbVersion;
private boolean status; private Boolean status;
private LocalDateTime healthTime; private LocalDateTime healthTime;
......
...@@ -8,6 +8,7 @@ import com.dlink.db.model.SuperEntity; ...@@ -8,6 +8,7 @@ import com.dlink.db.model.SuperEntity;
import com.dlink.job.JobConfig; import com.dlink.job.JobConfig;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
...@@ -15,6 +16,7 @@ import java.util.ArrayList; ...@@ -15,6 +16,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
/** /**
* 任务 * 任务
...@@ -44,11 +46,11 @@ public class Task extends SuperEntity { ...@@ -44,11 +46,11 @@ public class Task extends SuperEntity {
private Integer parallelism; private Integer parallelism;
private boolean fragment; private Boolean fragment;
private boolean statementSet; private Boolean statementSet;
private boolean batchModel; private Boolean batchModel;
private Integer clusterId; private Integer clusterId;
...@@ -102,8 +104,10 @@ public class Task extends SuperEntity { ...@@ -102,8 +104,10 @@ public class Task extends SuperEntity {
} }
Map<String, String> map = new HashMap<>(); Map<String, String> map = new HashMap<>();
for (Map<String, String> item : config) { for (Map<String, String> item : config) {
if (Asserts.isNotNull(item)) {
map.put(item.get("key"), item.get("value")); map.put(item.get("key"), item.get("value"));
} }
}
return new JobConfig(type, step, false, false, useRemote, clusterId, clusterConfigurationId, jarId, getId(), return new JobConfig(type, step, false, false, useRemote, clusterId, clusterConfigurationId, jarId, getId(),
alias, fragment, statementSet, batchModel, checkPoint, parallelism, savePointStrategy, savePointPath, map); alias, fragment, statementSet, batchModel, checkPoint, parallelism, savePointStrategy, savePointPath, map);
} }
......
...@@ -38,9 +38,9 @@ public class User implements Serializable { ...@@ -38,9 +38,9 @@ public class User implements Serializable {
private String mobile; private String mobile;
private boolean enabled; private Boolean enabled;
private boolean isDelete; private Boolean isDelete;
@TableField(fill = FieldFill.INSERT) @TableField(fill = FieldFill.INSERT)
private LocalDateTime createTime; private LocalDateTime createTime;
...@@ -49,5 +49,5 @@ public class User implements Serializable { ...@@ -49,5 +49,5 @@ public class User implements Serializable {
private LocalDateTime updateTime; private LocalDateTime updateTime;
@TableField(exist = false) @TableField(exist = false)
private boolean isAdmin; private Boolean isAdmin;
} }
package com.dlink.service; package com.dlink.service;
import com.dlink.alert.AlertResult;
import com.dlink.db.service.ISuperService; import com.dlink.db.service.ISuperService;
import com.dlink.model.AlertInstance; import com.dlink.model.AlertInstance;
...@@ -14,4 +15,6 @@ import java.util.List; ...@@ -14,4 +15,6 @@ import java.util.List;
public interface AlertInstanceService extends ISuperService<AlertInstance> { public interface AlertInstanceService extends ISuperService<AlertInstance> {
List<AlertInstance> listEnabledAll(); List<AlertInstance> listEnabledAll();
AlertResult testAlert(AlertInstance alertInstance);
} }
package com.dlink.service.impl; package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.alert.*;
import com.dlink.db.service.impl.SuperServiceImpl; import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.mapper.AlertInstanceMapper; import com.dlink.mapper.AlertInstanceMapper;
import com.dlink.model.AlertInstance; import com.dlink.model.AlertInstance;
import com.dlink.service.AlertInstanceService; import com.dlink.service.AlertInstanceService;
import com.dlink.utils.JSONUtil;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List; import java.util.List;
import java.util.UUID;
/** /**
* AlertInstanceServiceImpl * AlertInstanceServiceImpl
...@@ -21,4 +27,21 @@ public class AlertInstanceServiceImpl extends SuperServiceImpl<AlertInstanceMapp ...@@ -21,4 +27,21 @@ public class AlertInstanceServiceImpl extends SuperServiceImpl<AlertInstanceMapp
public List<AlertInstance> listEnabledAll() { public List<AlertInstance> listEnabledAll() {
return list(new QueryWrapper<AlertInstance>().eq("enabled", 1)); return list(new QueryWrapper<AlertInstance>().eq("enabled", 1));
} }
@Override
public AlertResult testAlert(AlertInstance alertInstance) {
AlertConfig alertConfig = AlertConfig.build(alertInstance.getName(), alertInstance.getType(), JSONUtil.toMap(alertInstance.getParams()));
Alert alert = Alert.buildTest(alertConfig);
String currentDateTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
String testSendMsg = "[{\"type\":\"Flink 实时监控\"," +
"\"time\":\"" + currentDateTime + "\"," +
"\"id\":\"" + UUID.randomUUID() + "\"," +
"\"name\":\"此信息仅用于测试告警信息是否发送正常 ! 请忽略此信息!\"," +
"\"status\":\"Test\"," +
"\"content\" :\"" + UUID.randomUUID() + "\"}]";
List<AlertMsg> lists = JSONUtil.toList(testSendMsg, AlertMsg.class);
String title = "任务【测试任务】:" + alertInstance.getType() + " 报警 !";
String content = JSONUtil.toJsonString(lists);
return alert.send(title, content);
}
} }
...@@ -30,6 +30,7 @@ import com.dlink.result.SqlExplainResult; ...@@ -30,6 +30,7 @@ import com.dlink.result.SqlExplainResult;
import com.dlink.service.*; import com.dlink.service.*;
import com.dlink.utils.CustomStringJavaCompiler; import com.dlink.utils.CustomStringJavaCompiler;
import com.dlink.utils.JSONUtil; import com.dlink.utils.JSONUtil;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -222,11 +223,11 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -222,11 +223,11 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
if (statement != null) { if (statement != null) {
task.setStatement(statement.getStatement()); task.setStatement(statement.getStatement());
} }
if (Asserts.isNull(task.getJobInstanceId()) || task.getJobInstanceId() == 0) {
JobInstance jobInstance = jobInstanceService.getJobInstanceByTaskId(id); JobInstance jobInstance = jobInstanceService.getJobInstanceByTaskId(id);
if (Asserts.isNotNull(jobInstance) && !JobStatus.isDone(jobInstance.getStatus())) { if (Asserts.isNotNull(jobInstance) && !JobStatus.isDone(jobInstance.getStatus())) {
task.setJobInstanceId(jobInstance.getId()); task.setJobInstanceId(jobInstance.getId());
} }else {
task.setJobInstanceId(0);
} }
} }
return task; return task;
...@@ -462,7 +463,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -462,7 +463,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return jobManager.cancel(jobId); return jobManager.cancel(jobId);
} }
SavePointResult savePointResult = jobManager.savepoint(jobId, savePointType, null); SavePointResult savePointResult = jobManager.savepoint(jobId, savePointType, null);
if (Asserts.isNotNull(savePointResult)) { if (Asserts.isNotNull(savePointResult.getJobInfos())) {
for (JobInfo item : savePointResult.getJobInfos()) { for (JobInfo item : savePointResult.getJobInfos()) {
if (Asserts.isEqualsIgnoreCase(jobId, item.getJobId()) && Asserts.isNotNull(jobConfig.getTaskId())) { if (Asserts.isEqualsIgnoreCase(jobId, item.getJobId()) && Asserts.isNotNull(jobConfig.getTaskId())) {
Savepoints savepoints = new Savepoints(); Savepoints savepoints = new Savepoints();
...@@ -486,7 +487,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -486,7 +487,7 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private JobConfig buildJobConfig(Task task) { private JobConfig buildJobConfig(Task task) {
boolean isJarTask = Dialect.FLINKJAR.equalsVal(task.getDialect()); boolean isJarTask = Dialect.FLINKJAR.equalsVal(task.getDialect());
if (!isJarTask && task.isFragment()) { if (!isJarTask && task.getFragment()) {
String flinkWithSql = dataBaseService.getEnabledFlinkWithSql(); String flinkWithSql = dataBaseService.getEnabledFlinkWithSql();
if (Asserts.isNotNullString(flinkWithSql)) { if (Asserts.isNotNullString(flinkWithSql)) {
task.setStatement(flinkWithSql + "\r\n" + task.getStatement()); task.setStatement(flinkWithSql + "\r\n" + task.getStatement());
...@@ -526,15 +527,18 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen ...@@ -526,15 +527,18 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Savepoints latestSavepoints = savepointsService.getLatestSavepointByTaskId(task.getId()); Savepoints latestSavepoints = savepointsService.getLatestSavepointByTaskId(task.getId());
if (Asserts.isNotNull(latestSavepoints)) { if (Asserts.isNotNull(latestSavepoints)) {
config.setSavePointPath(latestSavepoints.getPath()); config.setSavePointPath(latestSavepoints.getPath());
config.getConfig().put("execution.savepoint.path", latestSavepoints.getPath());
} }
break; break;
case EARLIEST: case EARLIEST:
Savepoints earliestSavepoints = savepointsService.getEarliestSavepointByTaskId(task.getId()); Savepoints earliestSavepoints = savepointsService.getEarliestSavepointByTaskId(task.getId());
if (Asserts.isNotNull(earliestSavepoints)) { if (Asserts.isNotNull(earliestSavepoints)) {
config.setSavePointPath(earliestSavepoints.getPath()); config.setSavePointPath(earliestSavepoints.getPath());
config.getConfig().put("execution.savepoint.path", earliestSavepoints.getPath());
} }
break; break;
case CUSTOM: case CUSTOM:
config.getConfig().put("execution.savepoint.path", config.getSavePointPath());
break; break;
default: default:
config.setSavePointPath(null); config.setSavePointPath(null);
......
...@@ -33,7 +33,7 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen ...@@ -33,7 +33,7 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen
} }
user.setPassword(SaSecureUtil.md5(user.getPassword())); user.setPassword(SaSecureUtil.md5(user.getPassword()));
user.setEnabled(true); user.setEnabled(true);
user.setDelete(false); user.setIsDelete(false);
if (save(user)) { if (save(user)) {
return Result.succeed("注册成功"); return Result.succeed("注册成功");
} else { } else {
...@@ -69,7 +69,7 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen ...@@ -69,7 +69,7 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen
public boolean removeUser(Integer id) { public boolean removeUser(Integer id) {
User user = new User(); User user = new User();
user.setId(id); user.setId(id);
user.setDelete(true); user.setIsDelete(true);
return updateById(user); return updateById(user);
} }
...@@ -84,10 +84,10 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen ...@@ -84,10 +84,10 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen
return Result.failed("密码不能为空"); return Result.failed("密码不能为空");
} }
if (Asserts.isEquals(SaSecureUtil.md5(password), userPassword)) { if (Asserts.isEquals(SaSecureUtil.md5(password), userPassword)) {
if (user.isDelete()) { if (user.getIsDelete()) {
return Result.failed("账号不存在"); return Result.failed("账号不存在");
} }
if (!user.isEnabled()) { if (!user.getEnabled()) {
return Result.failed("账号已被禁用"); return Result.failed("账号已被禁用");
} }
StpUtil.login(user.getId(), isRemember); StpUtil.login(user.getId(), isRemember);
...@@ -102,7 +102,7 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen ...@@ -102,7 +102,7 @@ public class UserServiceImpl extends SuperServiceImpl<UserMapper, User> implemen
public User getUserByUsername(String username) { public User getUserByUsername(String username) {
User user = getOne(new QueryWrapper<User>().eq("username", username).eq("is_delete", 0)); User user = getOne(new QueryWrapper<User>().eq("username", username).eq("is_delete", 0));
if (Asserts.isNotNull(user)) { if (Asserts.isNotNull(user)) {
user.setAdmin(Asserts.isEqualsIgnoreCase(username, "admin")); user.setIsAdmin(Asserts.isEqualsIgnoreCase(username, "admin"));
} }
return user; return user;
} }
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>dlink-alert</artifactId> <artifactId>dlink-alert</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<version>0.6.1</version> <version>0.6.2</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
......
package com.dlink.alert; package com.dlink.alert;
import com.dlink.assertion.Asserts;
import sun.misc.Service;
import java.util.Iterator;
import java.util.Optional; import java.util.Optional;
import java.util.ServiceLoader;
import com.dlink.assertion.Asserts;
/** /**
* Alert * Alert
...@@ -16,9 +15,8 @@ public interface Alert { ...@@ -16,9 +15,8 @@ public interface Alert {
static Optional<Alert> get(AlertConfig config) { static Optional<Alert> get(AlertConfig config) {
Asserts.checkNotNull(config, "报警组件配置不能为空"); Asserts.checkNotNull(config, "报警组件配置不能为空");
Iterator<Alert> providers = Service.providers(Alert.class); ServiceLoader<Alert> alerts = ServiceLoader.load(Alert.class);
while (providers.hasNext()) { for (Alert alert : alerts) {
Alert alert = providers.next();
if (alert.canHandle(config.getType())) { if (alert.canHandle(config.getType())) {
return Optional.of(alert.setConfig(config)); return Optional.of(alert.setConfig(config));
} }
...@@ -40,6 +38,14 @@ public interface Alert { ...@@ -40,6 +38,14 @@ public interface Alert {
return driver; return driver;
} }
static Alert buildTest(AlertConfig config) {
Optional<Alert> optionalDriver = Alert.get(config);
if (!optionalDriver.isPresent()) {
throw new AlertException("不支持报警组件类型【" + config.getType() + "】,请在 lib 下添加扩展依赖");
}
return optionalDriver.get();
}
Alert setConfig(AlertConfig config); Alert setConfig(AlertConfig config);
default boolean canHandle(String type) { default boolean canHandle(String type) {
......
...@@ -8,8 +8,12 @@ package com.dlink.alert; ...@@ -8,8 +8,12 @@ package com.dlink.alert;
**/ **/
public enum ShowType { public enum ShowType {
TABLE(0, "markdown"), MARKDOWN(0, "markdown"), // 通用markdown格式
TEXT(1, "text"); TEXT(1, "text"), //通用文本格式
POST(2, "post"), // 飞书的富文本msgType
TABLE(0, "table"), // table格式
ATTACHMENT(3, "attachment"), // 邮件相关 只发送附件
TABLE_ATTACHMENT(4, "table attachment"); // 邮件相关 邮件表格+附件
private int code; private int code;
private String value; private String value;
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>dlink-alert</artifactId> <artifactId>dlink-alert</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<version>0.6.1</version> <version>0.6.2</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
......
...@@ -128,7 +128,7 @@ public class DingTalkSender { ...@@ -128,7 +128,7 @@ public class DingTalkSender {
items.put("msgtype", msgType); items.put("msgtype", msgType);
Map<String, Object> text = new HashMap<>(); Map<String, Object> text = new HashMap<>();
items.put(msgType, text); items.put(msgType, text);
if (ShowType.TABLE.getValue().equals(msgType)) { if (ShowType.MARKDOWN.getValue().equals(msgType)) {
generateMarkdownMsg(title, content, text); generateMarkdownMsg(title, content, text);
} else { } else {
generateTextMsg(title, content, text); generateTextMsg(title, content, text);
......
...@@ -43,9 +43,9 @@ public class DingTalkTest { ...@@ -43,9 +43,9 @@ public class DingTalkTest {
@Before @Before
public void initDingTalkConfig() { public void initDingTalkConfig() {
config.put(DingTalkConstants.KEYWORD, "Dlinky-Fink 钉钉告警测试"); config.put(DingTalkConstants.KEYWORD, "Dinky-Fink 钉钉告警测试");
config.put(DingTalkConstants.WEB_HOOK, "url"); config.put(DingTalkConstants.WEB_HOOK, "url");
config.put(DingTalkConstants.MSG_TYPE, ShowType.TABLE.getValue()); config.put(DingTalkConstants.MSG_TYPE, ShowType.MARKDOWN.getValue());
config.put(DingTalkConstants.PROXY_ENABLE, "false"); config.put(DingTalkConstants.PROXY_ENABLE, "false");
config.put(DingTalkConstants.PASSWORD, "password"); config.put(DingTalkConstants.PASSWORD, "password");
...@@ -57,7 +57,7 @@ public class DingTalkTest { ...@@ -57,7 +57,7 @@ public class DingTalkTest {
public void sendMarkDownMsgTest() { public void sendMarkDownMsgTest() {
AlertConfig config = AlertConfig.build("MarkDownTest", "DingTalk", DingTalkTest.config); AlertConfig config = AlertConfig.build("MarkDownTest", "DingTalk", DingTalkTest.config);
Alert alert = Alert.build(config); Alert alert = Alert.build(config);
AlertResult result = alert.send("Dlinky钉钉告警测试", contentTest); AlertResult result = alert.send("Dinky钉钉告警测试", contentTest);
Assert.assertEquals(true, result.getSuccess()); Assert.assertEquals(true, result.getSuccess());
} }
...@@ -66,7 +66,7 @@ public class DingTalkTest { ...@@ -66,7 +66,7 @@ public class DingTalkTest {
config.put(DingTalkConstants.MSG_TYPE, ShowType.TEXT.getValue()); config.put(DingTalkConstants.MSG_TYPE, ShowType.TEXT.getValue());
AlertConfig config = AlertConfig.build("TextMsgTest", "DingTalk", DingTalkTest.config); AlertConfig config = AlertConfig.build("TextMsgTest", "DingTalk", DingTalkTest.config);
Alert alert = Alert.build(config); Alert alert = Alert.build(config);
AlertResult result = alert.send("Dlinky钉钉告警测试", contentTest); AlertResult result = alert.send("Dinky钉钉告警测试", contentTest);
Assert.assertEquals(true, result.getSuccess()); Assert.assertEquals(true, result.getSuccess());
} }
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-alert</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-alert-email</artifactId>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-alert-base</artifactId>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>4.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>4.1.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>
<dependency>
<groupId>com.sun.mail</groupId>
<artifactId>javax.mail</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-email</artifactId>
<version>1.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.dlink.alert.email;
import com.dlink.alert.AbstractAlert;
import com.dlink.alert.AlertResult;
/**
* EmailAlert
* @author zhumingye
* @date: 2022/4/2
**/
public class EmailAlert extends AbstractAlert {
@Override
public String getType() {
return EmailConstants.TYPE;
}
@Override
public AlertResult send(String title, String content) {
MailSender mailSender=new MailSender(getConfig().getParam());
return mailSender.send(title,content);
}
}
package com.dlink.alert.email;
/**
* EmailConstants 邮件常量
* @author zhumingye
* @date: 2022/4/3
**/
public final class EmailConstants {
public static final String TYPE = "Email";
public static final String PLUGIN_DEFAULT_EMAIL_RECEIVERS = "receiver.name";
public static final String NAME_PLUGIN_DEFAULT_EMAIL_RECEIVERS = "receivers";
public static final String PLUGIN_DEFAULT_EMAIL_RECEIVERCCS = "receiverCcs";
public static final String NAME_PLUGIN_DEFAULT_EMAIL_RECEIVERCCS = "receiverCcs";
public static final String NAME_MAIL_PROTOCOL = "mail.protocol";
public static final String MAIL_SMTP_HOST = "mail.smtp.host";
public static final String NAME_MAIL_SMTP_HOST = "serverHost";
public static final String MAIL_SMTP_PORT = "mail.smtp.port";
public static final String NAME_MAIL_SMTP_PORT = "serverPort";
public static final String MAIL_SENDER = "sender.name";
public static final String NAME_MAIL_SENDER = "sender";
public static final String MAIL_SMTP_AUTH = "mail.smtp.auth";
public static final String NAME_MAIL_SMTP_AUTH = "enableSmtpAuth";
public static final String MAIL_USER = "mail.smtp.user";
public static final String NAME_MAIL_USER = "User";
public static final String MAIL_PASSWD = "mail.smtp.passwd";
public static final String NAME_MAIL_PASSWD = "Password";
public static final String MAIL_SMTP_STARTTLS_ENABLE = "mail.smtp.starttls.enable";
public static final String NAME_MAIL_SMTP_STARTTLS_ENABLE = "starttlsEnable";
public static final String MAIL_SMTP_SSL_ENABLE = "mail.smtp.ssl.enable";
public static final String NAME_MAIL_SMTP_SSL_ENABLE = "sslEnable";
public static final String MAIL_SMTP_SSL_TRUST = "mail.smtp.ssl.trust";
public static final String NAME_MAIL_SMTP_SSL_TRUST = "smtpSslTrust";
public static final String XLS_FILE_PATH = "xls.file.path";
public static final String NAME_SHOW_TYPE = "msgtype";
public static final String MAIL_TRANSPORT_PROTOCOL = "mail.transport.protocol";
public static final String TEXT_HTML_CHARSET_UTF_8 = "text/html;charset=utf-8";
public static final int NUMBER_1000 = 1000;
public static final String TR = "<tr>";
public static final String TD = "<td>";
public static final String TD_END = "</td>";
public static final String TR_END = "</tr>";
public static final String TH = "<th>";
public static final String TH_COLSPAN = "<th colspan=2 >";
public static final String TH_END = "</th>";
public static final String TAB = "\t";
public static final String LINE = "\n";
public static final String LEFT = ">";
public static final String HTML_HEADER_PREFIX = "<!DOCTYPE HTML PUBLIC '-//W3C//DTD HTML 4.01 Transitional//EN' 'http://www.w3.org/TR/html4/loose.dtd'>"
+ "<html>"
+ "<head>"
+ "<title>Dinky</title>"
+ "<meta name='Keywords' content=''>"
+ "<meta name='Description' content=''>"
+ "<style type=\"text/css\">"
+ "table {margin-top:0px;padding-top:0px;border:1px solid;font-size: 14px;color: #333333;border-width: 1px;border-color: #666666;border-collapse: collapse;}"
+ "table th {border-width: 1px;padding: 8px;border-style: solid;border-color: #666666;background-color: #dedede;text-align: left;}"
+ "table td {border-width: 1px;padding: 8px;border-style: solid;border-color: #666666;background-color: #ffffff;text-align: left;}"
+ "</style>"
+ "</head>"
+ "<body style=\"margin:0;padding:0\"><table border=\"1px\" cellpadding=\"5px\" cellspacing=\"-10px\"> ";
public static final String TABLE_BODY_HTML_TAIL = "</table></body></html>";
public static final String UTF_8 = "UTF-8";
public static final String EXCEL_SUFFIX_XLSX = ".xlsx";
public static final String SINGLE_SLASH = "/";
private EmailConstants() {
throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
}
}
package com.dlink.alert.email;
import com.dlink.alert.AlertException;
import com.dlink.utils.JSONUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.poi.ss.usermodel.*;
import org.apache.poi.xssf.streaming.SXSSFWorkbook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* ExcelUtils excel工具类
* @author zhumingye
* @date: 2022/4/3
**/
public final class ExcelUtils {
private static final int XLSX_WINDOW_ROW = 10000;
private static final Logger logger = LoggerFactory.getLogger(ExcelUtils.class);
private ExcelUtils() {
throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
}
/**
* generate excel file
*
* @param content the content
* @param title the title
* @param xlsFilePath the xls path
*/
public static void genExcelFile(String content, String title, String xlsFilePath) {
File file = new File(xlsFilePath);
if (!file.exists() && !file.mkdirs()) {
logger.error("Create xlsx directory error, path:{}", xlsFilePath);
throw new AlertException("Create xlsx directory error");
}
List<LinkedHashMap> itemsList = JSONUtil.toList(content, LinkedHashMap.class);
if (CollectionUtils.isEmpty(itemsList)) {
logger.error("itemsList is null");
throw new AlertException("itemsList is null");
}
LinkedHashMap<String, Object> headerMap = itemsList.get(0);
List<String> headerList = new ArrayList<>();
for (Map.Entry<String, Object> en : headerMap.entrySet()) {
headerList.add(en.getKey());
}
try (SXSSFWorkbook wb = new SXSSFWorkbook(XLSX_WINDOW_ROW);
FileOutputStream fos = new FileOutputStream(String.format("%s/%s.xlsx", xlsFilePath, title))) {
// declare a workbook
// generate a table
Sheet sheet = wb.createSheet();
Row row = sheet.createRow(0);
//set the height of the first line
row.setHeight((short) 500);
//set Horizontal right
CellStyle cellStyle = wb.createCellStyle();
cellStyle.setAlignment(HorizontalAlignment.RIGHT);
//setting excel headers
for (int i = 0; i < headerList.size(); i++) {
Cell cell = row.createCell(i);
cell.setCellStyle(cellStyle);
cell.setCellValue(headerList.get(i));
}
//setting excel body
int rowIndex = 1;
for (LinkedHashMap<String, Object> itemsMap : itemsList) {
Object[] values = itemsMap.values().toArray();
row = sheet.createRow(rowIndex);
//setting excel body height
row.setHeight((short) 500);
rowIndex++;
for (int j = 0; j < values.length; j++) {
Cell cell1 = row.createCell(j);
cell1.setCellStyle(cellStyle);
if (values[j] instanceof Number) {
cell1.setCellValue(Double.parseDouble(String.valueOf(values[j])));
} else {
cell1.setCellValue(String.valueOf(values[j]));
}
}
}
for (int i = 0; i < headerList.size(); i++) {
sheet.setColumnWidth(i, headerList.get(i).length() * 800);
}
//setting file output
wb.write(fos);
wb.dispose();
} catch (Exception e) {
throw new AlertException("generate excel error", e);
}
}
}
package com.dlink.alert.email.template;
import com.dlink.alert.ShowType;
/**
* @Author: zhumingye
* @date: 2022/4/3
* @Description: 邮件告警模板接口
*/
public interface AlertTemplate {
String getMessageFromTemplate(String title,String content, ShowType showType, boolean showAll);
/**
* default showAll is true
* @param content alert message content
* @param showType show type
* @return a message from a specified alert template
*/
default String getMessageFromTemplate(String title,String content, ShowType showType) {
return getMessageFromTemplate(title,content, showType, true);
}
}
package com.dlink.alert.email.template;
import com.dlink.alert.ShowType;
import com.dlink.alert.email.EmailConstants;
import com.dlink.utils.JSONUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import static java.util.Objects.requireNonNull;
/**
* @Author: zhumingye
* @date: 2022/4/3
* @Description: 邮件告警Html模板
*/
public class DefaultHTMLTemplate implements AlertTemplate {
public static final Logger logger = LoggerFactory.getLogger(DefaultHTMLTemplate.class);
@Override
public String getMessageFromTemplate(String title , String content, ShowType showType, boolean showAll) {
switch (showType) {
case TABLE:
return getTableTypeMessage(title,content, showAll);
case TEXT:
return getTextTypeMessage(title,content);
default:
throw new IllegalArgumentException(String.format("not support showType: %s in DefaultHTMLTemplate", showType));
}
}
/**
* get alert message which type is TABLE
*
* @param content message content
* @param showAll weather to show all
* @return alert message
*/
private String getTableTypeMessage(String title ,String content, boolean showAll) {
if (StringUtils.isNotEmpty(content)) {
List<LinkedHashMap> mapItemsList = JSONUtil.toList(content,LinkedHashMap.class);
if (!showAll && mapItemsList.size() > EmailConstants.NUMBER_1000) {
mapItemsList = mapItemsList.subList(0, EmailConstants.NUMBER_1000);
}
StringBuilder contents = new StringBuilder(200);
boolean flag = true;
for (LinkedHashMap<String, Object> mapItems : mapItemsList) {
Set<Map.Entry<String, Object>> entries = mapItems.entrySet();
Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
StringBuilder t = new StringBuilder(EmailConstants.TR);
StringBuilder cs = new StringBuilder(EmailConstants.TR);
while (iterator.hasNext()) {
Map.Entry<String, Object> entry = iterator.next();
t.append(EmailConstants.TH).append(entry.getKey()).append(EmailConstants.TH_END);
cs.append(EmailConstants.TD).append(entry.getValue()).append(EmailConstants.TD_END);
}
t.append(EmailConstants.TR_END);
cs.append(EmailConstants.TR_END);
if (flag) {
title = t.toString();
}
flag = false;
contents.append(cs);
}
return getMessageFromHtmlTemplate(title, contents.toString());
}
return content;
}
/**
* get alert message which type is TEXT
* @param content message content
* @return alert message
*/
private String getTextTypeMessage(String title ,String content) {
StringBuilder stringBuilder = new StringBuilder(100);
if (StringUtils.isNotEmpty(content)) {
List<LinkedHashMap> linkedHashMaps = JSONUtil.toList(content, LinkedHashMap.class);
if (linkedHashMaps.size() > EmailConstants.NUMBER_1000) {
linkedHashMaps = linkedHashMaps.subList(0, EmailConstants.NUMBER_1000);
}
stringBuilder.append(EmailConstants.TR).append(EmailConstants.TH_COLSPAN).append(title).append(EmailConstants.TH_END).append(EmailConstants.TR_END);
for (LinkedHashMap<String, Object> mapItems : linkedHashMaps) {
Set<Map.Entry<String, Object>> entries = mapItems.entrySet();
Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Object> entry = iterator.next();
stringBuilder.append(EmailConstants.TR);
stringBuilder.append(EmailConstants.TD).append(entry.getKey()).append(EmailConstants.TD_END);
stringBuilder.append(EmailConstants.TD).append(entry.getValue()).append(EmailConstants.TD_END);
stringBuilder.append(EmailConstants.TR_END);
}
}
return getMessageFromHtmlTemplate(title, stringBuilder.toString());
}
return stringBuilder.toString();
}
/**
* get alert message from a html template
*
* @param title message title
* @param content message content
* @return alert message which use html template
*/
private String getMessageFromHtmlTemplate(String title, String content) {
requireNonNull(content, "content must not null");
String htmlTableThead = StringUtils.isEmpty(title) ? "" : String.format("<thead>%s</thead>%n", title);
return EmailConstants.HTML_HEADER_PREFIX + htmlTableThead + content + EmailConstants.TABLE_BODY_HTML_TAIL;
}
}
com.dlink.alert.email.EmailAlert
\ No newline at end of file
package com.dlink.alert.email;
import com.dlink.alert.AlertResult;
import com.dlink.alert.ShowType;
import com.dlink.alert.email.template.AlertTemplate;
import com.dlink.alert.email.template.DefaultHTMLTemplate;
import com.dlink.utils.JSONUtil;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class EmailSenderTest {
private static final Logger logger = LoggerFactory.getLogger(EmailSenderTest.class);
static MailSender mailSender;
private static Map<String, String> emailConfig = new HashMap<>();
private static AlertTemplate alertTemplate;
String title = "Dinky Email Alert";
String content = "[{\"id\":\"69\","
+ "\"name\":\"UserBehavior-0--1193959466\","
+ "\"Job name\": \"Start workflow\","
+ "\"State\": \"SUCCESS\","
+ "\"Recovery\":\"NO\","
+ "\"Run time\": \"1\","
+ "\"Start time\": \"2018-08-06 10:31:34.0\","
+ "\"End time\": \"2018-08-06 10:31:49.0\","
+ "\"Host\": \"192.168.xx.xx\","
+ "\"Notify group\" :\"4\"}]";
@BeforeClass
public static void initEmailConfig() {
emailConfig.put(EmailConstants.NAME_MAIL_PROTOCOL, "smtp");
emailConfig.put(EmailConstants.NAME_MAIL_SMTP_HOST, "smtp.mxhichina.com");
emailConfig.put(EmailConstants.NAME_MAIL_SMTP_PORT, "465");
emailConfig.put(EmailConstants.NAME_MAIL_SENDER, "aliyun.sdaDXZxDFSDFasa.cn");
emailConfig.put(EmailConstants.NAME_MAIL_USER, "aliyun.sdaDXZxDFSDFasay.cn");
emailConfig.put(EmailConstants.NAME_MAIL_PASSWD, "5vffgdsf123132q8");
emailConfig.put(EmailConstants.NAME_MAIL_SMTP_AUTH, "true");
emailConfig.put(EmailConstants.NAME_MAIL_SMTP_STARTTLS_ENABLE, "true");
emailConfig.put(EmailConstants.NAME_MAIL_SMTP_SSL_ENABLE, "true");
emailConfig.put(EmailConstants.NAME_MAIL_SMTP_SSL_TRUST, "smtp.mxhichina.com");
emailConfig.put(EmailConstants.NAME_PLUGIN_DEFAULT_EMAIL_RECEIVERS, "user1@qq.com,user2@163.com");
emailConfig.put(EmailConstants.NAME_PLUGIN_DEFAULT_EMAIL_RECEIVERCCS, "user3@qq.com");
emailConfig.put(EmailConstants.NAME_SHOW_TYPE, ShowType.TEXT.getValue());
alertTemplate = new DefaultHTMLTemplate();
mailSender = new MailSender(emailConfig);
}
@Test
public void testTextSendMails() {
AlertResult alertResult = mailSender.send(title, content);
Assert.assertEquals(true, alertResult.getSuccess()); // 格式需要调整
}
@Test
public void testSendTableMail() {
emailConfig.put(EmailConstants.NAME_SHOW_TYPE, ShowType.TABLE.getValue());
mailSender = new MailSender(emailConfig);
AlertResult alertResult = mailSender.send(title, content);
Assert.assertEquals(true, alertResult.getSuccess());
}
@Test
public void testAttachmentFile() {
emailConfig.put(EmailConstants.NAME_SHOW_TYPE, ShowType.ATTACHMENT.getValue());
mailSender = new MailSender(emailConfig);
AlertResult alertResult = mailSender.send(title, content);
Assert.assertEquals(true, alertResult.getSuccess());
}
@Test
public void testTableAttachmentFile() {
emailConfig.put(EmailConstants.NAME_SHOW_TYPE, ShowType.TABLE_ATTACHMENT.getValue());
mailSender = new MailSender(emailConfig);
AlertResult alertResult = mailSender.send(title, content);
Assert.assertEquals(true, alertResult.getSuccess());
}
@Test
public void testGenTextEmail() {
List<LinkedHashMap> linkedHashMaps = JSONUtil.toList(content, LinkedHashMap.class);
if (linkedHashMaps.size() > EmailConstants.NUMBER_1000) {
linkedHashMaps = linkedHashMaps.subList(0, EmailConstants.NUMBER_1000);
}
StringBuilder stringBuilder = new StringBuilder(100);
stringBuilder.append(EmailConstants.TR).append(EmailConstants.TH_COLSPAN).append(title).append(EmailConstants.TH_END).append(EmailConstants.TR_END);
for (LinkedHashMap<String, Object> mapItems : linkedHashMaps) {
Set<Map.Entry<String, Object>> entries = mapItems.entrySet();
Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Object> entry = iterator.next();
stringBuilder.append(EmailConstants.TR);
stringBuilder.append(EmailConstants.TD).append(entry.getKey()).append(EmailConstants.TD_END);
stringBuilder.append(EmailConstants.TD).append(entry.getValue()).append(EmailConstants.TD_END);
stringBuilder.append(EmailConstants.TR_END);
}
System.out.println(stringBuilder.toString());
}
}
}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-alert</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dlink-alert-feishu</artifactId>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-alert-base</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.dlink.alert.feishu;
import com.dlink.alert.AbstractAlert;
import com.dlink.alert.AlertResult;
/**
* FeiShuAlert
* @author zhumingye
* @date: 2022/4/2
**/
public class FeiShuAlert extends AbstractAlert {
@Override
public String getType() {
return FeiShuConstants.TYPE;
}
@Override
public AlertResult send(String title, String content) {
FeiShuSender sender = new FeiShuSender(getConfig().getParam());
return sender.send(title,content);
}
}
package com.dlink.alert.feishu;
/**
* @Author: zhumingye
* @date: 2022/4/2
* @Description: 参数常量
*/
public final class FeiShuConstants {
static final String TYPE = "FeiShu";
static final String MARKDOWN_QUOTE = "> ";
static final String MARKDOWN_ENTER = "\n";
static final String WEB_HOOK = "webhook";
static final String KEY_WORD = "keyword";
static final String SECRET = "secret";
static final String FEI_SHU_PROXY_ENABLE = "isEnableProxy";
static final String FEI_SHU_PROXY = "proxy";
static final String FEI_SHU_PORT = "port";
static final String FEI_SHU_USER = "user";
static final String FEI_SHU_PASSWORD = "password";
static final String MSG_TYPE = "msgtype";
static final String AT_ALL = "isAtAll";
static final String AT_USERS = "users";
static final String FEI_SHU_TEXT_TEMPLATE = "{\"msg_type\":\"{msg_type}\",\"content\":{\"{msg_type}\":\"{msg} {users} \" }}";
static final String FEI_SHU_POST_TEMPLATE ="{\"msg_type\":\"{msg_type}\",\"content\":{\"{msg_type}\":{\"zh_cn\":{\"title\":\"{keyword}\",\"content\":[[{\"tag\":\"text\",\"text\":\"{msg}\"},{users}]]}}}}";
private FeiShuConstants() {
throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
}
}
package com.dlink.alert.feishu;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
public final class HttpRequestUtil {
private HttpRequestUtil() {
throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
}
public static CloseableHttpClient getHttpClient(boolean enableProxy, String proxy, Integer port, String user, String password) {
if (enableProxy) {
HttpHost httpProxy = new HttpHost(proxy, port);
CredentialsProvider provider = new BasicCredentialsProvider();
provider.setCredentials(new AuthScope(httpProxy), new UsernamePasswordCredentials(user, password));
return HttpClients.custom().setDefaultCredentialsProvider(provider).build();
} else {
return HttpClients.createDefault();
}
}
public static HttpPost constructHttpPost(String url, String msg) {
HttpPost post = new HttpPost(url);
StringEntity entity = new StringEntity(msg, ContentType.APPLICATION_JSON);
post.setEntity(entity);
return post;
}
}
com.dlink.alert.feishu.FeiShuAlert
\ No newline at end of file
package com.dlink.alert.feishu;
import com.dlink.alert.AlertMsg;
import com.dlink.alert.AlertResult;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: zhumingye
* @date: 2022/4/2
* @Description: 飞书消息发送 单元测试
*/
public class FeiShuSenderTest {
private static Map<String, String> feiShuConfig = new HashMap<>();
String alertMsgContentTemplate = "[\n"
+ " {\n"
+ " \"owner\": \"dlink\",\n"
+ " \"processEndTime\": \"2021-01-29 19:01:11\",\n"
+ " \"processHost\": \"10.81.129.4:5678\",\n"
+ " \"processId\": 2926,\n"
+ " \"processName\": \"3-20210129190038108\",\n"
+ " \"processStartTime\": \"2021-01-29 19:00:38\",\n"
+ " \"processState\": \"SUCCESS\",\n"
+ " \"processType\": \"START_PROCESS\",\n"
+ " \"projectId\": 2,\n"
+ " \"projectName\": \"testdelproject\",\n"
+ " \"recovery\": \"NO\",\n"
+ " \"retryTimes\": 0,\n"
+ " \"runTimes\": 1,\n"
+ " \"taskId\": 0\n"
+ " }\n"
+ "]";
@Before
public void initFeiShuConfig() {
feiShuConfig.put(FeiShuConstants.WEB_HOOK, "https://open.feishu.cn/open-apis/bot/v2/hook/aea3cd7f13154854541dsadsadas08f2a9");
feiShuConfig.put(FeiShuConstants.KEY_WORD, "Dinky 飞书WebHook 告警测试");
feiShuConfig.put(FeiShuConstants.MSG_TYPE,"text");
feiShuConfig.put(FeiShuConstants.AT_ALL, "false");
feiShuConfig.put(FeiShuConstants.AT_USERS, "user1,user2,user3");
}
@Test
public void testTextTypeSend() {
AlertMsg alertMsg = new AlertMsg();
alertMsg.setName("Dinky 飞书WebHook 告警测试");
alertMsg.setContent(alertMsgContentTemplate);
FeiShuSender feiShuSender = new FeiShuSender(feiShuConfig);
AlertResult alertResult = feiShuSender.send(alertMsg.getName(),alertMsg.getContent());
Assert.assertEquals(true, alertResult.getSuccess());
}
@Test
public void testPostTypeSend() {
feiShuConfig.put(FeiShuConstants.MSG_TYPE,"post");
AlertMsg alertMsg = new AlertMsg();
alertMsg.setName("Dinky 飞书WebHook 告警测试");
alertMsg.setContent(alertMsgContentTemplate);
FeiShuSender feiShuSender = new FeiShuSender(feiShuConfig);
AlertResult alertResult = feiShuSender.send(alertMsg.getName(),alertMsg.getContent());
Assert.assertEquals(true, alertResult.getSuccess());
}
}
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>dlink-alert</artifactId> <artifactId>dlink-alert</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<version>0.6.1</version> <version>0.6.2</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
......
...@@ -81,7 +81,7 @@ public class WeChatSender { ...@@ -81,7 +81,7 @@ public class WeChatSender {
String data =""; String data ="";
if (sendType.equals(WeChatType.CHAT.getValue())) { if (sendType.equals(WeChatType.CHAT.getValue())) {
data = markdownByAlert(KeyWord, content ,userList);; data = markdownByAlert(title, content ,userList);;
}else{ }else{
data = markdownByAlert(title, content, userList); data = markdownByAlert(title, content, userList);
} }
...@@ -182,7 +182,7 @@ public class WeChatSender { ...@@ -182,7 +182,7 @@ public class WeChatSender {
private String markdownByAlert(String title, String content,List<String> userList) { private String markdownByAlert(String title, String content,List<String> userList) {
String result = ""; String result = "";
if (showType.equals(ShowType.TABLE.getValue())) { if (showType.equals(ShowType.MARKDOWN.getValue())) {
result = markdownTable(title, content,userList,sendType); result = markdownTable(title, content,userList,sendType);
} else if (showType.equals(ShowType.TEXT.getValue())) { } else if (showType.equals(ShowType.TEXT.getValue())) {
result = markdownText(title, content,userList,sendType); result = markdownText(title, content,userList,sendType);
......
...@@ -70,7 +70,7 @@ public class WeChatSenderTest { ...@@ -70,7 +70,7 @@ public class WeChatSenderTest {
); );
weChatConfig.put(WeChatConstants.USERS, "all"); weChatConfig.put(WeChatConstants.USERS, "all");
weChatConfig.put(WeChatConstants.TEAM_SEND_MSG, "msg"); weChatConfig.put(WeChatConstants.TEAM_SEND_MSG, "msg");
weChatConfig.put(WeChatConstants.SHOW_TYPE, ShowType.TABLE.getValue());// default is "table" weChatConfig.put(WeChatConstants.SHOW_TYPE, ShowType.MARKDOWN.getValue());// default is "table"
weChatConfig.put(WeChatConstants.SEND_TYPE, WeChatType.APP.getValue()); weChatConfig.put(WeChatConstants.SEND_TYPE, WeChatType.APP.getValue());
} }
...@@ -79,7 +79,7 @@ public class WeChatSenderTest { ...@@ -79,7 +79,7 @@ public class WeChatSenderTest {
@Test @Test
public void testSendAPPMarkDownMsg() { public void testSendAPPMarkDownMsg() {
WeChatSender weChatSender = new WeChatSender(weChatConfig); WeChatSender weChatSender = new WeChatSender(weChatConfig);
AlertResult alertResult = weChatSender.send("Dlinky企微APP MarkDown方式 告警测试", contentTest); AlertResult alertResult = weChatSender.send("Dinky企微APP MarkDown方式 告警测试", contentTest);
Assert.assertEquals(true, alertResult.getSuccess()); Assert.assertEquals(true, alertResult.getSuccess());
} }
...@@ -87,7 +87,7 @@ public class WeChatSenderTest { ...@@ -87,7 +87,7 @@ public class WeChatSenderTest {
public void testSendAPPTextMsg() { public void testSendAPPTextMsg() {
weChatConfig.put(WeChatConstants.SHOW_TYPE, ShowType.TEXT.getValue()); weChatConfig.put(WeChatConstants.SHOW_TYPE, ShowType.TEXT.getValue());
WeChatSender weChatSender = new WeChatSender(weChatConfig); WeChatSender weChatSender = new WeChatSender(weChatConfig);
AlertResult alertResult = weChatSender.send("Dlinky企微APP TEXT方式 告警测试", contentTest); AlertResult alertResult = weChatSender.send("Dinky企微APP TEXT方式 告警测试", contentTest);
Assert.assertEquals(true, alertResult.getSuccess()); Assert.assertEquals(true, alertResult.getSuccess());
} }
...@@ -96,8 +96,8 @@ public class WeChatSenderTest { ...@@ -96,8 +96,8 @@ public class WeChatSenderTest {
weChatConfig.put(WeChatConstants.WEBHOOK, "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=8xxxxxxxxxxxxxxxxx6fe13396c"); weChatConfig.put(WeChatConstants.WEBHOOK, "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=8xxxxxxxxxxxxxxxxx6fe13396c");
weChatConfig.put(WeChatConstants.SEND_TYPE, WeChatType.CHAT.getValue()); weChatConfig.put(WeChatConstants.SEND_TYPE, WeChatType.CHAT.getValue());
weChatConfig.put(WeChatConstants.USER_SEND_MSG,WeChatConstants.WEBHOOK_TEMPLATE); weChatConfig.put(WeChatConstants.USER_SEND_MSG,WeChatConstants.WEBHOOK_TEMPLATE);
weChatConfig.put(WeChatConstants.SHOW_TYPE, ShowType.TABLE.getValue()); weChatConfig.put(WeChatConstants.SHOW_TYPE, ShowType.MARKDOWN.getValue());
weChatConfig.put(WeChatConstants.KEYWORD, "Dlinky企微WEBHOOK MarkDown方式 告警测试"); weChatConfig.put(WeChatConstants.KEYWORD, "Dinky企微WEBHOOK MarkDown方式 告警测试");
WeChatSender weChatSender = new WeChatSender(weChatConfig); WeChatSender weChatSender = new WeChatSender(weChatConfig);
AlertResult alertResult = weChatSender.send("TEXT-TEST", contentTest); AlertResult alertResult = weChatSender.send("TEXT-TEST", contentTest);
Assert.assertEquals(true, alertResult.getSuccess()); Assert.assertEquals(true, alertResult.getSuccess());
...@@ -110,7 +110,7 @@ public class WeChatSenderTest { ...@@ -110,7 +110,7 @@ public class WeChatSenderTest {
weChatConfig.put(WeChatConstants.SEND_TYPE, WeChatType.CHAT.getValue()); weChatConfig.put(WeChatConstants.SEND_TYPE, WeChatType.CHAT.getValue());
weChatConfig.put(WeChatConstants.USER_SEND_MSG,WeChatConstants.WEBHOOK_TEMPLATE); weChatConfig.put(WeChatConstants.USER_SEND_MSG,WeChatConstants.WEBHOOK_TEMPLATE);
weChatConfig.put(WeChatConstants.SHOW_TYPE, ShowType.TEXT.getValue()); weChatConfig.put(WeChatConstants.SHOW_TYPE, ShowType.TEXT.getValue());
weChatConfig.put(WeChatConstants.KEYWORD, "Dlinky企微WEBHOOK TEXT方式 告警测试"); weChatConfig.put(WeChatConstants.KEYWORD, "Dinky企微WEBHOOK TEXT方式 告警测试");
WeChatSender weChatSender = new WeChatSender(weChatConfig); WeChatSender weChatSender = new WeChatSender(weChatConfig);
AlertResult alertResult = weChatSender.send("TEXT-TEST", contentTest); AlertResult alertResult = weChatSender.send("TEXT-TEST", contentTest);
Assert.assertEquals(true, alertResult.getSuccess()); Assert.assertEquals(true, alertResult.getSuccess());
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>dlink</artifactId> <artifactId>dlink</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<version>0.6.1</version> <version>0.6.2</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
<module>dlink-alert-base</module> <module>dlink-alert-base</module>
<module>dlink-alert-dingtalk</module> <module>dlink-alert-dingtalk</module>
<module>dlink-alert-wechat</module> <module>dlink-alert-wechat</module>
<module>dlink-alert-feishu</module>
<module>dlink-alert-email</module>
</modules> </modules>
<properties> <properties>
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>dlink-app</artifactId> <artifactId>dlink-app</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<version>0.6.1</version> <version>0.6.2</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
...@@ -33,6 +33,10 @@ ...@@ -33,6 +33,10 @@
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-1.11</artifactId> <artifactId>dlink-client-1.11</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-flink-1.11</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
......
package com.dlink.executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.*;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
/**
* 定制TableResultImpl
*
* @author wenmo
* @since 2021/6/7 22:06
**/
@Internal
class CustomTableResultImpl implements TableResult {
public static final TableResult TABLE_RESULT_OK =
CustomTableResultImpl.builder()
.resultKind(ResultKind.SUCCESS)
.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
.data(Collections.singletonList(Row.of("OK")))
.build();
private final JobClient jobClient;
private final TableSchema tableSchema;
private final ResultKind resultKind;
private final CloseableIterator<Row> data;
private final PrintStyle printStyle;
private CustomTableResultImpl(
@Nullable JobClient jobClient,
TableSchema tableSchema,
ResultKind resultKind,
CloseableIterator<Row> data,
PrintStyle printStyle) {
this.jobClient = jobClient;
this.tableSchema =
Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.data = Preconditions.checkNotNull(data, "data should not be null");
this.printStyle = Preconditions.checkNotNull(printStyle, "printStyle should not be null");
}
public static TableResult buildTableResult(List<TableSchemaField> fields, List<Row> rows) {
Builder builder = builder().resultKind(ResultKind.SUCCESS);
if (fields.size() > 0) {
TableSchema.Builder tableSchemaBuild = TableSchema.builder();
for (int i = 0; i < fields.size(); i++) {
tableSchemaBuild.field(fields.get(i).getName(), fields.get(i).getType());
}
builder.tableSchema(tableSchemaBuild.build()).data(rows);
}
return builder.build();
}
@Override
public Optional<JobClient> getJobClient() {
return Optional.ofNullable(jobClient);
}
@Override
public TableSchema getTableSchema() {
return tableSchema;
}
@Override
public ResultKind getResultKind() {
return resultKind;
}
@Override
public CloseableIterator<Row> collect() {
return data;
}
@Override
public void print() {
Iterator<Row> it = collect();
if (printStyle instanceof TableauStyle) {
int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth();
String nullColumn = ((TableauStyle) printStyle).getNullColumn();
boolean deriveColumnWidthByType =
((TableauStyle) printStyle).isDeriveColumnWidthByType();
PrintUtils.printAsTableauForm(
getTableSchema(),
it,
new PrintWriter(System.out),
maxColumnWidth,
nullColumn,
deriveColumnWidthByType);
} else if (printStyle instanceof RawContentStyle) {
while (it.hasNext()) {
System.out.println(String.join(",", PrintUtils.rowToString(it.next())));
}
} else {
throw new TableException("Unsupported print style: " + printStyle);
}
}
public static Builder builder() {
return new Builder();
}
/**
* Builder for creating a {@link CustomTableResultImpl}.
*/
public static class Builder {
private JobClient jobClient = null;
private TableSchema tableSchema = null;
private ResultKind resultKind = null;
private CloseableIterator<Row> data = null;
private PrintStyle printStyle =
PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false);
private Builder() {
}
/**
* Specifies job client which associates the submitted Flink job.
*
* @param jobClient a {@link JobClient} for the submitted Flink job.
*/
public Builder jobClient(JobClient jobClient) {
this.jobClient = jobClient;
return this;
}
/**
* Specifies table schema of the execution result.
*
* @param tableSchema a {@link TableSchema} for the execution result.
*/
public Builder tableSchema(TableSchema tableSchema) {
Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
this.tableSchema = tableSchema;
return this;
}
/**
* Specifies result kind of the execution result.
*
* @param resultKind a {@link ResultKind} for the execution result.
*/
public Builder resultKind(ResultKind resultKind) {
Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.resultKind = resultKind;
return this;
}
/**
* Specifies an row iterator as the execution result.
*
* @param rowIterator a row iterator as the execution result.
*/
public Builder data(CloseableIterator<Row> rowIterator) {
Preconditions.checkNotNull(rowIterator, "rowIterator should not be null");
this.data = rowIterator;
return this;
}
/**
* Specifies an row list as the execution result.
*
* @param rowList a row list as the execution result.
*/
public Builder data(List<Row> rowList) {
Preconditions.checkNotNull(rowList, "listRows should not be null");
this.data = CloseableIterator.adapterForIterator(rowList.iterator());
return this;
}
/**
* Specifies print style. Default is {@link TableauStyle} with max integer column width.
*/
public Builder setPrintStyle(PrintStyle printStyle) {
Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = printStyle;
return this;
}
/**
* Returns a {@link TableResult} instance.
*/
public TableResult build() {
return new CustomTableResultImpl(jobClient, tableSchema, resultKind, data, printStyle);
}
}
/**
* Root interface for all print styles.
*/
public interface PrintStyle {
/**
* Create a tableau print style with given max column width, null column, and a flag to
* indicate whether the column width is derived from type (true) or content (false), which
* prints the result schema and content as tableau form.
*/
static PrintStyle tableau(
int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType) {
Preconditions.checkArgument(
maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
return new TableauStyle(maxColumnWidth, nullColumn, deriveColumnWidthByType);
}
/**
* Create a raw content print style, which only print the result content as raw form. column
* delimiter is ",", row delimiter is "\n".
*/
static PrintStyle rawContent() {
return new RawContentStyle();
}
}
/**
* print the result schema and content as tableau form.
*/
private static final class TableauStyle implements PrintStyle {
/**
* A flag to indicate whether the column width is derived from type (true) or content
* (false).
*/
private final boolean deriveColumnWidthByType;
private final int maxColumnWidth;
private final String nullColumn;
private TableauStyle(
int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType) {
this.deriveColumnWidthByType = deriveColumnWidthByType;
this.maxColumnWidth = maxColumnWidth;
this.nullColumn = nullColumn;
}
public boolean isDeriveColumnWidthByType() {
return deriveColumnWidthByType;
}
int getMaxColumnWidth() {
return maxColumnWidth;
}
String getNullColumn() {
return nullColumn;
}
}
/**
* only print the result content as raw form. column delimiter is ",", row delimiter is "\n".
*/
private static final class RawContentStyle implements PrintStyle {
}
}
package com.dlink.executor;
import org.apache.flink.table.types.DataType;
/**
* @author wenmo
* @since 2021/6/7 22:06
**/
public class TableSchemaField {
private String name;
private DataType type;
public TableSchemaField(String name, DataType type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
}
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>dlink-app</artifactId> <artifactId>dlink-app</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<version>0.6.1</version> <version>0.6.2</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
...@@ -33,6 +33,10 @@ ...@@ -33,6 +33,10 @@
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-1.12</artifactId> <artifactId>dlink-client-1.12</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-flink-1.12</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
......
package com.dlink.executor;
import org.apache.flink.table.types.DataType;
/**
* @author wenmo
* @since 2021/6/7 22:06
**/
public class TableSchemaField {
private String name;
private DataType type;
public TableSchemaField(String name, DataType type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
}
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>dlink-app</artifactId> <artifactId>dlink-app</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<version>0.6.1</version> <version>0.6.2</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
...@@ -33,6 +33,10 @@ ...@@ -33,6 +33,10 @@
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-1.13</artifactId> <artifactId>dlink-client-1.13</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-flink-1.13</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
......
package com.dlink.executor;
import org.apache.flink.table.types.DataType;
/**
* @author wenmo
* @since 2021/6/7 22:06
**/
public class TableSchemaField {
private String name;
private DataType type;
public TableSchemaField(String name, DataType type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
}
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>dlink-app</artifactId> <artifactId>dlink-app</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<version>0.6.1</version> <version>0.6.2</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
...@@ -33,6 +33,10 @@ ...@@ -33,6 +33,10 @@
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-1.14</artifactId> <artifactId>dlink-client-1.14</artifactId>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-flink-1.14</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
......
package com.dlink.executor;
import org.apache.flink.table.types.DataType;
/**
* @author wenmo
* @since 2021/10/22 10:02
**/
public class TableSchemaField {
private String name;
private DataType type;
public TableSchemaField(String name, DataType type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
}
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>dlink-app</artifactId> <artifactId>dlink-app</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<version>0.6.1</version> <version>0.6.2</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
...@@ -25,7 +25,12 @@ ...@@ -25,7 +25,12 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-1.13</artifactId> <artifactId>dlink-client-${dlink.flink.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-flink-${dlink.flink.version}</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
......
...@@ -41,7 +41,7 @@ public class Submiter { ...@@ -41,7 +41,7 @@ public class Submiter {
} }
return "select id, name, alias as jobName, type,check_point as checkpoint," + return "select id, name, alias as jobName, type,check_point as checkpoint," +
"save_point_path as savePointPath, parallelism,fragment as useSqlFragment,statement_set as useStatementSet,config_json as config," + "save_point_path as savePointPath, parallelism,fragment as useSqlFragment,statement_set as useStatementSet,config_json as config," +
" env_id as envId from dlink_task where id = " + id; " env_id as envId,batch_model AS useBatchModel from dlink_task where id = " + id;
} }
private static String getFlinkSQLStatement(Integer id, DBConfig config) { private static String getFlinkSQLStatement(Integer id, DBConfig config) {
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>dlink</artifactId> <artifactId>dlink</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<version>0.6.1</version> <version>0.6.2</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>dlink</artifactId> <artifactId>dlink</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<version>0.6.1</version> <version>0.6.2</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
......
...@@ -182,6 +182,22 @@ ...@@ -182,6 +182,22 @@
<include>dlink-alert-wechat-${project.version}.jar</include> <include>dlink-alert-wechat-${project.version}.jar</include>
</includes> </includes>
</fileSet> </fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-alert/dlink-alert-feishu/target
</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>dlink-alert-feishu-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-alert/dlink-alert-email/target
</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>dlink-alert-email-${project.version}.jar</include>
</includes>
</fileSet>
<!-- 将模块dlink-extends的常用jar文件放到打包目录/plugins下 --> <!-- 将模块dlink-extends的常用jar文件放到打包目录/plugins下 -->
<!--<fileSet> <!--<fileSet>
<directory>${project.parent.basedir}/dlink-extends/target <directory>${project.parent.basedir}/dlink-extends/target
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>dlink-client</artifactId> <artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<version>0.6.1</version> <version>0.6.2</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
...@@ -13,11 +13,8 @@ ...@@ -13,11 +13,8 @@
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.6</flink.version>
<flinkcdc.version>2.1.1</flinkcdc.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
</properties> </properties>
<dependencies> <dependencies>
...@@ -26,81 +23,14 @@ ...@@ -26,81 +23,14 @@
<artifactId>dlink-client-base</artifactId> <artifactId>dlink-client-base</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-client-hadoop</artifactId> <artifactId>dlink-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.dlink</groupId> <groupId>com.dlink</groupId>
<artifactId>dlink-common</artifactId> <artifactId>dlink-flink-1.11</artifactId>
<scope>provided</scope>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
package com.dlink.cdc;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import com.dlink.assertion.Asserts;
import com.dlink.constant.FlinkParamConstant;
import com.dlink.model.FlinkCDCConfig;
/**
* AbstractCDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:28
**/
public abstract class AbstractCDCBuilder {
protected FlinkCDCConfig config;
public AbstractCDCBuilder() {
}
public AbstractCDCBuilder(FlinkCDCConfig config) {
this.config = config;
}
public FlinkCDCConfig getConfig() {
return config;
}
public void setConfig(FlinkCDCConfig config) {
this.config = config;
}
public List<String> getSchemaList() {
List<String> schemaList = new ArrayList<>();
String schema = config.getSchema();
if (Asserts.isNotNullString(schema)) {
String[] schemas = schema.split(FlinkParamConstant.SPLIT);
Collections.addAll(schemaList, schemas);
}
List<String> tableList = getTableList();
for (String tableName : tableList) {
if (Asserts.isNotNullString(tableName) && tableName.contains(".")) {
String[] names = tableName.split("\\\\.");
if (!schemaList.contains(names[0])) {
schemaList.add(names[0]);
}
}
}
return schemaList;
}
public List<String> getTableList() {
List<String> tableList = new ArrayList<>();
String table = config.getTable();
if (Asserts.isNullString(table)) {
return tableList;
}
String[] tables = table.split(FlinkParamConstant.SPLIT);
Collections.addAll(tableList, tables);
return tableList;
}
public String getSchemaFieldName() {
return "schema";
}
}
package com.dlink.cdc;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
import com.dlink.model.FlinkCDCConfig;
/**
* CDCBuilder
*
* @author wenmo
* @since 2022/4/12 21:09
**/
public interface CDCBuilder {
String getHandle();
CDCBuilder create(FlinkCDCConfig config);
DataStreamSource<String> build(StreamExecutionEnvironment env);
List<String> getSchemaList();
List<String> getTableList();
Map<String, Map<String, String>> parseMetaDataConfigs();
String getSchemaFieldName();
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment