Commit 4aae1539 authored by wenmo's avatar wenmo

新增 Application 模式自增修正checkpoint和savepoint存储路径

parent 57f62d2c
......@@ -9,6 +9,8 @@ import com.dlink.executor.ExecutorSetting;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType;
import com.dlink.trans.Operations;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -83,7 +85,16 @@ public class Submiter {
}
sb.append(getFlinkSQLStatement(id, dbConfig));
List<String> statements = Submiter.getStatements(sb.toString());
ExecutorSetting executorSetting = ExecutorSetting.build(Submiter.getTaskConfig(id, dbConfig));
ExecutorSetting executorSetting = ExecutorSetting.build(taskConfig);
String uuid = UUID.randomUUID().toString().replace("-", "");
if(executorSetting.getConfig().containsKey(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key())){
executorSetting.getConfig().put(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
executorSetting.getConfig().get(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key())+"/"+uuid);
}
if(executorSetting.getConfig().containsKey(CheckpointingOptions.SAVEPOINT_DIRECTORY.key())){
executorSetting.getConfig().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
executorSetting.getConfig().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key())+"/"+uuid);
}
logger.info("作业配置如下: " + executorSetting.toString());
Executor executor = Executor.buildAppStreamExecutor(executorSetting);
List<StatementParam> ddl = new ArrayList<>();
......
......@@ -11,6 +11,7 @@ import com.dlink.gateway.result.TestResult;
import com.dlink.utils.LogUtil;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
......@@ -60,6 +61,15 @@ public abstract class KubernetesGateway extends AbstractGateway {
if(Asserts.isNotNullString(config.getFlinkConfig().getJobName())) {
configuration.set(KubernetesConfigOptions.CLUSTER_ID, config.getFlinkConfig().getJobName());
}
if(getType().isApplicationMode()) {
String uuid = UUID.randomUUID().toString().replace("-", "");
if (configuration.contains(CheckpointingOptions.CHECKPOINTS_DIRECTORY)) {
configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, configuration.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY) + "/" + uuid);
}
if (configuration.contains(CheckpointingOptions.SAVEPOINT_DIRECTORY)) {
configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, configuration.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY) + "/" + uuid);
}
}
}
private void initKubeClient(){
......
......@@ -11,6 +11,7 @@ import com.dlink.gateway.result.TestResult;
import com.dlink.utils.LogUtil;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.SecurityOptions;
......@@ -84,6 +85,15 @@ public abstract class YarnGateway extends AbstractGateway {
}
}
if(getType().isApplicationMode()) {
String uuid = UUID.randomUUID().toString().replace("-", "");
if (configuration.contains(CheckpointingOptions.CHECKPOINTS_DIRECTORY)) {
configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, configuration.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY) + "/" + uuid);
}
if (configuration.contains(CheckpointingOptions.SAVEPOINT_DIRECTORY)) {
configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, configuration.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY) + "/" + uuid);
}
}
YarnLogConfigUtil.setLogConfigFileInConfig(configuration, config.getClusterConfig().getFlinkConfigPath());
}
......
......@@ -722,6 +722,9 @@ export default (): React.ReactNode => {
<li>
<Link>修复 Oracle无法正确获取元数据的bug</Link>
</li>
<li>
<Link>新增 Application 模式自增修正checkpoint和savepoint存储路径</Link>
</li>
</ul>
</Paragraph>
</Timeline.Item>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment