Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class StarRocksCatalog implements Serializable {
private final String jdbcUrl;
private final String username;
private final String password;
private Connection conn;
private boolean checkDriver;

public StarRocksCatalog(String jdbcUrl, String username, String password) {
Expand All @@ -66,7 +67,8 @@ public void open() throws StarRocksCatalogException {
try {
JdbcUtils.checkJdbcDriver();
// test connection, fail early if we cannot connect to starrocks
try (Connection conn = getConnection()) {
try {
conn = getConnection();
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to connect StarRocks via JDBC: %s.", jdbcUrl), e);
Expand All @@ -87,6 +89,14 @@ public void open() throws StarRocksCatalogException {
*/
public void close() throws StarRocksCatalogException {
LOG.info("Close StarRocks catalog");
try {
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
LOG.error("Failed to close StarRocks catalog", e);
throw new StarRocksCatalogException("Failed to close StarRocks catalog", e);
}
}

/**
Expand Down Expand Up @@ -165,54 +175,52 @@ public Optional<StarRocksTable> getTable(String databaseName, String tableName)
StarRocksTable.TableType tableType = StarRocksTable.TableType.UNKNOWN;
List<StarRocksColumn> columns = new ArrayList<>();
List<String> tableKeys = new ArrayList<>();
try (Connection connection = getConnection()) {
try (PreparedStatement statement = connection.prepareStatement(tableSchemaQuery)) {
statement.setObject(1, databaseName);
statement.setObject(2, tableName);
try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
String name = resultSet.getString("COLUMN_NAME");
String type = resultSet.getString("DATA_TYPE");
Integer size = resultSet.getInt("COLUMN_SIZE");
if (resultSet.wasNull()) {
size = null;
}
// mysql does not have boolean type, and starrocks `information_schema`.`COLUMNS` will return
// a "tinyint" data type for both StarRocks BOOLEAN and TINYINT type, Distinguish them by
// column size, and the size of BOOLEAN is null
if ("tinyint".equalsIgnoreCase(type) && size == null) {
type = "boolean";
}
int position = resultSet.getInt("ORDINAL_POSITION");
Integer scale = resultSet.getInt("DECIMAL_DIGITS");
if (resultSet.wasNull()) {
scale = null;
}
String isNullable = resultSet.getString("IS_NULLABLE");
String comment = resultSet.getString("COLUMN_COMMENT");
StarRocksColumn column =
new StarRocksColumn.Builder()
.setColumnName(name)
.setOrdinalPosition(position - 1)
.setDataType(type)
.setColumnSize(size)
.setDecimalDigits(scale)
.setNullable(
isNullable == null
|| !isNullable.equalsIgnoreCase("NO"))
.setColumnComment(comment)
.build();
columns.add(column);

// Only primary key table has value in this field. and the value is "PRI"
String columnKey = resultSet.getString("COLUMN_KEY");
if (!StringUtils.isNullOrWhitespaceOnly(columnKey)) {
if (columnKey.equalsIgnoreCase("PRI")
&& tableType == StarRocksTable.TableType.UNKNOWN) {
tableType = StarRocksTable.TableType.PRIMARY_KEY;
}
tableKeys.add(column.getColumnName());
try (PreparedStatement statement = getConnection().prepareStatement(tableSchemaQuery)) {
statement.setObject(1, databaseName);
statement.setObject(2, tableName);
try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
String name = resultSet.getString("COLUMN_NAME");
String type = resultSet.getString("DATA_TYPE");
Integer size = resultSet.getInt("COLUMN_SIZE");
if (resultSet.wasNull()) {
size = null;
}
// mysql does not have boolean type, and starrocks `information_schema`.`COLUMNS` will return
// a "tinyint" data type for both StarRocks BOOLEAN and TINYINT type, Distinguish them by
// column size, and the size of BOOLEAN is null
if ("tinyint".equalsIgnoreCase(type) && size == null) {
type = "boolean";
}
int position = resultSet.getInt("ORDINAL_POSITION");
Integer scale = resultSet.getInt("DECIMAL_DIGITS");
if (resultSet.wasNull()) {
scale = null;
}
String isNullable = resultSet.getString("IS_NULLABLE");
String comment = resultSet.getString("COLUMN_COMMENT");
StarRocksColumn column =
new StarRocksColumn.Builder()
.setColumnName(name)
.setOrdinalPosition(position - 1)
.setDataType(type)
.setColumnSize(size)
.setDecimalDigits(scale)
.setNullable(
isNullable == null
|| !isNullable.equalsIgnoreCase("NO"))
.setColumnComment(comment)
.build();
columns.add(column);

// Only primary key table has value in this field. and the value is "PRI"
String columnKey = resultSet.getString("COLUMN_KEY");
if (!StringUtils.isNullOrWhitespaceOnly(columnKey)) {
if (columnKey.equalsIgnoreCase("PRI")
&& tableType == StarRocksTable.TableType.UNKNOWN) {
tableType = StarRocksTable.TableType.PRIMARY_KEY;
}
tableKeys.add(column.getColumnName());
}
}
}
Expand All @@ -236,7 +244,7 @@ public Optional<StarRocksTable> getTable(String databaseName, String tableName)
}

/**
* check if a table exists in this databse.
* check if a table exists in this database.
*/
public boolean tableExists(String database, String table){
List<String> tableList = executeSingleColumnStatement(
Expand All @@ -249,8 +257,7 @@ public boolean tableExists(String database, String table){
}

private List<String> executeSingleColumnStatement(String sql, Object... params) {
try (Connection conn = getConnection();
PreparedStatement statement = conn.prepareStatement(sql)) {
try (PreparedStatement statement = getConnection().prepareStatement(sql)) {
List<String> columnValues = new ArrayList<>();
if (params != null) {
for (int i = 0; i < params.length; i++) {
Expand Down Expand Up @@ -465,29 +472,26 @@ private AlterJobState getAlterJobState(String databaseName, String tableName)
String showAlterSql = String.format(
"SHOW ALTER TABLE COLUMN FROM `%s` WHERE TableName = '%s' ORDER BY JobId DESC LIMIT 1;",
databaseName, tableName);
try (Connection connection = getConnection()) {
try (PreparedStatement statement = connection.prepareStatement(showAlterSql)) {
try (ResultSet resultSet = statement.executeQuery()) {
if (resultSet.next()) {
return new AlterJobState(
resultSet.getString("JobId"),
resultSet.getString("TableName"),
resultSet.getString("CreateTime"),
resultSet.getString("FinishTime"),
resultSet.getString("TransactionId"),
resultSet.getString("State"),
resultSet.getString("Msg"));
}
}
try (PreparedStatement statement = getConnection().prepareStatement(showAlterSql);
ResultSet resultSet = statement.executeQuery()) {
if (resultSet.next()) {
return new AlterJobState(
resultSet.getString("JobId"),
resultSet.getString("TableName"),
resultSet.getString("CreateTime"),
resultSet.getString("FinishTime"),
resultSet.getString("TransactionId"),
resultSet.getString("State"),
resultSet.getString("Msg"));

}
}
throw new SQLException(
String.format("Alter job state for %s.%s does not exsit", databaseName, tableName));
}

private List<String> executeSingleColumnStatement(String sql) throws SQLException {
try (Connection conn = getConnection();
PreparedStatement statement = conn.prepareStatement(sql)) {
try (PreparedStatement statement = getConnection().prepareStatement(sql)) {
List<String> columnValues = new ArrayList<>();
try (ResultSet rs = statement.executeQuery()) {
while (rs.next()) {
Expand All @@ -500,13 +504,16 @@ private List<String> executeSingleColumnStatement(String sql) throws SQLExceptio
}

private void executeUpdateStatement(String sql) throws SQLException {
try (Connection connection = getConnection();
Statement statement = connection.createStatement()) {
try (PreparedStatement statement = getConnection().prepareStatement(sql)) {
statement.executeUpdate(sql);
}
}

private Connection getConnection() throws SQLException {

if (conn != null && !conn.isClosed()) {
return conn;
}
return DriverManager.getConnection(jdbcUrl, username, password);
}

Expand Down