3
3
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file or at
4
4
// https://opensource.org/licenses/MIT.
5
5
6
+ use std:: sync:: Arc ;
7
+
6
8
use anyhow:: { bail, Result } ;
7
9
use datafusion:: common:: DFSchema ;
8
10
use datafusion:: logical_expr:: { self , logical_plan, LogicalPlan , Operator } ;
@@ -15,7 +17,7 @@ use optd_datafusion_repr::plan_nodes::{
15
17
ConstantPred , DfReprPlanNode , DfReprPredNode , ExternColumnRefPred , FuncPred , FuncType ,
16
18
InListPred , JoinType , LikePred , ListPred , LogOpPred , LogOpType , LogicalAgg ,
17
19
LogicalEmptyRelation , LogicalFilter , LogicalJoin , LogicalLimit , LogicalProjection , LogicalScan ,
18
- LogicalSort , RawDependentJoin , SortOrderPred , SortOrderType ,
20
+ LogicalSort , RawDependentJoin , SortOrderPred , SortOrderType , SubqueryType ,
19
21
} ;
20
22
use optd_datafusion_repr:: properties:: schema:: Schema as OptdSchema ;
21
23
@@ -24,15 +26,18 @@ use crate::OptdPlanContext;
24
26
impl OptdPlanContext < ' _ > {
25
27
fn subqueries_to_dependent_joins (
26
28
& mut self ,
27
- subqueries : & [ & Subquery ] ,
29
+ subqueries : Vec < ( & Subquery , SubqueryType ) > ,
28
30
input : ArcDfPlanNode ,
29
31
input_schema : & DFSchema ,
30
32
) -> Result < ArcDfPlanNode > {
31
33
let mut node = input;
32
- for Subquery {
33
- subquery,
34
- outer_ref_columns,
35
- } in subqueries. iter ( )
34
+ for (
35
+ Subquery {
36
+ subquery,
37
+ outer_ref_columns,
38
+ } ,
39
+ sq_typ,
40
+ ) in subqueries. into_iter ( )
36
41
{
37
42
let subquery_root = self . conv_into_optd_plan_node ( subquery, Some ( input_schema) ) ?;
38
43
let dep_join = RawDependentJoin :: new (
@@ -56,7 +61,7 @@ impl OptdPlanContext<'_> {
56
61
} )
57
62
. collect ( ) ,
58
63
) ,
59
- JoinType :: Cross ,
64
+ sq_typ ,
60
65
) ;
61
66
node = dep_join. into_plan_node ( ) ;
62
67
}
@@ -92,7 +97,7 @@ impl OptdPlanContext<'_> {
92
97
expr : & ' a logical_expr:: Expr ,
93
98
context : & DFSchema ,
94
99
dep_ctx : Option < & DFSchema > ,
95
- subqueries : & mut Vec < & ' a Subquery > ,
100
+ subqueries : & mut Vec < ( & ' a Subquery , SubqueryType ) > ,
96
101
) -> Result < ArcDfPredNode > {
97
102
use logical_expr:: Expr ;
98
103
match expr {
@@ -257,6 +262,18 @@ impl OptdPlanContext<'_> {
257
262
)
258
263
. into_pred_node ( ) )
259
264
}
265
+ Expr :: Not ( x) => {
266
+ let expr = self . conv_into_optd_expr ( x. as_ref ( ) , context, dep_ctx, subqueries) ?;
267
+ Ok ( FuncPred :: new ( FuncType :: Not , ListPred :: new ( vec ! [ expr] ) ) . into_pred_node ( ) )
268
+ }
269
+ Expr :: IsNull ( x) => {
270
+ let expr = self . conv_into_optd_expr ( x. as_ref ( ) , context, dep_ctx, subqueries) ?;
271
+ Ok ( FuncPred :: new ( FuncType :: IsNull , ListPred :: new ( vec ! [ expr] ) ) . into_pred_node ( ) )
272
+ }
273
+ Expr :: IsNotNull ( x) => {
274
+ let expr = self . conv_into_optd_expr ( x. as_ref ( ) , context, dep_ctx, subqueries) ?;
275
+ Ok ( FuncPred :: new ( FuncType :: IsNotNull , ListPred :: new ( vec ! [ expr] ) ) . into_pred_node ( ) )
276
+ }
260
277
Expr :: Between ( x) => {
261
278
let expr =
262
279
self . conv_into_optd_expr ( x. expr . as_ref ( ) , context, dep_ctx, subqueries) ?;
@@ -288,9 +305,53 @@ impl OptdPlanContext<'_> {
288
305
// This relies on a left-deep tree of dependent joins being
289
306
// generated below this node, in response to all pushed subqueries.
290
307
let new_column_ref_idx = context. fields ( ) . len ( ) + subqueries. len ( ) ;
291
- subqueries. push ( sq ) ;
308
+ subqueries. push ( ( sq , SubqueryType :: Scalar ) ) ;
292
309
Ok ( ColumnRefPred :: new ( new_column_ref_idx) . into_pred_node ( ) )
293
310
}
311
+ Expr :: Exists ( ex) => {
312
+ let sq = & ex. subquery ;
313
+ let negated = ex. negated ;
314
+
315
+ let new_column_ref_idx = context. fields ( ) . len ( ) + subqueries. len ( ) ;
316
+ subqueries. push ( ( sq, SubqueryType :: Exists ) ) ;
317
+ if negated {
318
+ Ok ( FuncPred :: new (
319
+ FuncType :: Not ,
320
+ ListPred :: new (
321
+ vec ! [ ColumnRefPred :: new( new_column_ref_idx) . into_pred_node( ) ] ,
322
+ ) ,
323
+ )
324
+ . into_pred_node ( ) )
325
+ } else {
326
+ Ok ( ColumnRefPred :: new ( new_column_ref_idx) . into_pred_node ( ) )
327
+ }
328
+ }
329
+ Expr :: InSubquery ( insq) => {
330
+ let sq = & insq. subquery ;
331
+ let expr =
332
+ self . conv_into_optd_expr ( insq. expr . as_ref ( ) , context, dep_ctx, subqueries) ?;
333
+ let negated = insq. negated ;
334
+
335
+ let new_column_ref_idx = context. fields ( ) . len ( ) + subqueries. len ( ) ;
336
+ subqueries. push ( (
337
+ sq,
338
+ SubqueryType :: Any {
339
+ pred : Arc :: unwrap_or_clone ( expr) ,
340
+ op : BinOpType :: Eq ,
341
+ } ,
342
+ ) ) ;
343
+ if negated {
344
+ Ok ( FuncPred :: new (
345
+ FuncType :: Not ,
346
+ ListPred :: new (
347
+ vec ! [ ColumnRefPred :: new( new_column_ref_idx) . into_pred_node( ) ] ,
348
+ ) ,
349
+ )
350
+ . into_pred_node ( ) )
351
+ } else {
352
+ Ok ( ColumnRefPred :: new ( new_column_ref_idx) . into_pred_node ( ) )
353
+ }
354
+ }
294
355
_ => bail ! ( "Unsupported expression: {:?}" , expr) ,
295
356
}
296
357
}
@@ -308,7 +369,7 @@ impl OptdPlanContext<'_> {
308
369
dep_ctx,
309
370
& mut subqueries,
310
371
) ?;
311
- let input = self . subqueries_to_dependent_joins ( & subqueries, input, node. input . schema ( ) ) ?;
372
+ let input = self . subqueries_to_dependent_joins ( subqueries, input, node. input . schema ( ) ) ?;
312
373
Ok ( LogicalProjection :: new ( input, expr_list) )
313
374
}
314
375
@@ -326,7 +387,7 @@ impl OptdPlanContext<'_> {
326
387
& mut subqueries,
327
388
) ?;
328
389
329
- let input = self . subqueries_to_dependent_joins ( & subqueries, input, node. input . schema ( ) ) ?;
390
+ let input = self . subqueries_to_dependent_joins ( subqueries, input, node. input . schema ( ) ) ?;
330
391
331
392
Ok ( LogicalFilter :: new ( input, expr) )
332
393
}
@@ -336,7 +397,7 @@ impl OptdPlanContext<'_> {
336
397
exprs : & ' a [ logical_expr:: Expr ] ,
337
398
context : & DFSchema ,
338
399
dep_ctx : Option < & DFSchema > ,
339
- subqueries : & mut Vec < & ' a Subquery > ,
400
+ subqueries : & mut Vec < ( & ' a Subquery , SubqueryType ) > ,
340
401
) -> Result < ListPred > {
341
402
let exprs = exprs
342
403
. iter ( )
@@ -350,7 +411,7 @@ impl OptdPlanContext<'_> {
350
411
exprs : & ' a [ logical_expr:: SortExpr ] ,
351
412
context : & DFSchema ,
352
413
dep_ctx : Option < & DFSchema > ,
353
- subqueries : & mut Vec < & ' a Subquery > ,
414
+ subqueries : & mut Vec < ( & ' a Subquery , SubqueryType ) > ,
354
415
) -> Result < ListPred > {
355
416
let exprs = exprs
356
417
. iter ( )
@@ -453,7 +514,7 @@ impl OptdPlanContext<'_> {
453
514
DFJoinType :: RightAnti => JoinType :: RightAnti ,
454
515
DFJoinType :: LeftSemi => JoinType :: LeftSemi ,
455
516
DFJoinType :: RightSemi => JoinType :: RightSemi ,
456
- _ => unimplemented ! ( ) ,
517
+ DFJoinType :: LeftMark => JoinType :: LeftMark ,
457
518
} ;
458
519
let mut log_ops = Vec :: with_capacity ( node. on . len ( ) ) ;
459
520
let mut subqueries = vec ! [ ] ;
0 commit comments