Skip to content

Commit 632cf22

Browse files
committed
[Feature] Reuse connection in StarRocksCatalog
Signed-off-by: ClownXC <[email protected]>
1 parent df6f7b4 commit 632cf22

File tree

1 file changed

+76
-69
lines changed

1 file changed

+76
-69
lines changed

src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java

Lines changed: 76 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class StarRocksCatalog implements Serializable {
4949
private final String jdbcUrl;
5050
private final String username;
5151
private final String password;
52+
private Connection conn;
5253
private boolean checkDriver;
5354

5455
public StarRocksCatalog(String jdbcUrl, String username, String password) {
@@ -66,7 +67,8 @@ public void open() throws StarRocksCatalogException {
6667
try {
6768
JdbcUtils.checkJdbcDriver();
6869
// test connection, fail early if we cannot connect to starrocks
69-
try (Connection conn = getConnection()) {
70+
try {
71+
conn = getConnection();
7072
} catch (SQLException e) {
7173
throw new RuntimeException(
7274
String.format("Failed to connect StarRocks via JDBC: %s.", jdbcUrl), e);
@@ -87,6 +89,14 @@ public void open() throws StarRocksCatalogException {
8789
*/
8890
public void close() throws StarRocksCatalogException {
8991
LOG.info("Close StarRocks catalog");
92+
try {
93+
if (conn != null) {
94+
conn.close();
95+
}
96+
} catch (SQLException e) {
97+
LOG.error("Failed to close StarRocks catalog", e);
98+
throw new StarRocksCatalogException("Failed to close StarRocks catalog", e);
99+
}
90100
}
91101

92102
/**
@@ -165,54 +175,52 @@ public Optional<StarRocksTable> getTable(String databaseName, String tableName)
165175
StarRocksTable.TableType tableType = StarRocksTable.TableType.UNKNOWN;
166176
List<StarRocksColumn> columns = new ArrayList<>();
167177
List<String> tableKeys = new ArrayList<>();
168-
try (Connection connection = getConnection()) {
169-
try (PreparedStatement statement = connection.prepareStatement(tableSchemaQuery)) {
170-
statement.setObject(1, databaseName);
171-
statement.setObject(2, tableName);
172-
try (ResultSet resultSet = statement.executeQuery()) {
173-
while (resultSet.next()) {
174-
String name = resultSet.getString("COLUMN_NAME");
175-
String type = resultSet.getString("DATA_TYPE");
176-
Integer size = resultSet.getInt("COLUMN_SIZE");
177-
if (resultSet.wasNull()) {
178-
size = null;
179-
}
180-
// mysql does not have boolean type, and starrocks `information_schema`.`COLUMNS` will return
181-
// a "tinyint" data type for both StarRocks BOOLEAN and TINYINT type, Distinguish them by
182-
// column size, and the size of BOOLEAN is null
183-
if ("tinyint".equalsIgnoreCase(type) && size == null) {
184-
type = "boolean";
185-
}
186-
int position = resultSet.getInt("ORDINAL_POSITION");
187-
Integer scale = resultSet.getInt("DECIMAL_DIGITS");
188-
if (resultSet.wasNull()) {
189-
scale = null;
190-
}
191-
String isNullable = resultSet.getString("IS_NULLABLE");
192-
String comment = resultSet.getString("COLUMN_COMMENT");
193-
StarRocksColumn column =
194-
new StarRocksColumn.Builder()
195-
.setColumnName(name)
196-
.setOrdinalPosition(position - 1)
197-
.setDataType(type)
198-
.setColumnSize(size)
199-
.setDecimalDigits(scale)
200-
.setNullable(
201-
isNullable == null
202-
|| !isNullable.equalsIgnoreCase("NO"))
203-
.setColumnComment(comment)
204-
.build();
205-
columns.add(column);
206-
207-
// Only primary key table has value in this field. and the value is "PRI"
208-
String columnKey = resultSet.getString("COLUMN_KEY");
209-
if (!StringUtils.isNullOrWhitespaceOnly(columnKey)) {
210-
if (columnKey.equalsIgnoreCase("PRI")
211-
&& tableType == StarRocksTable.TableType.UNKNOWN) {
212-
tableType = StarRocksTable.TableType.PRIMARY_KEY;
213-
}
214-
tableKeys.add(column.getColumnName());
178+
try (PreparedStatement statement = getConnection().prepareStatement(tableSchemaQuery)) {
179+
statement.setObject(1, databaseName);
180+
statement.setObject(2, tableName);
181+
try (ResultSet resultSet = statement.executeQuery()) {
182+
while (resultSet.next()) {
183+
String name = resultSet.getString("COLUMN_NAME");
184+
String type = resultSet.getString("DATA_TYPE");
185+
Integer size = resultSet.getInt("COLUMN_SIZE");
186+
if (resultSet.wasNull()) {
187+
size = null;
188+
}
189+
// mysql does not have boolean type, and starrocks `information_schema`.`COLUMNS` will return
190+
// a "tinyint" data type for both StarRocks BOOLEAN and TINYINT type, Distinguish them by
191+
// column size, and the size of BOOLEAN is null
192+
if ("tinyint".equalsIgnoreCase(type) && size == null) {
193+
type = "boolean";
194+
}
195+
int position = resultSet.getInt("ORDINAL_POSITION");
196+
Integer scale = resultSet.getInt("DECIMAL_DIGITS");
197+
if (resultSet.wasNull()) {
198+
scale = null;
199+
}
200+
String isNullable = resultSet.getString("IS_NULLABLE");
201+
String comment = resultSet.getString("COLUMN_COMMENT");
202+
StarRocksColumn column =
203+
new StarRocksColumn.Builder()
204+
.setColumnName(name)
205+
.setOrdinalPosition(position - 1)
206+
.setDataType(type)
207+
.setColumnSize(size)
208+
.setDecimalDigits(scale)
209+
.setNullable(
210+
isNullable == null
211+
|| !isNullable.equalsIgnoreCase("NO"))
212+
.setColumnComment(comment)
213+
.build();
214+
columns.add(column);
215+
216+
// Only primary key table has value in this field. and the value is "PRI"
217+
String columnKey = resultSet.getString("COLUMN_KEY");
218+
if (!StringUtils.isNullOrWhitespaceOnly(columnKey)) {
219+
if (columnKey.equalsIgnoreCase("PRI")
220+
&& tableType == StarRocksTable.TableType.UNKNOWN) {
221+
tableType = StarRocksTable.TableType.PRIMARY_KEY;
215222
}
223+
tableKeys.add(column.getColumnName());
216224
}
217225
}
218226
}
@@ -236,7 +244,7 @@ public Optional<StarRocksTable> getTable(String databaseName, String tableName)
236244
}
237245

238246
/**
239-
* check if a table exists in this databse.
247+
* check if a table exists in this database.
240248
*/
241249
public boolean tableExists(String database, String table){
242250
List<String> tableList = executeSingleColumnStatement(
@@ -249,8 +257,7 @@ public boolean tableExists(String database, String table){
249257
}
250258

251259
private List<String> executeSingleColumnStatement(String sql, Object... params) {
252-
try (Connection conn = getConnection();
253-
PreparedStatement statement = conn.prepareStatement(sql)) {
260+
try (PreparedStatement statement = getConnection().prepareStatement(sql)) {
254261
List<String> columnValues = new ArrayList<>();
255262
if (params != null) {
256263
for (int i = 0; i < params.length; i++) {
@@ -465,29 +472,26 @@ private AlterJobState getAlterJobState(String databaseName, String tableName)
465472
String showAlterSql = String.format(
466473
"SHOW ALTER TABLE COLUMN FROM `%s` WHERE TableName = '%s' ORDER BY JobId DESC LIMIT 1;",
467474
databaseName, tableName);
468-
try (Connection connection = getConnection()) {
469-
try (PreparedStatement statement = connection.prepareStatement(showAlterSql)) {
470-
try (ResultSet resultSet = statement.executeQuery()) {
471-
if (resultSet.next()) {
472-
return new AlterJobState(
473-
resultSet.getString("JobId"),
474-
resultSet.getString("TableName"),
475-
resultSet.getString("CreateTime"),
476-
resultSet.getString("FinishTime"),
477-
resultSet.getString("TransactionId"),
478-
resultSet.getString("State"),
479-
resultSet.getString("Msg"));
480-
}
481-
}
475+
try (PreparedStatement statement = getConnection().prepareStatement(showAlterSql);
476+
ResultSet resultSet = statement.executeQuery()) {
477+
if (resultSet.next()) {
478+
return new AlterJobState(
479+
resultSet.getString("JobId"),
480+
resultSet.getString("TableName"),
481+
resultSet.getString("CreateTime"),
482+
resultSet.getString("FinishTime"),
483+
resultSet.getString("TransactionId"),
484+
resultSet.getString("State"),
485+
resultSet.getString("Msg"));
486+
482487
}
483488
}
484489
throw new SQLException(
485490
String.format("Alter job state for %s.%s does not exsit", databaseName, tableName));
486491
}
487492

488493
private List<String> executeSingleColumnStatement(String sql) throws SQLException {
489-
try (Connection conn = getConnection();
490-
PreparedStatement statement = conn.prepareStatement(sql)) {
494+
try (PreparedStatement statement = getConnection().prepareStatement(sql)) {
491495
List<String> columnValues = new ArrayList<>();
492496
try (ResultSet rs = statement.executeQuery()) {
493497
while (rs.next()) {
@@ -500,13 +504,16 @@ private List<String> executeSingleColumnStatement(String sql) throws SQLExceptio
500504
}
501505

502506
private void executeUpdateStatement(String sql) throws SQLException {
503-
try (Connection connection = getConnection();
504-
Statement statement = connection.createStatement()) {
507+
try (PreparedStatement statement = getConnection().prepareStatement(sql)) {
505508
statement.executeUpdate(sql);
506509
}
507510
}
508511

509512
private Connection getConnection() throws SQLException {
513+
514+
if (conn != null && !conn.isClosed()) {
515+
return conn;
516+
}
510517
return DriverManager.getConnection(jdbcUrl, username, password);
511518
}
512519

0 commit comments

Comments
 (0)