-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-32284][SQL] Avoid expanding too many CNF predicates in partition pruning #29075
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,11 +53,20 @@ private[sql] object PruneFileSourcePartitions | |
val partitionColumns = | ||
relation.resolve(partitionSchema, sparkSession.sessionState.analyzer.resolver) | ||
val partitionSet = AttributeSet(partitionColumns) | ||
val (partitionFilters, dataFilters) = normalizedFilters.partition(f => | ||
val (partitionFilters, remainingFilters) = normalizedFilters.partition(f => | ||
f.references.subsetOf(partitionSet) | ||
) | ||
|
||
(ExpressionSet(partitionFilters), dataFilters) | ||
// Try extracting more convertible partition filters from the remaining filters by converting | ||
// them into CNF. | ||
val remainingFilterInCnf = remainingFilters.flatMap(CNFConversion(_)) | ||
val extraPartitionFilters = | ||
remainingFilterInCnf.filter(f => f.references.subsetOf(partitionSet)) | ||
|
||
// For the filters that can't be used for partition pruning, we simply use `remainingFilters` | ||
// instead of using the non-convertible part from `remainingFilterInCnf`. Otherwise, the | ||
// result filters can be very long. | ||
(ExpressionSet(partitionFilters ++ extraPartitionFilters), remainingFilters) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In that way, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. okay. |
||
} | ||
|
||
private def rebuildPhysicalOperation( | ||
|
@@ -88,12 +97,9 @@ private[sql] object PruneFileSourcePartitions | |
_, | ||
_)) | ||
if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => | ||
val predicates = CNFWithGroupExpressionsByReference(filters.reduceLeft(And)) | ||
val finalPredicates = if (predicates.nonEmpty) predicates else filters | ||
val (partitionKeyFilters, _) = getPartitionKeyFiltersAndDataFilters( | ||
fsRelation.sparkSession, logicalRelation, partitionSchema, finalPredicates, | ||
fsRelation.sparkSession, logicalRelation, partitionSchema, filters, | ||
logicalRelation.output) | ||
|
||
if (partitionKeyFilters.nonEmpty) { | ||
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) | ||
val prunedFsRelation = | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,9 +54,15 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) | |
val normalizedFilters = DataSourceStrategy.normalizeExprs( | ||
filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) | ||
val partitionColumnSet = AttributeSet(relation.partitionCols) | ||
ExpressionSet(normalizedFilters.filter { f => | ||
val (partitionFilters, remainingFilters) = normalizedFilters.partition { f => | ||
!f.references.isEmpty && f.references.subsetOf(partitionColumnSet) | ||
}) | ||
} | ||
// Try extracting more convertible partition filters from the remaining filters by converting | ||
// them into CNF. | ||
val remainingFilterInCnf = remainingFilters.flatMap(CNFConversion(_)) | ||
val extraPartitionFilters = remainingFilterInCnf.filter(f => | ||
!f.references.isEmpty && f.references.subsetOf(partitionColumnSet)) | ||
ExpressionSet(partitionFilters ++ extraPartitionFilters) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am confused that seems CNFConversion won't change references, You don't need to call a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
} | ||
|
||
/** | ||
|
@@ -103,9 +109,7 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) | |
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) | ||
if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => | ||
val predicates = CNFWithGroupExpressionsByReference(filters.reduceLeft(And)) | ||
val finalPredicates = if (predicates.nonEmpty) predicates else filters | ||
val partitionKeyFilters = getPartitionKeyFilters(finalPredicates, relation) | ||
val partitionKeyFilters = getPartitionKeyFilters(filters, relation) | ||
if (partitionKeyFilters.nonEmpty) { | ||
val newPartitions = prunePartitions(relation, partitionKeyFilters) | ||
val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions) | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
A method to group expressions for reducing the size of pushed down predicates and corresponding codegen
?