Skip to content

Commit 3a81ffa

Browse files
committed
[BugFix] Fix predicate push-down time dimension table error(#375)
Signed-off-by: Author Name hhoao <[email protected]>
1 parent cc8689d commit 3a81ffa

File tree

5 files changed

+118
-42
lines changed

5 files changed

+118
-42
lines changed

src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicLRUFunction.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,17 @@
3434
import java.text.SimpleDateFormat;
3535
import java.time.format.DateTimeFormatter;
3636
import java.util.ArrayList;
37+
import java.util.Arrays;
3738
import java.util.Calendar;
3839
import java.util.Date;
3940
import java.util.List;
4041
import java.util.concurrent.TimeUnit;
42+
import java.util.stream.Collectors;
4143

4244
public class StarRocksDynamicLRUFunction extends TableFunction<RowData> {
4345

4446
private static final Logger LOG = LoggerFactory.getLogger(StarRocksDynamicLRUFunction.class);
45-
47+
4648
private final ColumnRichInfo[] filterRichInfos;
4749
private final StarRocksSourceOptions sourceOptions;
4850
private final ArrayList<String> filterList;
@@ -56,7 +58,7 @@ public class StarRocksDynamicLRUFunction extends TableFunction<RowData> {
5658
private final long cacheExpireMs;
5759
private final int maxRetryTimes;
5860

59-
public StarRocksDynamicLRUFunction(StarRocksSourceOptions sourceOptions,
61+
public StarRocksDynamicLRUFunction(StarRocksSourceOptions sourceOptions,
6062
ColumnRichInfo[] filterRichInfos,
6163
List<ColumnRichInfo> columnRichInfos,
6264
SelectColumn[] selectColumns) {
@@ -72,7 +74,7 @@ public StarRocksDynamicLRUFunction(StarRocksSourceOptions sourceOptions,
7274
this.filterList = new ArrayList<>();
7375
this.dataReaderList = new ArrayList<>();
7476
}
75-
77+
7678
@Override
7779
public void open(FunctionContext context) throws Exception {
7880
super.open(context);
@@ -101,14 +103,17 @@ public void eval(Object... keys) {
101103
}
102104
String filter = String.join(" and ", filterList);
103105
filterList.clear();
104-
String SQL = "select * from " + sourceOptions.getDatabaseName() + "." + sourceOptions.getTableName() + " where " + filter;
106+
String columns = Arrays.stream(selectColumns)
107+
.map(col -> "`" + col.getColumnName() + "`")
108+
.collect(Collectors.joining(","));
109+
String SQL = "select " + columns + " from " + sourceOptions.getDatabaseName() + "." + sourceOptions.getTableName() + " where " + filter;
105110
LOG.info("LookUpFunction SQL [{}]", SQL);
106111
this.queryInfo = StarRocksSourceCommonFunc.getQueryInfo(this.sourceOptions, SQL);
107112
List<List<QueryBeXTablets>> lists = StarRocksSourceCommonFunc.splitQueryBeXTablets(1, queryInfo);
108113
lists.get(0).forEach(beXTablets -> {
109114
StarRocksSourceBeReader beReader = new StarRocksSourceBeReader(beXTablets.getBeNode(),
110115
columnRichInfos,
111-
selectColumns,
116+
selectColumns,
112117
sourceOptions);
113118
beReader.openScanner(beXTablets.getTabletIds(), queryInfo.getQueryPlan().getOpaqued_query_plan(), sourceOptions);
114119
beReader.startToRead();
@@ -132,7 +137,7 @@ public void eval(Object... keys) {
132137
});
133138
rows.trimToSize();
134139
cache.put(keyRow, rows);
135-
}
140+
}
136141
}
137142

138143
private void getFieldValue(Object obj, ColumnRichInfo columnRichInfo) {
@@ -147,9 +152,9 @@ private void getFieldValue(Object obj, ColumnRichInfo columnRichInfo) {
147152
filter = columnRichInfo.getColumnName() + " = '" + sdf.format(d).toString() + "'";
148153
}
149154
if (flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE ||
150-
flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE ||
155+
flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE ||
151156
flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE) {
152-
157+
153158
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
154159
String strDateTime = dtf.format(((TimestampData)obj).toLocalDateTime());
155160
filter = columnRichInfo.getColumnName() + " = '" + strDateTime + "'";

src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java

Lines changed: 65 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
2626
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
2727
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
28+
import org.apache.flink.table.data.RowData;
2829
import org.apache.flink.table.expressions.ResolvedExpression;
2930

3031
import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
3132
import com.starrocks.connector.flink.table.source.struct.PushDownHolder;
3233
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
34+
import org.apache.flink.table.functions.TableFunction;
3335

3436
import java.util.ArrayList;
3537
import java.util.Arrays;
@@ -58,32 +60,77 @@ public ChangelogMode getChangelogMode() {
5860
@Override
5961
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
6062
StarRocksDynamicSourceFunction sourceFunction = new StarRocksDynamicSourceFunction(
61-
options, flinkSchema,
62-
this.pushDownHolder.getFilter(),
63-
this.pushDownHolder.getLimit(),
64-
this.pushDownHolder.getSelectColumns(),
63+
options, flinkSchema,
64+
this.pushDownHolder.getFilter(),
65+
this.pushDownHolder.getLimit(),
66+
this.pushDownHolder.getSelectColumns(),
6567
this.pushDownHolder.getQueryType());
6668
return SourceFunctionProvider.of(sourceFunction, true);
6769
}
6870

6971
@Override
7072
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
71-
int[] projectedFields = Arrays.stream(context.getKeys()).mapToInt(value -> value[0]).toArray();
73+
Map<String, ColumnRichInfo> columnMap = StarRocksSourceCommonFunc.genColumnMap(flinkSchema);
74+
List<ColumnRichInfo> allColumnRichInfos =
75+
StarRocksSourceCommonFunc.genColumnRichInfo(columnMap);
76+
SelectColumn[] pushDownSelectColumns = pushDownHolder.getSelectColumns();
77+
SelectColumn[] selectColumns;
78+
List<ColumnRichInfo> columnRichInfos;
79+
int[] projectedFields =
80+
Arrays.stream(context.getKeys()).mapToInt(value -> value[0]).toArray();
7281
ColumnRichInfo[] filerRichInfo = new ColumnRichInfo[projectedFields.length];
73-
for (int i = 0; i < projectedFields.length; i ++) {
74-
ColumnRichInfo columnRichInfo = new ColumnRichInfo(
75-
this.flinkSchema.getFieldName(projectedFields[i]).get(),
76-
projectedFields[i],
77-
this.flinkSchema.getFieldDataType(projectedFields[i]).get()
78-
);
79-
filerRichInfo[i] = columnRichInfo;
82+
StarRocksSourceQueryType queryType = pushDownHolder.getQueryType();
83+
if (queryType == StarRocksSourceQueryType.QuerySomeColumns) {
84+
columnRichInfos = new ArrayList<>();
85+
selectColumns = new SelectColumn[pushDownSelectColumns.length];
86+
for (int i = 0; i < pushDownSelectColumns.length; i++) {
87+
ColumnRichInfo columnRichInfo =
88+
allColumnRichInfos.get(
89+
pushDownSelectColumns[i].getColumnIndexInFlinkTable());
90+
columnRichInfos.add(
91+
new ColumnRichInfo(
92+
columnRichInfo.getColumnName(), i, columnRichInfo.getDataType()));
93+
selectColumns[i] = new SelectColumn(columnRichInfo.getColumnName(), i);
94+
}
95+
for (int i = 0; i < projectedFields.length; i++) {
96+
int columnIndexInFlinkTable = pushDownSelectColumns[i].getColumnIndexInFlinkTable();
97+
ColumnRichInfo columnRichInfo =
98+
new ColumnRichInfo(
99+
this.flinkSchema.getFieldName(columnIndexInFlinkTable).get(),
100+
i,
101+
this.flinkSchema.getFieldDataType(columnIndexInFlinkTable).get());
102+
103+
filerRichInfo[i] = columnRichInfo;
104+
}
105+
} else {
106+
columnRichInfos = allColumnRichInfos;
107+
selectColumns =
108+
StarRocksSourceCommonFunc.genSelectedColumns(
109+
columnMap, this.options, allColumnRichInfos);
110+
for (int i = 0; i < projectedFields.length; i++) {
111+
ColumnRichInfo columnRichInfo =
112+
new ColumnRichInfo(
113+
this.flinkSchema.getFieldName(i).get(),
114+
projectedFields[i],
115+
this.flinkSchema.getFieldDataType(i).get());
116+
filerRichInfo[i] = columnRichInfo;
117+
}
80118
}
81119

82-
Map<String, ColumnRichInfo> columnMap = StarRocksSourceCommonFunc.genColumnMap(flinkSchema);
83-
List<ColumnRichInfo> ColumnRichInfos = StarRocksSourceCommonFunc.genColumnRichInfo(columnMap);
84-
SelectColumn[] selectColumns = StarRocksSourceCommonFunc.genSelectedColumns(columnMap, this.options, ColumnRichInfos);
85-
86-
StarRocksDynamicLookupFunction tableFunction = new StarRocksDynamicLookupFunction(this.options, filerRichInfo, ColumnRichInfos, selectColumns);
120+
TableFunction<RowData> tableFunction = null;
121+
StarRocksSourceOptions.CacheType lookupCacheType = options.getLookupCacheType();
122+
switch (lookupCacheType) {
123+
case ALL:
124+
tableFunction =
125+
new StarRocksDynamicLookupFunction(
126+
this.options, filerRichInfo, columnRichInfos, selectColumns);
127+
break;
128+
case LRU:
129+
tableFunction =
130+
new StarRocksDynamicLRUFunction(
131+
this.options, filerRichInfo, columnRichInfos, selectColumns);
132+
break;
133+
}
87134
return TableFunctionProvider.of(tableFunction);
88135
}
89136

@@ -113,7 +160,7 @@ public void applyProjection(int[][] projectedFields) {
113160
this.pushDownHolder.setQueryType(StarRocksSourceQueryType.QuerySomeColumns);
114161

115162
ArrayList<String> columnList = new ArrayList<>();
116-
ArrayList<SelectColumn> selectColumns = new ArrayList<SelectColumn>();
163+
ArrayList<SelectColumn> selectColumns = new ArrayList<SelectColumn>();
117164
for (int index : curProjectedFields) {
118165
String columnName = flinkSchema.getFieldName(index).get();
119166
columnList.add(columnName);

src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public Set<ConfigOption<?>> optionalOptions() {
7979
options.add(StarRocksSourceOptions.LOOKUP_CACHE_TTL_MS);
8080
options.add(StarRocksSourceOptions.LOOKUP_CACHE_MAX_ROWS);
8181
options.add(StarRocksSourceOptions.LOOKUP_MAX_RETRIES);
82+
options.add(StarRocksSourceOptions.LOOKUP_CACHE_TYPE);
8283
return options;
8384
}
8485
}

src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceCommonFunc.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,11 @@
3737

3838

3939
public class StarRocksSourceCommonFunc {
40-
40+
4141
private static volatile StarRocksQueryVisitor starrocksQueryVisitor;
4242

4343
private static volatile StarRocksQueryPlanVisitor starRocksQueryPlanVisitor;
44-
44+
4545

4646
private static StarRocksQueryVisitor getStarRocksQueryVisitor(StarRocksSourceOptions sourceOptions) {
4747
if (null == starrocksQueryVisitor) {
@@ -84,15 +84,15 @@ public static List<List<QueryBeXTablets>> splitQueryBeXTablets(int subTaskCount,
8484
curBeXTabletList.set(i, Collections.singletonList(queryInfo.getBeXTablets().get(i)));
8585
}
8686
return curBeXTabletList;
87-
}
87+
}
8888
if (subTaskCount < beXTabletsListCount) {
8989
for (int i = 0; i < beXTabletsListCount; i ++) {
9090
List<QueryBeXTablets> tList = curBeXTabletList.get(i%subTaskCount);
9191
tList.add(queryInfo.getBeXTablets().get(i));
9292
curBeXTabletList.set(i%subTaskCount, tList);
9393
}
9494
return curBeXTabletList;
95-
}
95+
}
9696
List<QueryBeXTablets> beWithSingleTabletList = new ArrayList<>();
9797
queryInfo.getBeXTablets().forEach(beXTablets -> {
9898
beXTablets.getTabletIds().forEach(tabletId -> {
@@ -106,7 +106,7 @@ public static List<List<QueryBeXTablets>> splitQueryBeXTablets(int subTaskCount,
106106
curBeXTabletList.set(i, Collections.singletonList(beWithSingleTabletList.get(i)));
107107
}
108108
return curBeXTabletList;
109-
}
109+
}
110110
long newx = Math.round(x);
111111
for (int i = 0; i < subTaskCount; i ++) {
112112
int start = (int)(i * newx);
@@ -124,7 +124,7 @@ public static List<List<QueryBeXTablets>> splitQueryBeXTablets(int subTaskCount,
124124
curBxTs = beWithSingleTabletList.subList(start, end);
125125
Map<String, List<Long>> beXTabletsMap = new HashMap<>();
126126
curBxTs.forEach(curBxT -> {
127-
List<Long> tablets = new ArrayList<>();
127+
List<Long> tablets = new ArrayList<>();
128128
if (beXTabletsMap.containsKey(curBxT.getBeNode())) {
129129
tablets = beXTabletsMap.get(curBxT.getBeNode());
130130
} else {
@@ -174,8 +174,12 @@ public static List<ColumnRichInfo> genColumnRichInfo(Map<String, ColumnRichInfo>
174174
return columnMap.values().stream().sorted(Comparator.comparing(ColumnRichInfo::getColumnIndexInSchema)).collect(Collectors.toList());
175175
}
176176

177+
public static List<ColumnRichInfo> getSelectSql(Map<String, ColumnRichInfo> columnMap) {
178+
return columnMap.values().stream().sorted(Comparator.comparing(ColumnRichInfo::getColumnIndexInSchema)).collect(Collectors.toList());
179+
}
180+
177181
public static SelectColumn[] genSelectedColumns(Map<String, ColumnRichInfo> columnMap,
178-
StarRocksSourceOptions sourceOptions,
182+
StarRocksSourceOptions sourceOptions,
179183
List<ColumnRichInfo> columnRichInfos) {
180184
List<SelectColumn> selectedColumns = new ArrayList<>();
181185
// user selected columns from sourceOptions

src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceOptions.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,24 +53,24 @@ public class StarRocksSourceOptions implements Serializable {
5353

5454
public static final ConfigOption<String> TABLE_NAME = ConfigOptions.key("table-name")
5555
.stringType().noDefaultValue().withDescription("Table name");
56-
57-
56+
57+
5858
// optional Options
5959
public static final ConfigOption<Integer> SCAN_CONNECT_TIMEOUT = ConfigOptions.key("scan.connect.timeout-ms")
6060
.intType().defaultValue(1000).withDescription("Connect timeout");
61-
61+
6262
public static final ConfigOption<Integer> SCAN_BATCH_ROWS = ConfigOptions.key("scan.params.batch-rows")
6363
.intType().defaultValue(1000).withDescription("Batch rows");
6464

6565
public static final ConfigOption<String> SCAN_PROPERTIES = ConfigOptions.key("scan.params.properties")
6666
.stringType().noDefaultValue().withDescription("Reserved params for use");
67-
67+
6868
public static final ConfigOption<Integer> SCAN_LIMIT = ConfigOptions.key("scan.params.limit")
6969
.intType().defaultValue(1).withDescription("The query limit, if specified.");
7070

7171
public static final ConfigOption<Integer> SCAN_KEEP_ALIVE_MIN = ConfigOptions.key("scan.params.keep-alive-min")
7272
.intType().defaultValue(10).withDescription("Max keep alive time min");
73-
73+
7474
public static final ConfigOption<Integer> SCAN_QUERTY_TIMEOUT_S = ConfigOptions.key("scan.params.query-timeout-s")
7575
.intType().defaultValue(600).withDescription("Query timeout for a single query");
7676

@@ -88,7 +88,7 @@ public class StarRocksSourceOptions implements Serializable {
8888

8989
public static final ConfigOption<String> SCAN_BE_HOST_MAPPING_LIST = ConfigOptions.key("scan.be-host-mapping-list")
9090
.stringType().defaultValue("").withDescription("List of be host mapping");
91-
91+
9292
// lookup Options
9393
public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows")
9494
.longType().defaultValue(-1L).withDescription(
@@ -102,6 +102,12 @@ public class StarRocksSourceOptions implements Serializable {
102102
public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions.key("lookup.max-retries")
103103
.intType().defaultValue(1).withDescription("the max retry times if lookup database failed.");
104104

105+
public static final ConfigOption<CacheType> LOOKUP_CACHE_TYPE =
106+
ConfigOptions.key("lookup.cache-type")
107+
.enumType(CacheType.class)
108+
.defaultValue(CacheType.ALL)
109+
.withDescription("lookup type.");
110+
105111

106112
public static final String SOURCE_PROPERTIES_PREFIX = "scan.params.";
107113

@@ -150,7 +156,7 @@ public String getScanUrl() {
150156
public String getJdbcUrl() {
151157
return tableOptions.get(JDBC_URL);
152158
}
153-
159+
154160
public String getUsername() {
155161
return tableOptions.get(USERNAME);
156162
}
@@ -169,8 +175,8 @@ public String getTableName() {
169175

170176

171177
// optional Options
172-
public int getConnectTimeoutMs() {
173-
return tableOptions.get(SCAN_CONNECT_TIMEOUT).intValue();
178+
public int getConnectTimeoutMs() {
179+
return tableOptions.get(SCAN_CONNECT_TIMEOUT).intValue();
174180
}
175181

176182
public int getBatchRows() {
@@ -236,10 +242,23 @@ public int getLookupMaxRetries() {
236242
return tableOptions.get(LOOKUP_MAX_RETRIES).intValue();
237243
}
238244

245+
239246
public static Builder builder() {
240247
return new Builder();
241248
}
242249

250+
public CacheType getLookupCacheType() {
251+
return tableOptions.get(LOOKUP_CACHE_TYPE);
252+
}
253+
254+
/**
255+
* Cache Type
256+
*/
257+
public enum CacheType {
258+
LRU,
259+
ALL
260+
}
261+
243262
/**
244263
* Builder for {@link StarRocksSourceOptions}.
245264
*/

0 commit comments

Comments
 (0)