Commit 308bc440 authored by godkaikai's avatar godkaikai

savepoint改进

parent f9d7cdd8
......@@ -75,7 +75,7 @@ public class Job2MysqlHandler implements JobHandler {
// history.setResult(JSONUtil.toJsonStr(job.getResult()));
if(job.isUseGateway()){
Cluster cluster = clusterService.registersCluster(Cluster.autoRegistersCluster(job.getJobManagerAddress(),
job.getJobConfig().getJobName()+ LocalDateTime.now(), job.getType().getLongValue(),
job.getJobId(),job.getJobConfig().getJobName()+ LocalDateTime.now(), job.getType().getLongValue(),
job.getJobConfig().getClusterConfigurationId(),job.getJobConfig().getClusterConfigurationId()));
if(Asserts.isNotNull(cluster)){
history.setClusterId(cluster.getId());
......
......@@ -41,10 +41,10 @@ public class Cluster extends SuperEntity {
private Integer taskId;
public static Cluster autoRegistersCluster(String hosts,String name,String type,Integer clusterConfigurationId,Integer taskId){
public static Cluster autoRegistersCluster(String hosts,String name,String alias,String type,Integer clusterConfigurationId,Integer taskId){
Cluster cluster = new Cluster();
cluster.setName(name);
cluster.setAlias(name);
cluster.setAlias(alias);
cluster.setHosts(hosts);
cluster.setType(type);
cluster.setClusterConfigurationId(clusterConfigurationId);
......
......@@ -175,6 +175,7 @@ public class StudioServiceImpl implements StudioService {
if(Asserts.isNotNull(cluster.getClusterConfigurationId())){
Map<String, String> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName());
}
jobConfig.setUseRestAPI(SystemConfiguration.getInstances().isUseRestAPI());
JobManager jobManager = JobManager.build(jobConfig);
......
......@@ -15,6 +15,7 @@ public class ClusterConfig {
private String flinkConfigPath;
private String flinkLibPath;
private String yarnConfigPath;
private String appId;
public ClusterConfig() {
}
......@@ -28,4 +29,14 @@ public class ClusterConfig {
public static ClusterConfig build(String flinkConfigPath, String flinkLibPath, String yarnConfigPath){
return new ClusterConfig(flinkConfigPath,flinkLibPath,yarnConfigPath);
}
@Override
public String toString() {
return "ClusterConfig{" +
"flinkConfigPath='" + flinkConfigPath + '\'' +
", flinkLibPath='" + flinkLibPath + '\'' +
", yarnConfigPath='" + yarnConfigPath + '\'' +
", appId='" + appId + '\'' +
'}';
}
}
......@@ -57,13 +57,17 @@ public abstract class YarnGateway extends AbstractGateway {
private void initConfig(){
configuration = GlobalConfiguration.loadConfiguration(config.getClusterConfig().getFlinkConfigPath());
if(Asserts.isNotNull(config.getFlinkConfig().getConfigParas())) {
addConfigParas(config.getFlinkConfig().getConfigParas());
}
configuration.set(DeploymentOptions.TARGET, getType().getLongValue());
if(Asserts.isNotNullString(config.getFlinkConfig().getSavePoint())) {
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, config.getFlinkConfig().getSavePoint());
}
configuration.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(config.getClusterConfig().getFlinkLibPath()));
if(Asserts.isNotNullString(config.getFlinkConfig().getJobName())) {
configuration.set(YarnConfigOptions.APPLICATION_NAME, config.getFlinkConfig().getJobName());
}
YarnLogConfigUtil.setLogConfigFileInConfig(configuration, config.getClusterConfig().getFlinkConfigPath());
}
......@@ -91,6 +95,7 @@ public abstract class YarnGateway extends AbstractGateway {
}
SavePointResult result = SavePointResult.build(getType());
YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
configuration.set(YarnConfigOptions.APPLICATION_ID, config.getClusterConfig().getAppId());
ApplicationId applicationId = clusterClientFactory.getClusterId(configuration);
if (applicationId == null){
throw new GatewayException(
......@@ -123,12 +128,15 @@ public abstract class YarnGateway extends AbstractGateway {
if(Asserts.isNull(yarnClient)){
init();
}
System.out.println(config.getClusterConfig().toString());
logger.warn(config.getClusterConfig().toString());
if(Asserts.isNull(config.getFlinkConfig().getJobId())){
throw new GatewayException(
"No job id was specified. Please specify a job to which you would like to savepont.");
}
SavePointResult result = SavePointResult.build(getType());
YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
configuration.set(YarnConfigOptions.APPLICATION_ID, config.getClusterConfig().getAppId());
ApplicationId applicationId = clusterClientFactory.getClusterId(configuration);
if (Asserts.isNull(applicationId)){
throw new GatewayException(
......
......@@ -21,9 +21,10 @@ const StudioSavePoint: React.FC<{}> = (props: any) => {
title: '名称',
dataIndex: 'name',
sorter: true,
render: (dom, entity) => {
hideInTable: true,
/*render: (dom, entity) => {
return <a onClick={() => setRow(entity)}>{dom}</a>;
},
},*/
},
{
title: 'id',
......@@ -58,7 +59,9 @@ const StudioSavePoint: React.FC<{}> = (props: any) => {
dataIndex: 'createTime',
sorter: true,
valueType: 'dateTime',
hideInTable: true,
render: (dom, entity) => {
return <a onClick={() => setRow(entity)}>{dom}</a>;
},
},
];
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment