Skip to content

Conversation

@adriangb
Copy link
Contributor

@adriangb adriangb commented Nov 2, 2025

Background

This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in #17171.

A "target state" is tracked in #18393.
There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own:

Changes in this PR

This PR refactors state management in HashJoinExec to make filter pushdown more efficient and prepare for pushing down membership tests.

  • Refactor internal data structures to clean up state management and make usage more idiomatic (use Option instead of comparing integers, etc.)
  • Uses CASE expressions to evaluate pushed-down filters selectively by partition Example: CASE hash_repartition % N WHEN partition_id THEN condition ELSE false END

@github-actions github-actions bot added core Core DataFusion crate physical-plan Changes to the physical-plan crate labels Nov 2, 2025
@adriangb adriangb force-pushed the refactor-hash-join-state branch from 0a934fc to 692217e Compare November 2, 2025 20:40
and use CASE expressions to evaluate pushed down filters only for the given partition.
Comment on lines +318 to +320
/// Fixed seed used for hash repartitioning to ensure consistent behavior across
/// executions and runs.
pub const REPARTITION_HASH_SEED: [u64; 4] = [0u64; 4];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to just share a RandomState instance (e.g. pub fn repartition_random_state() -> RandomState?

let build_data = match self.mode {
PartitionMode::Partitioned => PartitionBuildDataReport::Partitioned {
partition_id: left_side_partition_id,
bounds: left_data.bounds.clone(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not much data but it could be interesting to avoid the clone here as a future optimization.

// In `PartitionMode::CollectLeft`, all streams on the left side share the same partition id (0).
// Since this function can be called multiple times for that same partition, we must deduplicate
// by checking against the last recorded bound.
last_bound.partition != left_side_partition_id
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleanup removes this sort of hacky comparison and instead matches on Options

}

// Wait for all partitions to report
if self.barrier.wait().await.is_leader() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would now be pretty easy to always update the filter and use let fall_through_case = !self.barrier.wait().await.is_leader() to set the default case for the CASE expression such that it's true if we haven't received data for all partitions and false if we have. That would allow immediate pushdown for partitions that have their build side complete before others.

- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= aa AND d@0 <= ab ELSE false END ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that even though there is only 1 WHEN clause we keep the CASE statement because there are 2 partitions: anything that had hash_repartition % 1 = 1 has no data in the build side -> can immediately be discarded by the false fall through condition.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant