Commit 9663144d authored by gaogao110's avatar gaogao110

update Phoenix connector

parent adc02868
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.connector.phoenix.internal.JdbcBatchingOutputFormat;
import org.apache.flink.connector.phoenix.internal.connection.JdbcConnectionProvider;
import org.apache.flink.connector.phoenix.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.phoenix.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Function;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.setRecordToStatement;
/**
* OutputFormat to write Rows into a JDBC database. The OutputFormat has to be configured using the
* supplied OutputFormatBuilder.
*/
@Experimental
public class JdbcOutputFormat
extends JdbcBatchingOutputFormat<Row, Row, JdbcBatchStatementExecutor<Row>> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(JdbcOutputFormat.class);
private JdbcOutputFormat(
JdbcConnectionProvider connectionProvider,
String sql,
int[] typesArray,
int batchSize) {
super(
connectionProvider,
new JdbcExecutionOptions.Builder().withBatchSize(batchSize).build(),
ctx -> createRowExecutor(sql, typesArray, ctx),
RecordExtractor.identity());
}
private static JdbcBatchStatementExecutor<Row> createRowExecutor(
String sql, int[] typesArray, RuntimeContext ctx) {
JdbcStatementBuilder<Row> statementBuilder =
(st, record) -> setRecordToStatement(st, typesArray, record);
return JdbcBatchStatementExecutor.simple(
sql,
statementBuilder,
ctx.getExecutionConfig().isObjectReuseEnabled() ? Row::copy : Function.identity());
}
public static JdbcOutputFormatBuilder buildJdbcOutputFormat() {
return new JdbcOutputFormatBuilder();
}
/** Builder for {@link JdbcOutputFormat}. */
public static class JdbcOutputFormatBuilder {
private String username;
private String password;
private String drivername;
private String dbURL;
private String query;
private int batchSize = JdbcExecutionOptions.DEFAULT_SIZE;
private int[] typesArray;
private JdbcOutputFormatBuilder() {}
public JdbcOutputFormatBuilder setUsername(String username) {
this.username = username;
return this;
}
public JdbcOutputFormatBuilder setPassword(String password) {
this.password = password;
return this;
}
public JdbcOutputFormatBuilder setDrivername(String drivername) {
this.drivername = drivername;
return this;
}
public JdbcOutputFormatBuilder setDBUrl(String dbURL) {
this.dbURL = dbURL;
return this;
}
public JdbcOutputFormatBuilder setQuery(String query) {
this.query = query;
return this;
}
public JdbcOutputFormatBuilder setBatchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}
public JdbcOutputFormatBuilder setSqlTypes(int[] typesArray) {
this.typesArray = typesArray;
return this;
}
/**
* Finalizes the configuration and checks validity.
*
* @return Configured JdbcOutputFormat
*/
public JdbcOutputFormat finish() {
return new JdbcOutputFormat(
new SimpleJdbcConnectionProvider(buildConnectionOptions()),
query,
typesArray,
batchSize);
}
public JdbcConnectionOptions buildConnectionOptions() {
if (this.username == null) {
LOG.info("Username was not supplied.");
}
if (this.password == null) {
LOG.info("Password was not supplied.");
}
return new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(dbURL)
.withDriverName(drivername)
.withUsername(username)
.withPassword(password)
.build();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.phoenix.internal.GenericJdbcSinkFunction;
import org.apache.flink.connector.phoenix.internal.JdbcBatchingOutputFormat;
import org.apache.flink.connector.phoenix.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.phoenix.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Preconditions;
import javax.sql.XADataSource;
import java.util.function.Function;
/** Facade to create JDBC {@link SinkFunction sinks}. */
@PublicEvolving
public class JdbcSink {
/**
* Create a JDBC sink with the default {@link JdbcExecutionOptions}.
*
* @see #sink(String, JdbcStatementBuilder, JdbcExecutionOptions, JdbcConnectionOptions)
*/
public static <T> SinkFunction<T> sink(
String sql,
JdbcStatementBuilder<T> statementBuilder,
JdbcConnectionOptions connectionOptions) {
return sink(sql, statementBuilder, JdbcExecutionOptions.defaults(), connectionOptions);
}
/**
* Create a JDBC sink.
*
* <p>Note: the objects passed to the return sink can be processed in batch and retried.
* Therefore, objects can not be {@link
* org.apache.flink.api.common.ExecutionConfig#enableObjectReuse() reused}.
*
* @param sql arbitrary DML query (e.g. insert, update, upsert)
* @param statementBuilder sets parameters on {@link java.sql.PreparedStatement} according to
* the query
* @param <T> type of data in {@link
* org.apache.flink.streaming.runtime.streamrecord.StreamRecord StreamRecord}.
* @param executionOptions parameters of execution, such as batch size and maximum retries
* @param connectionOptions parameters of connection, such as JDBC URL
*/
public static <T> SinkFunction<T> sink(
String sql,
JdbcStatementBuilder<T> statementBuilder,
JdbcExecutionOptions executionOptions,
JdbcConnectionOptions connectionOptions) {
return new GenericJdbcSinkFunction<>(
new JdbcBatchingOutputFormat<>(
new SimpleJdbcConnectionProvider(connectionOptions),
executionOptions,
context -> {
Preconditions.checkState(
!context.getExecutionConfig().isObjectReuseEnabled(),
"objects can not be reused with JDBC sink function");
return JdbcBatchStatementExecutor.simple(
sql, statementBuilder, Function.identity());
},
JdbcBatchingOutputFormat.RecordExtractor.identity()));
}
/**
* Create JDBC sink which provides exactly-once guarantee.
*
* <p>Note: the objects passed to the return sink can be processed in batch and retried.
* Therefore, objects can not be {@link
* org.apache.flink.api.common.ExecutionConfig#enableObjectReuse() reused}.
*
* @param sql arbitrary DML query (e.g. insert, update, upsert)
* @param statementBuilder sets parameters on {@link java.sql.PreparedStatement} according to
* the query
* @param <T> type of data in {@link
* org.apache.flink.streaming.runtime.streamrecord.StreamRecord StreamRecord}.
* @param executionOptions parameters of execution, such as batch size and maximum retries
* @param exactlyOnceOptions exactly-once options
* @param dataSourceSupplier supplies the {@link XADataSource}
*/
/*
public static <T> SinkFunction<T> exactlyOnceSink(
String sql,
JdbcStatementBuilder<T> statementBuilder,
JdbcExecutionOptions executionOptions,
JdbcExactlyOnceOptions exactlyOnceOptions,
SerializableSupplier<XADataSource> dataSourceSupplier) {
return new JdbcXaSinkFunction<>(
sql,
statementBuilder,
XaFacade.fromXaDataSourceSupplier(
dataSourceSupplier,
exactlyOnceOptions.getTimeoutSec(),
exactlyOnceOptions.isTransactionPerConnection()),
executionOptions,
exactlyOnceOptions);
}
*/
private JdbcSink() {}
}
...@@ -93,11 +93,11 @@ import java.util.Arrays; ...@@ -93,11 +93,11 @@ import java.util.Arrays;
* @see DriverManager * @see DriverManager
*/ */
@Experimental @Experimental
public class JdbcInputFormat extends RichInputFormat<Row, InputSplit> public class PhoenixInputFormat extends RichInputFormat<Row, InputSplit>
implements ResultTypeQueryable<Row> { implements ResultTypeQueryable<Row> {
protected static final long serialVersionUID = 2L; protected static final long serialVersionUID = 2L;
protected static final Logger LOG = LoggerFactory.getLogger(JdbcInputFormat.class); protected static final Logger LOG = LoggerFactory.getLogger(PhoenixInputFormat.class);
protected JdbcConnectionProvider connectionProvider; protected JdbcConnectionProvider connectionProvider;
protected String queryTemplate; protected String queryTemplate;
...@@ -118,7 +118,7 @@ public class JdbcInputFormat extends RichInputFormat<Row, InputSplit> ...@@ -118,7 +118,7 @@ public class JdbcInputFormat extends RichInputFormat<Row, InputSplit>
protected boolean hasNext; protected boolean hasNext;
protected Object[][] parameterValues; protected Object[][] parameterValues;
public JdbcInputFormat() { public PhoenixInputFormat() {
} }
@Override @Override
...@@ -339,73 +339,73 @@ public class JdbcInputFormat extends RichInputFormat<Row, InputSplit> ...@@ -339,73 +339,73 @@ public class JdbcInputFormat extends RichInputFormat<Row, InputSplit>
* *
* @return builder * @return builder
*/ */
public static JdbcInputFormatBuilder buildJdbcInputFormat() { public static PhoenixInputFormatBuilder buildJdbcInputFormat() {
return new JdbcInputFormatBuilder(); return new PhoenixInputFormatBuilder();
} }
/** /**
* Builder for {@link JdbcInputFormat}. * Builder for {@link PhoenixInputFormat}.
*/ */
public static class JdbcInputFormatBuilder { public static class PhoenixInputFormatBuilder {
private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder; private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;
private final JdbcInputFormat format; private final PhoenixInputFormat format;
public JdbcInputFormatBuilder() { public PhoenixInputFormatBuilder() {
//this.connOptionsBuilder = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder(); //this.connOptionsBuilder = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder();
this.connOptionsBuilder = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder(); this.connOptionsBuilder = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder();
this.format = new JdbcInputFormat(); this.format = new PhoenixInputFormat();
// using TYPE_FORWARD_ONLY for high performance reads // using TYPE_FORWARD_ONLY for high performance reads
this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY; this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY;
this.format.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY; this.format.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
} }
public JdbcInputFormatBuilder setUsername(String username) { public PhoenixInputFormatBuilder setUsername(String username) {
connOptionsBuilder.withUsername(username); connOptionsBuilder.withUsername(username);
return this; return this;
} }
public JdbcInputFormatBuilder setPassword(String password) { public PhoenixInputFormatBuilder setPassword(String password) {
connOptionsBuilder.withPassword(password); connOptionsBuilder.withPassword(password);
return this; return this;
} }
public JdbcInputFormatBuilder setDrivername(String drivername) { public PhoenixInputFormatBuilder setDrivername(String drivername) {
connOptionsBuilder.withDriverName(drivername); connOptionsBuilder.withDriverName(drivername);
return this; return this;
} }
public JdbcInputFormatBuilder setDBUrl(String dbURL) { public PhoenixInputFormatBuilder setDBUrl(String dbURL) {
connOptionsBuilder.withUrl(dbURL); connOptionsBuilder.withUrl(dbURL);
return this; return this;
} }
public JdbcInputFormatBuilder setQuery(String query) { public PhoenixInputFormatBuilder setQuery(String query) {
format.queryTemplate = query; format.queryTemplate = query;
return this; return this;
} }
public JdbcInputFormatBuilder setResultSetType(int resultSetType) { public PhoenixInputFormatBuilder setResultSetType(int resultSetType) {
format.resultSetType = resultSetType; format.resultSetType = resultSetType;
return this; return this;
} }
public JdbcInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency) { public PhoenixInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency) {
format.resultSetConcurrency = resultSetConcurrency; format.resultSetConcurrency = resultSetConcurrency;
return this; return this;
} }
public JdbcInputFormatBuilder setParametersProvider( public PhoenixInputFormatBuilder setParametersProvider(
JdbcParameterValuesProvider parameterValuesProvider) { JdbcParameterValuesProvider parameterValuesProvider) {
format.parameterValues = parameterValuesProvider.getParameterValues(); format.parameterValues = parameterValuesProvider.getParameterValues();
return this; return this;
} }
public JdbcInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo) { public PhoenixInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo) {
format.rowTypeInfo = rowTypeInfo; format.rowTypeInfo = rowTypeInfo;
return this; return this;
} }
public JdbcInputFormatBuilder setFetchSize(int fetchSize) { public PhoenixInputFormatBuilder setFetchSize(int fetchSize) {
Preconditions.checkArgument( Preconditions.checkArgument(
fetchSize == Integer.MIN_VALUE || fetchSize > 0, fetchSize == Integer.MIN_VALUE || fetchSize > 0,
"Illegal value %s for fetchSize, has to be positive or Integer.MIN_VALUE.", "Illegal value %s for fetchSize, has to be positive or Integer.MIN_VALUE.",
...@@ -414,23 +414,23 @@ public class JdbcInputFormat extends RichInputFormat<Row, InputSplit> ...@@ -414,23 +414,23 @@ public class JdbcInputFormat extends RichInputFormat<Row, InputSplit>
return this; return this;
} }
public JdbcInputFormatBuilder setAutoCommit(Boolean autoCommit) { public PhoenixInputFormatBuilder setAutoCommit(Boolean autoCommit) {
format.autoCommit = autoCommit; format.autoCommit = autoCommit;
return this; return this;
} }
public JdbcInputFormatBuilder setNamespaceMappingEnabled(Boolean namespaceMappingEnabled) { public PhoenixInputFormatBuilder setNamespaceMappingEnabled(Boolean namespaceMappingEnabled) {
format.namespaceMappingEnabled = namespaceMappingEnabled; format.namespaceMappingEnabled = namespaceMappingEnabled;
return this; return this;
} }
public JdbcInputFormatBuilder setMapSystemTablesEnabled(Boolean mapSystemTablesEnabled) { public PhoenixInputFormatBuilder setMapSystemTablesEnabled(Boolean mapSystemTablesEnabled) {
format.mapSystemTablesEnabled = mapSystemTablesEnabled; format.mapSystemTablesEnabled = mapSystemTablesEnabled;
return this; return this;
} }
public JdbcInputFormat finish() { public PhoenixInputFormat finish() {
format.connectionProvider = format.connectionProvider =
//new SimpleJdbcConnectionProvider(connOptionsBuilder.build()); //new SimpleJdbcConnectionProvider(connOptionsBuilder.build());
new PhoneixJdbcConnectionProvider(connOptionsBuilder.build(), format.namespaceMappingEnabled, format.namespaceMappingEnabled); new PhoneixJdbcConnectionProvider(connOptionsBuilder.build(), format.namespaceMappingEnabled, format.namespaceMappingEnabled);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.phoenix.internal.connection.PhoneixJdbcConnectionProvider;
import org.apache.flink.connector.phoenix.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.phoenix.internal.options.JdbcOptions;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatementImpl;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import org.apache.flink.types.Row;
import static org.apache.flink.connector.phoenix.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A JDBC outputFormat that supports batching records before writing records to database.
*/
@Internal
public class PhoenixSinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>>
implements CheckpointedFunction {
private static final Logger logger = LoggerFactory.getLogger(PhoenixSinkFunction.class);
private static Connection connection = null;
private PreparedStatement psUp = null;
private static int batchcount = 0;
private static int totalcount = 0;
private final JdbcOptions jdbcOptions;
private final PhoneixJdbcConnectionProvider connectionProvider;
private String[] fieldNames;
private String[] keyFields;
private int[] fieldTypes;
public PhoenixSinkFunction(JdbcOptions jdbcOptions, PhoneixJdbcConnectionProvider connectionProvider,String[] fieldNames,String[] keyFields,int[] fieldTypes) {
super();
this.jdbcOptions = jdbcOptions;
this.connectionProvider = connectionProvider;
this.fieldNames = fieldNames;
this.keyFields = keyFields;
this.fieldTypes = fieldTypes;
}
@Override
public void open(Configuration parameters) throws Exception {
logger.info("打开连接!!!");
try {
connection = connectionProvider.getOrEstablishConnection();
} catch (Exception e) {
throw new IOException("unable to open JDBC writer", e);
}
checkNotNull(jdbcOptions, "No options supplied.");
checkNotNull(fieldNames, "No fieldNames supplied.");
JdbcDmlOptions dml =
JdbcDmlOptions.builder()
.withTableName(jdbcOptions.getTableName())
.withDialect(jdbcOptions.getDialect())
.withFieldNames(fieldNames)
.withKeyFields(keyFields)
.withFieldTypes(fieldTypes)
.build();
String sql =
FieldNamedPreparedStatementImpl.parseNamedStatement(
jdbcOptions.getDialect()
.getInsertIntoStatement(
dml.getTableName(), dml.getFieldNames()),
new HashMap<>());
psUp = connection.prepareStatement(sql);
logger.info("创建prepareStatement!!! sql: "+sql);
}
@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
setRecordToStatement(psUp, fieldTypes, value.f1);
psUp.executeUpdate();
batchcount++;
if (batchcount == 1000) {
connection.commit();
batchcount = 0;
}
}
@Override
public void close() throws Exception {
logger.info("关闭连接!!!");
connection.commit();
if (psUp != null ) {
try {
psUp.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
}
@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.internal.connection;
import org.apache.flink.connector.phoenix.JdbcConnectionOptions;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Enumeration;
import java.util.Properties;
/** Simple JDBC connection provider. */
@NotThreadSafe
public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Serializable {
private static final Logger LOG = LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class);
private static final long serialVersionUID = 1L;
private final JdbcConnectionOptions jdbcOptions;
private transient Driver loadedDriver;
private transient Connection connection;
static {
// Load DriverManager first to avoid deadlock between DriverManager's
// static initialization block and specific driver class's static
// initialization block when two different driver classes are loading
// concurrently using Class.forName while DriverManager is uninitialized
// before.
//
// This could happen in JDK 8 but not above as driver loading has been
// moved out of DriverManager's static initialization block since JDK 9.
DriverManager.getDrivers();
}
public SimpleJdbcConnectionProvider(JdbcConnectionOptions jdbcOptions) {
this.jdbcOptions = jdbcOptions;
}
@Override
public Connection getConnection() {
return connection;
}
@Override
public boolean isConnectionValid() throws SQLException {
return connection != null
&& connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
}
private static Driver loadDriver(String driverName)
throws SQLException, ClassNotFoundException {
Preconditions.checkNotNull(driverName);
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
if (driver.getClass().getName().equals(driverName)) {
return driver;
}
}
// We could reach here for reasons:
// * Class loader hell of DriverManager(see JDK-8146872).
// * driver is not installed as a service provider.
Class<?> clazz =
Class.forName(driverName, true, Thread.currentThread().getContextClassLoader());
try {
return (Driver) clazz.newInstance();
} catch (Exception ex) {
throw new SQLException("Fail to create driver of class " + driverName, ex);
}
}
private Driver getLoadedDriver() throws SQLException, ClassNotFoundException {
if (loadedDriver == null) {
loadedDriver = loadDriver(jdbcOptions.getDriverName());
}
return loadedDriver;
}
@Override
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
if (connection != null) {
return connection;
}
if (jdbcOptions.getDriverName() == null) {
connection =
DriverManager.getConnection(
jdbcOptions.getDbURL(),
jdbcOptions.getUsername().orElse(null),
jdbcOptions.getPassword().orElse(null));
} else {
Driver driver = getLoadedDriver();
Properties info = new Properties();
jdbcOptions.getUsername().ifPresent(user -> info.setProperty("user", user));
jdbcOptions.getPassword().ifPresent(password -> info.setProperty("password", password));
connection = driver.connect(jdbcOptions.getDbURL(), info);
if (connection == null) {
// Throw same exception as DriverManager.getConnection when no driver found to match
// caller expectation.
throw new SQLException(
"No suitable driver found for " + jdbcOptions.getDbURL(), "08001");
}
}
return connection;
}
@Override
public void closeConnection() {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
LOG.warn("JDBC connection close failed.", e);
} finally {
connection = null;
}
}
}
@Override
public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
closeConnection();
return getOrEstablishConnection();
}
}
...@@ -20,6 +20,7 @@ package org.apache.flink.connector.phoenix.internal.executor; ...@@ -20,6 +20,7 @@ package org.apache.flink.connector.phoenix.internal.executor;
import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.phoenix.JdbcStatementBuilder; import org.apache.flink.connector.phoenix.JdbcStatementBuilder;
import org.apache.flink.connector.phoenix.table.PhoenixUpsertTableSink;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -41,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; ...@@ -41,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* and inserting otherwise. Used in Table API. * and inserting otherwise. Used in Table API.
* *
* @deprecated This has been replaced with {@link TableInsertOrUpdateStatementExecutor}, will remove * @deprecated This has been replaced with {@link TableInsertOrUpdateStatementExecutor}, will remove
* this once {@link org.apache.flink.connector.phoenix.table.JdbcUpsertTableSink} is removed. * this once {@link PhoenixUpsertTableSink} is removed.
*/ */
@Internal @Internal
public final class InsertOrUpdateJdbcExecutor<R, K, V> implements JdbcBatchStatementExecutor<R> { public final class InsertOrUpdateJdbcExecutor<R, K, V> implements JdbcBatchStatementExecutor<R> {
...@@ -103,10 +104,8 @@ public final class InsertOrUpdateJdbcExecutor<R, K, V> implements JdbcBatchState ...@@ -103,10 +104,8 @@ public final class InsertOrUpdateJdbcExecutor<R, K, V> implements JdbcBatchState
for (Map.Entry<K, V> entry : batch.entrySet()) { for (Map.Entry<K, V> entry : batch.entrySet()) {
processOneRowInBatch(entry.getKey(), entry.getValue()); processOneRowInBatch(entry.getKey(), entry.getValue());
} }
//updateStatement.executeBatch();
//insertStatement.executeBatch();
//batch.clear();
conn.commit(); conn.commit();
batch.clear();
} }
} }
...@@ -114,11 +113,8 @@ public final class InsertOrUpdateJdbcExecutor<R, K, V> implements JdbcBatchState ...@@ -114,11 +113,8 @@ public final class InsertOrUpdateJdbcExecutor<R, K, V> implements JdbcBatchState
if (exist(pk)) { if (exist(pk)) {
updateSetter.accept(updateStatement, row); updateSetter.accept(updateStatement, row);
updateStatement.executeUpdate(); updateStatement.executeUpdate();
//updateStatement.addBatch();
} else { } else {
insertSetter.accept(insertStatement, row); insertSetter.accept(insertStatement, row);
// insertStatement.addBatch();
insertStatement.executeUpdate(); insertStatement.executeUpdate();
} }
} }
......
...@@ -72,12 +72,11 @@ class KeyedBatchStatementExecutor<T, K> implements JdbcBatchStatementExecutor<T> ...@@ -72,12 +72,11 @@ class KeyedBatchStatementExecutor<T, K> implements JdbcBatchStatementExecutor<T>
if (!batch.isEmpty()) { if (!batch.isEmpty()) {
for (K entry : batch) { for (K entry : batch) {
parameterSetter.accept(st, entry); parameterSetter.accept(st, entry);
//st.addBatch();
st.executeUpdate(); st.executeUpdate();
} }
//st.executeBatch(); LOG.info("connection commit datasize:" + batch.size());
//batch.clear();
conn.commit(); conn.commit();
batch.clear();
} }
} }
......
...@@ -60,7 +60,6 @@ class SimpleBatchStatementExecutor<T, V> implements JdbcBatchStatementExecutor<T ...@@ -60,7 +60,6 @@ class SimpleBatchStatementExecutor<T, V> implements JdbcBatchStatementExecutor<T
@Override @Override
public void addToBatch(T record) { public void addToBatch(T record) {
LOG.info("添加数据:" + record.toString());
batch.add(valueTransformer.apply(record)); batch.add(valueTransformer.apply(record));
} }
...@@ -69,12 +68,9 @@ class SimpleBatchStatementExecutor<T, V> implements JdbcBatchStatementExecutor<T ...@@ -69,12 +68,9 @@ class SimpleBatchStatementExecutor<T, V> implements JdbcBatchStatementExecutor<T
if (!batch.isEmpty()) { if (!batch.isEmpty()) {
for (V r : batch) { for (V r : batch) {
parameterSetter.accept(st, r); parameterSetter.accept(st, r);
//st.addBatch();
st.executeUpdate(); st.executeUpdate();
} }
//st.executeBatch(); LOG.info("connection commit dataSize:" + batch.size());
LOG.info("提交数据:" +batch.size() );
connection.commit(); connection.commit();
batch.clear(); batch.clear();
} }
......
...@@ -85,12 +85,10 @@ public final class TableInsertOrUpdateStatementExecutor ...@@ -85,12 +85,10 @@ public final class TableInsertOrUpdateStatementExecutor
private void processOneRowInBatch(RowData pk, RowData row) throws SQLException { private void processOneRowInBatch(RowData pk, RowData row) throws SQLException {
if (exist(pk)) { if (exist(pk)) {
updateSetter.toExternal(row, updateStatement); updateSetter.toExternal(row, updateStatement);
//updateStatement.addBatch(); updateStatement.addBatch();
updateStatement.executeBatch();
} else { } else {
insertSetter.toExternal(row, insertStatement); insertSetter.toExternal(row, insertStatement);
//insertStatement.addBatch(); insertStatement.addBatch();
insertStatement.executeBatch();
} }
} }
...@@ -103,9 +101,6 @@ public final class TableInsertOrUpdateStatementExecutor ...@@ -103,9 +101,6 @@ public final class TableInsertOrUpdateStatementExecutor
@Override @Override
public void executeBatch(Connection conn) throws SQLException { public void executeBatch(Connection conn) throws SQLException {
//updateStatement.executeBatch();
//insertStatement.executeBatch();
conn.commit(); conn.commit();
} }
......
...@@ -19,13 +19,13 @@ ...@@ -19,13 +19,13 @@
package org.apache.flink.connector.phoenix.split; package org.apache.flink.connector.phoenix.split;
import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.Experimental;
import org.apache.flink.connector.phoenix.JdbcInputFormat; import org.apache.flink.connector.phoenix.PhoenixInputFormat;
import java.io.Serializable; import java.io.Serializable;
/** /**
* This splits generator actually does nothing but wrapping the query parameters computed by the * This splits generator actually does nothing but wrapping the query parameters computed by the
* user before creating the {@link JdbcInputFormat} instance. * user before creating the {@link PhoenixInputFormat} instance.
*/ */
@Experimental @Experimental
public class JdbcGenericParameterValuesProvider implements JdbcParameterValuesProvider { public class JdbcGenericParameterValuesProvider implements JdbcParameterValuesProvider {
......
...@@ -19,12 +19,12 @@ ...@@ -19,12 +19,12 @@
package org.apache.flink.connector.phoenix.split; package org.apache.flink.connector.phoenix.split;
import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.Experimental;
import org.apache.flink.connector.phoenix.JdbcInputFormat; import org.apache.flink.connector.phoenix.PhoenixInputFormat;
import java.io.Serializable; import java.io.Serializable;
/** /**
* This interface is used by the {@link JdbcInputFormat} to compute the list of parallel query to * This interface is used by the {@link PhoenixInputFormat} to compute the list of parallel query to
* run (i.e. splits). Each query will be parameterized using a row of the matrix provided by each * run (i.e. splits). Each query will be parameterized using a row of the matrix provided by each
* {@link JdbcParameterValuesProvider} implementation. * {@link JdbcParameterValuesProvider} implementation.
*/ */
......
...@@ -101,6 +101,15 @@ public interface FieldNamedPreparedStatement extends AutoCloseable { ...@@ -101,6 +101,15 @@ public interface FieldNamedPreparedStatement extends AutoCloseable {
*/ */
int[] executeBatch() throws SQLException; int[] executeBatch() throws SQLException;
/**
* Phoenix add Batch method
*
* @see PreparedStatement#executeBatch()
*/
void executeUpdate() throws SQLException;
/** /**
* Sets the designated parameter to SQL <code>NULL</code>. * Sets the designated parameter to SQL <code>NULL</code>.
* *
......
...@@ -51,7 +51,12 @@ public class FieldNamedPreparedStatementImpl implements FieldNamedPreparedStatem ...@@ -51,7 +51,12 @@ public class FieldNamedPreparedStatementImpl implements FieldNamedPreparedStatem
@Override @Override
public void addBatch() throws SQLException { public void addBatch() throws SQLException {
statement.addBatch(); statement.executeUpdate();
}
@Override
public void executeUpdate() throws SQLException {
statement.executeUpdate();
} }
@Override @Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.phoenix.JdbcExecutionOptions;
import org.apache.flink.connector.phoenix.internal.GenericJdbcSinkFunction;
import org.apache.flink.connector.phoenix.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.phoenix.internal.options.JdbcOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkState;
/** A {@link DynamicTableSink} for JDBC. */
@Internal
public class JdbcDynamicTableSink implements DynamicTableSink {
private final JdbcOptions jdbcOptions;
private final JdbcExecutionOptions executionOptions;
private final JdbcDmlOptions dmlOptions;
private final TableSchema tableSchema;
private final String dialectName;
public JdbcDynamicTableSink(
JdbcOptions jdbcOptions,
JdbcExecutionOptions executionOptions,
JdbcDmlOptions dmlOptions,
TableSchema tableSchema) {
this.jdbcOptions = jdbcOptions;
this.executionOptions = executionOptions;
this.dmlOptions = dmlOptions;
this.tableSchema = tableSchema;
this.dialectName = dmlOptions.getDialect().dialectName();
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
validatePrimaryKey(requestedMode);
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.DELETE)
.addContainedKind(RowKind.UPDATE_AFTER)
.build();
}
private void validatePrimaryKey(ChangelogMode requestedMode) {
checkState(
ChangelogMode.insertOnly().equals(requestedMode)
|| dmlOptions.getKeyFields().isPresent(),
"please declare primary key for sink table when query contains update/delete record.");
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final TypeInformation<RowData> rowDataTypeInformation =
context.createTypeInformation(tableSchema.toRowDataType());
final JdbcDynamicOutputFormatBuilder builder = new JdbcDynamicOutputFormatBuilder();
builder.setJdbcOptions(jdbcOptions);
builder.setJdbcDmlOptions(dmlOptions);
builder.setJdbcExecutionOptions(executionOptions);
builder.setRowDataTypeInfo(rowDataTypeInformation);
builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
return SinkFunctionProvider.of(
new GenericJdbcSinkFunction<>(builder.build()), jdbcOptions.getParallelism());
}
@Override
public DynamicTableSink copy() {
return new JdbcDynamicTableSink(jdbcOptions, executionOptions, dmlOptions, tableSchema);
}
@Override
public String asSummaryString() {
return "JDBC:" + dialectName;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof JdbcDynamicTableSink)) {
return false;
}
JdbcDynamicTableSink that = (JdbcDynamicTableSink) o;
return Objects.equals(jdbcOptions, that.jdbcOptions)
&& Objects.equals(executionOptions, that.executionOptions)
&& Objects.equals(dmlOptions, that.dmlOptions)
&& Objects.equals(tableSchema, that.tableSchema)
&& Objects.equals(dialectName, that.dialectName);
}
@Override
public int hashCode() {
return Objects.hash(jdbcOptions, executionOptions, dmlOptions, tableSchema, dialectName);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
import org.apache.flink.connector.phoenix.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.phoenix.internal.options.JdbcOptions;
import org.apache.flink.connector.phoenix.internal.options.JdbcReadOptions;
import org.apache.flink.connector.phoenix.split.JdbcNumericBetweenParametersProvider;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
import java.util.Objects;
/** A {@link DynamicTableSource} for JDBC. */
@Internal
public class JdbcDynamicTableSource
implements ScanTableSource,
LookupTableSource,
SupportsProjectionPushDown,
SupportsLimitPushDown {
private final JdbcOptions options;
private final JdbcReadOptions readOptions;
private final JdbcLookupOptions lookupOptions;
private TableSchema physicalSchema;
private final String dialectName;
private long limit = -1;
public JdbcDynamicTableSource(
JdbcOptions options,
JdbcReadOptions readOptions,
JdbcLookupOptions lookupOptions,
TableSchema physicalSchema) {
this.options = options;
this.readOptions = readOptions;
this.lookupOptions = lookupOptions;
this.physicalSchema = physicalSchema;
this.dialectName = options.getDialect().dialectName();
}
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
// JDBC only support non-nested look up keys
String[] keyNames = new String[context.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
int[] innerKeyArr = context.getKeys()[i];
Preconditions.checkArgument(
innerKeyArr.length == 1, "JDBC only support non-nested look up keys");
keyNames[i] = physicalSchema.getFieldNames()[innerKeyArr[0]];
}
final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
return TableFunctionProvider.of(
new JdbcRowDataLookupFunction(
options,
lookupOptions,
physicalSchema.getFieldNames(),
physicalSchema.getFieldDataTypes(),
keyNames,
rowType));
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
final JdbcRowDataInputFormat.Builder builder =
JdbcRowDataInputFormat.builder()
.setDrivername(options.getDriverName())
.setDBUrl(options.getDbURL())
.setUsername(options.getUsername().orElse(null))
.setPassword(options.getPassword().orElse(null))
.setAutoCommit(readOptions.getAutoCommit());
if (readOptions.getFetchSize() != 0) {
builder.setFetchSize(readOptions.getFetchSize());
}
final JdbcDialect dialect = options.getDialect();
String query =
dialect.getSelectFromStatement(
options.getTableName(), physicalSchema.getFieldNames(), new String[0]);
if (readOptions.getPartitionColumnName().isPresent()) {
long lowerBound = readOptions.getPartitionLowerBound().get();
long upperBound = readOptions.getPartitionUpperBound().get();
int numPartitions = readOptions.getNumPartitions().get();
builder.setParametersProvider(
new JdbcNumericBetweenParametersProvider(lowerBound, upperBound)
.ofBatchNum(numPartitions));
query +=
" WHERE "
+ dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
+ " BETWEEN ? AND ?";
}
if (limit >= 0) {
query = String.format("%s %s", query, dialect.getLimitClause(limit));
}
builder.setQuery(query);
final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
builder.setRowConverter(dialect.getRowConverter(rowType));
builder.setRowDataTypeInfo(
runtimeProviderContext.createTypeInformation(physicalSchema.toRowDataType()));
return InputFormatProvider.of(builder.build());
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
@Override
public boolean supportsNestedProjection() {
// JDBC doesn't support nested projection
return false;
}
@Override
public void applyProjection(int[][] projectedFields) {
this.physicalSchema = TableSchemaUtils.projectSchema(physicalSchema, projectedFields);
}
@Override
public DynamicTableSource copy() {
return new JdbcDynamicTableSource(options, readOptions, lookupOptions, physicalSchema);
}
@Override
public String asSummaryString() {
return "JDBC:" + dialectName;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof JdbcDynamicTableSource)) {
return false;
}
JdbcDynamicTableSource that = (JdbcDynamicTableSource) o;
return Objects.equals(options, that.options)
&& Objects.equals(readOptions, that.readOptions)
&& Objects.equals(lookupOptions, that.lookupOptions)
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(dialectName, that.dialectName)
&& Objects.equals(limit, that.limit);
}
@Override
public int hashCode() {
return Objects.hash(
options, readOptions, lookupOptions, physicalSchema, dialectName, limit);
}
@Override
public void applyLimit(long limit) {
this.limit = limit;
}
}
...@@ -8,11 +8,12 @@ import org.apache.flink.connector.phoenix.internal.options.PhoenixJdbcOptions; ...@@ -8,11 +8,12 @@ import org.apache.flink.connector.phoenix.internal.options.PhoenixJdbcOptions;
import org.apache.flink.connector.phoenix.split.JdbcNumericBetweenParametersProvider; import org.apache.flink.connector.phoenix.split.JdbcNumericBetweenParametersProvider;
import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.*;
import org.apache.flink.table.connector.source.InputFormatProvider; import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
import java.util.Objects; import java.util.Objects;
...@@ -22,7 +23,8 @@ import java.util.Objects; ...@@ -22,7 +23,8 @@ import java.util.Objects;
* @author gy * @author gy
* @since 2022/3/17 10:40 * @since 2022/3/17 10:40
**/ **/
public class PhoenixDynamicTableSource implements ScanTableSource { public class PhoenixDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown,
SupportsLimitPushDown {
private final PhoenixJdbcOptions options; private final PhoenixJdbcOptions options;
private final JdbcReadOptions readOptions; private final JdbcReadOptions readOptions;
...@@ -40,6 +42,28 @@ public class PhoenixDynamicTableSource implements ScanTableSource { ...@@ -40,6 +42,28 @@ public class PhoenixDynamicTableSource implements ScanTableSource {
} }
@Override
public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext context) {
// JDBC only support non-nested look up keys
String[] keyNames = new String[context.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
int[] innerKeyArr = context.getKeys()[i];
Preconditions.checkArgument(
innerKeyArr.length == 1, "JDBC only support non-nested look up keys");
keyNames[i] = physicalSchema.getFieldNames()[innerKeyArr[0]];
}
final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
return TableFunctionProvider.of(
new PhoenixRowDataLookupFunction(
options,
lookupOptions,
physicalSchema.getFieldNames(),
physicalSchema.getFieldDataTypes(),
keyNames,
rowType));
}
@Override @Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
PhoenixJdbcRowDataInputFormat.Builder builder = PhoenixJdbcRowDataInputFormat.builder() PhoenixJdbcRowDataInputFormat.Builder builder = PhoenixJdbcRowDataInputFormat.builder()
...@@ -82,11 +106,11 @@ public class PhoenixDynamicTableSource implements ScanTableSource { ...@@ -82,11 +106,11 @@ public class PhoenixDynamicTableSource implements ScanTableSource {
public ChangelogMode getChangelogMode() { public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly(); return ChangelogMode.insertOnly();
} }
@Override
public boolean supportsNestedProjection() { public boolean supportsNestedProjection() {
return false; return false;
} }
@Override
public void applyProjection(int[][] projectedFields) { public void applyProjection(int[][] projectedFields) {
this.physicalSchema = TableSchemaUtils.projectSchema(this.physicalSchema, projectedFields); this.physicalSchema = TableSchemaUtils.projectSchema(this.physicalSchema, projectedFields);
} }
......
...@@ -22,19 +22,17 @@ import org.apache.flink.annotation.VisibleForTesting; ...@@ -22,19 +22,17 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.phoenix.internal.connection.JdbcConnectionProvider; import org.apache.flink.connector.phoenix.internal.connection.JdbcConnectionProvider;
import org.apache.flink.connector.phoenix.internal.connection.SimpleJdbcConnectionProvider; import org.apache.flink.connector.phoenix.internal.connection.PhoneixJdbcConnectionProvider;
import org.apache.flink.connector.phoenix.internal.options.JdbcLookupOptions; import org.apache.flink.connector.phoenix.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.phoenix.internal.options.JdbcOptions; import org.apache.flink.connector.phoenix.internal.options.JdbcOptions;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatementImpl; import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatementImpl;
import org.apache.flink.connector.phoenix.utils.JdbcTypeUtil; import org.apache.flink.connector.phoenix.utils.JdbcTypeUtil;
import org.apache.flink.connector.phoenix.utils.JdbcUtils; import org.apache.flink.connector.phoenix.utils.JdbcUtils;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -63,9 +61,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; ...@@ -63,9 +61,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* <p>Support cache the result to avoid frequent accessing to remote databases. 1.The cacheMaxSize * <p>Support cache the result to avoid frequent accessing to remote databases. 1.The cacheMaxSize
* is -1 means not use cache. 2.For real-time data, you need to set the TTL of cache. * is -1 means not use cache. 2.For real-time data, you need to set the TTL of cache.
*/ */
public class JdbcLookupFunction extends TableFunction<Row> { public class PhoenixLookupFunction extends TableFunction<Row> {
private static final Logger LOG = LoggerFactory.getLogger(JdbcLookupFunction.class); private static final Logger LOG = LoggerFactory.getLogger(PhoenixLookupFunction.class);
private static final long serialVersionUID = 2L; private static final long serialVersionUID = 2L;
private final String query; private final String query;
...@@ -83,13 +81,13 @@ public class JdbcLookupFunction extends TableFunction<Row> { ...@@ -83,13 +81,13 @@ public class JdbcLookupFunction extends TableFunction<Row> {
private transient PreparedStatement statement; private transient PreparedStatement statement;
private transient Cache<Row, List<Row>> cache; private transient Cache<Row, List<Row>> cache;
public JdbcLookupFunction( public PhoenixLookupFunction(
JdbcOptions options, JdbcOptions options,
JdbcLookupOptions lookupOptions, JdbcLookupOptions lookupOptions,
String[] fieldNames, String[] fieldNames,
TypeInformation[] fieldTypes, TypeInformation[] fieldTypes,
String[] keyNames) { String[] keyNames) {
this.connectionProvider = new SimpleJdbcConnectionProvider(options); this.connectionProvider = new PhoneixJdbcConnectionProvider(options);
this.fieldNames = fieldNames; this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes; this.fieldTypes = fieldTypes;
this.keyNames = keyNames; this.keyNames = keyNames;
...@@ -255,7 +253,7 @@ public class JdbcLookupFunction extends TableFunction<Row> { ...@@ -255,7 +253,7 @@ public class JdbcLookupFunction extends TableFunction<Row> {
return keyTypes; return keyTypes;
} }
/** Builder for a {@link JdbcLookupFunction}. */ /** Builder for a {@link PhoenixLookupFunction}. */
public static class Builder { public static class Builder {
private JdbcOptions options; private JdbcOptions options;
private JdbcLookupOptions lookupOptions; private JdbcLookupOptions lookupOptions;
...@@ -298,7 +296,7 @@ public class JdbcLookupFunction extends TableFunction<Row> { ...@@ -298,7 +296,7 @@ public class JdbcLookupFunction extends TableFunction<Row> {
* *
* @return Configured JdbcLookupFunction * @return Configured JdbcLookupFunction
*/ */
public JdbcLookupFunction build() { public PhoenixLookupFunction build() {
checkNotNull(options, "No JdbcOptions supplied."); checkNotNull(options, "No JdbcOptions supplied.");
if (lookupOptions == null) { if (lookupOptions == null) {
lookupOptions = JdbcLookupOptions.builder().build(); lookupOptions = JdbcLookupOptions.builder().build();
...@@ -307,7 +305,7 @@ public class JdbcLookupFunction extends TableFunction<Row> { ...@@ -307,7 +305,7 @@ public class JdbcLookupFunction extends TableFunction<Row> {
checkNotNull(fieldTypes, "No fieldTypes supplied."); checkNotNull(fieldTypes, "No fieldTypes supplied.");
checkNotNull(keyNames, "No keyNames supplied."); checkNotNull(keyNames, "No keyNames supplied.");
return new JdbcLookupFunction(options, lookupOptions, fieldNames, fieldTypes, keyNames); return new PhoenixLookupFunction(options, lookupOptions, fieldNames, fieldTypes, keyNames);
} }
} }
} }
...@@ -23,11 +23,13 @@ import org.apache.flink.annotation.VisibleForTesting; ...@@ -23,11 +23,13 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.phoenix.dialect.JdbcDialect; import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
import org.apache.flink.connector.phoenix.dialect.JdbcDialects; import org.apache.flink.connector.phoenix.dialect.JdbcDialects;
import org.apache.flink.connector.phoenix.internal.connection.JdbcConnectionProvider; import org.apache.flink.connector.phoenix.internal.connection.JdbcConnectionProvider;
import org.apache.flink.connector.phoenix.internal.connection.SimpleJdbcConnectionProvider; import org.apache.flink.connector.phoenix.internal.connection.PhoneixJdbcConnectionProvider;
import org.apache.flink.connector.phoenix.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.phoenix.internal.options.JdbcLookupOptions; import org.apache.flink.connector.phoenix.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.phoenix.internal.options.JdbcOptions; import org.apache.flink.connector.phoenix.internal.options.PhoenixJdbcOptions;
import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatement; import org.apache.flink.connector.phoenix.statement.FieldNamedPreparedStatement;
import org.apache.flink.connector.phoenix.internal.converter.JdbcRowConverter; import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.FunctionContext;
...@@ -35,10 +37,6 @@ import org.apache.flink.table.functions.TableFunction; ...@@ -35,10 +37,6 @@ import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -56,9 +54,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; ...@@ -56,9 +54,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/** A lookup function for {@link JdbcDynamicTableSource}. */ /** A lookup function for {@link JdbcDynamicTableSource}. */
@Internal @Internal
public class JdbcRowDataLookupFunction extends TableFunction<RowData> { public class PhoenixRowDataLookupFunction extends TableFunction<RowData> {
private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataLookupFunction.class); private static final Logger LOG = LoggerFactory.getLogger(PhoenixRowDataLookupFunction.class);
private static final long serialVersionUID = 2L; private static final long serialVersionUID = 2L;
private final String query; private final String query;
...@@ -75,8 +73,8 @@ public class JdbcRowDataLookupFunction extends TableFunction<RowData> { ...@@ -75,8 +73,8 @@ public class JdbcRowDataLookupFunction extends TableFunction<RowData> {
private transient FieldNamedPreparedStatement statement; private transient FieldNamedPreparedStatement statement;
private transient Cache<RowData, List<RowData>> cache; private transient Cache<RowData, List<RowData>> cache;
public JdbcRowDataLookupFunction( public PhoenixRowDataLookupFunction(
JdbcOptions options, PhoenixJdbcOptions options,
JdbcLookupOptions lookupOptions, JdbcLookupOptions lookupOptions,
String[] fieldNames, String[] fieldNames,
DataType[] fieldTypes, DataType[] fieldTypes,
...@@ -86,7 +84,7 @@ public class JdbcRowDataLookupFunction extends TableFunction<RowData> { ...@@ -86,7 +84,7 @@ public class JdbcRowDataLookupFunction extends TableFunction<RowData> {
checkNotNull(fieldNames, "No fieldNames supplied."); checkNotNull(fieldNames, "No fieldNames supplied.");
checkNotNull(fieldTypes, "No fieldTypes supplied."); checkNotNull(fieldTypes, "No fieldTypes supplied.");
checkNotNull(keyNames, "No keyNames supplied."); checkNotNull(keyNames, "No keyNames supplied.");
this.connectionProvider = new SimpleJdbcConnectionProvider(options); this.connectionProvider = new PhoneixJdbcConnectionProvider(options,options.getNamespaceMappingEnabled(),options.getMapSystemTablesToNamespace());
this.keyNames = keyNames; this.keyNames = keyNames;
List<String> nameList = Arrays.asList(fieldNames); List<String> nameList = Arrays.asList(fieldNames);
this.keyTypes = this.keyTypes =
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
package org.apache.flink.connector.phoenix.table; package org.apache.flink.connector.phoenix.table;
import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.phoenix.JdbcInputFormat; import org.apache.flink.connector.phoenix.PhoenixInputFormat;
import org.apache.flink.connector.phoenix.dialect.JdbcDialect; import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
import org.apache.flink.connector.phoenix.internal.options.JdbcLookupOptions; import org.apache.flink.connector.phoenix.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.phoenix.internal.options.JdbcOptions; import org.apache.flink.connector.phoenix.internal.options.JdbcOptions;
...@@ -48,7 +48,7 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToL ...@@ -48,7 +48,7 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToL
import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;
/** {@link TableSource} for JDBC. */ /** {@link TableSource} for JDBC. */
public class JdbcTableSource public class PhoenixTableSource
implements StreamTableSource<Row>, ProjectableTableSource<Row>, LookupableTableSource<Row> { implements StreamTableSource<Row>, ProjectableTableSource<Row>, LookupableTableSource<Row> {
private final JdbcOptions options; private final JdbcOptions options;
...@@ -60,7 +60,7 @@ public class JdbcTableSource ...@@ -60,7 +60,7 @@ public class JdbcTableSource
private final int[] selectFields; private final int[] selectFields;
private final DataType producedDataType; private final DataType producedDataType;
private JdbcTableSource( private PhoenixTableSource(
JdbcOptions options, JdbcOptions options,
JdbcReadOptions readOptions, JdbcReadOptions readOptions,
JdbcLookupOptions lookupOptions, JdbcLookupOptions lookupOptions,
...@@ -68,7 +68,7 @@ public class JdbcTableSource ...@@ -68,7 +68,7 @@ public class JdbcTableSource
this(options, readOptions, lookupOptions, schema, null); this(options, readOptions, lookupOptions, schema, null);
} }
private JdbcTableSource( private PhoenixTableSource(
JdbcOptions options, JdbcOptions options,
JdbcReadOptions readOptions, JdbcReadOptions readOptions,
JdbcLookupOptions lookupOptions, JdbcLookupOptions lookupOptions,
...@@ -112,7 +112,7 @@ public class JdbcTableSource ...@@ -112,7 +112,7 @@ public class JdbcTableSource
@Override @Override
public TableFunction<Row> getLookupFunction(String[] lookupKeys) { public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType); final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType);
return JdbcLookupFunction.builder() return PhoenixLookupFunction.builder()
.setOptions(options) .setOptions(options)
.setLookupOptions(lookupOptions) .setLookupOptions(lookupOptions)
.setFieldTypes(rowTypeInfo.getFieldTypes()) .setFieldTypes(rowTypeInfo.getFieldTypes())
...@@ -128,7 +128,7 @@ public class JdbcTableSource ...@@ -128,7 +128,7 @@ public class JdbcTableSource
@Override @Override
public TableSource<Row> projectFields(int[] fields) { public TableSource<Row> projectFields(int[] fields) {
return new JdbcTableSource(options, readOptions, lookupOptions, schema, fields); return new PhoenixTableSource(options, readOptions, lookupOptions, schema, fields);
} }
@Override @Override
...@@ -156,10 +156,10 @@ public class JdbcTableSource ...@@ -156,10 +156,10 @@ public class JdbcTableSource
return new Builder(); return new Builder();
} }
private JdbcInputFormat getInputFormat() { private PhoenixInputFormat getInputFormat() {
final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType); final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType);
JdbcInputFormat.JdbcInputFormatBuilder builder = PhoenixInputFormat.PhoenixInputFormatBuilder builder =
JdbcInputFormat.buildJdbcInputFormat() PhoenixInputFormat.buildJdbcInputFormat()
.setDrivername(options.getDriverName()) .setDrivername(options.getDriverName())
.setDBUrl(options.getDbURL()) .setDBUrl(options.getDbURL())
.setRowTypeInfo( .setRowTypeInfo(
...@@ -210,8 +210,8 @@ public class JdbcTableSource ...@@ -210,8 +210,8 @@ public class JdbcTableSource
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (o instanceof JdbcTableSource) { if (o instanceof PhoenixTableSource) {
JdbcTableSource source = (JdbcTableSource) o; PhoenixTableSource source = (PhoenixTableSource) o;
return Objects.equals(options, source.options) return Objects.equals(options, source.options)
&& Objects.equals(readOptions, source.readOptions) && Objects.equals(readOptions, source.readOptions)
&& Objects.equals(lookupOptions, source.lookupOptions) && Objects.equals(lookupOptions, source.lookupOptions)
...@@ -222,7 +222,7 @@ public class JdbcTableSource ...@@ -222,7 +222,7 @@ public class JdbcTableSource
} }
} }
/** Builder for a {@link JdbcTableSource}. */ /** Builder for a {@link PhoenixTableSource}. */
public static class Builder { public static class Builder {
private JdbcOptions options; private JdbcOptions options;
...@@ -265,7 +265,7 @@ public class JdbcTableSource ...@@ -265,7 +265,7 @@ public class JdbcTableSource
* *
* @return Configured JdbcTableSource * @return Configured JdbcTableSource
*/ */
public JdbcTableSource build() { public PhoenixTableSource build() {
checkNotNull(options, "No options supplied."); checkNotNull(options, "No options supplied.");
checkNotNull(schema, "No schema supplied."); checkNotNull(schema, "No schema supplied.");
if (readOptions == null) { if (readOptions == null) {
...@@ -274,7 +274,7 @@ public class JdbcTableSource ...@@ -274,7 +274,7 @@ public class JdbcTableSource
if (lookupOptions == null) { if (lookupOptions == null) {
lookupOptions = JdbcLookupOptions.builder().build(); lookupOptions = JdbcLookupOptions.builder().build();
} }
return new JdbcTableSource(options, readOptions, lookupOptions, schema); return new PhoenixTableSource(options, readOptions, lookupOptions, schema);
} }
} }
} }
...@@ -125,7 +125,7 @@ public class PhoenixTableSourceSinkFactory ...@@ -125,7 +125,7 @@ public class PhoenixTableSourceSinkFactory
TableSchema schema = TableSchema schema =
TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(SCHEMA)); TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(SCHEMA));
return JdbcTableSource.builder() return PhoenixTableSource.builder()
.setOptions(getJdbcOptions(descriptorProperties)) .setOptions(getJdbcOptions(descriptorProperties))
.setReadOptions(getJdbcReadOptions(descriptorProperties)) .setReadOptions(getJdbcReadOptions(descriptorProperties))
.setLookupOptions(getJdbcLookupOptions(descriptorProperties)) .setLookupOptions(getJdbcLookupOptions(descriptorProperties))
...@@ -140,8 +140,8 @@ public class PhoenixTableSourceSinkFactory ...@@ -140,8 +140,8 @@ public class PhoenixTableSourceSinkFactory
TableSchema schema = TableSchema schema =
TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(SCHEMA)); TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(SCHEMA));
final JdbcUpsertTableSink.Builder builder = final PhoenixUpsertTableSink.Builder builder =
JdbcUpsertTableSink.builder() PhoenixUpsertTableSink.builder()
.setOptions(getJdbcOptions(descriptorProperties)) .setOptions(getJdbcOptions(descriptorProperties))
.setTableSchema(schema); .setTableSchema(schema);
......
...@@ -27,8 +27,6 @@ import org.apache.flink.connector.phoenix.JdbcExecutionOptions; ...@@ -27,8 +27,6 @@ import org.apache.flink.connector.phoenix.JdbcExecutionOptions;
import org.apache.flink.connector.phoenix.internal.AbstractJdbcOutputFormat; import org.apache.flink.connector.phoenix.internal.AbstractJdbcOutputFormat;
import org.apache.flink.connector.phoenix.internal.GenericJdbcSinkFunction; import org.apache.flink.connector.phoenix.internal.GenericJdbcSinkFunction;
import org.apache.flink.connector.phoenix.internal.JdbcBatchingOutputFormat; import org.apache.flink.connector.phoenix.internal.JdbcBatchingOutputFormat;
import org.apache.flink.connector.phoenix.internal.PhoenixSinkFunction;
import org.apache.flink.connector.phoenix.internal.connection.PhoneixJdbcConnectionProvider;
import org.apache.flink.connector.phoenix.internal.executor.JdbcBatchStatementExecutor; import org.apache.flink.connector.phoenix.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.phoenix.internal.options.JdbcOptions; import org.apache.flink.connector.phoenix.internal.options.JdbcOptions;
import org.apache.flink.connector.phoenix.utils.JdbcTypeUtil; import org.apache.flink.connector.phoenix.utils.JdbcTypeUtil;
...@@ -48,7 +46,7 @@ import java.util.Objects; ...@@ -48,7 +46,7 @@ import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;
/** An upsert {@link UpsertStreamTableSink} for JDBC. */ /** An upsert {@link UpsertStreamTableSink} for JDBC. */
public class JdbcUpsertTableSink implements UpsertStreamTableSink<Row> { public class PhoenixUpsertTableSink implements UpsertStreamTableSink<Row> {
private final TableSchema schema; private final TableSchema schema;
private final JdbcOptions options; private final JdbcOptions options;
...@@ -59,7 +57,7 @@ public class JdbcUpsertTableSink implements UpsertStreamTableSink<Row> { ...@@ -59,7 +57,7 @@ public class JdbcUpsertTableSink implements UpsertStreamTableSink<Row> {
private String[] keyFields; private String[] keyFields;
private boolean isAppendOnly; private boolean isAppendOnly;
private JdbcUpsertTableSink( private PhoenixUpsertTableSink(
TableSchema schema, TableSchema schema,
JdbcOptions options, JdbcOptions options,
int flushMaxSize, int flushMaxSize,
...@@ -97,11 +95,6 @@ public class JdbcUpsertTableSink implements UpsertStreamTableSink<Row> { ...@@ -97,11 +95,6 @@ public class JdbcUpsertTableSink implements UpsertStreamTableSink<Row> {
@Override @Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) { public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
// sql types
int[] jdbcSqlTypes =
Arrays.stream(schema.getFieldTypes())
.mapToInt(JdbcTypeUtil::typeInformationToSqlType)
.toArray();
return dataStream return dataStream
.addSink(new GenericJdbcSinkFunction<>(newFormat())) .addSink(new GenericJdbcSinkFunction<>(newFormat()))
...@@ -166,8 +159,8 @@ public class JdbcUpsertTableSink implements UpsertStreamTableSink<Row> { ...@@ -166,8 +159,8 @@ public class JdbcUpsertTableSink implements UpsertStreamTableSink<Row> {
+ Arrays.toString(fieldTypes)); + Arrays.toString(fieldTypes));
} }
JdbcUpsertTableSink copy = PhoenixUpsertTableSink copy =
new JdbcUpsertTableSink( new PhoenixUpsertTableSink(
schema, options, flushMaxSize, flushIntervalMills, maxRetryTime); schema, options, flushMaxSize, flushIntervalMills, maxRetryTime);
copy.keyFields = keyFields; copy.keyFields = keyFields;
return copy; return copy;
...@@ -179,8 +172,8 @@ public class JdbcUpsertTableSink implements UpsertStreamTableSink<Row> { ...@@ -179,8 +172,8 @@ public class JdbcUpsertTableSink implements UpsertStreamTableSink<Row> {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (o instanceof JdbcUpsertTableSink) { if (o instanceof PhoenixUpsertTableSink) {
JdbcUpsertTableSink sink = (JdbcUpsertTableSink) o; PhoenixUpsertTableSink sink = (PhoenixUpsertTableSink) o;
return Objects.equals(schema, sink.schema) return Objects.equals(schema, sink.schema)
&& Objects.equals(options, sink.options) && Objects.equals(options, sink.options)
&& Objects.equals(flushMaxSize, sink.flushMaxSize) && Objects.equals(flushMaxSize, sink.flushMaxSize)
...@@ -193,7 +186,7 @@ public class JdbcUpsertTableSink implements UpsertStreamTableSink<Row> { ...@@ -193,7 +186,7 @@ public class JdbcUpsertTableSink implements UpsertStreamTableSink<Row> {
} }
} }
/** Builder for a {@link JdbcUpsertTableSink}. */ /** Builder for a {@link PhoenixUpsertTableSink}. */
public static class Builder { public static class Builder {
protected TableSchema schema; protected TableSchema schema;
private JdbcOptions options; private JdbcOptions options;
...@@ -234,10 +227,10 @@ public class JdbcUpsertTableSink implements UpsertStreamTableSink<Row> { ...@@ -234,10 +227,10 @@ public class JdbcUpsertTableSink implements UpsertStreamTableSink<Row> {
return this; return this;
} }
public JdbcUpsertTableSink build() { public PhoenixUpsertTableSink build() {
checkNotNull(schema, "No schema supplied."); checkNotNull(schema, "No schema supplied.");
checkNotNull(options, "No options supplied."); checkNotNull(options, "No options supplied.");
return new JdbcUpsertTableSink( return new PhoenixUpsertTableSink(
schema, options, flushMaxSize, flushIntervalMills, maxRetryTimes); schema, options, flushMaxSize, flushIntervalMills, maxRetryTimes);
} }
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.xa;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
import javax.annotation.concurrent.ThreadSafe;
import javax.transaction.xa.Xid;
import java.util.Objects;
/**
* A pair of checkpoint id and {@link Xid} representing a checkpoint and an associated pending
* (prepared) XA transaction. Thread-safe (assuming immutable {@link Xid} implementation).
*/
@ThreadSafe
@Internal
public final class CheckpointAndXid {
final long checkpointId;
final Xid xid;
final int attempts;
final boolean restored;
public Xid getXid() {
return xid;
}
private CheckpointAndXid(long checkpointId, Xid xid, int attempts, boolean restored) {
this.checkpointId = checkpointId;
this.xid = Preconditions.checkNotNull(xid);
this.attempts = attempts;
this.restored = restored;
}
@Override
public String toString() {
return String.format("checkpointId=%d, xid=%s, restored=%s", checkpointId, xid, restored);
}
CheckpointAndXid asRestored() {
return restored ? this : new CheckpointAndXid(checkpointId, xid, attempts, true);
}
static CheckpointAndXid createRestored(long checkpointId, int attempts, Xid xid) {
return new CheckpointAndXid(checkpointId, xid, attempts, true);
}
static CheckpointAndXid createNew(long checkpointId, Xid xid) {
return new CheckpointAndXid(checkpointId, xid, 0, false);
}
CheckpointAndXid withAttemptsIncremented() {
return new CheckpointAndXid(checkpointId, xid, attempts + 1, restored);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof CheckpointAndXid)) {
return false;
}
CheckpointAndXid that = (CheckpointAndXid) o;
return checkpointId == that.checkpointId
&& attempts == that.attempts
&& restored == that.restored
&& Objects.equals(xid, that.xid);
}
@Override
public int hashCode() {
return Objects.hash(checkpointId, xid, attempts, restored);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.xa;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import javax.transaction.xa.Xid;
import java.io.IOException;
import java.util.Objects;
/** {@link CheckpointAndXid} serializer. */
@Internal
public final class CheckpointAndXidSerializer extends TypeSerializer<CheckpointAndXid> {
private static final long serialVersionUID = 1L;
public static final TypeSerializerSnapshot<CheckpointAndXid> SNAPSHOT =
new CheckpointAndXidSimpleTypeSerializerSnapshot();
private final TypeSerializer<Xid> xidSerializer = new XidSerializer();
@Override
public boolean isImmutableType() {
return xidSerializer.isImmutableType();
}
@Override
public TypeSerializer<CheckpointAndXid> duplicate() {
return this;
}
@Override
public CheckpointAndXid createInstance() {
return CheckpointAndXid.createRestored(0L, 0, xidSerializer.createInstance());
}
@Override
public CheckpointAndXid copy(CheckpointAndXid from) {
return CheckpointAndXid.createRestored(
from.checkpointId, from.attempts, xidSerializer.copy(from.xid));
}
@Override
public CheckpointAndXid copy(CheckpointAndXid from, CheckpointAndXid reuse) {
return from;
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(CheckpointAndXid record, DataOutputView target) throws IOException {
target.writeLong(record.checkpointId);
target.writeInt(record.attempts);
xidSerializer.serialize(record.xid, target);
}
@Override
public CheckpointAndXid deserialize(DataInputView source) throws IOException {
return CheckpointAndXid.createRestored(
source.readLong(), source.readInt(), xidSerializer.deserialize(source));
}
@Override
public CheckpointAndXid deserialize(CheckpointAndXid reuse, DataInputView source)
throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
serialize(deserialize(source), target);
}
@Override
public boolean equals(Object o) {
return o instanceof CheckpointAndXidSerializer;
}
@Override
public int hashCode() {
return Objects.hash(xidSerializer);
}
@Override
public TypeSerializerSnapshot<CheckpointAndXid> snapshotConfiguration() {
return SNAPSHOT;
}
/** SImple {@link TypeSerializerSnapshot} for {@link CheckpointAndXidSerializer}. */
public static class CheckpointAndXidSimpleTypeSerializerSnapshot
extends SimpleTypeSerializerSnapshot<CheckpointAndXid> {
private static final int VERSION = 1;
public CheckpointAndXidSimpleTypeSerializerSnapshot() {
super(CheckpointAndXidSerializer::new);
}
@Override
public void writeSnapshot(DataOutputView out) throws IOException {
super.writeSnapshot(out);
out.writeInt(VERSION);
}
@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader)
throws IOException {
super.readSnapshot(readVersion, in, classLoader);
in.readInt();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.xa;
import javax.annotation.concurrent.ThreadSafe;
import javax.transaction.xa.Xid;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import static java.util.Collections.unmodifiableCollection;
/** Thread-safe (assuming immutable {@link Xid} implementation). */
@ThreadSafe
class JdbcXaSinkFunctionState {
private final Collection<CheckpointAndXid> prepared;
private final Collection<Xid> hanging;
static JdbcXaSinkFunctionState empty() {
return new JdbcXaSinkFunctionState(Collections.emptyList(), Collections.emptyList());
}
static JdbcXaSinkFunctionState of(
Collection<CheckpointAndXid> prepared, Collection<Xid> hanging) {
return new JdbcXaSinkFunctionState(
unmodifiableCollection(new ArrayList<>(prepared)),
unmodifiableCollection(new ArrayList<>(hanging)));
}
private JdbcXaSinkFunctionState(
Collection<CheckpointAndXid> prepared, Collection<Xid> hanging) {
this.prepared = prepared;
this.hanging = hanging;
}
/**
* @return immutable collection of prepared XA transactions to {@link
* javax.transaction.xa.XAResource#commit commit}.
*/
public Collection<CheckpointAndXid> getPrepared() {
return prepared;
}
/**
* @return immutable collection of XA transactions to {@link
* javax.transaction.xa.XAResource#rollback rollback} (if they were prepared) or {@link
* javax.transaction.xa.XAResource#end end} (if they were only started).
*/
Collection<Xid> getHanging() {
return hanging;
}
@Override
public String toString() {
return "prepared=" + prepared + ", hanging=" + hanging;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.xa;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.RuntimeContext;
import javax.transaction.xa.Xid;
import java.security.SecureRandom;
import java.util.Arrays;
/**
* Generates {@link Xid} from:
*
* <ol>
* <li>To provide uniqueness over other jobs and apps, and other instances
* <li>of this job, gtrid consists of
* <li>job id (16 bytes)
* <li>subtask index (4 bytes)
* <li>checkpoint id (4 bytes)
* <li>bqual consists of 4 random bytes (generated using {@link SecureRandom})
* </ol>
*
* <p>Each {@link SemanticXidGenerator} instance MUST be used for only one Sink (otherwise Xids will
* collide).
*/
@Internal
class SemanticXidGenerator implements XidGenerator {
private static final long serialVersionUID = 1L;
private static final SecureRandom SECURE_RANDOM = new SecureRandom();
private static final int FORMAT_ID = 201;
private transient byte[] gtridBuffer;
private transient byte[] bqualBuffer;
@Override
public void open() {
// globalTransactionId = job id + task index + checkpoint id
gtridBuffer = new byte[JobID.SIZE + Integer.BYTES + Long.BYTES];
// branchQualifier = random bytes
bqualBuffer = getRandomBytes(Integer.BYTES);
}
@Override
public Xid generateXid(RuntimeContext runtimeContext, long checkpointId) {
byte[] jobIdBytes = runtimeContext.getJobId().getBytes();
System.arraycopy(jobIdBytes, 0, gtridBuffer, 0, JobID.SIZE);
writeNumber(runtimeContext.getIndexOfThisSubtask(), Integer.BYTES, gtridBuffer, JobID.SIZE);
writeNumber(checkpointId, Long.BYTES, gtridBuffer, JobID.SIZE + Integer.BYTES);
// relying on arrays copying inside XidImpl constructor
return new XidImpl(FORMAT_ID, gtridBuffer, bqualBuffer);
}
@Override
public boolean belongsToSubtask(Xid xid, RuntimeContext ctx) {
if (xid.getFormatId() != FORMAT_ID) {
return false;
}
int subtaskIndex = readNumber(xid.getGlobalTransactionId(), JobID.SIZE, Integer.BYTES);
if (subtaskIndex != ctx.getIndexOfThisSubtask()
&& subtaskIndex <= ctx.getNumberOfParallelSubtasks() - 1) {
return false;
}
byte[] jobIdBytes = new byte[JobID.SIZE];
System.arraycopy(xid.getGlobalTransactionId(), 0, jobIdBytes, 0, JobID.SIZE);
return Arrays.equals(jobIdBytes, ctx.getJobId().getBytes());
}
private static int readNumber(byte[] bytes, int offset, int numBytes) {
int result = 0;
for (int i = 0; i < numBytes; i++) {
result |= (bytes[offset + i] & 0xff) << Byte.SIZE * i;
}
return result;
}
private static void writeNumber(long number, int numBytes, byte[] dst, int dstOffset) {
for (int i = dstOffset; i < dstOffset + numBytes; i++) {
dst[i] = (byte) number;
number >>>= Byte.SIZE;
}
}
private byte[] getRandomBytes(int size) {
byte[] bytes = new byte[size];
SECURE_RANDOM.nextBytes(bytes);
return bytes;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.xa;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.phoenix.internal.connection.JdbcConnectionProvider;
import org.apache.flink.util.FlinkRuntimeException;
import javax.sql.XADataSource;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import java.io.Serializable;
import java.util.Collection;
import java.util.function.Supplier;
/**
* Facade to the XA operations relevant to {@link
* org.apache.flink.streaming.api.functions.sink.SinkFunction sink}.
*
* <p>Typical workflow:
*
* <ol>
* <li>{@link #open}
* <li>{@link #start} transaction
* <li>{@link #getConnection}, write some data
* <li>{@link #endAndPrepare} (or {@link #failAndRollback})
* <li>{@link #commit} / {@link #rollback}
* <li>{@link #close}
* </ol>
*
* {@link #recover} can be used to get abandoned prepared transactions for cleanup.
*/
@Internal
public interface XaFacade extends JdbcConnectionProvider, Serializable, AutoCloseable {
/** @return a non-serializable instance. */
static XaFacade fromXaDataSourceSupplier(
Supplier<XADataSource> dataSourceSupplier,
Integer timeoutSec,
boolean transactionPerConnection) {
return transactionPerConnection
? new XaFacadePoolingImpl(() -> new XaFacadeImpl(dataSourceSupplier, timeoutSec))
: new XaFacadeImpl(dataSourceSupplier, timeoutSec);
}
void open() throws Exception;
boolean isOpen();
/** Start a new transaction. */
void start(Xid xid) throws Exception;
/** End and then prepare the transaction. Transaction can't be resumed afterwards. */
void endAndPrepare(Xid xid) throws Exception;
/**
* Commit previously prepared transaction.
*
* @param ignoreUnknown whether to ignore {@link XAException#XAER_NOTA
* XAER_NOTA} error.
*/
void commit(Xid xid, boolean ignoreUnknown) throws TransientXaException;
/** Rollback previously prepared transaction. */
void rollback(Xid xid) throws TransientXaException;
/**
* End transaction as {@link javax.transaction.xa.XAResource#TMFAIL failed}; in case of error,
* try to roll it back.
*/
void failAndRollback(Xid xid) throws TransientXaException;
/**
* Note: this can block on some non-MVCC databases if there are ended not prepared transactions.
*/
Collection<Xid> recover() throws TransientXaException;
/**
* Thrown by {@link XaFacade} when RM responds with {@link
* javax.transaction.xa.XAResource#XA_RDONLY XA_RDONLY} indicating that the transaction doesn't
* include any changes. When such a transaction is committed RM may return an error (usually,
* {@link XAException#XAER_NOTA XAER_NOTA}).
*/
class EmptyXaTransactionException extends FlinkRuntimeException {
private final Xid xid;
EmptyXaTransactionException(Xid xid) {
super("end response XA_RDONLY, xid: " + xid);
this.xid = xid;
}
public Xid getXid() {
return xid;
}
}
/**
* Indicates a transient or unknown failure from the resource manager (see {@link
* XAException#XA_RBTRANSIENT XA_RBTRANSIENT}, {@link XAException#XAER_RMFAIL XAER_RMFAIL}).
*/
class TransientXaException extends FlinkRuntimeException {
TransientXaException(XAException cause) {
super(cause);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.xa;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.function.ThrowingConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.transaction.xa.Xid;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.function.Supplier;
import static org.apache.flink.util.ExceptionUtils.rethrow;
import static org.apache.flink.util.Preconditions.checkState;
/**
* A "pooling" implementation of {@link XaFacade}. Some database implement XA such that one
* connection is limited to a single transaction. As a workaround, this implementation creates a new
* XA resource after each xa_start call is made (and associates it with the xid to commit later).
*/
@Internal
class XaFacadePoolingImpl implements XaFacade {
private static final long serialVersionUID = 1L;
public interface FacadeSupplier extends Serializable, Supplier<XaFacade> {}
private static final transient Logger LOG = LoggerFactory.getLogger(XaFacadePoolingImpl.class);
private final FacadeSupplier facadeSupplier;
private transient XaFacade active;
private transient Map<Xid, XaFacade> mappedToXids;
private transient Deque<XaFacade> pooled;
XaFacadePoolingImpl(FacadeSupplier facadeSupplier) {
this.facadeSupplier = facadeSupplier;
}
@Override
public void open() throws Exception {
checkState(active == null);
pooled = new LinkedList<>();
mappedToXids = new HashMap<>();
}
@Override
public boolean isOpen() {
return active != null && active.isOpen();
}
@Override
public void start(Xid xid) throws Exception {
checkState(active == null);
if (pooled.isEmpty()) {
active = facadeSupplier.get();
active.open();
} else {
active = pooled.poll();
}
active.start(xid);
mappedToXids.put(xid, active);
}
/**
* Must be called after {@link #start(Xid)} with the same {@link Xid}.
*
* @see XaFacade#endAndPrepare(Xid)
*/
@Override
public void endAndPrepare(Xid xid) throws Exception {
checkState(active == mappedToXids.get(xid));
try {
active.endAndPrepare(xid);
} finally {
active = null;
}
}
@Override
public void commit(Xid xid, boolean ignoreUnknown) throws TransientXaException {
runForXid(xid, facade -> facade.commit(xid, ignoreUnknown));
}
@Override
public void rollback(Xid xid) throws TransientXaException {
runForXid(xid, facade -> facade.rollback(xid));
}
@Override
public void failAndRollback(Xid xid) throws TransientXaException {
runForXid(xid, facade -> facade.failAndRollback(xid));
}
@Override
public Collection<Xid> recover() throws TransientXaException {
return peekPooled().recover();
}
@Override
public void close() throws Exception {
for (XaFacade facade : mappedToXids.values()) {
facade.close();
}
for (XaFacade facade : pooled) {
facade.close();
}
if (active != null && active.isOpen()) {
active.close();
}
}
@Nullable
@Override
public Connection getConnection() {
return active.getConnection();
}
@Override
public boolean isConnectionValid() throws SQLException {
return active.isConnectionValid();
}
@Override
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
return active.getOrEstablishConnection();
}
@Override
public void closeConnection() {
active.closeConnection();
}
@Override
public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
return active.reestablishConnection();
}
// WARN: action MUST leave the facade in IDLE state (i.e. not start/end/prepare any tx)
private void runForXid(Xid xid, ThrowingConsumer<XaFacade, TransientXaException> action) {
XaFacade mapped = mappedToXids.remove(xid);
if (mapped == null) {
// a transaction can be not known during recovery
LOG.debug("No XA resource found associated with XID: {}", xid);
action.accept(peekPooled());
} else {
LOG.debug("Found mapped XA resource for XID: {} {}", xid, mapped);
try {
action.accept(mapped);
} finally {
pooled.offer(mapped);
}
}
}
// WARN: the returned facade MUST be left in IDLE state (i.e. not start/end/prepare any tx)
private XaFacade peekPooled() {
XaFacade xaFacade = pooled.peek();
if (xaFacade == null) {
xaFacade = facadeSupplier.get();
try {
xaFacade.open();
} catch (Exception e) {
rethrow(e);
}
pooled.offer(xaFacade);
}
return xaFacade;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.xa;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.util.FlinkRuntimeException;
import javax.transaction.xa.Xid;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
@Internal
interface XaGroupOps extends Serializable {
GroupXaOperationResult<CheckpointAndXid> commit(
List<CheckpointAndXid> xids, boolean allowOutOfOrderCommits, int maxCommitAttempts);
GroupXaOperationResult<Xid> failOrRollback(Collection<Xid> xids);
void recoverAndRollback(RuntimeContext runtimeContext, XidGenerator xidGenerator);
class GroupXaOperationResult<T> {
private final List<T> succeeded = new ArrayList<>();
private final List<T> failed = new ArrayList<>();
private final List<T> toRetry = new ArrayList<>();
private Optional<Exception> failure = Optional.empty();
private Optional<Exception> transientFailure = Optional.empty();
void failedTransiently(T x, XaFacade.TransientXaException e) {
toRetry.add(x);
transientFailure =
getTransientFailure().isPresent() ? getTransientFailure() : Optional.of(e);
}
void failed(T x, Exception e) {
failed.add(x);
failure = failure.isPresent() ? failure : Optional.of(e);
}
void succeeded(T x) {
succeeded.add(x);
}
private FlinkRuntimeException wrapFailure(
Exception error, String formatWithCounts, int errCount) {
return new FlinkRuntimeException(
String.format(formatWithCounts, errCount, total()), error);
}
private int total() {
return succeeded.size() + failed.size() + toRetry.size();
}
List<T> getForRetry() {
return toRetry;
}
Optional<Exception> getTransientFailure() {
return transientFailure;
}
boolean hasNoFailures() {
return !failure.isPresent() && !transientFailure.isPresent();
}
void throwIfAnyFailed(String action) {
failure.map(
f ->
wrapFailure(
f,
"failed to " + action + " %d transactions out of %d",
toRetry.size() + failed.size()))
.ifPresent(
f -> {
throw f;
});
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.xa;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.connector.phoenix.xa.XaFacade.TransientXaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.transaction.xa.Xid;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@Internal
class XaGroupOpsImpl implements XaGroupOps {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(XaGroupOpsImpl.class);
private final XaFacade xaFacade;
XaGroupOpsImpl(XaFacade xaFacade) {
this.xaFacade = xaFacade;
}
@Override
public GroupXaOperationResult<CheckpointAndXid> commit(
List<CheckpointAndXid> xids, boolean allowOutOfOrderCommits, int maxCommitAttempts) {
GroupXaOperationResult<CheckpointAndXid> result = new GroupXaOperationResult<>();
int origSize = xids.size();
LOG.debug("commit {} transactions", origSize);
for (Iterator<CheckpointAndXid> i = xids.iterator();
i.hasNext() && (result.hasNoFailures() || allowOutOfOrderCommits); ) {
CheckpointAndXid x = i.next();
i.remove();
try {
xaFacade.commit(x.xid, x.restored);
result.succeeded(x);
} catch (TransientXaException e) {
result.failedTransiently(x.withAttemptsIncremented(), e);
} catch (Exception e) {
result.failed(x, e);
}
}
result.getForRetry().addAll(xids);
result.throwIfAnyFailed("commit");
throwIfAnyReachedMaxAttempts(result, maxCommitAttempts);
result.getTransientFailure()
.ifPresent(
f ->
LOG.warn(
"failed to commit {} transactions out of {} (keep them to retry later)",
result.getForRetry().size(),
origSize,
f));
return result;
}
@Override
public GroupXaOperationResult<Xid> failOrRollback(Collection<Xid> xids) {
GroupXaOperationResult<Xid> result = new GroupXaOperationResult<>();
if (xids.isEmpty()) {
return result;
}
if (LOG.isDebugEnabled()) {
LOG.debug("rolling back {} transactions: {}", xids.size(), xids);
}
for (Xid x : xids) {
try {
xaFacade.failAndRollback(x);
result.succeeded(x);
} catch (TransientXaException e) {
LOG.info("unable to fail/rollback transaction, xid={}: {}", x, e.getMessage());
result.failedTransiently(x, e);
} catch (Exception e) {
LOG.warn("unable to fail/rollback transaction, xid={}: {}", x, e.getMessage());
result.failed(x, e);
}
}
if (!result.getForRetry().isEmpty()) {
LOG.info("failed to roll back {} transactions", result.getForRetry().size());
}
return result;
}
@Override
public void recoverAndRollback(RuntimeContext runtimeContext, XidGenerator xidGenerator) {
Collection<Xid> recovered = xaFacade.recover();
if (recovered.isEmpty()) {
return;
}
LOG.warn("rollback {} recovered transactions", recovered.size());
for (Xid xid : recovered) {
if (xidGenerator.belongsToSubtask(xid, runtimeContext)) {
try {
xaFacade.rollback(xid);
} catch (Exception e) {
LOG.info("unable to rollback recovered transaction, xid={}", xid, e);
}
}
}
}
private static void throwIfAnyReachedMaxAttempts(
GroupXaOperationResult<CheckpointAndXid> result, int maxAttempts) {
List<CheckpointAndXid> reached = null;
for (CheckpointAndXid x : result.getForRetry()) {
if (x.attempts >= maxAttempts) {
if (reached == null) {
reached = new ArrayList<>();
}
reached.add(x);
}
}
if (reached != null) {
throw new RuntimeException(
String.format(
"reached max number of commit attempts (%d) for transactions: %s",
maxAttempts, reached));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.xa;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import java.io.Serializable;
@PublicEvolving
interface XaSinkStateHandler extends Serializable {
JdbcXaSinkFunctionState load(FunctionInitializationContext context) throws Exception;
void store(JdbcXaSinkFunctionState state) throws Exception;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.xa;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.transaction.xa.Xid;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@Internal
class XaSinkStateHandlerImpl implements XaSinkStateHandler {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(XaSinkStateHandlerImpl.class);
private final TypeSerializer<JdbcXaSinkFunctionState> serializer;
// state could be stored as two separate lists
// on one hand this would allow more even distribution on re-scale
// on the other it would lead to more IO calls and less data locality
private transient ListState<JdbcXaSinkFunctionState> states;
XaSinkStateHandlerImpl() {
this(new XaSinkStateSerializer());
}
XaSinkStateHandlerImpl(TypeSerializer<JdbcXaSinkFunctionState> serializer) {
this.serializer = serializer;
}
@Override
public JdbcXaSinkFunctionState load(FunctionInitializationContext context) throws Exception {
states = getListState(context, serializer, "XaSinkState");
return context.isRestored() ? merge(states.get()) : JdbcXaSinkFunctionState.empty();
}
@Override
public void store(JdbcXaSinkFunctionState state) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("store state snapshot: {}", state);
}
states.update(Collections.singletonList(state));
}
private <T> ListState<T> getListState(
FunctionInitializationContext context, TypeSerializer<T> serializer, String name) {
try {
return context.getOperatorStateStore()
.getListState(new ListStateDescriptor<>(name, serializer));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private JdbcXaSinkFunctionState merge(@Nullable Iterable<JdbcXaSinkFunctionState> states) {
if (states == null) {
return JdbcXaSinkFunctionState.empty();
}
List<Xid> hanging = new ArrayList<>();
List<CheckpointAndXid> prepared = new ArrayList<>();
states.forEach(
i -> {
hanging.addAll(i.getHanging());
prepared.addAll(i.getPrepared());
});
return JdbcXaSinkFunctionState.of(prepared, hanging);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.xa;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import javax.transaction.xa.Xid;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/** XaSinkStateSerializer. */
@Internal
public final class XaSinkStateSerializer extends TypeSerializer<JdbcXaSinkFunctionState> {
private static final TypeSerializerSnapshot<JdbcXaSinkFunctionState> SNAPSHOT =
new XaSinkStateSimpleXaTypeSerializerSnapshot();
private final TypeSerializer<Xid> xidSerializer;
private final TypeSerializer<CheckpointAndXid> checkpointAndXidSerializer;
public XaSinkStateSerializer() {
this(new XidSerializer(), new CheckpointAndXidSerializer());
}
private XaSinkStateSerializer(
TypeSerializer<Xid> xidSerializer,
TypeSerializer<CheckpointAndXid> checkpointAndXidSerializer) {
this.xidSerializer = xidSerializer;
this.checkpointAndXidSerializer = checkpointAndXidSerializer;
}
@Override
public boolean isImmutableType() {
return true;
}
@Override
public TypeSerializer<JdbcXaSinkFunctionState> duplicate() {
return this;
}
@Override
public JdbcXaSinkFunctionState createInstance() {
return JdbcXaSinkFunctionState.empty();
}
@Override
public JdbcXaSinkFunctionState copy(JdbcXaSinkFunctionState from) {
return from;
}
@Override
public JdbcXaSinkFunctionState copy(
JdbcXaSinkFunctionState from, JdbcXaSinkFunctionState reuse) {
return from;
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(JdbcXaSinkFunctionState state, DataOutputView target) throws IOException {
target.writeInt(state.getHanging().size());
for (Xid h : state.getHanging()) {
xidSerializer.serialize(h, target);
}
target.writeInt(state.getPrepared().size());
for (CheckpointAndXid checkpointAndXid : state.getPrepared()) {
checkpointAndXidSerializer.serialize(checkpointAndXid, target);
}
}
@Override
public JdbcXaSinkFunctionState deserialize(DataInputView source) throws IOException {
int hangingSize = source.readInt();
List<Xid> hanging = new ArrayList<>(hangingSize);
for (int i = 0; i < hangingSize; i++) {
hanging.add(xidSerializer.deserialize(source));
}
int preparedSize = source.readInt();
List<CheckpointAndXid> prepared = new ArrayList<>(preparedSize);
for (int i = 0; i < preparedSize; i++) {
prepared.add(checkpointAndXidSerializer.deserialize(source));
}
return JdbcXaSinkFunctionState.of(prepared, hanging);
}
@Override
public JdbcXaSinkFunctionState deserialize(JdbcXaSinkFunctionState reuse, DataInputView source)
throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
serialize(deserialize(source), target);
}
@Override
public boolean equals(Object obj) {
return obj instanceof XaSinkStateSerializer;
}
@Override
public int hashCode() {
return 0;
}
@Override
public TypeSerializerSnapshot<JdbcXaSinkFunctionState> snapshotConfiguration() {
return SNAPSHOT;
}
/** Simple {@link TypeSerializerSnapshot} for {@link XaSinkStateSerializer}. */
public static class XaSinkStateSimpleXaTypeSerializerSnapshot
extends SimpleTypeSerializerSnapshot<JdbcXaSinkFunctionState> {
private static final int VERSION = 1;
public XaSinkStateSimpleXaTypeSerializerSnapshot() {
super(XaSinkStateSerializer::new);
}
@Override
public void writeSnapshot(DataOutputView out) throws IOException {
super.writeSnapshot(out);
out.writeInt(VERSION);
}
@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader)
throws IOException {
super.readSnapshot(readVersion, in, classLoader);
in.readInt();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.xa;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import javax.transaction.xa.Xid;
import java.io.Serializable;
import java.security.SecureRandom;
/** {@link Xid} generator. */
@Internal
public interface XidGenerator extends Serializable, AutoCloseable {
/**
* Generate new {@link Xid}. Requirements for generated Xids:
*
* <ul>
* <li>Global Transaction Id MUST be unique across Flink job, and probably across Xids
* generated by other jobs and applications - depends on the usage of this class
* <li>SHOULD be immutable
* <li>SHOULD override {@link Object#hashCode hashCode} and {@link Object#equals equals}
* </ul>
*
* @param runtimeContext can be used for example to derive global transaction id
* @param checkpointId can be used for example to derive global transaction id
*/
Xid generateXid(RuntimeContext runtimeContext, long checkpointId);
default void open() {}
/** @return true if the provided transaction belongs to this subtask */
boolean belongsToSubtask(Xid xid, RuntimeContext ctx);
@Override
default void close() {}
/**
* Creates a {@link XidGenerator} that generates {@link Xid xids} from:
*
* <ol>
* <li>job id
* <li>subtask index
* <li>checkpoint id
* <li>four random bytes generated using {@link SecureRandom})
* </ol>
*
* <p>Each created {@link XidGenerator} instance MUST be used for only one Sink instance
* (otherwise Xids could collide).
*/
static XidGenerator semanticXidGenerator() {
return new SemanticXidGenerator();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.xa;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
import javax.transaction.xa.Xid;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import static org.apache.flink.util.StringUtils.byteToHexString;
/**
* A simple {@link Xid} implementation that stores branch and global transaction identifiers as byte
* arrays.
*/
@Internal
final class XidImpl implements Xid, Serializable {
private static final long serialVersionUID = 1L;
private final int formatId;
@Nonnull private final byte[] globalTransactionId;
@Nonnull private final byte[] branchQualifier;
XidImpl(int formatId, byte[] globalTransactionId, byte[] branchQualifier) {
Preconditions.checkArgument(globalTransactionId.length <= Xid.MAXGTRIDSIZE);
Preconditions.checkArgument(branchQualifier.length <= Xid.MAXBQUALSIZE);
this.formatId = formatId;
this.globalTransactionId = Arrays.copyOf(globalTransactionId, globalTransactionId.length);
this.branchQualifier = Arrays.copyOf(branchQualifier, branchQualifier.length);
}
@Override
public int getFormatId() {
return formatId;
}
@Override
public byte[] getGlobalTransactionId() {
return globalTransactionId;
}
@Override
public byte[] getBranchQualifier() {
return branchQualifier;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof XidImpl)) {
return false;
}
XidImpl xid = (XidImpl) o;
return formatId == xid.formatId
&& Arrays.equals(globalTransactionId, xid.globalTransactionId)
&& Arrays.equals(branchQualifier, xid.branchQualifier);
}
@Override
public int hashCode() {
int result = Objects.hash(formatId);
result = 31 * result + Arrays.hashCode(globalTransactionId);
result = 31 * result + Arrays.hashCode(branchQualifier);
return result;
}
@Override
public String toString() {
return formatId
+ ":"
+ byteToHexString(globalTransactionId)
+ ":"
+ byteToHexString(branchQualifier);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.phoenix.xa;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import javax.transaction.xa.Xid;
import java.io.IOException;
/** {@link Xid} serializer. */
@Internal
public final class XidSerializer extends TypeSerializer<Xid> {
private static final long serialVersionUID = 1L;
private static final TypeSerializerSnapshot<Xid> SNAPSHOT =
new XidSimpleTypeSerializerSnapshot();
@Override
public boolean isImmutableType() {
return true;
}
@Override
public TypeSerializer<Xid> duplicate() {
return this;
}
@Override
public Xid createInstance() {
return new XidImpl(0, new byte[0], new byte[0]);
}
@Override
public Xid copy(Xid from) {
return from;
}
@Override
public Xid copy(Xid from, Xid reuse) {
return from;
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(Xid xid, DataOutputView target) throws IOException {
target.writeInt(xid.getFormatId());
writeBytesWithSize(target, xid.getGlobalTransactionId());
writeBytesWithSize(target, xid.getBranchQualifier());
}
@Override
public Xid deserialize(DataInputView source) throws IOException {
return new XidImpl(source.readInt(), readBytesWithSize(source), readBytesWithSize(source));
}
private void writeBytesWithSize(DataOutputView target, byte[] bytes) throws IOException {
target.writeByte(bytes.length);
target.write(bytes, 0, bytes.length);
}
private byte[] readBytesWithSize(DataInputView source) throws IOException {
byte len = source.readByte();
byte[] bytes = new byte[len];
source.read(bytes, 0, len);
return bytes;
}
@Override
public Xid deserialize(Xid reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
serialize(deserialize(source), target);
}
@Override
public boolean equals(Object obj) {
return obj instanceof XidSerializer;
}
@Override
public int hashCode() {
return XidSerializer.class.hashCode();
}
@Override
public TypeSerializerSnapshot<Xid> snapshotConfiguration() {
return SNAPSHOT;
}
/** Simple {@link TypeSerializerSnapshot} for {@link XidSerializer}. */
public static class XidSimpleTypeSerializerSnapshot extends SimpleTypeSerializerSnapshot<Xid> {
private static final int VERSION = 1;
public XidSimpleTypeSerializerSnapshot() {
super(XidSerializer::new);
}
@Override
public void writeSnapshot(DataOutputView out) throws IOException {
super.writeSnapshot(out);
out.writeInt(VERSION);
}
@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader)
throws IOException {
super.readSnapshot(readVersion, in, classLoader);
in.readInt();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Provides exactly-once JDBC sink implementation using Java XA transactions API (JTA).
*
* @see org.apache.flink.connector.phoenix.xa.JdbcXaSinkFunction JdbcXaExactlyOnceSinkFunction
*/
package org.apache.flink.connector.phoenix.xa;
...@@ -481,7 +481,7 @@ public class JobManager { ...@@ -481,7 +481,7 @@ public class JobManager {
try { try {
return FlinkAPI.build(config.getAddress()).stop(jobId); return FlinkAPI.build(config.getAddress()).stop(jobId);
} catch (Exception e) { } catch (Exception e) {
logger.info("停止作业时集群不存在"); logger.error("停止作业时集群不存在: " + e);
} }
return false; return false;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment