Commit 2767b4b5 authored by wenmo's avatar wenmo

gateway format

parent 7698c3e4
......@@ -20,14 +20,14 @@ import java.util.ServiceLoader;
**/
public interface Gateway {
static Optional<Gateway> get(GatewayConfig config){
Asserts.checkNotNull(config,"配置不能为空");
Asserts.checkNotNull(config.getType(),"配置类型不能为空");
static Optional<Gateway> get(GatewayConfig config) {
Asserts.checkNotNull(config, "配置不能为空");
Asserts.checkNotNull(config.getType(), "配置类型不能为空");
ServiceLoader<Gateway> loader = ServiceLoader.load(Gateway.class);
Iterator<Gateway> iterator = loader.iterator();
while(iterator.hasNext()) {
while (iterator.hasNext()) {
Gateway gateway = iterator.next();
if(gateway.canHandle(config.getType())){
if (gateway.canHandle(config.getType())) {
gateway.setGatewayConfig(config);
return Optional.of(gateway);
}
......@@ -35,10 +35,10 @@ public interface Gateway {
return Optional.empty();
}
static Gateway build(GatewayConfig config){
static Gateway build(GatewayConfig config) {
Optional<Gateway> optionalGateway = Gateway.get(config);
if(!optionalGateway.isPresent()){
throw new GatewayException("不支持 Flink Gateway 类型【"+config.getType().getLongValue()+"】,请添加扩展包");
if (!optionalGateway.isPresent()) {
throw new GatewayException("不支持 Flink Gateway 类型【" + config.getType().getLongValue() + "】,请添加扩展包");
}
return optionalGateway.get();
}
......
......@@ -8,12 +8,12 @@ import com.dlink.assertion.Asserts;
* @author wenmo
* @since 2021/11/3 21:58
*/
public enum ActionType{
SAVEPOINT("savepoint"),CANCEL("cancel");
public enum ActionType {
SAVEPOINT("savepoint"), CANCEL("cancel");
private String value;
ActionType(String value){
ActionType(String value) {
this.value = value;
}
......@@ -21,9 +21,9 @@ public enum ActionType{
return value;
}
public static ActionType get(String value){
public static ActionType get(String value) {
for (ActionType type : ActionType.values()) {
if(Asserts.isEquals(type.getValue(),value)){
if (Asserts.isEquals(type.getValue(), value)) {
return type;
}
}
......
......@@ -26,11 +26,11 @@ public class AppConfig {
this.userJarMainAppClass = userJarMainAppClass;
}
public static AppConfig build(String userJarPath, String userJarParasStr, String userJarMainAppClass){
if(Asserts.isNotNullString(userJarParasStr)){
return new AppConfig(userJarPath,userJarParasStr.split(" "),userJarMainAppClass);
}else{
return new AppConfig(userJarPath,new String[]{},userJarMainAppClass);
public static AppConfig build(String userJarPath, String userJarParasStr, String userJarMainAppClass) {
if (Asserts.isNotNullString(userJarParasStr)) {
return new AppConfig(userJarPath, userJarParasStr.split(" "), userJarMainAppClass);
} else {
return new AppConfig(userJarPath, new String[]{}, userJarMainAppClass);
}
}
......
......@@ -7,9 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
......@@ -36,43 +34,43 @@ public class GatewayConfig {
appConfig = new AppConfig();
}
public static GatewayConfig build(JsonNode para){
public static GatewayConfig build(JsonNode para) {
GatewayConfig config = new GatewayConfig();
if(para.has("taskId")) {
if (para.has("taskId")) {
config.setTaskId(para.get("taskId").asInt());
}
config.setType(GatewayType.get(para.get("type").asText()));
if(para.has("flinkConfigPath")) {
if (para.has("flinkConfigPath")) {
config.getClusterConfig().setFlinkConfigPath(para.get("flinkConfigPath").asText());
}
if(para.has("flinkLibPath")) {
if (para.has("flinkLibPath")) {
config.getClusterConfig().setFlinkLibPath(para.get("flinkLibPath").asText());
}
if(para.has("yarnConfigPath")) {
if (para.has("yarnConfigPath")) {
config.getClusterConfig().setYarnConfigPath(para.get("yarnConfigPath").asText());
}
if(para.has("jobName")) {
if (para.has("jobName")) {
config.getFlinkConfig().setJobName(para.get("jobName").asText());
}
if(para.has("userJarPath")) {
if (para.has("userJarPath")) {
config.getAppConfig().setUserJarPath(para.get("userJarPath").asText());
}
if(para.has("userJarParas")) {
if (para.has("userJarParas")) {
config.getAppConfig().setUserJarParas(para.get("userJarParas").asText().split("\\s+"));
}
if(para.has("userJarMainAppClass")) {
if (para.has("userJarMainAppClass")) {
config.getAppConfig().setUserJarMainAppClass(para.get("userJarMainAppClass").asText());
}
if(para.has("savePoint")) {
if (para.has("savePoint")) {
config.getFlinkConfig().setSavePoint(para.get("savePoint").asText());
}
if(para.has("configParas")) {
if (para.has("configParas")) {
try {
Map<String, String> configMap = new HashMap<>();
JsonNode paras = mapper.readTree(para.get("configParas").asText());
paras.forEach((JsonNode node)-> {
configMap.put(node.get("key").asText(),node.get("value").asText());
}
paras.forEach((JsonNode node) -> {
configMap.put(node.get("key").asText(), node.get("value").asText());
}
);
config.getFlinkConfig().setConfiguration(configMap);
} catch (JsonProcessingException e) {
......
......@@ -8,12 +8,12 @@ import com.dlink.assertion.Asserts;
* @author wenmo
* @since 2021/11/3 21:58
*/
public enum SavePointType{
TRIGGER("trigger"),DISPOSE("dispose"),STOP("stop"),CANCEL("cancel");
public enum SavePointType {
TRIGGER("trigger"), DISPOSE("dispose"), STOP("stop"), CANCEL("cancel");
private String value;
SavePointType(String value){
SavePointType(String value) {
this.value = value;
}
......@@ -21,9 +21,9 @@ public enum SavePointType{
return value;
}
public static SavePointType get(String value){
public static SavePointType get(String value) {
for (SavePointType type : SavePointType.values()) {
if(Asserts.isEqualsIgnoreCase(type.getValue(),value)){
if (Asserts.isEqualsIgnoreCase(type.getValue(), value)) {
return type;
}
}
......
......@@ -25,12 +25,12 @@ public class JobInfo {
this.status = status;
}
public enum JobStatus{
RUN("run"),STOP("stop"),CANCEL("cancel"),FAIL("fail");
public enum JobStatus {
RUN("run"), STOP("stop"), CANCEL("cancel"), FAIL("fail");
private String value;
JobStatus(String value){
JobStatus(String value) {
this.value = value;
}
}
......
......@@ -34,12 +34,12 @@ public abstract class AbstractGatewayResult implements GatewayResult {
this.exceptionMsg = exceptionMsg;
}
public void success(){
public void success() {
this.isSuccess = true;
this.endTime = LocalDateTime.now();
}
public void fail(String error){
public void fail(String error) {
this.isSuccess = false;
this.endTime = LocalDateTime.now();
this.exceptionMsg = error;
......
......@@ -55,7 +55,7 @@ public class KubernetesResult extends AbstractGatewayResult {
this.jids = jids;
}
public static KubernetesResult build(GatewayType type){
return new KubernetesResult(type,LocalDateTime.now());
public static KubernetesResult build(GatewayType type) {
return new KubernetesResult(type, LocalDateTime.now());
}
}
......@@ -43,8 +43,8 @@ public class SavePointResult extends AbstractGatewayResult {
return null;
}
public static SavePointResult build(GatewayType type){
return new SavePointResult(type,LocalDateTime.now());
public static SavePointResult build(GatewayType type) {
return new SavePointResult(type, LocalDateTime.now());
}
}
......@@ -23,11 +23,11 @@ public class TestResult {
this.error = error;
}
public static TestResult success(){
return new TestResult(true,null);
public static TestResult success() {
return new TestResult(true, null);
}
public static TestResult fail(String error){
return new TestResult(false,error);
public static TestResult fail(String error) {
return new TestResult(false, error);
}
}
package com.dlink.gateway.result;
import com.dlink.gateway.GatewayType;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
import java.util.List;
......@@ -52,8 +50,8 @@ public class YarnResult extends AbstractGatewayResult {
this.jids = jids;
}
public static YarnResult build(GatewayType type){
return new YarnResult(type,LocalDateTime.now());
public static YarnResult build(GatewayType type) {
return new YarnResult(type, LocalDateTime.now());
}
}
......@@ -55,26 +55,26 @@ public abstract class YarnGateway extends AbstractGateway {
super(config);
}
public void init(){
public void init() {
initConfig();
initYarnClient();
}
private void initConfig(){
private void initConfig() {
configuration = GlobalConfiguration.loadConfiguration(config.getClusterConfig().getFlinkConfigPath());
if(Asserts.isNotNull(config.getFlinkConfig().getConfiguration())) {
if (Asserts.isNotNull(config.getFlinkConfig().getConfiguration())) {
addConfigParas(config.getFlinkConfig().getConfiguration());
}
configuration.set(DeploymentOptions.TARGET, getType().getLongValue());
if(Asserts.isNotNullString(config.getFlinkConfig().getSavePoint())) {
if (Asserts.isNotNullString(config.getFlinkConfig().getSavePoint())) {
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, config.getFlinkConfig().getSavePoint());
}
configuration.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(config.getClusterConfig().getFlinkLibPath()));
if(Asserts.isNotNullString(config.getFlinkConfig().getJobName())) {
if (Asserts.isNotNullString(config.getFlinkConfig().getJobName())) {
configuration.set(YarnConfigOptions.APPLICATION_NAME, config.getFlinkConfig().getJobName());
}
if(configuration.containsKey(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key())) {
if (configuration.containsKey(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key())) {
try {
SecurityUtils.install(new SecurityConfiguration(configuration));
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
......@@ -85,7 +85,7 @@ public abstract class YarnGateway extends AbstractGateway {
}
}
if(getType().isApplicationMode()) {
if (getType().isApplicationMode()) {
String uuid = UUID.randomUUID().toString().replace("-", "");
if (configuration.contains(CheckpointingOptions.CHECKPOINTS_DIRECTORY)) {
configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, configuration.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY) + "/" + uuid);
......@@ -97,30 +97,30 @@ public abstract class YarnGateway extends AbstractGateway {
YarnLogConfigUtil.setLogConfigFileInConfig(configuration, config.getClusterConfig().getFlinkConfigPath());
}
private void initYarnClient(){
private void initYarnClient() {
yarnConfiguration = new YarnConfiguration();
yarnConfiguration.addResource( new Path( URI.create(config.getClusterConfig().getYarnConfigPath()+"/yarn-site.xml") ) );
yarnConfiguration.addResource( new Path( URI.create(config.getClusterConfig().getYarnConfigPath()+"/core-site.xml") ) );
yarnConfiguration.addResource( new Path( URI.create(config.getClusterConfig().getYarnConfigPath()+"/hdfs-site.xml") ) );
yarnConfiguration.addResource(new Path(URI.create(config.getClusterConfig().getYarnConfigPath() + "/yarn-site.xml")));
yarnConfiguration.addResource(new Path(URI.create(config.getClusterConfig().getYarnConfigPath() + "/core-site.xml")));
yarnConfiguration.addResource(new Path(URI.create(config.getClusterConfig().getYarnConfigPath() + "/hdfs-site.xml")));
yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfiguration);
yarnClient.start();
}
private void addConfigParas(Map<String, String> configMap){
if(Asserts.isNotNull(configMap)) {
private void addConfigParas(Map<String, String> configMap) {
if (Asserts.isNotNull(configMap)) {
for (Map.Entry<String, String> entry : configMap.entrySet()) {
this.configuration.setString(entry.getKey(), entry.getValue());
}
}
}
public SavePointResult savepointCluster(){
public SavePointResult savepointCluster() {
return savepointCluster(null);
}
public SavePointResult savepointCluster(String savePoint){
if(Asserts.isNull(yarnClient)){
public SavePointResult savepointCluster(String savePoint) {
if (Asserts.isNull(yarnClient)) {
init();
}
/*if(Asserts.isNotNullString(config.getClusterConfig().getYarnConfigPath())) {
......@@ -132,7 +132,7 @@ public abstract class YarnGateway extends AbstractGateway {
YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
configuration.set(YarnConfigOptions.APPLICATION_ID, config.getClusterConfig().getAppId());
ApplicationId applicationId = clusterClientFactory.getClusterId(configuration);
if (applicationId == null){
if (applicationId == null) {
throw new GatewayException(
"No cluster id was specified. Please specify a cluster to which you would like to connect.");
}
......@@ -141,18 +141,18 @@ public abstract class YarnGateway extends AbstractGateway {
configuration);*/
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
try(ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(
applicationId).getClusterClient()){
try (ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(
applicationId).getClusterClient()) {
List<JobInfo> jobInfos = new ArrayList<>();
CompletableFuture<Collection<JobStatusMessage>> listJobsFuture = clusterClient.listJobs();
for( JobStatusMessage jobStatusMessage: listJobsFuture.get()){
for (JobStatusMessage jobStatusMessage : listJobsFuture.get()) {
JobInfo jobInfo = new JobInfo(jobStatusMessage.getJobId().toHexString());
jobInfo.setStatus(JobInfo.JobStatus.RUN);
jobInfos.add(jobInfo);
}
runSavePointJob(jobInfos,clusterClient,savePoint);
runSavePointJob(jobInfos, clusterClient, savePoint);
result.setJobInfos(jobInfos);
}catch (Exception e){
} catch (Exception e) {
e.printStackTrace();
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
......@@ -163,15 +163,15 @@ public abstract class YarnGateway extends AbstractGateway {
return result;
}
public SavePointResult savepointJob(){
public SavePointResult savepointJob() {
return savepointJob(null);
}
public SavePointResult savepointJob(String savePoint){
if(Asserts.isNull(yarnClient)){
public SavePointResult savepointJob(String savePoint) {
if (Asserts.isNull(yarnClient)) {
init();
}
if(Asserts.isNull(config.getFlinkConfig().getJobId())){
if (Asserts.isNull(config.getFlinkConfig().getJobId())) {
throw new GatewayException(
"No job id was specified. Please specify a job to which you would like to savepont.");
}
......@@ -184,7 +184,7 @@ public abstract class YarnGateway extends AbstractGateway {
YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
configuration.set(YarnConfigOptions.APPLICATION_ID, config.getClusterConfig().getAppId());
ApplicationId applicationId = clusterClientFactory.getClusterId(configuration);
if (Asserts.isNull(applicationId)){
if (Asserts.isNull(applicationId)) {
throw new GatewayException(
"No cluster id was specified. Please specify a cluster to which you would like to connect.");
}
......@@ -193,26 +193,26 @@ public abstract class YarnGateway extends AbstractGateway {
configuration);*/
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
try(ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(
applicationId).getClusterClient()){
try (ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(
applicationId).getClusterClient()) {
List<JobInfo> jobInfos = new ArrayList<>();
jobInfos.add(new JobInfo(config.getFlinkConfig().getJobId(),JobInfo.JobStatus.FAIL));
runSavePointJob(jobInfos,clusterClient,savePoint);
jobInfos.add(new JobInfo(config.getFlinkConfig().getJobId(), JobInfo.JobStatus.FAIL));
runSavePointJob(jobInfos, clusterClient, savePoint);
result.setJobInfos(jobInfos);
}catch (Exception e){
} catch (Exception e) {
result.fail(LogUtil.getError(e));
}
return result;
}
private void runSavePointJob(List<JobInfo> jobInfos,ClusterClient<ApplicationId> clusterClient,String savePoint) throws Exception{
for( JobInfo jobInfo: jobInfos){
if(ActionType.CANCEL== config.getFlinkConfig().getAction()){
private void runSavePointJob(List<JobInfo> jobInfos, ClusterClient<ApplicationId> clusterClient, String savePoint) throws Exception {
for (JobInfo jobInfo : jobInfos) {
if (ActionType.CANCEL == config.getFlinkConfig().getAction()) {
clusterClient.cancel(JobID.fromHexString(jobInfo.getJobId()));
jobInfo.setStatus(JobInfo.JobStatus.CANCEL);
continue;
}
switch (config.getFlinkConfig().getSavePointType()){
switch (config.getFlinkConfig().getSavePointType()) {
case TRIGGER:
CompletableFuture<String> triggerFuture = clusterClient.triggerSavepoint(JobID.fromHexString(jobInfo.getJobId()), savePoint);
jobInfo.setSavePoint(triggerFuture.get());
......@@ -232,25 +232,25 @@ public abstract class YarnGateway extends AbstractGateway {
}
}
public TestResult test(){
public TestResult test() {
try {
initConfig();
}catch (Exception e){
logger.error("测试 Flink 配置失败:"+e.getMessage());
return TestResult.fail("测试 Flink 配置失败:"+e.getMessage());
} catch (Exception e) {
logger.error("测试 Flink 配置失败:" + e.getMessage());
return TestResult.fail("测试 Flink 配置失败:" + e.getMessage());
}
try {
initYarnClient();
if(yarnClient.isInState(Service.STATE.STARTED)){
if (yarnClient.isInState(Service.STATE.STARTED)) {
logger.info("配置连接测试成功");
return TestResult.success();
}else{
} else {
logger.error("该配置无对应 Yarn 集群存在");
return TestResult.fail("该配置无对应 Yarn 集群存在");
}
}catch (Exception e){
logger.error("测试 Yarn 配置失败:"+e.getMessage());
return TestResult.fail("测试 Yarn 配置失败:"+e.getMessage());
} catch (Exception e) {
logger.error("测试 Yarn 配置失败:" + e.getMessage());
return TestResult.fail("测试 Yarn 配置失败:" + e.getMessage());
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment