From fa57b0f9f421837e7993272e711821c7e0f4680f Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Thu, 18 Apr 2024 14:51:09 +0800 Subject: [PATCH] feat: support alter modify columns' type and position Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../flink/catalog/StarRocksCatalog.java | 70 +++++++++++++++++++ .../it/catalog/StarRocksCatalogTest.java | 55 +++++++++++++++ 2 files changed, 125 insertions(+) diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java index 403f8ac64..9b7be981a 100644 --- a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java +++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java @@ -35,8 +35,11 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; /** Responsible for reading and writing metadata such as database/table from StarRocks. */ @@ -367,6 +370,48 @@ public void alterDropColumns( } } + /** + * Rename columns of a table. + * + * @param databaseName Name of the database + * @param tableName Name of the table + * @param modifyColumnsTypeMap Columns to modify types + * @param modifyColumnsPositionMap Columns to modify positions + * @param timeoutSecond Timeout for a schema change on StarRocks side + * @throws StarRocksCatalogException in case of any runtime exception + */ + public void alterModifyColumns( + String databaseName, String tableName, Map modifyColumnsTypeMap, + Map modifyColumnsPositionMap, + long timeoutSecond) + throws StarRocksCatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "database name cannot be null or empty."); + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(tableName), + "table name cannot be null or empty."); + + Preconditions.checkArgument(!modifyColumnsTypeMap.isEmpty(), "Modified columns should not be empty."); + + Set extraneousColumns = new HashSet<>(modifyColumnsPositionMap.keySet()); + extraneousColumns.removeAll(modifyColumnsTypeMap.keySet()); + Preconditions.checkArgument(extraneousColumns.isEmpty(), "Position-modified columns should provide types too."); + + String alterSql = + buildAlterModifyColumnsSql(databaseName, tableName, modifyColumnsTypeMap, modifyColumnsPositionMap, timeoutSecond); + try { + long startTimeMillis = System.currentTimeMillis(); + executeAlter(databaseName, tableName, alterSql, timeoutSecond); + LOG.info("Success to modify columns from {}.{}, duration: {}ms, sql: {}", + databaseName, tableName, System.currentTimeMillis() - startTimeMillis, alterSql); + } catch (Exception e) { + LOG.error("Failed to modify columns from {}.{}, sql: {}", databaseName, tableName, alterSql); + throw new StarRocksCatalogException( + String.format("Failed to modify columns from %s.%s ", databaseName, tableName), e); + } + } + private void executeAlter( String databaseName, String tableName, String alterSql, long timeoutSecond) throws StarRocksCatalogException { @@ -607,6 +652,31 @@ private String buildAlterDropColumnsSql( return builder.toString(); } + private String buildAlterModifyColumnsSql( + String databaseName, String tableName, + Map modifyColumnsTypeMap, + Map modifyColumnsPositionMap, + long timeoutSecond + ) { + StringBuilder builder = new StringBuilder(); + builder.append(String.format("ALTER TABLE `%s`.`%s` ", databaseName, tableName)); + String columnsStmt = + modifyColumnsTypeMap.entrySet().stream() + .map(col -> { + String stmt = String.format("MODIFY COLUMN `%s` %s", col.getKey(), col.getValue()); + if (modifyColumnsPositionMap.containsKey(col.getKey())) { + String position = modifyColumnsPositionMap.get(col.getKey()); + stmt += position.isEmpty() ? " FIRST" : String.format(" AFTER `%s`", position); + } + return stmt; + }) + .collect(Collectors.joining(", ")); + builder.append(columnsStmt); + builder.append(String.format(" PROPERTIES (\"timeout\" = \"%s\")", timeoutSecond)); + builder.append(";"); + return builder.toString(); + } + private String buildColumnStmt(StarRocksColumn column) { StringBuilder builder = new StringBuilder(); builder.append("`"); diff --git a/src/test/java/com/starrocks/connector/flink/it/catalog/StarRocksCatalogTest.java b/src/test/java/com/starrocks/connector/flink/it/catalog/StarRocksCatalogTest.java index 8b81ef370..555d21d2e 100644 --- a/src/test/java/com/starrocks/connector/flink/it/catalog/StarRocksCatalogTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/catalog/StarRocksCatalogTest.java @@ -30,7 +30,9 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; @@ -136,4 +138,57 @@ public void testAlterDropColumns() throws Exception { .collect(Collectors.toList()); assertEquals(expectedColumns, newTable.getColumns()); } + + @Test + public void testAlterModifyColumns() throws Exception { + StarRocksTable oldTable = catalog.getTable(DB_NAME, tableName).orElse(null); + assertNotNull(oldTable); + Map modifyColumnsTypeMap = new HashMap<>(); + modifyColumnsTypeMap.put("c1", "DOUBLE"); + modifyColumnsTypeMap.put("c2", "BOOLEAN"); + Map modifyColumnsPositionMap = new HashMap<>(); + modifyColumnsPositionMap.put("c1", "c3"); + + catalog.alterModifyColumns(DB_NAME, tableName, modifyColumnsTypeMap, modifyColumnsPositionMap, 60); + StarRocksTable newTable = catalog.getTable(DB_NAME, tableName).orElse(null); + assertNotNull(newTable); + + List expectedColumns = Arrays.asList( + new StarRocksColumn.Builder() + .setColumnName("c0") + .setOrdinalPosition(0) + .setDataType("int") + .setNullable(false) + .setColumnSize(10) + .setDecimalDigits(0) + .setColumnComment("") + .build(), + new StarRocksColumn.Builder() + .setColumnName("c2") + .setOrdinalPosition(1) + .setDataType("boolean") + .setDecimalDigits(0) + .setNullable(true) + .setColumnComment("") + .build(), + new StarRocksColumn.Builder() + .setColumnName("c3") + .setOrdinalPosition(2) + .setDataType("date") + .setNullable(true) + .setColumnComment("") + .build(), + new StarRocksColumn.Builder() + .setColumnName("c1") + .setOrdinalPosition(3) + .setDataType("double") + .setColumnSize(15) + .setDecimalDigits(15) + .setNullable(true) + .setColumnComment("") + .build() + ); + + assertEquals(expectedColumns, newTable.getColumns()); + } }