- 
                Notifications
    
You must be signed in to change notification settings  - Fork 1.7k
 
          Refactor state management in HashJoinExec and use CASE expressions for more precise filters
          #18451
        
          New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
0a934fc    to
    692217e      
    Compare
  
    and use CASE expressions to evaluate pushed down filters only for the given partition.
692217e    to
    b3f7856      
    Compare
  
    | /// Fixed seed used for hash repartitioning to ensure consistent behavior across | ||
| /// executions and runs. | ||
| pub const REPARTITION_HASH_SEED: [u64; 4] = [0u64; 4]; | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to just share a RandomState instance (e.g. pub fn repartition_random_state() -> RandomState?
| let build_data = match self.mode { | ||
| PartitionMode::Partitioned => PartitionBuildDataReport::Partitioned { | ||
| partition_id: left_side_partition_id, | ||
| bounds: left_data.bounds.clone(), | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not much data but it could be interesting to avoid the clone here as a future optimization.
| // In `PartitionMode::CollectLeft`, all streams on the left side share the same partition id (0). | ||
| // Since this function can be called multiple times for that same partition, we must deduplicate | ||
| // by checking against the last recorded bound. | ||
| last_bound.partition != left_side_partition_id | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cleanup removes this sort of hacky comparison and instead matches on Options
| } | ||
| 
               | 
          ||
| // Wait for all partitions to report | ||
| if self.barrier.wait().await.is_leader() { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would now be pretty easy to always update the filter and use let fall_through_case = !self.barrier.wait().await.is_leader() to set the default case for the CASE expression such that it's true if we haven't received data for all partitions and false if we have. That would allow immediate pushdown for partitions that have their build side complete before others.
| - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= aa AND d@0 <= ab ELSE false END ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ] | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that even though there is only 1 WHEN clause we keep the CASE statement because there are 2 partitions: anything that had hash_repartition % 1 = 1 has no data in the build side -> can immediately be discarded by the false fall through condition.
Background
This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in #17171.
A "target state" is tracked in #18393.
There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own:
HashJoinExecand use CASE expressions for more precise filters #18451Changes in this PR
This PR refactors state management in HashJoinExec to make filter pushdown more efficient and prepare for pushing down membership tests.
Optioninstead of comparing integers, etc.)CASE hash_repartition % N WHEN partition_id THEN condition ELSE false END