Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ abstract class Optimizer(catalogManager: CatalogManager)
Batch("Infer Filters", Once,
InferFiltersFromConstraints) ::
Batch("Operator Optimization after Inferring Filters", fixedPoint,
rulesWithoutInferFiltersFromConstraints: _*) :: Nil
rulesWithoutInferFiltersFromConstraints: _*) ::
// Set strategy to Once to avoid pushing filter every time because we do not change the
// join condition.
Batch("Push predicate through join by conjunctive normal form", Once,
PushPredicateThroughJoinByCNF) :: Nil
}

val batches = (Batch("Eliminate Distinct", Once, EliminateDistinct) ::
Expand Down Expand Up @@ -1372,6 +1376,108 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
}
}

/**
* Rewriting join condition to conjunctive normal form expression so that we can push
* more predicate.
*/
object PushPredicateThroughJoinByCNF extends Rule[LogicalPlan] with PredicateHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it apply to all the predicates? like when we pushdown filters to the data source?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, such as the step of scanning web_sales table for TPC-DS q85.

Before this pr:

+- *(8) Filter (((isnotnull(ws_item_sk#3) AND isnotnull(ws_order_number#17)) AND isnotnull(ws_web_page_sk#12)) AND isnotnull(ws_sold_date_sk#0))
   +- *(8) ColumnarToRow
      +- FileScan parquet default.web_sales[ws_sold_date_sk#0,ws_item_sk#3,ws_web_page_sk#12,ws_order_number#17,ws_quantity#18,ws_sales_price#21,ws_net_profit#33] Batched: true, DataFilters: [isnotnull(ws_item_sk#3), isnotnull(ws_order_number#17), isnotnull(ws_web_page_sk#12), isnotnull(ws_sold_date_sk#0)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSQuerySuite/web_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ws_item_sk), IsNotNull(ws_order_number), IsNotNull(ws_web_page_sk), IsNotNull(ws_sold_date_sk)], ReadSchema: struct<ws_sold_date_sk:int,ws_item_sk:int,ws_web_page_sk:int,ws_order_number:int,ws_quantity:int,ws_sales_price:decimal(7,2),ws_net_profit:decimal(7,2)>

After this pr:

+- *(8) Filter (((((isnotnull(ws_item_sk#3) AND isnotnull(ws_order_number#17)) AND isnotnull(ws_web_page_sk#12)) AND isnotnull(ws_sold_date_sk#0)) AND ((((ws_sales_price#21 >= 100.00) AND (ws_sales_price#21 <= 150.00)) OR ((ws_sales_price#21 >= 50.00) AND (ws_sales_price#21 <= 100.00))) OR ((ws_sales_price#21 >= 150.00) AND (ws_sales_price#21 <= 200.00)))) AND ((((ws_net_profit#33 >= 100.00) AND (ws_net_profit#33 <= 200.00)) OR ((ws_net_profit#33 >= 150.00) AND (ws_net_profit#33 <= 300.00))) OR ((ws_net_profit#33 >= 50.00) AND (ws_net_profit#33 <= 250.00))))
   +- *(8) ColumnarToRow
      +- FileScan parquet default.web_sales[ws_sold_date_sk#0,ws_item_sk#3,ws_web_page_sk#12,ws_order_number#17,ws_quantity#18,ws_sales_price#21,ws_net_profit#33] Batched: true, DataFilters: [isnotnull(ws_item_sk#3), isnotnull(ws_order_number#17), isnotnull(ws_web_page_sk#12), isnotnull(ws_sold_date_sk#0), ((((ws_sales_price#21 >= 100.00) AND (ws_sales_price#21 <= 150.00)) OR ((ws_sales_price#21 >= 50.00) AND (ws_sales_price#21 <= 100.00))) OR ((ws_sales_price#21 >= 150.00) AND (ws_sales_price#21 <= 200.00))), ((((ws_net_profit#33 >= 100.00) AND (ws_net_profit#33 <= 200.00)) OR ((ws_net_profit#33 >= 150.00) AND (ws_net_profit#33 <= 300.00))) OR ((ws_net_profit#33 >= 50.00) AND (ws_net_profit#33 <= 250.00)))], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSQuerySuite/web_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ws_item_sk), IsNotNull(ws_order_number), IsNotNull(ws_web_page_sk), IsNotNull(ws_sold_date_sk), Or(Or(And(GreaterThanOrEqual(ws_sales_price,100.00),LessThanOrEqual(ws_sales_price,150.00)),And(GreaterThanOrEqual(ws_sales_price,50.00),LessThanOrEqual(ws_sales_price,100.00))),And(GreaterThanOrEqual(ws_sales_price,150.00),LessThanOrEqual(ws_sales_price,200.00))), Or(Or(And(GreaterThanOrEqual(ws_net_profit,100.00),LessThanOrEqual(ws_net_profit,200.00)),And(GreaterThanOrEqual(ws_net_profit,150.00),LessThanOrEqual(ws_net_profit,300.00))),And(GreaterThanOrEqual(ws_net_profit,50.00),LessThanOrEqual(ws_net_profit,250.00)))], ReadSchema: struct<ws_sold_date_sk:int,ws_item_sk:int,ws_web_page_sk:int,ws_order_number:int,ws_quantity:int,ws_sales_price:decimal(7,2),ws_net_profit:decimal(7,2)>

/**
* Rewrite pattern:
* 1. (a && b) || c --> (a || c) && (b || c)
* 2. a || (b && c) --> (a || b) && (a || c)
*
* To avoid generating too many predicates, we first group the filter columns from the same table.
*/
private def toCNF(condition: Expression, depth: Int = 0): Expression = {
if (depth < SQLConf.get.maxRewritingCNFDepth) {
condition match {
case or @ Or(left: And, right: And) =>
val lhs = splitConjunctivePredicates(left).groupBy(_.references.map(_.qualifier))
Copy link
Member Author

@wangyum wangyum May 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group by qualifier to avoid generating too many predicates. For example:
TPCDS q85:
Without group by qualifier:

== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[substr(r_reason_desc, 1, 20)#137 ASC NULLS FIRST,aggOrder#142 ASC NULLS FIRST,avg(wr_refunded_cash)#139 ASC NULLS FIRST,avg(wr_fee)#140 ASC NULLS FIRST], output=[substr(r_reason_desc, 1, 20)#137,avg(ws_quantity)#138,avg(wr_refunded_cash)#139,avg(wr_fee)#140])
+- *(9) HashAggregate(keys=[r_reason_desc#124], functions=[avg(cast(ws_quantity#18 as bigint)), avg(UnscaledValue(wr_refunded_cash#54)), avg(UnscaledValue(wr_fee#52))])
   +- Exchange hashpartitioning(r_reason_desc#124, 5), true, [id=#351]
      +- *(8) HashAggregate(keys=[r_reason_desc#124], functions=[partial_avg(cast(ws_quantity#18 as bigint)), partial_avg(UnscaledValue(wr_refunded_cash#54)), partial_avg(UnscaledValue(wr_fee#52))])
         +- *(8) Project [ws_quantity#18, wr_fee#52, wr_refunded_cash#54, r_reason_desc#124]
            +- *(8) BroadcastHashJoin [wr_reason_sk#46L], [cast(r_reason_sk#122 as bigint)], Inner, BuildRight
               :- *(8) Project [ws_quantity#18, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
               :  +- *(8) BroadcastHashJoin [ws_sold_date_sk#0], [d_date_sk#94], Inner, BuildRight
               :     :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
               :     :  +- *(8) BroadcastHashJoin [wr_refunded_addr_sk#40L], [cast(ca_address_sk#81 as bigint)], Inner, BuildRight, ((((ca_state#89 IN (IN,OH,NJ) AND (ws_net_profit#33 >= 100.00)) AND (ws_net_profit#33 <= 200.00)) OR ((ca_state#89 IN (WI,CT,KY) AND (ws_net_profit#33 >= 150.00)) AND (ws_net_profit#33 <= 300.00))) OR ((ca_state#89 IN (LA,IA,AR) AND (ws_net_profit#33 >= 50.00)) AND (ws_net_profit#33 <= 250.00)))
               :     :     :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, ws_net_profit#33, wr_refunded_addr_sk#40L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
               :     :     :  +- *(8) BroadcastHashJoin [wr_returning_cdemo_sk#42L, cd_marital_status#74, cd_education_status#75], [cast(cd_demo_sk#125 as bigint), cd_marital_status#127, cd_education_status#128], Inner, BuildRight
               :     :     :     :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, ws_net_profit#33, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54, cd_marital_status#74, cd_education_status#75]
               :     :     :     :  +- *(8) BroadcastHashJoin [wr_refunded_cdemo_sk#38L], [cast(cd_demo_sk#72 as bigint)], Inner, BuildRight, ((((((cd_marital_status#74 = M) AND (cd_education_status#75 = Advanced Degree)) AND (ws_sales_price#21 >= 100.00)) AND (ws_sales_price#21 <= 150.00)) OR ((((cd_marital_status#74 = S) AND (cd_education_status#75 = College)) AND (ws_sales_price#21 >= 50.00)) AND (ws_sales_price#21 <= 100.00))) OR ((((cd_marital_status#74 = W) AND (cd_education_status#75 = 2 yr Degree)) AND (ws_sales_price#21 >= 150.00)) AND (ws_sales_price#21 <= 200.00)))
               :     :     :     :     :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, ws_sales_price#21, ws_net_profit#33, wr_refunded_cdemo_sk#38L, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
               :     :     :     :     :  +- *(8) BroadcastHashJoin [ws_web_page_sk#12], [wp_web_page_sk#58], Inner, BuildRight
               :     :     :     :     :     :- *(8) Project [ws_sold_date_sk#0, ws_web_page_sk#12, ws_quantity#18, ws_sales_price#21, ws_net_profit#33, wr_refunded_cdemo_sk#38L, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
               :     :     :     :     :     :  +- *(8) BroadcastHashJoin [cast(ws_item_sk#3 as bigint), cast(ws_order_number#17 as bigint)], [wr_item_sk#36L, wr_order_number#47L], Inner, BuildRight
               :     :     :     :     :     :     :- *(8) Project [ws_sold_date_sk#0, ws_item_sk#3, ws_web_page_sk#12, ws_order_number#17, ws_quantity#18, ws_sales_price#21, ws_net_profit#33]
               :     :     :     :     :     :     :  +- *(8) Filter (((((((((((((((((((isnotnull(ws_item_sk#3) AND isnotnull(ws_order_number#17)) AND isnotnull(ws_web_page_sk#12)) AND isnotnull(ws_sold_date_sk#0)) AND (((ws_sales_price#21 >= 100.00) OR (ws_sales_price#21 >= 50.00)) OR (ws_sales_price#21 >= 150.00))) AND (((ws_sales_price#21 >= 100.00) OR (ws_sales_price#21 <= 100.00)) OR (ws_sales_price#21 >= 150.00))) AND (((ws_sales_price#21 <= 150.00) OR (ws_sales_price#21 >= 50.00)) OR (ws_sales_price#21 >= 150.00))) AND (((ws_sales_price#21 <= 150.00) OR (ws_sales_price#21 <= 100.00)) OR (ws_sales_price#21 >= 150.00))) AND (((ws_sales_price#21 >= 100.00) OR (ws_sales_price#21 >= 50.00)) OR (ws_sales_price#21 <= 200.00))) AND (((ws_sales_price#21 >= 100.00) OR (ws_sales_price#21 <= 100.00)) OR (ws_sales_price#21 <= 200.00))) AND (((ws_sales_price#21 <= 150.00) OR (ws_sales_price#21 >= 50.00)) OR (ws_sales_price#21 <= 200.00))) AND (((ws_sales_price#21 <= 150.00) OR (ws_sales_price#21 <= 100.00)) OR (ws_sales_price#21 <= 200.00))) AND (((ws_net_profit#33 >= 100.00) OR (ws_net_profit#33 >= 150.00)) OR (ws_net_profit#33 >= 50.00))) AND (((ws_net_profit#33 >= 100.00) OR (ws_net_profit#33 <= 300.00)) OR (ws_net_profit#33 >= 50.00))) AND (((ws_net_profit#33 <= 200.00) OR (ws_net_profit#33 >= 150.00)) OR (ws_net_profit#33 >= 50.00))) AND (((ws_net_profit#33 <= 200.00) OR (ws_net_profit#33 <= 300.00)) OR (ws_net_profit#33 >= 50.00))) AND (((ws_net_profit#33 >= 100.00) OR (ws_net_profit#33 >= 150.00)) OR (ws_net_profit#33 <= 250.00))) AND (((ws_net_profit#33 >= 100.00) OR (ws_net_profit#33 <= 300.00)) OR (ws_net_profit#33 <= 250.00))) AND (((ws_net_profit#33 <= 200.00) OR (ws_net_profit#33 >= 150.00)) OR (ws_net_profit#33 <= 250.00))) AND (((ws_net_profit#33 <= 200.00) OR (ws_net_profit#33 <= 300.00)) OR (ws_net_profit#33 <= 250.00)))
               :     :     :     :     :     :     :     +- *(8) ColumnarToRow
               :     :     :     :     :     :     :        +- FileScan parquet default.web_sales[ws_sold_date_sk#0,ws_item_sk#3,ws_web_page_sk#12,ws_order_number#17,ws_quantity#18,ws_sales_price#21,ws_net_profit#33] Batched: true, DataFilters: [isnotnull(ws_item_sk#3), isnotnull(ws_order_number#17), isnotnull(ws_web_page_sk#12), isnotnull(..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(ws_item_sk), IsNotNull(ws_order_number), IsNotNull(ws_web_page_sk), IsNotNull(ws_sold_..., ReadSchema: struct<ws_sold_date_sk:int,ws_item_sk:int,ws_web_page_sk:int,ws_order_number:int,ws_quantity:int,...
               :     :     :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true], input[5, bigint, true])), [id=#291]
               :     :     :     :     :     :        +- *(1) Project [wr_item_sk#36L, wr_refunded_cdemo_sk#38L, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_order_number#47L, wr_fee#52, wr_refunded_cash#54]
               :     :     :     :     :     :           +- *(1) Filter (((((isnotnull(wr_item_sk#36L) AND isnotnull(wr_order_number#47L)) AND isnotnull(wr_refunded_cdemo_sk#38L)) AND isnotnull(wr_returning_cdemo_sk#42L)) AND isnotnull(wr_refunded_addr_sk#40L)) AND isnotnull(wr_reason_sk#46L))
               :     :     :     :     :     :              +- *(1) ColumnarToRow
               :     :     :     :     :     :                 +- FileScan parquet default.web_returns[wr_item_sk#36L,wr_refunded_cdemo_sk#38L,wr_refunded_addr_sk#40L,wr_returning_cdemo_sk#42L,wr_reason_sk#46L,wr_order_number#47L,wr_fee#52,wr_refunded_cash#54] Batched: true, DataFilters: [isnotnull(wr_item_sk#36L), isnotnull(wr_order_number#47L), isnotnull(wr_refunded_cdemo_sk#38L), ..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(wr_item_sk), IsNotNull(wr_order_number), IsNotNull(wr_refunded_cdemo_sk), IsNotNull(wr..., ReadSchema: struct<wr_item_sk:bigint,wr_refunded_cdemo_sk:bigint,wr_refunded_addr_sk:bigint,wr_returning_cdem...
               :     :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#300]
               :     :     :     :     :        +- *(2) Project [wp_web_page_sk#58]
               :     :     :     :     :           +- *(2) Filter isnotnull(wp_web_page_sk#58)
               :     :     :     :     :              +- *(2) ColumnarToRow
               :     :     :     :     :                 +- FileScan parquet default.web_page[wp_web_page_sk#58] Batched: true, DataFilters: [isnotnull(wp_web_page_sk#58)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(wp_web_page_sk)], ReadSchema: struct<wp_web_page_sk:int>
               :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#309]
               :     :     :     :        +- *(3) Project [cd_demo_sk#72, cd_marital_status#74, cd_education_status#75]
               :     :     :     :           +- *(3) Filter ((((((((((isnotnull(cd_demo_sk#72) AND isnotnull(cd_education_status#75)) AND isnotnull(cd_marital_status#74)) AND (((cd_marital_status#74 = M) OR (cd_marital_status#74 = S)) OR (cd_marital_status#74 = W))) AND (((cd_marital_status#74 = M) OR (cd_marital_status#74 = S)) OR (cd_education_status#75 = 2 yr Degree))) AND (((cd_marital_status#74 = M) OR (cd_education_status#75 = College)) OR (cd_marital_status#74 = W))) AND (((cd_marital_status#74 = M) OR (cd_education_status#75 = College)) OR (cd_education_status#75 = 2 yr Degree))) AND (((cd_education_status#75 = Advanced Degree) OR (cd_marital_status#74 = S)) OR (cd_marital_status#74 = W))) AND (((cd_education_status#75 = Advanced Degree) OR (cd_marital_status#74 = S)) OR (cd_education_status#75 = 2 yr Degree))) AND (((cd_education_status#75 = Advanced Degree) OR (cd_education_status#75 = College)) OR (cd_marital_status#74 = W))) AND (((cd_education_status#75 = Advanced Degree) OR (cd_education_status#75 = College)) OR (cd_education_status#75 = 2 yr Degree)))
               :     :     :     :              +- *(3) ColumnarToRow
               :     :     :     :                 +- FileScan parquet default.customer_demographics[cd_demo_sk#72,cd_marital_status#74,cd_education_status#75] Batched: true, DataFilters: [isnotnull(cd_demo_sk#72), isnotnull(cd_education_status#75), isnotnull(cd_marital_status#74), ((..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(cd_demo_sk), IsNotNull(cd_education_status), IsNotNull(cd_marital_status), Or(Or(Equal..., ReadSchema: struct<cd_demo_sk:int,cd_marital_status:string,cd_education_status:string>
               :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint), input[1, string, true], input[2, string, true])), [id=#318]
               :     :     :        +- *(4) Project [cd_demo_sk#125, cd_marital_status#127, cd_education_status#128]
               :     :     :           +- *(4) Filter ((isnotnull(cd_demo_sk#125) AND isnotnull(cd_education_status#128)) AND isnotnull(cd_marital_status#127))
               :     :     :              +- *(4) ColumnarToRow
               :     :     :                 +- FileScan parquet default.customer_demographics[cd_demo_sk#125,cd_marital_status#127,cd_education_status#128] Batched: true, DataFilters: [isnotnull(cd_demo_sk#125), isnotnull(cd_education_status#128), isnotnull(cd_marital_status#127)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(cd_demo_sk), IsNotNull(cd_education_status), IsNotNull(cd_marital_status)], ReadSchema: struct<cd_demo_sk:int,cd_marital_status:string,cd_education_status:string>
               :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#327]
               :     :        +- *(5) Project [ca_address_sk#81, ca_state#89]
               :     :           +- *(5) Filter (((isnotnull(ca_country#91) AND (ca_country#91 = United States)) AND isnotnull(ca_address_sk#81)) AND ((ca_state#89 IN (IN,OH,NJ) OR ca_state#89 IN (WI,CT,KY)) OR ca_state#89 IN (LA,IA,AR)))
               :     :              +- *(5) ColumnarToRow
               :     :                 +- FileScan parquet default.customer_address[ca_address_sk#81,ca_state#89,ca_country#91] Batched: true, DataFilters: [isnotnull(ca_country#91), (ca_country#91 = United States), isnotnull(ca_address_sk#81), ((ca_sta..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(ca_country), EqualTo(ca_country,United States), IsNotNull(ca_address_sk), Or(Or(In(ca_..., ReadSchema: struct<ca_address_sk:int,ca_state:string,ca_country:string>
               :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#336]
               :        +- *(6) Project [d_date_sk#94]
               :           +- *(6) Filter ((isnotnull(d_year#100) AND (d_year#100 = 2000)) AND isnotnull(d_date_sk#94))
               :              +- *(6) ColumnarToRow
               :                 +- FileScan parquet default.date_dim[d_date_sk#94,d_year#100] Batched: true, DataFilters: [isnotnull(d_year#100), (d_year#100 = 2000), isnotnull(d_date_sk#94)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2000), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:int,d_year:int>
               +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#345]
                  +- *(7) Project [r_reason_sk#122, r_reason_desc#124]
                     +- *(7) Filter isnotnull(r_reason_sk#122)
                        +- *(7) ColumnarToRow
                           +- FileScan parquet default.reason[r_reason_sk#122,r_reason_desc#124] Batched: true, DataFilters: [isnotnull(r_reason_sk#122)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(r_reason_sk)], ReadSchema: struct<r_reason_sk:int,r_reason_desc:string>

Group by qualifier:

== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[substr(r_reason_desc, 1, 20)#137 ASC NULLS FIRST,aggOrder#142 ASC NULLS FIRST,avg(wr_refunded_cash)#139 ASC NULLS FIRST,avg(wr_fee)#140 ASC NULLS FIRST], output=[substr(r_reason_desc, 1, 20)#137,avg(ws_quantity)#138,avg(wr_refunded_cash)#139,avg(wr_fee)#140])
+- *(9) HashAggregate(keys=[r_reason_desc#124], functions=[avg(cast(ws_quantity#18 as bigint)), avg(UnscaledValue(wr_refunded_cash#54)), avg(UnscaledValue(wr_fee#52))])
   +- Exchange hashpartitioning(r_reason_desc#124, 5), true, [id=#351]
      +- *(8) HashAggregate(keys=[r_reason_desc#124], functions=[partial_avg(cast(ws_quantity#18 as bigint)), partial_avg(UnscaledValue(wr_refunded_cash#54)), partial_avg(UnscaledValue(wr_fee#52))])
         +- *(8) Project [ws_quantity#18, wr_fee#52, wr_refunded_cash#54, r_reason_desc#124]
            +- *(8) BroadcastHashJoin [wr_reason_sk#46L], [cast(r_reason_sk#122 as bigint)], Inner, BuildRight
               :- *(8) Project [ws_quantity#18, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
               :  +- *(8) BroadcastHashJoin [ws_sold_date_sk#0], [d_date_sk#94], Inner, BuildRight
               :     :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
               :     :  +- *(8) BroadcastHashJoin [wr_refunded_addr_sk#40L], [cast(ca_address_sk#81 as bigint)], Inner, BuildRight, ((((ca_state#89 IN (IN,OH,NJ) AND (ws_net_profit#33 >= 100.00)) AND (ws_net_profit#33 <= 200.00)) OR ((ca_state#89 IN (WI,CT,KY) AND (ws_net_profit#33 >= 150.00)) AND (ws_net_profit#33 <= 300.00))) OR ((ca_state#89 IN (LA,IA,AR) AND (ws_net_profit#33 >= 50.00)) AND (ws_net_profit#33 <= 250.00)))
               :     :     :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, ws_net_profit#33, wr_refunded_addr_sk#40L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
               :     :     :  +- *(8) BroadcastHashJoin [wr_returning_cdemo_sk#42L, cd_marital_status#74, cd_education_status#75], [cast(cd_demo_sk#125 as bigint), cd_marital_status#127, cd_education_status#128], Inner, BuildRight
               :     :     :     :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, ws_net_profit#33, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54, cd_marital_status#74, cd_education_status#75]
               :     :     :     :  +- *(8) BroadcastHashJoin [wr_refunded_cdemo_sk#38L], [cast(cd_demo_sk#72 as bigint)], Inner, BuildRight, ((((((cd_marital_status#74 = M) AND (cd_education_status#75 = Advanced Degree)) AND (ws_sales_price#21 >= 100.00)) AND (ws_sales_price#21 <= 150.00)) OR ((((cd_marital_status#74 = S) AND (cd_education_status#75 = College)) AND (ws_sales_price#21 >= 50.00)) AND (ws_sales_price#21 <= 100.00))) OR ((((cd_marital_status#74 = W) AND (cd_education_status#75 = 2 yr Degree)) AND (ws_sales_price#21 >= 150.00)) AND (ws_sales_price#21 <= 200.00)))
               :     :     :     :     :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, ws_sales_price#21, ws_net_profit#33, wr_refunded_cdemo_sk#38L, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
               :     :     :     :     :  +- *(8) BroadcastHashJoin [ws_web_page_sk#12], [wp_web_page_sk#58], Inner, BuildRight
               :     :     :     :     :     :- *(8) Project [ws_sold_date_sk#0, ws_web_page_sk#12, ws_quantity#18, ws_sales_price#21, ws_net_profit#33, wr_refunded_cdemo_sk#38L, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
               :     :     :     :     :     :  +- *(8) BroadcastHashJoin [cast(ws_item_sk#3 as bigint), cast(ws_order_number#17 as bigint)], [wr_item_sk#36L, wr_order_number#47L], Inner, BuildRight
               :     :     :     :     :     :     :- *(8) Project [ws_sold_date_sk#0, ws_item_sk#3, ws_web_page_sk#12, ws_order_number#17, ws_quantity#18, ws_sales_price#21, ws_net_profit#33]
               :     :     :     :     :     :     :  +- *(8) Filter (((((isnotnull(ws_item_sk#3) AND isnotnull(ws_order_number#17)) AND isnotnull(ws_web_page_sk#12)) AND isnotnull(ws_sold_date_sk#0)) AND ((((ws_sales_price#21 >= 100.00) AND (ws_sales_price#21 <= 150.00)) OR ((ws_sales_price#21 >= 50.00) AND (ws_sales_price#21 <= 100.00))) OR ((ws_sales_price#21 >= 150.00) AND (ws_sales_price#21 <= 200.00)))) AND ((((ws_net_profit#33 >= 100.00) AND (ws_net_profit#33 <= 200.00)) OR ((ws_net_profit#33 >= 150.00) AND (ws_net_profit#33 <= 300.00))) OR ((ws_net_profit#33 >= 50.00) AND (ws_net_profit#33 <= 250.00))))
               :     :     :     :     :     :     :     +- *(8) ColumnarToRow
               :     :     :     :     :     :     :        +- FileScan parquet default.web_sales[ws_sold_date_sk#0,ws_item_sk#3,ws_web_page_sk#12,ws_order_number#17,ws_quantity#18,ws_sales_price#21,ws_net_profit#33] Batched: true, DataFilters: [isnotnull(ws_item_sk#3), isnotnull(ws_order_number#17), isnotnull(ws_web_page_sk#12), isnotnull(..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(ws_item_sk), IsNotNull(ws_order_number), IsNotNull(ws_web_page_sk), IsNotNull(ws_sold_..., ReadSchema: struct<ws_sold_date_sk:int,ws_item_sk:int,ws_web_page_sk:int,ws_order_number:int,ws_quantity:int,...
               :     :     :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true], input[5, bigint, true])), [id=#291]
               :     :     :     :     :     :        +- *(1) Project [wr_item_sk#36L, wr_refunded_cdemo_sk#38L, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_order_number#47L, wr_fee#52, wr_refunded_cash#54]
               :     :     :     :     :     :           +- *(1) Filter (((((isnotnull(wr_item_sk#36L) AND isnotnull(wr_order_number#47L)) AND isnotnull(wr_refunded_cdemo_sk#38L)) AND isnotnull(wr_returning_cdemo_sk#42L)) AND isnotnull(wr_refunded_addr_sk#40L)) AND isnotnull(wr_reason_sk#46L))
               :     :     :     :     :     :              +- *(1) ColumnarToRow
               :     :     :     :     :     :                 +- FileScan parquet default.web_returns[wr_item_sk#36L,wr_refunded_cdemo_sk#38L,wr_refunded_addr_sk#40L,wr_returning_cdemo_sk#42L,wr_reason_sk#46L,wr_order_number#47L,wr_fee#52,wr_refunded_cash#54] Batched: true, DataFilters: [isnotnull(wr_item_sk#36L), isnotnull(wr_order_number#47L), isnotnull(wr_refunded_cdemo_sk#38L), ..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(wr_item_sk), IsNotNull(wr_order_number), IsNotNull(wr_refunded_cdemo_sk), IsNotNull(wr..., ReadSchema: struct<wr_item_sk:bigint,wr_refunded_cdemo_sk:bigint,wr_refunded_addr_sk:bigint,wr_returning_cdem...
               :     :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#300]
               :     :     :     :     :        +- *(2) Project [wp_web_page_sk#58]
               :     :     :     :     :           +- *(2) Filter isnotnull(wp_web_page_sk#58)
               :     :     :     :     :              +- *(2) ColumnarToRow
               :     :     :     :     :                 +- FileScan parquet default.web_page[wp_web_page_sk#58] Batched: true, DataFilters: [isnotnull(wp_web_page_sk#58)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(wp_web_page_sk)], ReadSchema: struct<wp_web_page_sk:int>
               :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#309]
               :     :     :     :        +- *(3) Project [cd_demo_sk#72, cd_marital_status#74, cd_education_status#75]
               :     :     :     :           +- *(3) Filter (((isnotnull(cd_demo_sk#72) AND isnotnull(cd_education_status#75)) AND isnotnull(cd_marital_status#74)) AND ((((cd_marital_status#74 = M) AND (cd_education_status#75 = Advanced Degree)) OR ((cd_marital_status#74 = S) AND (cd_education_status#75 = College))) OR ((cd_marital_status#74 = W) AND (cd_education_status#75 = 2 yr Degree))))
               :     :     :     :              +- *(3) ColumnarToRow
               :     :     :     :                 +- FileScan parquet default.customer_demographics[cd_demo_sk#72,cd_marital_status#74,cd_education_status#75] Batched: true, DataFilters: [isnotnull(cd_demo_sk#72), isnotnull(cd_education_status#75), isnotnull(cd_marital_status#74), ((..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(cd_demo_sk), IsNotNull(cd_education_status), IsNotNull(cd_marital_status), Or(Or(And(E..., ReadSchema: struct<cd_demo_sk:int,cd_marital_status:string,cd_education_status:string>
               :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint), input[1, string, true], input[2, string, true])), [id=#318]
               :     :     :        +- *(4) Project [cd_demo_sk#125, cd_marital_status#127, cd_education_status#128]
               :     :     :           +- *(4) Filter ((isnotnull(cd_demo_sk#125) AND isnotnull(cd_education_status#128)) AND isnotnull(cd_marital_status#127))
               :     :     :              +- *(4) ColumnarToRow
               :     :     :                 +- FileScan parquet default.customer_demographics[cd_demo_sk#125,cd_marital_status#127,cd_education_status#128] Batched: true, DataFilters: [isnotnull(cd_demo_sk#125), isnotnull(cd_education_status#128), isnotnull(cd_marital_status#127)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(cd_demo_sk), IsNotNull(cd_education_status), IsNotNull(cd_marital_status)], ReadSchema: struct<cd_demo_sk:int,cd_marital_status:string,cd_education_status:string>
               :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#327]
               :     :        +- *(5) Project [ca_address_sk#81, ca_state#89]
               :     :           +- *(5) Filter (((isnotnull(ca_country#91) AND (ca_country#91 = United States)) AND isnotnull(ca_address_sk#81)) AND ((ca_state#89 IN (IN,OH,NJ) OR ca_state#89 IN (WI,CT,KY)) OR ca_state#89 IN (LA,IA,AR)))
               :     :              +- *(5) ColumnarToRow
               :     :                 +- FileScan parquet default.customer_address[ca_address_sk#81,ca_state#89,ca_country#91] Batched: true, DataFilters: [isnotnull(ca_country#91), (ca_country#91 = United States), isnotnull(ca_address_sk#81), ((ca_sta..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(ca_country), EqualTo(ca_country,United States), IsNotNull(ca_address_sk), Or(Or(In(ca_..., ReadSchema: struct<ca_address_sk:int,ca_state:string,ca_country:string>
               :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#336]
               :        +- *(6) Project [d_date_sk#94]
               :           +- *(6) Filter ((isnotnull(d_year#100) AND (d_year#100 = 2000)) AND isnotnull(d_date_sk#94))
               :              +- *(6) ColumnarToRow
               :                 +- FileScan parquet default.date_dim[d_date_sk#94,d_year#100] Batched: true, DataFilters: [isnotnull(d_year#100), (d_year#100 = 2000), isnotnull(d_date_sk#94)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2000), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:int,d_year:int>
               +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#345]
                  +- *(7) Project [r_reason_sk#122, r_reason_desc#124]
                     +- *(7) Filter isnotnull(r_reason_sk#122)
                        +- *(7) ColumnarToRow
                           +- FileScan parquet default.reason[r_reason_sk#122,r_reason_desc#124] Batched: true, DataFilters: [isnotnull(r_reason_sk#122)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(r_reason_sk)], ReadSchema: struct<r_reason_sk:int,r_reason_desc:string>


val rhs = splitConjunctivePredicates(right).groupBy(_.references.map(_.qualifier))
if (lhs.size > 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we pick rhs if it has more conjunctive predicates?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We will pick it at case or @ Or(left, right: And) => or case or @ Or(left: And, right: And) =>.
E.g.: (a && b) || (c && d). The rewriting steps are:
(a && b) || (c && d) --> (a || (c && d)) && (b || (c && d)) --> (a || c) && (a || d) && (b || c) && (b && d).

We will pick it at case or @ Or(left, right: And) => if a is fixed .

lhs.values.map(_.reduceLeft(And)).map { c =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the time complexity here? I am concerned about the performance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

07:35:05.863 WARN org.apache.spark.sql.TPCDSQuerySuite: 
=== Metrics of Analyzer/Optimizer Rules ===
Total number of runs: 7249
Total time: 1.949092121 seconds

Rule                                                                              Effective Time / Total Time                     Effective Runs / Total Runs                    

org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog                      151465071 / 249555919                           24 / 59                                        
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions         138746642 / 168406459                           1 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences                 78878999 / 132411189                            3 / 59                                         
org.apache.spark.sql.execution.datasources.FindDataSourceTable                    95372289 / 99326980                             1 / 59                                         
org.apache.spark.sql.catalyst.analysis.DecimalPrecision                           56750800 / 66170980                             2 / 59                                         
org.apache.spark.sql.execution.datasources.PreprocessTableCreation                0 / 48910600                                    0 / 28                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$ImplicitTypeCasts             20820860 / 44731509                             1 / 59                                         
org.apache.spark.sql.catalyst.optimizer.ColumnPruning                             12919681 / 44543112                             2 / 105                                        
org.apache.spark.sql.catalyst.optimizer.BooleanSimplification                     24718048 / 43686766                             1 / 55                                         
org.apache.spark.sql.execution.datasources.SchemaPruning                          0 / 32795196                                    0 / 25                                         
org.apache.spark.sql.catalyst.analysis.ResolveCatalogs                            0 / 30645089                                    0 / 59                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$FunctionArgumentConversion    17902985 / 27578796                             2 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator                  0 / 26144312                                    0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.PushDownPredicates                        17354780 / 25365681                             5 / 80                                         
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints               22137478 / 24736530                             1 / 25                                         
org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAliases                    0 / 24508932                                    0 / 55                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions                  10536555 / 24417169                             2 / 59                                         
org.apache.spark.sql.catalyst.optimizer.ReorderJoin                               17311087 / 22391786                             1 / 55                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveMissingReferences          0 / 21142565                                    0 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveBinaryArithmetic           0 / 20704002                                    0 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations                  15024644 / 20411277                             1 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases                    15859386 / 18096012                             1 / 59                                         
org.apache.spark.sql.catalyst.optimizer.ReorderAssociativeOperator                0 / 17026758                                    0 / 55                                         
org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability                 0 / 16870561                                    0 / 28                                         
org.apache.spark.sql.catalyst.optimizer.PruneFilters                              0 / 15220754                                    0 / 80                                         
org.apache.spark.sql.catalyst.optimizer.CollapseProject                           8979563 / 13211028                              1 / 80                                         
org.apache.spark.sql.catalyst.optimizer.LikeSimplification                        0 / 12837312                                    0 / 55                                         
org.apache.spark.sql.catalyst.optimizer.ConstantFolding                           7099416 / 12652039                              1 / 55                                         
org.apache.spark.sql.catalyst.analysis.TimeWindowing                              0 / 12566615                                    0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.ComputeCurrentTime                        0 / 12279377                                    0 / 25                                         
org.apache.spark.sql.catalyst.optimizer.FoldablePropagation                       0 / 12158420                                    0 / 55                                         
org.apache.spark.sql.catalyst.analysis.ResolveTimeZone                            7331904 / 11661433                              5 / 59                                         
org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown              0 / 11651482                                    0 / 25                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$HandleNullInputsForUDF            0 / 11312278                                    0 / 28                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$PullOutNondeterministic           0 / 11111621                                    0 / 28                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$PromoteStrings                0 / 11032240                                    0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.OptimizeIn                                0 / 10792662                                    0 / 55                                         
org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps                   0 / 10435223                                    0 / 55                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics          0 / 10147394                                    0 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions                   0 / 9906569                                     0 / 28                                         
org.apache.spark.sql.catalyst.optimizer.SimplifyBinaryComparison                  0 / 9434962                                     0 / 55                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRandomSeed                 0 / 8745520                                     0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.NullPropagation                           0 / 8673323                                     0 / 55                                         
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables                     0 / 8618322                                     0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.DecimalAggregates                         1133548 / 8344365                               1 / 26                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$EltCoercion                   0 / 8306604                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveInsertInto                 0 / 8272186                                     0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint                     0 / 8269481                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$BooleanEquality               0 / 8164425                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$InConversion                  0 / 7999784                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$WindowFrameCoercion           0 / 7744630                                     0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.EliminateSorts                            0 / 7664352                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNamespace                  0 / 7333421                                     0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.RewriteCorrelatedScalarSubquery           0 / 7288581                                     0 / 55                                         
org.apache.spark.sql.catalyst.optimizer.ReassignLambdaVariableID                  0 / 7094275                                     0 / 25                                         
org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers                  0 / 6952061                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.CleanupAliases                             1507275 / 6818821                               1 / 29                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$DateTimeOperations            0 / 6621755                                     0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.SimplifyConditionals                      0 / 6594834                                     0 / 55                                         
org.apache.spark.sql.execution.dynamicpruning.PartitionPruning                    0 / 6435655                                     0 / 25                                         
org.apache.spark.sql.catalyst.optimizer.SimplifyCasts                             1726589 / 6431193                               1 / 55                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$IfCoercion                    0 / 6390305                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates                  0 / 6364189                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$ConcatCoercion                0 / 6344767                                     0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.PushLeftSemiLeftAntiThroughJoin           0 / 6300823                                     0 / 55                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSubquery                   0 / 6237735                                     0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.SimplifyCaseConversionExpressions         0 / 6221240                                     0 / 55                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$StringLiteralCoercion         0 / 6192720                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.CTESubstitution                            0 / 6150743                                     0 / 28                                         
org.apache.spark.sql.catalyst.optimizer.GetCurrentDatabase                        0 / 6148608                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.ResolveCreateNamedStruct                   0 / 6101190                                     0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.ReplaceNullWithFalseInPredicate           0 / 6100257                                     0 / 55                                         
org.apache.spark.sql.catalyst.optimizer.ConstantPropagation                       0 / 6087339                                     0 / 55                                         
org.apache.spark.sql.catalyst.analysis.ResolveHigherOrderFunctions                0 / 6067619                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions          0 / 6061035                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast                     0 / 6007788                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$Division                      0 / 5925219                                     0 / 59                                         
org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions              0 / 5824041                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolvePivot                      0 / 5559831                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveJoinStrategyHints      0 / 5524307                                     0 / 28                                         
org.apache.spark.sql.catalyst.optimizer.RemoveDispensableExpressions              0 / 5510061                                     0 / 55                                         
org.apache.spark.sql.execution.datasources.DataSourceAnalysis                     5412159 / 5475347                               24 / 28                                        
org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery                  2055027 / 5324950                               1 / 25                                         
org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoinByCNF             0 / 5285288                                     0 / 25                                         
org.apache.spark.sql.catalyst.optimizer.RemoveNoopOperators                       0 / 4998930                                     0 / 105                                        
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOrdinalInOrderByAndGroupBy 0 / 4914629                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder                0 / 4899787                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals               0 / 4897924                                     0 / 28                                         
org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin                  0 / 4835640                                     0 / 55                                         
org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation                    0 / 4795908                                     0 / 50                                         
org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning                   0 / 4763236                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.ResolveTableValuedFunctions                0 / 4607355                                     0 / 59                                         
org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin                   0 / 4562616                                     0 / 28                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$CaseWhenCoercion              0 / 4517751                                     0 / 59                                         
org.apache.spark.sql.execution.datasources.FallBackFileSourceV2                   0 / 4397375                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame                0 / 4313997                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$MapZipWithCoercion            0 / 4309833                                     0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.RewriteNonCorrelatedExists                0 / 4253433                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$StackCoercion                 0 / 4208225                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer               0 / 4145215                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggAliasInGroupBy          0 / 4095335                                     0 / 59                                         
org.apache.spark.sql.execution.python.ExtractPythonUDFs                           0 / 4001274                                     0 / 25                                         
org.apache.spark.sql.catalyst.optimizer.PullupCorrelatedPredicates                0 / 3920586                                     0 / 25                                         
org.apache.spark.sql.catalyst.optimizer.CollapseWindow                            0 / 3888524                                     0 / 55                                         
org.apache.spark.sql.catalyst.optimizer.RewriteDistinctAggregates                 0 / 3803019                                     0 / 25                                         
org.apache.spark.sql.catalyst.optimizer.CollapseRepartition                       0 / 3755697                                     0 / 55                                         
org.apache.spark.sql.catalyst.optimizer.CombineUnions                             0 / 3704733                                     0 / 80                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNewInstance                0 / 3695526                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases                   1421178 / 3635064                               1 / 25                                         
org.apache.spark.sql.catalyst.optimizer.EliminateSerialization                    0 / 3550180                                     0 / 55                                         
org.apache.spark.sql.catalyst.optimizer.Optimizer$OptimizeSubqueries              0 / 3536272                                     0 / 50                                         
org.apache.spark.sql.catalyst.optimizer.CombineLimits                             0 / 3504846                                     0 / 55                                         
org.apache.spark.sql.catalyst.optimizer.TransposeWindow                           0 / 3499057                                     0 / 55                                         
org.apache.spark.sql.catalyst.optimizer.PushProjectionThroughUnion                0 / 3355877                                     0 / 55                                         
org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin                        0 / 3233735                                     0 / 55                                         
org.apache.spark.sql.catalyst.optimizer.LimitPushDown                             0 / 3208991                                     0 / 55                                         
org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero                         0 / 3190070                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables                     0 / 3094747                                     0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.RewriteIntersectAll                       0 / 3085769                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$WindowsSubstitution               0 / 2979758                                     0 / 28                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAlterTableChanges          0 / 2948089                                     0 / 28                                         
org.apache.spark.sql.catalyst.optimizer.ExtractPythonUDFFromJoinCondition         0 / 2915414                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.EliminateView                              0 / 2805178                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveCoalesceHints          0 / 2710440                                     0 / 28                                         
org.apache.spark.sql.catalyst.optimizer.ReplaceExpressions                        0 / 2535170                                     0 / 25                                         
org.apache.spark.sql.catalyst.optimizer.EliminateMapObjects                       0 / 2520470                                     0 / 25                                         
org.apache.spark.sql.execution.dynamicpruning.CleanupDynamicPruningFilters        0 / 2519862                                     0 / 25                                         
org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate               0 / 2495766                                     0 / 25                                         
org.apache.spark.sql.execution.python.ExtractGroupingPythonUDFFromAggregate       0 / 2417625                                     0 / 25                                         
org.apache.spark.sql.catalyst.optimizer.RewriteExceptAll                          0 / 2301441                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSubqueryColumnAliases      0 / 2277596                                     0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.CombineTypedFilters                       0 / 2267944                                     0 / 25                                         
org.apache.spark.sql.catalyst.optimizer.ReplaceExceptWithFilter                   0 / 2252560                                     0 / 25                                         
org.apache.spark.sql.catalyst.optimizer.CombineFilters                            0 / 2236246                                     0 / 55                                         
org.apache.spark.sql.execution.datasources.ResolveSQLOnFile                       0 / 2214162                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.TypeCoercion$WidenSetOperationTypes        0 / 2164273                                     0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.RemoveLiteralFromGroupExpressions         0 / 2092068                                     0 / 25                                         
org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin              0 / 1952290                                     0 / 25                                         
org.apache.spark.sql.catalyst.optimizer.ReplaceDeduplicateWithAggregate           0 / 1927408                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate                   0 / 1915572                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin        0 / 1910467                                     0 / 59                                         
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOutputRelation             0 / 1839082                                     0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.ReplaceExceptWithAntiJoin                 0 / 1651671                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.ResolveInlineTables                        0 / 1536486                                     0 / 59                                         
org.apache.spark.sql.catalyst.optimizer.RemoveRepetitionFromGroupExpressions      0 / 1474769                                     0 / 25                                         
org.apache.spark.sql.execution.datasources.PreprocessTableInsertion               0 / 1406859                                     0 / 28                                         
org.apache.spark.sql.catalyst.analysis.EliminateUnions                            0 / 1327002                                     0 / 28                                         
org.apache.spark.sql.catalyst.optimizer.CombineConcats                            0 / 1217837                                     0 / 55                                         
org.apache.spark.sql.catalyst.optimizer.ReplaceDistinctWithAggregate              0 / 1201090                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.ResolveHints$RemoveAllHints                0 / 1199850                                     0 / 28                                         
org.apache.spark.sql.catalyst.optimizer.EliminateDistinct                         0 / 1098290                                     0 / 25                                         
org.apache.spark.sql.catalyst.analysis.UpdateOuterReferences                      0 / 1028033                                     0 / 28                                         
org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts                    0 / 703053                                      0 / 50                                         
org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughNonJoin               0 / 617000                                      0 / 25                                         
org.apache.spark.sql.execution.OptimizeMetadataOnlyQuery                          0 / 371820                                      0 / 25                                         
org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder                      0 / 309745                                      0 / 25                                         

toCNF(Or(toCNF(c, depth + 1), toCNF(right, depth + 1)), depth + 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand this. At least I can't see how is it related to (a && b) || c --> (a || c) && (b || c)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a special case of (a && b) || c. E.g.: (a && b) || (c && d). The rewriting steps are:
(a && b) || (c && d) --> (a || (c && d)) && (b || (c && d)) --> (a || c) && (a || d) && (b || c) && (b && d).

TPC-DS Q13:
image

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a complete step to PR description to explain how it works.

}.reduce(And)
} else if (rhs.size > 1) {
rhs.values.map(_.reduceLeft(And)).map { c =>
toCNF(Or(toCNF(left, depth + 1), toCNF(c, depth + 1)), depth + 1)
}.reduce(And)
} else {
or
}

case or @ Or(left: And, right) =>
val lhs = splitConjunctivePredicates(left).groupBy(_.references.map(_.qualifier))
if (lhs.size > 1) {
lhs.values.map(_.reduceLeft(And)).map {
c => toCNF(Or(toCNF(c, depth + 1), toCNF(right, depth + 1)), depth + 1)
}.reduce(And)
} else {
or
}

case or @ Or(left, right: And) =>
val rhs = splitConjunctivePredicates(right).groupBy(_.references.map(_.qualifier))
if (rhs.size > 1) {
rhs.values.map(_.reduceLeft(And)).map { c =>
toCNF(Or(toCNF(left, depth + 1), toCNF(c, depth + 1)), depth + 1)
}.reduce(And)
} else {
or
}

case And(left, right) =>
And(toCNF(left, depth + 1), toCNF(right, depth + 1))

case other =>
other
}
} else {
condition
}
}

private def maybeWithFilter(joinCondition: Option[Expression], plan: LogicalPlan) = {
(joinCondition, plan) match {
// Avoid adding the same filter.
case (Some(condition), filter: Filter) if condition.semanticEquals(filter.condition) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there an existing optimizer rule to remove duplicated predicates?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it.

plan
case (Some(condition), _) =>
Filter(condition, plan)
case _ =>
plan
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally

val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
case j @ Join(left, right, joinType, Some(joinCondition), hint) =>

val pushDownCandidates =
splitConjunctivePredicates(toCNF(joinCondition)).filter(_.deterministic)
val (leftEvaluateCondition, rest) =
pushDownCandidates.partition(_.references.subsetOf(left.outputSet))
val (rightEvaluateCondition, _) =
rest.partition(expr => expr.references.subsetOf(right.outputSet))

val newLeft = maybeWithFilter(leftEvaluateCondition.reduceLeftOption(And), left)
val newRight = maybeWithFilter(rightEvaluateCondition.reduceLeftOption(And), right)

joinType match {
case _: InnerLike | LeftSemi =>
Join(newLeft, newRight, joinType, Some(joinCondition), hint)
case RightOuter =>
Join(newLeft, right, RightOuter, Some(joinCondition), hint)
case LeftOuter | LeftAnti | ExistenceJoin(_) =>
Join(left, newRight, joinType, Some(joinCondition), hint)
case FullOuter => j
case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node")
case UsingJoin(_, _) => sys.error("Untransformed Using join node")
}
}
}

/**
* Combines two adjacent [[Limit]] operators into one, merging the
* expressions into one single expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,18 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val MAX_REWRITING_CNF_DEPTH =
buildConf("spark.sql.maxRewritingCNFDepth")
.internal()
.doc("The maximum depth of rewriting a join condition to conjunctive normal form " +
"expression. The deeper, the more predicate may be found, but the optimization time " +
"will increase. The default is 6. By setting this value to 0 this feature can be disabled.")
.version("3.1.0")
.intConf
.checkValue(_ >= 0,
"The depth of the maximum rewriting conjunction normal form must be positive.")
.createWithDefault(10)

val ESCAPED_STRING_LITERALS = buildConf("spark.sql.parser.escapedStringLiterals")
.internal()
.doc("When true, string literals (including regex patterns) remain escaped in our SQL " +
Expand Down Expand Up @@ -2845,6 +2857,8 @@ class SQLConf extends Serializable with Logging {

def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)

def maxRewritingCNFDepth: Int = getConf(MAX_REWRITING_CNF_DEPTH)

def escapedStringLiterals: Boolean = getConf(ESCAPED_STRING_LITERALS)

def fileCompressionFactor: Double = getConf(FILE_COMPRESSION_FACTOR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, IntegerType}
import org.apache.spark.unsafe.types.CalendarInterval

Expand All @@ -39,7 +40,9 @@ class FilterPushdownSuite extends PlanTest {
PushPredicateThroughNonJoin,
BooleanSimplification,
PushPredicateThroughJoin,
CollapseProject) :: Nil
CollapseProject) ::
Batch("PushPredicateThroughJoinByCNF", Once,
PushPredicateThroughJoinByCNF) :: Nil
}

val attrA = 'a.int
Expand Down Expand Up @@ -1230,4 +1233,154 @@ class FilterPushdownSuite extends PlanTest {

comparePlans(Optimize.execute(query.analyze), expected)
}

test("inner join: rewrite filter predicates to conjunctive normal form") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y)
.where(("x.b".attr === "y.b".attr)
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11)))
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.where(('a > 3 || 'a > 1)).subquery('x)
val right = testRelation.where('a > 13 || 'a > 11).subquery('y)
val correctAnswer =
left.join(right, condition = Some("x.b".attr === "y.b".attr
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11))))
.analyze

comparePlans(optimized, correctAnswer)
}

test("inner join: rewrite join predicates to conjunctive normal form") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, condition = Some(("x.b".attr === "y.b".attr)
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11))))
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.where('a > 3 || 'a > 1).subquery('x)
val right = testRelation.where('a > 13 || 'a > 11).subquery('y)
val correctAnswer =
left.join(right, condition = Some("x.b".attr === "y.b".attr
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11))))
.analyze

comparePlans(optimized, correctAnswer)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Match the PostgreSQL's plan:

postgres=# explain select x.* from x join y on (x.b = y.b and ( (x.a > 3 and y.a > 13 ) or (x.a > 1 and y.a > 11) ));
                                QUERY PLAN
---------------------------------------------------------------------------
 Merge Join  (cost=178.36..315.60 rows=3593 width=16)
   Merge Cond: (x.b = y.b)
   Join Filter: (((x.a > 3) AND (y.a > 13)) OR ((x.a > 1) AND (y.a > 11)))
   ->  Sort  (cost=89.18..91.75 rows=1028 width=16)
         Sort Key: x.b
         ->  Seq Scan on x  (cost=0.00..37.75 rows=1028 width=16)
               Filter: ((a > 3) OR (a > 1))
   ->  Sort  (cost=89.18..91.75 rows=1028 width=8)
         Sort Key: y.b
         ->  Seq Scan on y  (cost=0.00..37.75 rows=1028 width=8)
               Filter: ((a > 13) OR (a > 11))
(11 rows)

}

test("inner join: rewrite join predicates(with NOT predicate) to conjunctive normal form") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, condition = Some(("x.b".attr === "y.b".attr)
&& Not(("x.a".attr > 3)
&& ("x.a".attr < 2 || ("y.a".attr > 13)) || ("x.a".attr > 1) && ("y.a".attr > 11))))
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.where('a <= 3 || 'a >= 2).subquery('x)
val right = testRelation.subquery('y)
val correctAnswer =
left.join(right, condition = Some("x.b".attr === "y.b".attr
&& (("x.a".attr <= 3) || (("x.a".attr >= 2) && ("y.a".attr <= 13)))
&& (("x.a".attr <= 1) || ("y.a".attr <= 11))))
.analyze
comparePlans(optimized, correctAnswer)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Match the PostgreSQL's plan:

postgres=# explain select x.* from x join y on ((x.b = y.b) and Not((x.a > 3) and (x.a < 2 or (y.a > 13)) or (x.a > 1) and (y.a > 11)));
                                          QUERY PLAN
-----------------------------------------------------------------------------------------------
 Merge Join  (cost=218.07..484.71 rows=3874 width=16)
   Merge Cond: (x.b = y.b)
   Join Filter: (((x.a <= 1) OR (y.a <= 11)) AND ((x.a <= 3) OR ((x.a >= 2) AND (y.a <= 13))))
   ->  Sort  (cost=89.18..91.75 rows=1028 width=16)
         Sort Key: x.b
         ->  Seq Scan on x  (cost=0.00..37.75 rows=1028 width=16)
               Filter: ((a <= 3) OR (a >= 2))
   ->  Sort  (cost=128.89..133.52 rows=1850 width=8)
         Sort Key: y.b
         ->  Seq Scan on y  (cost=0.00..28.50 rows=1850 width=8)
(10 rows)

}

test("left join: rewrite join predicates to conjunctive normal form") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, joinType = LeftOuter, condition = Some(("x.b".attr === "y.b".attr)
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11))))
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.subquery('x)
val right = testRelation.where('a > 13 || 'a > 11).subquery('y)
val correctAnswer =
left.join(right, joinType = LeftOuter, condition = Some("x.b".attr === "y.b".attr
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11))))
.analyze

comparePlans(optimized, correctAnswer)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Match the PostgreSQL's plan:

postgres=# explain select x.* from x left join y on (x.b = y.b and ( (x.a > 3 and y.a > 13 ) or (x.a > 1 and y.a > 11) ));
                                QUERY PLAN
---------------------------------------------------------------------------
 Merge Left Join  (cost=218.07..465.05 rows=1996 width=16)
   Merge Cond: (x.b = y.b)
   Join Filter: (((x.a > 3) AND (y.a > 13)) OR ((x.a > 1) AND (y.a > 11)))
   ->  Sort  (cost=128.89..133.52 rows=1850 width=16)
         Sort Key: x.b
         ->  Seq Scan on x  (cost=0.00..28.50 rows=1850 width=16)
   ->  Sort  (cost=89.18..91.75 rows=1028 width=8)
         Sort Key: y.b
         ->  Seq Scan on y  (cost=0.00..37.75 rows=1028 width=8)
               Filter: ((a > 13) OR (a > 11))
(10 rows)

}

test("right join: rewrite join predicates to conjunctive normal form") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, joinType = RightOuter, condition = Some(("x.b".attr === "y.b".attr)
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11))))
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.where('a > 3 || 'a > 1).subquery('x)
val right = testRelation.subquery('y)
val correctAnswer =
left.join(right, joinType = RightOuter, condition = Some("x.b".attr === "y.b".attr
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11))))
.analyze

comparePlans(optimized, correctAnswer)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Match the PostgreSQL's plan:

postgres=# explain select x.* from x right join y on (x.b = y.b and ( (x.a > 3 and y.a > 13 ) or (x.a > 1 and y.a > 11) ));
                                QUERY PLAN
---------------------------------------------------------------------------
 Merge Left Join  (cost=218.07..465.05 rows=1996 width=16)
   Merge Cond: (y.b = x.b)
   Join Filter: (((x.a > 3) AND (y.a > 13)) OR ((x.a > 1) AND (y.a > 11)))
   ->  Sort  (cost=128.89..133.52 rows=1850 width=8)
         Sort Key: y.b
         ->  Seq Scan on y  (cost=0.00..28.50 rows=1850 width=8)
   ->  Sort  (cost=89.18..91.75 rows=1028 width=16)
         Sort Key: x.b
         ->  Seq Scan on x  (cost=0.00..37.75 rows=1028 width=16)
               Filter: ((a > 3) OR (a > 1))
(10 rows)

}

test("inner join: rewrite to conjunctive normal form avoid genereting too many predicates") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, condition = Some(("x.b".attr === "y.b".attr)
&& ((("x.a".attr > 3) && ("x.a".attr < 13) && ("y.c".attr <= 5))
|| (("y.a".attr > 2) && ("y.c".attr < 1)))))
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.subquery('x)
val right = testRelation.where('c <= 5 || ('a > 2 && 'c < 1)).subquery('y)
val correctAnswer = left.join(right, condition = Some("x.b".attr === "y.b".attr
&& ((("x.a".attr > 3) && ("x.a".attr < 13) && ("y.c".attr <= 5))
|| (("y.a".attr > 2) && ("y.c".attr < 1))))).analyze

comparePlans(optimized, correctAnswer)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Match the PostgreSQL's plan:

postgres=# explain select x.* from x join y on (x.b = y.b and ( (x.a > 3 and x.a < 13 and y.c <= 5) or (y.a > 2 and y.c < 1) ));
                                       QUERY PLAN
-----------------------------------------------------------------------------------------
 Merge Join  (cost=207.30..402.86 rows=1927 width=16)
   Merge Cond: (y.b = x.b)
   Join Filter: (((x.a > 3) AND (x.a < 13) AND (y.c <= 5)) OR ((y.a > 2) AND (y.c < 1)))
   ->  Sort  (cost=78.41..80.30 rows=754 width=12)
         Sort Key: y.b
         ->  Seq Scan on y  (cost=0.00..42.38 rows=754 width=12)
               Filter: ((c <= 5) OR ((a > 2) AND (c < 1)))
   ->  Sort  (cost=128.89..133.52 rows=1850 width=16)
         Sort Key: x.b
         ->  Seq Scan on x  (cost=0.00..28.50 rows=1850 width=16)
(10 rows)

}

test(s"Disable rewrite to CNF by setting ${SQLConf.MAX_REWRITING_CNF_DEPTH.key}=0") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, condition = Some(("x.b".attr === "y.b".attr)
&& ((("x.a".attr > 3) && ("x.a".attr < 13) && ("y.c".attr <= 5))
|| (("y.a".attr > 2) && ("y.c".attr < 1)))))
}

Seq(0, 10).foreach { depth =>
withSQLConf(SQLConf.MAX_REWRITING_CNF_DEPTH.key -> depth.toString) {
val optimized = Optimize.execute(originalQuery.analyze)
val (left, right) = if (depth == 0) {
(testRelation.subquery('x), testRelation.subquery('y))
} else {
(testRelation.subquery('x),
testRelation.where('c <= 5 || ('a > 2 && 'c < 1)).subquery('y))
}
val correctAnswer = left.join(right, condition = Some("x.b".attr === "y.b".attr
&& ((("x.a".attr > 3) && ("x.a".attr < 13) && ("y.c".attr <= 5))
|| (("y.a".attr > 2) && ("y.c".attr < 1))))).analyze

comparePlans(optimized, correctAnswer)
}
}
}
}