Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,15 @@ trait PredicateHelper extends Logging {
* CNF can explode exponentially in the size of the input expression when converting [[Or]]
* clauses. Use a configuration [[SQLConf.MAX_CNF_NODE_COUNT]] to prevent such cases.
*
* @param condition to be converted into CNF.
* @param condition Condition to be converted into CNF.
* @param groupExpsFunc A method for grouping intermediate results so that the final result can be
* shorter.
Copy link
Member

@maropu maropu Jul 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: A method to group expressions for reducing the size of pushed down predicates and corresponding codegen?

* @return the CNF result as sequence of disjunctive expressions. If the number of expressions
* exceeds threshold on converting `Or`, `Seq.empty` is returned.
*/
protected def conjunctiveNormalForm(
protected def CNFConversion(
condition: Expression,
groupExpsFunc: Seq[Expression] => Seq[Expression]): Seq[Expression] = {
groupExpsFunc: Seq[Expression] => Seq[Expression] = identity): Seq[Expression] = {
val postOrderNodes = postOrderTraversal(condition)
val resultStack = new mutable.Stack[Seq[Expression]]
val maxCnfNodeCount = SQLConf.get.maxCnfNodeCount
Expand Down Expand Up @@ -256,33 +258,15 @@ trait PredicateHelper extends Logging {
* when expand predicates, we can group by the qualifier avoiding generate unnecessary
* expression to control the length of final result since there are multiple tables.
*
* @param condition condition need to be converted
* @param condition Condition to be converted into CNF.
* @return the CNF result as sequence of disjunctive expressions. If the number of expressions
* exceeds threshold on converting `Or`, `Seq.empty` is returned.
*/
def CNFWithGroupExpressionsByQualifier(condition: Expression): Seq[Expression] = {
conjunctiveNormalForm(condition, (expressions: Seq[Expression]) =>
CNFConversion(condition, (expressions: Seq[Expression]) =>
expressions.groupBy(_.references.map(_.qualifier)).map(_._2.reduceLeft(And)).toSeq)
}

/**
* Convert an expression to conjunctive normal form for predicate pushdown and partition pruning.
* When expanding predicates, this method groups expressions by their references for reducing
* the size of pushed down predicates and corresponding codegen. In partition pruning strategies,
* we split filters by [[splitConjunctivePredicates]] and partition filters by judging if it's
* references is subset of partCols, if we combine expressions group by reference when expand
* predicate of [[Or]], it won't impact final predicate pruning result since
* [[splitConjunctivePredicates]] won't split [[Or]] expression.
*
* @param condition condition need to be converted
* @return the CNF result as sequence of disjunctive expressions. If the number of expressions
* exceeds threshold on converting `Or`, `Seq.empty` is returned.
*/
def CNFWithGroupExpressionsByReference(condition: Expression): Seq[Expression] = {
conjunctiveNormalForm(condition, (expressions: Seq[Expression]) =>
expressions.groupBy(e => AttributeSet(e.references)).map(_._2.reduceLeft(And)).toSeq)
}

/**
* Iterative post order traversal over a binary tree built by And/Or clauses with two stacks.
* For example, a condition `(a And b) Or c`, the postorder traversal is
Expand All @@ -294,7 +278,7 @@ trait PredicateHelper extends Logging {
* 2.1 Pop a node from first stack and push it to second stack
* 2.2 Push the children of the popped node to first stack
*
* @param condition to be traversed as binary tree
* @param condition Condition to be traversed as binary tree
* @return sub-expressions in post order traversal as a stack.
* The first element of result stack is the leftmost node.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,20 @@ private[sql] object PruneFileSourcePartitions
val partitionColumns =
relation.resolve(partitionSchema, sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val (partitionFilters, dataFilters) = normalizedFilters.partition(f =>
val (partitionFilters, remainingFilters) = normalizedFilters.partition(f =>
f.references.subsetOf(partitionSet)
)

(ExpressionSet(partitionFilters), dataFilters)
// Try extracting more convertible partition filters from the remaining filters by converting
// them into CNF.
val remainingFilterInCnf = remainingFilters.flatMap(CNFConversion(_))
val extraPartitionFilters =
remainingFilterInCnf.filter(f => f.references.subsetOf(partitionSet))

// For the filters that can't be used for partition pruning, we simply use `remainingFilters`
// instead of using the non-convertible part from `remainingFilterInCnf`. Otherwise, the
// result filters can be very long.
(ExpressionSet(partitionFilters ++ extraPartitionFilters), remainingFilters)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    val (extraPartitionFilters, otherFilters) = remainingFilterInCnf.partition(f =>
      f.references.subsetOf(partitionSet)
    )
    (ExpressionSet(partitionFilters ++ extraPartitionFilters), otherFilters)

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that way, otherFilters can be very long, which leads to a longer codegen... I am avoiding that on purpose. Let me add comment here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay.

}

private def rebuildPhysicalOperation(
Expand Down Expand Up @@ -88,12 +97,9 @@ private[sql] object PruneFileSourcePartitions
_,
_))
if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined =>
val predicates = CNFWithGroupExpressionsByReference(filters.reduceLeft(And))
val finalPredicates = if (predicates.nonEmpty) predicates else filters
val (partitionKeyFilters, _) = getPartitionKeyFiltersAndDataFilters(
fsRelation.sparkSession, logicalRelation, partitionSchema, finalPredicates,
fsRelation.sparkSession, logicalRelation, partitionSchema, filters,
logicalRelation.output)

if (partitionKeyFilters.nonEmpty) {
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
val prunedFsRelation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,15 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession)
val normalizedFilters = DataSourceStrategy.normalizeExprs(
filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output)
val partitionColumnSet = AttributeSet(relation.partitionCols)
ExpressionSet(normalizedFilters.filter { f =>
val (partitionFilters, remainingFilters) = normalizedFilters.partition { f =>
!f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
})
}
// Try extracting more convertible partition filters from the remaining filters by converting
// them into CNF.
val remainingFilterInCnf = remainingFilters.flatMap(CNFConversion(_))
val extraPartitionFilters = remainingFilterInCnf.filter(f =>
!f.references.isEmpty && f.references.subsetOf(partitionColumnSet))
ExpressionSet(partitionFilters ++ extraPartitionFilters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused that seems CNFConversion won't change references, You don't need to call a splitConjunctivePredicates to each expr in remainingFilterInCnf to extract more predicate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filters here is already processed with splitConjunctivePredicates in PhysicalOperation.unapply. That's why the original code before #28805 doesn't call splitConjunctivePredicates either.

}

/**
Expand Down Expand Up @@ -103,9 +109,7 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession)
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation)
if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty =>
val predicates = CNFWithGroupExpressionsByReference(filters.reduceLeft(And))
val finalPredicates = if (predicates.nonEmpty) predicates else filters
val partitionKeyFilters = getPartitionKeyFilters(finalPredicates, relation)
val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
if (partitionKeyFilters.nonEmpty) {
val newPartitions = prunePartitions(relation, partitionKeyFilters)
val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,31 @@ abstract class PrunePartitionSuiteBase extends QueryTest with SQLTestUtils with
}
}

test("SPARK-32284: Avoid expanding too many CNF predicates in partition pruning") {
withTempView("temp") {
withTable("t") {
sql(
s"""
|CREATE TABLE t(i INT, p0 INT, p1 INT)
|USING $format
|PARTITIONED BY (p0, p1)""".stripMargin)

spark.range(0, 10, 1).selectExpr("id as col")
.createOrReplaceTempView("temp")

for (part <- (0 to 25)) {
sql(
s"""
|INSERT OVERWRITE TABLE t PARTITION (p0='$part', p1='$part')
|SELECT col FROM temp""".stripMargin)
}
val scale = 20
val predicate = (1 to scale).map(i => s"(p0 = '$i' AND p1 = '$i')").mkString(" OR ")
assertPrunedPartitions(s"SELECT * FROM t WHERE $predicate", scale)
}
}
}

protected def assertPrunedPartitions(query: String, expected: Long): Unit = {
val plan = sql(query).queryExecution.sparkPlan
assert(getScanExecPartitionSize(plan) == expected)
Expand Down