-
Notifications
You must be signed in to change notification settings - Fork 191
Open
Description


如何复现:
`package org.example;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
public class StarRocksSqlDimApp {
public static void main(String[] args) {
Configuration configuration = new Configuration();
configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofSeconds(30));
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(executionEnvironment);
streamTableEnvironment.executeSql("CREATE TABLE source (\n" +
" id INT,\n" +
" name STRING,\n" +
" proc_time AS PROCTIME()" +
") WITH (\n" +
" 'connector' = 'starrocks',\n" +
" 'password' = '',\n" +
" 'table-name' = 'source', \n" +
" 'scan-url' = '', \n" +
" 'username'='root',\n" +
" 'jdbc-url' = '',\n" +
" 'database-name' = 'test'\n" +
");");
streamTableEnvironment.executeSql("CREATE TABLE dim (\n" +
" id INT,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'starrocks',\n" +
" 'password' = '',\n" +
" 'table-name' = 'dim', \n" +
" 'scan-url' = '', \n" +
" 'username'='root',\n" +
" 'jdbc-url' = 'jdbc:mysql:///',\n" +
" 'database-name' = 'test'\n" +
");");
streamTableEnvironment.executeSql("CREATE TABLE sink (\n" +
" id INT,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'starrocks',\n" +
" 'password' = '',\n" +
" 'table-name' = 'sink', \n" +
" 'load-url' = '', \n" +
" 'username'='root',\n" +
" 'jdbc-url' = 'jdbc:mysql:///',\n" +
" 'database-name' = 'test'\n" +
");");
TableResult tableResult = streamTableEnvironment.executeSql(
"INSERT INTO sink\n" +
"SELECT source.id, dim.name FROM source\n" +
"LEFT JOIN\n" +
"dim for SYSTEM_TIME as OF source.proc_time\n" +
"on dim.name = source.name;");
tableResult.print();
}
}
`
Metadata
Metadata
Assignees
Labels
No labels