-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-32284][SQL] Avoid expanding too many CNF predicates in partition pruning #29075
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although this aims to fix too many predicated
issues in HMS, Avoid pushing down too many predicated in partition pruning
sounds ambiguous as a PR title. Can we have a more specific title describing what the PR code does?
@dongjoon-hyun Thanks for the suggestion. I have updated the title. |
Test build #125715 has finished for PR 29075 at commit
|
Thank you for updating, @gengliangwang . Shall we adjust this test case name accordingly together?
BTW, in the test case, since
|
case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) | ||
if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => | ||
val predicates = CNFWithGroupExpressionsByReference(filters.reduceLeft(And)) | ||
val predicates = CNFConversion(filters.reduceLeft(And)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: conjunctiveNormalForm(filters.reduceLeft(And), identity)
?
val extraPartitionFilters = | ||
remainingFilterInCnf.filter(f => f.references.subsetOf(partitionSet)) | ||
|
||
(ExpressionSet(partitionFilters ++ extraPartitionFilters), remainingFilters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val (extraPartitionFilters, otherFilters) = remainingFilterInCnf.partition(f =>
f.references.subsetOf(partitionSet)
)
(ExpressionSet(partitionFilters ++ extraPartitionFilters), otherFilters)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that way, otherFilters
can be very long, which leads to a longer codegen... I am avoiding that on purpose. Let me add comment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay.
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.sql.execution.datasources.DataSourceStrategy | ||
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions.CNFConversion | ||
import org.apache.spark.sql.internal.SQLConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This import is not necessary.
val remainingFilterInCnf = remainingFilters.flatMap(CNFConversion) | ||
val extraPartitionFilters = remainingFilterInCnf.filter(f => | ||
!f.references.isEmpty && f.references.subsetOf(partitionColumnSet)) | ||
ExpressionSet(partitionFilters ++ extraPartitionFilters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused that seems CNFConversion won't change references, You don't need to call a splitConjunctivePredicates
to each expr in remainingFilterInCnf
to extract more predicate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The filters
here is already processed with splitConjunctivePredicates
in PhysicalOperation.unapply
. That's why the original code before #28805 doesn't call splitConjunctivePredicates
either.
Test build #125746 has finished for PR 29075 at commit
|
Test build #125758 has finished for PR 29075 at commit
|
* @param condition to be converted into CNF. | ||
* @param condition Condition to be converted into CNF. | ||
* @param groupExpsFunc A method for grouping intermediate results so that the final result can be | ||
* shorter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: A method to group expressions for reducing the size of pushed down predicates and corresponding codegen
?
#29101 is a better solution to me. Close this one now. |
What changes were proposed in this pull request?
After #28805, predicates are converted into CNF for partition pruning. However, the CNF result can be very long and the Hive metastore will fail to execute it.
For example, the following partition filter:
will be converted into a long query(130K characters) in Hive metastore, and there will be error:
To mitigating the regression due to the previous improvement #28805:
Why are the changes needed?
Mitigating potential regressions in partiton pruning from #28805
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test