Commit 7698c3e4 authored by wenmo's avatar wenmo

gateway format

parent 4bb764c3
......@@ -26,8 +26,8 @@ public class ClusterConfig {
this.yarnConfigPath = yarnConfigPath;
}
public static ClusterConfig build(String flinkConfigPath, String flinkLibPath, String yarnConfigPath){
return new ClusterConfig(flinkConfigPath,flinkLibPath,yarnConfigPath);
public static ClusterConfig build(String flinkConfigPath, String flinkLibPath, String yarnConfigPath) {
return new ClusterConfig(flinkConfigPath, flinkLibPath, yarnConfigPath);
}
@Override
......
......@@ -7,9 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
......@@ -26,12 +24,13 @@ public class FlinkConfig {
private ActionType action;
private SavePointType savePointType;
private String savePoint;
// private List<ConfigPara> configParas;
// private List<ConfigPara> configParas;
private Map<String, String> configuration = new HashMap<>();
private static final ObjectMapper mapper = new ObjectMapper();
public static final String DEFAULT_SAVEPOINT_PREFIX = "hdfs:///flink/savepoints/";
public FlinkConfig() {
}
......@@ -48,7 +47,7 @@ public class FlinkConfig {
this.configuration = configuration;
}
public static FlinkConfig build(Map<String, String> paras){
public static FlinkConfig build(Map<String, String> paras) {
/*List<ConfigPara> configParasList = new ArrayList<>();
for (Map.Entry<String, String> entry : paras.entrySet()) {
configParasList.add(new ConfigPara(entry.getKey(),entry.getValue()));
......@@ -56,27 +55,27 @@ public class FlinkConfig {
return new FlinkConfig(paras);
}
public static FlinkConfig build(String jobName, String jobId, String actionStr, String savePointTypeStr, String savePoint, String configParasStr){
public static FlinkConfig build(String jobName, String jobId, String actionStr, String savePointTypeStr, String savePoint, String configParasStr) {
// List<ConfigPara> configParasList = new ArrayList<>();
Map<String, String> configMap = new HashMap<>();
JsonNode paras = null;
if(Asserts.isNotNullString(configParasStr)) {
if (Asserts.isNotNullString(configParasStr)) {
try {
paras = mapper.readTree(configParasStr);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
paras.forEach((JsonNode node) -> {
configMap.put(node.get("key").asText(),node.get("value").asText());
configMap.put(node.get("key").asText(), node.get("value").asText());
// configParasList.add(new ConfigPara(node.get("key").asText(), node.get("value").asText()));
}
);
}
return new FlinkConfig(jobName,jobId,ActionType.get(actionStr),SavePointType.get(savePointTypeStr),savePoint,configMap);
return new FlinkConfig(jobName, jobId, ActionType.get(actionStr), SavePointType.get(savePointTypeStr), savePoint, configMap);
}
public static FlinkConfig build(String jobId, String actionStr, String savePointTypeStr, String savePoint){
return new FlinkConfig(null,jobId,ActionType.get(actionStr),SavePointType.get(savePointTypeStr),savePoint,null);
public static FlinkConfig build(String jobId, String actionStr, String savePointTypeStr, String savePoint) {
return new FlinkConfig(null, jobId, ActionType.get(actionStr), SavePointType.get(savePointTypeStr), savePoint, null);
}
}
......@@ -22,7 +22,6 @@ import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import java.util.*;
import java.util.concurrent.CompletableFuture;
......@@ -44,24 +43,24 @@ public abstract class KubernetesGateway extends AbstractGateway {
super(config);
}
public void init(){
public void init() {
initConfig();
initKubeClient();
}
private void initConfig(){
private void initConfig() {
configuration = GlobalConfiguration.loadConfiguration(config.getClusterConfig().getFlinkConfigPath());
if(Asserts.isNotNull(config.getFlinkConfig().getConfiguration())) {
if (Asserts.isNotNull(config.getFlinkConfig().getConfiguration())) {
addConfigParas(config.getFlinkConfig().getConfiguration());
}
configuration.set(DeploymentOptions.TARGET, getType().getLongValue());
if(Asserts.isNotNullString(config.getFlinkConfig().getSavePoint())) {
if (Asserts.isNotNullString(config.getFlinkConfig().getSavePoint())) {
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, config.getFlinkConfig().getSavePoint());
}
if(Asserts.isNotNullString(config.getFlinkConfig().getJobName())) {
if (Asserts.isNotNullString(config.getFlinkConfig().getJobName())) {
configuration.set(KubernetesConfigOptions.CLUSTER_ID, config.getFlinkConfig().getJobName());
}
if(getType().isApplicationMode()) {
if (getType().isApplicationMode()) {
String uuid = UUID.randomUUID().toString().replace("-", "");
if (configuration.contains(CheckpointingOptions.CHECKPOINTS_DIRECTORY)) {
configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, configuration.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY) + "/" + uuid);
......@@ -72,94 +71,94 @@ public abstract class KubernetesGateway extends AbstractGateway {
}
}
private void initKubeClient(){
private void initKubeClient() {
client = FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "client");
}
private void addConfigParas(Map<String, String> configMap){
if(Asserts.isNotNull(configMap)) {
private void addConfigParas(Map<String, String> configMap) {
if (Asserts.isNotNull(configMap)) {
for (Map.Entry<String, String> entry : configMap.entrySet()) {
this.configuration.setString(entry.getKey(), entry.getValue());
}
}
}
public SavePointResult savepointCluster(){
public SavePointResult savepointCluster() {
return savepointCluster(null);
}
public SavePointResult savepointCluster(String savePoint){
if(Asserts.isNull(client)){
public SavePointResult savepointCluster(String savePoint) {
if (Asserts.isNull(client)) {
init();
}
SavePointResult result = SavePointResult.build(getType());
configuration.set(KubernetesConfigOptions.CLUSTER_ID, config.getClusterConfig().getAppId());
KubernetesClusterClientFactory clusterClientFactory = new KubernetesClusterClientFactory();
String clusterId = clusterClientFactory.getClusterId(configuration);
if (Asserts.isNull(clusterId)){
if (Asserts.isNull(clusterId)) {
throw new GatewayException("No cluster id was specified. Please specify a cluster to which you would like to connect.");
}
KubernetesClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
try(ClusterClient<String> clusterClient = clusterDescriptor.retrieve(
clusterId).getClusterClient()){
try (ClusterClient<String> clusterClient = clusterDescriptor.retrieve(
clusterId).getClusterClient()) {
List<JobInfo> jobInfos = new ArrayList<>();
CompletableFuture<Collection<JobStatusMessage>> listJobsFuture = clusterClient.listJobs();
for( JobStatusMessage jobStatusMessage: listJobsFuture.get()){
for (JobStatusMessage jobStatusMessage : listJobsFuture.get()) {
JobInfo jobInfo = new JobInfo(jobStatusMessage.getJobId().toHexString());
jobInfo.setStatus(JobInfo.JobStatus.RUN);
jobInfos.add(jobInfo);
}
runSavePointJob(jobInfos,clusterClient,savePoint);
runSavePointJob(jobInfos, clusterClient, savePoint);
result.setJobInfos(jobInfos);
}catch (Exception e){
} catch (Exception e) {
result.fail(LogUtil.getError(e));
}
return null;
}
public SavePointResult savepointJob(){
public SavePointResult savepointJob() {
return savepointJob(null);
}
public SavePointResult savepointJob(String savePoint){
if(Asserts.isNull(client)){
public SavePointResult savepointJob(String savePoint) {
if (Asserts.isNull(client)) {
init();
}
if(Asserts.isNull(config.getFlinkConfig().getJobId())){
if (Asserts.isNull(config.getFlinkConfig().getJobId())) {
throw new GatewayException("No job id was specified. Please specify a job to which you would like to savepont.");
}
if(Asserts.isNotNullString(config.getClusterConfig().getYarnConfigPath())) {
if (Asserts.isNotNullString(config.getClusterConfig().getYarnConfigPath())) {
configuration = GlobalConfiguration.loadConfiguration(config.getClusterConfig().getYarnConfigPath());
}else {
} else {
configuration = new Configuration();
}
SavePointResult result = SavePointResult.build(getType());
configuration.set(KubernetesConfigOptions.CLUSTER_ID, config.getClusterConfig().getAppId());
KubernetesClusterClientFactory clusterClientFactory = new KubernetesClusterClientFactory();
String clusterId = clusterClientFactory.getClusterId(configuration);
if (Asserts.isNull(clusterId)){
if (Asserts.isNull(clusterId)) {
throw new GatewayException("No cluster id was specified. Please specify a cluster to which you would like to connect.");
}
KubernetesClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
try(ClusterClient<String> clusterClient = clusterDescriptor.retrieve(clusterId).getClusterClient()){
try (ClusterClient<String> clusterClient = clusterDescriptor.retrieve(clusterId).getClusterClient()) {
List<JobInfo> jobInfos = new ArrayList<>();
jobInfos.add(new JobInfo(config.getFlinkConfig().getJobId(),JobInfo.JobStatus.FAIL));
runSavePointJob(jobInfos,clusterClient,savePoint);
jobInfos.add(new JobInfo(config.getFlinkConfig().getJobId(), JobInfo.JobStatus.FAIL));
runSavePointJob(jobInfos, clusterClient, savePoint);
result.setJobInfos(jobInfos);
}catch (Exception e){
} catch (Exception e) {
result.fail(LogUtil.getError(e));
}
return result;
}
private void runSavePointJob(List<JobInfo> jobInfos,ClusterClient<String> clusterClient,String savePoint) throws Exception{
for( JobInfo jobInfo: jobInfos){
if(ActionType.CANCEL== config.getFlinkConfig().getAction()){
private void runSavePointJob(List<JobInfo> jobInfos, ClusterClient<String> clusterClient, String savePoint) throws Exception {
for (JobInfo jobInfo : jobInfos) {
if (ActionType.CANCEL == config.getFlinkConfig().getAction()) {
clusterClient.cancel(JobID.fromHexString(jobInfo.getJobId()));
jobInfo.setStatus(JobInfo.JobStatus.CANCEL);
continue;
}
switch (config.getFlinkConfig().getSavePointType()){
switch (config.getFlinkConfig().getSavePointType()) {
case TRIGGER:
CompletableFuture<String> triggerFuture = clusterClient.triggerSavepoint(JobID.fromHexString(jobInfo.getJobId()), savePoint);
jobInfo.setSavePoint(triggerFuture.get());
......@@ -179,20 +178,20 @@ public abstract class KubernetesGateway extends AbstractGateway {
}
}
public TestResult test(){
public TestResult test() {
try {
initConfig();
}catch (Exception e){
logger.error("测试 Flink 配置失败:"+e.getMessage());
return TestResult.fail("测试 Flink 配置失败:"+e.getMessage());
} catch (Exception e) {
logger.error("测试 Flink 配置失败:" + e.getMessage());
return TestResult.fail("测试 Flink 配置失败:" + e.getMessage());
}
try {
initKubeClient();
logger.info("配置连接测试成功");
return TestResult.success();
}catch (Exception e){
logger.error("测试 Kubernetes 配置失败:"+e.getMessage());
return TestResult.fail("测试 Kubernetes 配置失败:"+e.getMessage());
} catch (Exception e) {
logger.error("测试 Kubernetes 配置失败:" + e.getMessage());
return TestResult.fail("测试 Kubernetes 配置失败:" + e.getMessage());
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment