Skip to content

Conversation

gengliangwang
Copy link
Member

@gengliangwang gengliangwang commented Jul 12, 2020

What changes were proposed in this pull request?

After #28805, predicates are converted into CNF for partition pruning. However, the CNF result can be very long and the Hive metastore will fail to execute it.
For example, the following partition filter:

(p0 = '1' AND p1 = '1') OR (p0 = '2' AND p1 = '2') OR (p0 = '3' AND p1 = '3') OR (p0 = '4' AND p1 = '4') OR (p0 = '5' AND p1 = '5') OR (p0 = '6' AND p1 = '6') OR (p0 = '7' AND p1 = '7') OR (p0 = '8' AND p1 = '8') OR (p0 = '9' AND p1 = '9') OR (p0 = '10' AND p1 = '10') OR (p0 = '11' AND p1 = '11') OR (p0 = '12' AND p1 = '12') OR (p0 = '13' AND p1 = '13') OR (p0 = '14' AND p1 = '14') OR (p0 = '15' AND p1 = '15') OR (p0 = '16' AND p1 = '16') OR (p0 = '17' AND p1 = '17') OR (p0 = '18' AND p1 = '18') OR (p0 = '19' AND p1 = '19') OR (p0 = '20' AND p1 = '20')

will be converted into a long query(130K characters) in Hive metastore, and there will be error:

javax.jdo.JDOException: Exception thrown when executing query : SELECT DISTINCT 'org.apache.hadoop.hive.metastore.model.MPartition' AS NUCLEUS_TYPE,A0.CREATE_TIME,A0.LAST_ACCESS_TIME,A0.PART_NAME,A0.PART_ID,A0.PART_NAME AS NUCORDER0 FROM PARTITIONS A0 LEFT OUTER JOIN TBLS B0 ON A0.TBL_ID = B0.TBL_ID LEFT OUTER JOIN DBS C0 ON B0.DB_ID = C0.DB_ID WHERE B0.TBL_NAME = ? AND C0."NAME" = ? AND ((((((A0.PART_NAME LIKE '%/p1=1' ESCAPE '\' ) OR (A0.PART_NAME LIKE '%/p1=2' ESCAPE '\' )) OR (A0.PART_NAME LIKE '%/p1=3' ESCAPE '\' )) OR ((A0.PART_NAME LIKE '%/p1=4' ESCAPE '\' ) O ...

To mitigating the regression due to the previous improvement #28805:

  1. We should push down the convertible original queries as they are, instead of converting all predicates into CNF
  2. We can skip grouping expressions so that we can stop the CNF conversion when the predicates becoming too long.

Why are the changes needed?

Mitigating potential regressions in partiton pruning from #28805

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test

@gengliangwang
Copy link
Member Author

cc @AngersZhuuuu @cloud-fan

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this aims to fix too many predicated issues in HMS, Avoid pushing down too many predicated in partition pruning sounds ambiguous as a PR title. Can we have a more specific title describing what the PR code does?

@gengliangwang gengliangwang changed the title [SPARK-32284][SQL] Avoid pushing down too many predicated in partition pruning [SPARK-32284][SQL] Avoid expanding too many CNF predicates in partition pruning Jul 12, 2020
@gengliangwang
Copy link
Member Author

@dongjoon-hyun Thanks for the suggestion. I have updated the title.

@SparkQA
Copy link

SparkQA commented Jul 12, 2020

Test build #125715 has finished for PR 29075 at commit ccba836.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jul 12, 2020

Thank you for updating, @gengliangwang . Shall we adjust this test case name accordingly together?

test("SPARK-32284: Avoid pushing down too many predicates in partition pruning") {

BTW, in the test case, since 20 looks like reasonably a small number in the Spark world. Could you use more functional word to describe the change? For example, this PR is not limiting based on the number of predicate like 10 is possible, but 20 is not allowed. Apache Spark still will hit the HMS issue when we have a long long SQL query with too many predicates after this PR. So, this PR doesn't fix Avoid pushing down too many predicates in partition pruning. Instead, this PR looks like mitigating the regression due to the previous improvement PR.

We should push down the convertible original queries as they are, instead of converting all predicates into CNF
We can skip grouping expressions so that we can stop the CNF conversion when the predicates becoming too long.

case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation)
if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty =>
val predicates = CNFWithGroupExpressionsByReference(filters.reduceLeft(And))
val predicates = CNFConversion(filters.reduceLeft(And))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: conjunctiveNormalForm(filters.reduceLeft(And), identity)?

val extraPartitionFilters =
remainingFilterInCnf.filter(f => f.references.subsetOf(partitionSet))

(ExpressionSet(partitionFilters ++ extraPartitionFilters), remainingFilters)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    val (extraPartitionFilters, otherFilters) = remainingFilterInCnf.partition(f =>
      f.references.subsetOf(partitionSet)
    )
    (ExpressionSet(partitionFilters ++ extraPartitionFilters), otherFilters)

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that way, otherFilters can be very long, which leads to a longer codegen... I am avoiding that on purpose. Let me add comment here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay.

import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions.CNFConversion
import org.apache.spark.sql.internal.SQLConf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is not necessary.

val remainingFilterInCnf = remainingFilters.flatMap(CNFConversion)
val extraPartitionFilters = remainingFilterInCnf.filter(f =>
!f.references.isEmpty && f.references.subsetOf(partitionColumnSet))
ExpressionSet(partitionFilters ++ extraPartitionFilters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused that seems CNFConversion won't change references, You don't need to call a splitConjunctivePredicates to each expr in remainingFilterInCnf to extract more predicate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filters here is already processed with splitConjunctivePredicates in PhysicalOperation.unapply. That's why the original code before #28805 doesn't call splitConjunctivePredicates either.

@SparkQA
Copy link

SparkQA commented Jul 13, 2020

Test build #125746 has finished for PR 29075 at commit df08390.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 13, 2020

Test build #125758 has finished for PR 29075 at commit 6fe106c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* @param condition to be converted into CNF.
* @param condition Condition to be converted into CNF.
* @param groupExpsFunc A method for grouping intermediate results so that the final result can be
* shorter.
Copy link
Member

@maropu maropu Jul 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: A method to group expressions for reducing the size of pushed down predicates and corresponding codegen?

@gengliangwang
Copy link
Member Author

#29101 is a better solution to me. Close this one now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants