Commit 4006fdfa authored by wenmo's avatar wenmo

savepoint触发bug解决

parent 308bc440
......@@ -12,10 +12,12 @@ import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.SavePointResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterClientFactory;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
......@@ -24,7 +26,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
......@@ -93,6 +97,11 @@ public abstract class YarnGateway extends AbstractGateway {
if(Asserts.isNull(yarnClient)){
init();
}
/*if(Asserts.isNotNullString(config.getClusterConfig().getYarnConfigPath())) {
configuration = GlobalConfiguration.loadConfiguration(config.getClusterConfig().getYarnConfigPath());
}else {
configuration = new Configuration();
}*/
SavePointResult result = SavePointResult.build(getType());
YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
configuration.set(YarnConfigOptions.APPLICATION_ID, config.getClusterConfig().getAppId());
......@@ -101,10 +110,11 @@ public abstract class YarnGateway extends AbstractGateway {
throw new GatewayException(
"No cluster id was specified. Please specify a cluster to which you would like to connect.");
}
YarnClusterDescriptor clusterDescriptor = clusterClientFactory
/*YarnClusterDescriptor clusterDescriptor = clusterClientFactory
.createClusterDescriptor(
configuration);
configuration);*/
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
try(ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(
applicationId).getClusterClient()){
List<JobInfo> jobInfos = new ArrayList<>();
......@@ -128,12 +138,15 @@ public abstract class YarnGateway extends AbstractGateway {
if(Asserts.isNull(yarnClient)){
init();
}
System.out.println(config.getClusterConfig().toString());
logger.warn(config.getClusterConfig().toString());
if(Asserts.isNull(config.getFlinkConfig().getJobId())){
throw new GatewayException(
"No job id was specified. Please specify a job to which you would like to savepont.");
}
/*if(Asserts.isNotNullString(config.getClusterConfig().getYarnConfigPath())) {
configuration = GlobalConfiguration.loadConfiguration(config.getClusterConfig().getYarnConfigPath());
}else {
configuration = new Configuration();
}*/
SavePointResult result = SavePointResult.build(getType());
YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
configuration.set(YarnConfigOptions.APPLICATION_ID, config.getClusterConfig().getAppId());
......@@ -142,9 +155,11 @@ public abstract class YarnGateway extends AbstractGateway {
throw new GatewayException(
"No cluster id was specified. Please specify a cluster to which you would like to connect.");
}
YarnClusterDescriptor clusterDescriptor = clusterClientFactory
/*YarnClusterDescriptor clusterDescriptor = clusterClientFactory
.createClusterDescriptor(
configuration);
configuration);*/
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
try(ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(
applicationId).getClusterClient()){
List<JobInfo> jobInfos = new ArrayList<>();
......
......@@ -10,7 +10,7 @@ import {SavePointTableListItem} from "@/components/Studio/StudioRightTool/Studio
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
const url = '/api/clusterConfiguration';
const url = '/api/savepoints';
const StudioSavePoint: React.FC<{}> = (props: any) => {
const {current,dispatch} = props;
const [row, setRow] = useState<SavePointTableListItem>();
......@@ -22,6 +22,8 @@ const StudioSavePoint: React.FC<{}> = (props: any) => {
dataIndex: 'name',
sorter: true,
hideInTable: true,
hideInForm: true,
hideInSearch: true,
/*render: (dom, entity) => {
return <a onClick={() => setRow(entity)}>{dom}</a>;
},*/
......@@ -68,12 +70,8 @@ const StudioSavePoint: React.FC<{}> = (props: any) => {
return (
<PageContainer>
<ProTable<SavePointTableListItem>
headerTitle="savepoints"
actionRef={actionRef}
rowKey="id"
search={{
labelWidth: 120,
}}
request={(params, sorter, filter) => queryData(url, {taskId:current.taskId,...params, sorter, filter})}
columns={columns}
/>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment