Commit 5502068c authored by shezaixing's avatar shezaixing

init commit

parents
Pipeline #366 canceled with stages
This diff is collapsed.
package com.dsk.flink.dsc.common.function;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author shezaixing
* @date 2023/12/6 15:40
* @description mysql/tidb sink 数据组装 sql
*
*/
public class AsyncMysqlDataTransferFunction extends RichAsyncFunction<JSONObject,String> {
static Logger logger = LoggerFactory.getLogger(AsyncMysqlDataTransferFunction.class);
//数据库连接信息
EnvProperties dbInfoMap;
//线程池
private transient ExecutorService executorService;
public AsyncMysqlDataTransferFunction(EnvProperties dbInfoMap) {
this.dbInfoMap = dbInfoMap;
}
@Override
public void open(Configuration parameters) throws Exception {
//初始化线程池
executorService = new ThreadPoolExecutor(4 , 4, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
}
@Override
public void close() throws Exception {
executorService.shutdown();
}
@Override
public void timeout(JSONObject input, ResultFuture<String> resultFuture) throws Exception {
resultFuture.complete(Collections.singleton("err"));
}
@Override
public void asyncInvoke(JSONObject value, ResultFuture<String> resultFuture) throws Exception {
executorService.submit(() -> {
try {
String type = value.getString("type");
JSONArray dataList = value.getJSONArray("data");
JSONObject mysqlType = value.getJSONObject("mysqlType");
JSONArray oldDataList = value.getJSONArray("old");
String table = value.getString("table");
Boolean isDdl = value.getBoolean("isDdl");
JSONArray pkNames = value.getJSONArray("pkNames");
Set<String> pkNameSet = new HashSet<>();
if(CollUtil.isNotEmpty(pkNames)){
pkNames.forEach(name -> pkNameSet.add(String.valueOf(name)));
}
String excueteSql = "";
if(isDdl){
excueteSql = value.getString("sql");
if(StrUtil.isNotBlank(excueteSql)){
excueteSql = StrUtil.subBefore(excueteSql,"AFTER",true);
}
resultFuture.complete(Collections.singleton(excueteSql));
return;
}
JSONObject dataObj = dataList.getJSONObject(0);
if("INSERT".equals(type)){
excueteSql = tranferInsertSql(table,dataObj,mysqlType);
}
if("UPDATE".equals(type)){
JSONObject oldDataObj = oldDataList.getJSONObject(0);
// excueteSql = tranferUpdateSql(table,dataObj,oldDataObj,mysqlType,pkNameSet);
excueteSql = tranferInsertSql(table,dataObj,mysqlType);
}
if("DELETE".equals(type)){
excueteSql = transferDeleteSql(table,dataObj,mysqlType,pkNameSet);
}
resultFuture.complete(Collections.singleton(excueteSql));
}catch (Exception e){
e.printStackTrace();
resultFuture.complete(Collections.singleton("err"));
}finally {
}
});
}
private static final String[] STR_SQL_TYPE = new String[]{"VARCHAR","CHAR","TINYBLOB","BLOB","MEDIUMBLOB","LONGBLOB","TINYTEXT","TEXT","MEDIUMTEXT","LONGTEXT","TIME","TIMESTAMP"};
private static String tranferInsertSql(String table, JSONObject dataObj, JSONObject mysqlType) {
Set<String> columnSet = mysqlType.keySet();
List<String> valueList = new ArrayList<>();
List<String> updateList = new ArrayList<>();
for (String col : columnSet) {
valueList.add(getValueString(dataObj,col,mysqlType.getString(col)));
updateList.add(col.concat(" = VALUES(").concat(col).concat(")"));
}
String columnString = String.join(",",columnSet);
String valueString = String.join(",",valueList);
String updateString = String.join(",",updateList);
return String.format("INSERT INTO %s (%s) values (%s) ON DUPLICATE KEY UPDATE %s;",table,columnString,valueString,updateString);
}
private String tranferUpdateSql(String table, JSONObject dataObj, JSONObject oldDataObj, JSONObject mysqlType,Set<String> pkNameSet) {
Set<String> columnSet = mysqlType.keySet();
List<String> setList = new ArrayList<>();
List<String> whereList = new ArrayList<>();
for (String col : columnSet) {
String setString = col.concat(" = ").concat(getValueString(dataObj,col,mysqlType.getString(col)));
setList.add(setString);
}
for (String pk : pkNameSet) {
String whereString = pk.concat(" = ").concat(getValueString(oldDataObj,pk,mysqlType.getString(pk)));
whereList.add(whereString);
}
String setString = String.join(",",setList);
String whereString = String.join(" and ",whereList);
return String.format("UPDATE %s SET %s WHERE %s",table,setString,whereString);
}
private String transferDeleteSql(String table, JSONObject dataObj, JSONObject mysqlType, Set<String> pkNameSet) {
List<String> whereList = new ArrayList<>();
for (String pk : pkNameSet) {
String whereString = pk.concat(" = ").concat(getValueString(dataObj,pk,mysqlType.getString(pk)));
whereList.add(whereString);
}
String whereString = String.join(" and ",whereList);
return String.format("DELETE FROM %s WHERE %s",table,whereString);
}
/**
* @author shezaixing
* @date 2023/12/7 14:23
* @description 判断拼接字符串时类型(是否需要加上引号)
*
*/
private static String getValueString(JSONObject dataObj,String columnKey,String mysqlType){
if(null == dataObj.get(columnKey)){
return "null";
}
//需要处理成字符串加引号的类型
if(Arrays.asList(STR_SQL_TYPE).contains(mysqlType.toUpperCase())){
return String.format("'%s'",dataObj.getString(columnKey));
}
//时间字段处理
if("DATE".equalsIgnoreCase(mysqlType) || "DATETIME".equalsIgnoreCase(mysqlType)){
SimpleDateFormat df = "DATETIME".equalsIgnoreCase(mysqlType) ? new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") : new SimpleDateFormat("yyyy-MM-dd");
return String.format("\"%s\"",df.format(dataObj.getDate(columnKey)));
}
return dataObj.getString(columnKey);
}
public static void main(String[] args) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("id",1);
jsonObject.put("name","Nana");
jsonObject.put("age",26);
jsonObject.put("salary",20000);
jsonObject.put("date1","2023-10-01");
jsonObject.put("date2","2023-10-02 11:11:00");
JSONObject mysqlType = new JSONObject();
mysqlType.put("id","int");
mysqlType.put("name","varchar");
mysqlType.put("age","bigint");
mysqlType.put("salary","double");
mysqlType.put("date1","date");
mysqlType.put("date2","datetime");
mysqlType.put("relation",null);
String table = "test";
String s= "ff8940af-c080-40cc-9d83-8c7dc8b86ed4";
System.out.println(s.length());
String s1 = "hello string sss";
System.out.println(StrUtil.subBefore(s1,"string",true));
System.out.println(tranferInsertSql(table,jsonObject,mysqlType));
}
}
package com.dsk.flink.dsc.common.sink;
import cn.hutool.core.date.DateUtil;
import com.alibaba.druid.pool.DruidDataSource;
import com.dsk.flink.dsc.utils.EnvProperties;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.concurrent.*;
public class MysqlDataTransferSink extends RichSinkFunction<String> {
static Logger logger = LoggerFactory.getLogger(MysqlDataTransferSink.class);
EnvProperties envProps;
ExecutorService executorService;
DruidDataSource dataSource;
public MysqlDataTransferSink(EnvProperties envProps) {
this.envProps = envProps;
}
@Override
public void open(Configuration parameters) throws Exception {
executorService = new ThreadPoolExecutor(4, 4, 20, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
//初始化获取配置
String configTidbUrl = String.format(envProps.getDb_url(), envProps.getDb_host(), envProps.getDb_port(), envProps.getDb_database());
dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUsername(envProps.getDb_username());
dataSource.setPassword(envProps.getDb_password());
dataSource.setUrl(configTidbUrl);
dataSource.setMaxActive(30);
dataSource.setInitialSize(10);
dataSource.setTestWhileIdle(true);
dataSource.setMaxWait(20000);
dataSource.setValidationQuery("select 1");
}
@Override
public void close() throws Exception {
executorService.shutdown();
dataSource.close();
}
@Override
public void invoke(String value, Context context) throws Exception {
executorService.execute(() -> {
try {
executeSql(value);
}catch (Exception e){
e.printStackTrace();
}
});
}
private void executeSql(String sql) throws Exception{
Connection connection = null;
PreparedStatement pt = null;
try {
connection = dataSource.getConnection();
pt = connection.prepareStatement(sql);
pt.execute();
} catch (Exception e) {
e.printStackTrace();
logger.error("------错误时间:{}-----,sql:{}--------异常:{}", DateUtil.now(),sql,e.getMessage());
} finally {
if (pt != null) {
pt.close();
}
if (connection != null) {
connection.close();
}
}
}
}
package com.dsk.flink.dsc.sync;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.dsk.flink.dsc.common.function.AsyncMysqlDataTransferFunction;
import com.dsk.flink.dsc.common.sink.MysqlDataTransferSink;
import com.dsk.flink.dsc.utils.EnvProperties;
import com.dsk.flink.dsc.utils.EnvPropertiesUtil;
import com.dsk.flink.dsc.utils.EtlUtils;
import io.tidb.bigdata.tidb.JdbcConnectionProviderFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.concurrent.TimeUnit;
/**
* @author shezaixing
* @date 2023/12/5 14:44
* @description 同步至客户目标数据源任务
*
*/
@Slf4j
public class SyncCustomerDataSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 30000));
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000);
env.getCheckpointConfig().setCheckpointTimeout(7200000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
//获取用户自己的配置信息
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String propertiesPath = parameterTool.get("propertiesPath");
EnvProperties envProps = EnvPropertiesUtil.getPropertiesFromArgsPath(propertiesPath);
envProps.put("providerImpl", JdbcConnectionProviderFactory.HikariDataSourceJdbcConnectionProvider.class.getName());
System.out.println("读取到的配置文件:-> " + envProps.toString());
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(envProps.getKafka_topic(), new SimpleStringSchema(), EtlUtils.getKafkaConfig(envProps.getKafka_brokers(), envProps.getKafka_topic().concat("-group")));
kafkaConsumer.setStartFromEarliest();
SingleOutputStreamOperator<String> kafkaSource = env.addSource(kafkaConsumer)
.setParallelism(1)
.name("kafka-source")
.uid("kafka-source");
//kafkaSource.print("kafaka stream");
SingleOutputStreamOperator<JSONObject> canalJsonStream = kafkaSource.map(JSONObject::parseObject)
.name("canalJsonStream")
.uid("canalJsonStream");
canalJsonStream.print("canal stream");
SingleOutputStreamOperator<String> sqlResultStream = AsyncDataStream.orderedWait(canalJsonStream, new AsyncMysqlDataTransferFunction(envProps), 1200L, TimeUnit.SECONDS, 20)
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return StrUtil.isNotBlank(value) && !"err".equals(value);
}
})
.name("sqlResultStream")
.uid("sqlResultStream");
sqlResultStream.print("sql result");
sqlResultStream.addSink(new MysqlDataTransferSink(envProps)).name("sqlSinkStream").uid("sqlSinkStream");
env.execute();
}
}
This diff is collapsed.
package com.dsk.flink.dsc.utils;
import cn.hutool.core.util.StrUtil;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.jar.JarFile;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
/**
* @Description:
* @author zhaowei 2022年9月29日
*/
public class EnvPropertiesUtil {
private static String SYSTEM_ENV = "dev";
private static EnvProperties properties = new EnvProperties();
static {
SYSTEM_ENV = System.getProperty("profiles.active") == null ? SYSTEM_ENV : System.getProperty("profiles.active");
System.out.println("Env: " + SYSTEM_ENV + " properties");
URI uri = null;
try {
uri = EnvPropertiesUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI();
} catch (URISyntaxException e1) {
e1.printStackTrace();
}
if (uri.toString().endsWith(".jar")) {
// jar包内
String jarPath = uri.toString();
uri = URI.create(jarPath.substring(jarPath.indexOf("file:"),jarPath.indexOf(".jar") + 4));
try {
allPropertiesJar(uri, properties);
}catch (Exception e) {
e.printStackTrace();
}
} else {
//本地
try {
allPropertiesLocal(uri, properties);
}catch (Exception e) {
e.printStackTrace();
}
}
}
/**
*
* @Description: 获取默认配置
* @param key
* @return
*/
public static EnvProperties getProperties() {
return (EnvProperties) properties.clone();
}
/**
*
* @Description: 获取配置值
* @param key
* @return
*/
public static String getProperty(String key) {
return properties.getProperty(key);
}
/**
* 本地环境读资源
* @throws IOException
*/
private static void allPropertiesLocal(URI uri, Properties properties) throws IOException {
File resources = new File(uri.getPath());
if (!resources.exists() || !resources.isDirectory()) {
return ;
}
File[] propertiesFiles = resources.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
if(!name.endsWith(".properties")) return false;
if(name.matches(getApplicationPropertiesRegx()) && !name.equals(getApplicationPropertiesName())) {
return false;
}
return true;
}
});
if(propertiesFiles == null || propertiesFiles.length == 0) {
return ;
}
for(File file : propertiesFiles) {
properties.load(new FileInputStream(file));
}
}
/**
* Jar环境读资源
* @throws IOException
*/
private static void allPropertiesJar(URI uri, Properties properties) throws IOException {
List<Map.Entry<ZipEntry, InputStream>> collect = readJarFile(new JarFile(uri.getPath())).collect(Collectors.toList());
if(collect == null || collect.isEmpty()) {
return;
}
for (Map.Entry<ZipEntry, InputStream> entry : collect) {
// 文件相对路径
String key = entry.getKey().getName();
// 文件流
InputStream stream = entry.getValue();
properties.load(stream);
}
}
private static Stream<Map.Entry<ZipEntry, InputStream>> readJarFile(JarFile jarFile) {
Stream<Map.Entry<ZipEntry, InputStream>> readingStream = jarFile.stream()
.filter(entry -> {
if(entry.getName().matches(getApplicationPropertiesRegx()) && !entry.getName().equals(getApplicationPropertiesName())) {
return false;
}
return !entry.isDirectory() && entry.getName().endsWith(".properties");
})
.map(entry -> {
try {
return new AbstractMap.SimpleEntry<>(entry, jarFile.getInputStream(entry));
} catch (IOException e) {
return new AbstractMap.SimpleEntry<>(entry, null);
}
});
return readingStream.onClose(() -> {
try {
jarFile.close();
} catch (IOException e) {
e.printStackTrace();
}
});
}
public static EnvProperties getPropertiesFromArgsPath(String filePath) throws IOException {
EnvProperties envProperties = new EnvProperties();
if(StrUtil.isBlank(filePath)){
filePath = System.getProperties().getProperty("os.name").contains("Windows") ? "D:\\Env\\application_pro.properties" : "/home/module/flink-job/application.properties";
}
File file = new File(filePath);
if (!file.exists()) {
return new EnvProperties();
}
envProperties.load(Files.newInputStream(file.toPath()));
//System.out.println("获取到的kafka_topic为:"+envProperties.getKafka_topic());
return envProperties;
}
private static String getApplicationPropertiesName() {
return "application_"+SYSTEM_ENV+".properties";
}
private static String getApplicationPropertiesRegx() {
return "application_[a-z]{3,5}\\.properties";
}
public static void main(String[] args) throws IOException {
String filePath = "D:\\Env\\application_pro.properties";
System.out.println(getPropertiesFromArgsPath(filePath).toString());
EnvProperties envProperties = getPropertiesFromArgsPath(filePath);
System.out.println(envProperties.getDb_database());
}
}
package com.dsk.flink.dsc.utils;
import java.util.Properties;
public class EtlUtils {
public static Properties getKafkaConfig(String url, String groupId) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", url);
properties.setProperty("group.id", groupId);
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("auto.offset.reset", "latest");
//properties.setProperty("max.poll.interval.ms", "604800000");
//properties.setProperty("session.timeout.ms", "20160000");
//properties.setProperty("heartbeat.interval.ms", "6720000");
//properties.setProperty("max.partition.fetch.bytes", "349525");
//properties.setProperty("max.poll.records", "50");
/*properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"dsk\" password=\"LU1SRhTzoxssRoCp\";");
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "PLAIN");*/
properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"dsk_st\" password=\"vIfVv6esjpwU2jT\";");
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
return properties;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment