Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= aa AND d@0 <= ab AND lookup_p0 ELSE false END ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
"
);
}
Expand Down Expand Up @@ -1078,7 +1078,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
@r"
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND hash_lookup ]
"
);
}
Expand Down Expand Up @@ -1309,7 +1309,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 0 THEN lookup_p0 WHEN 1 THEN lookup_p1 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND lookup_p2 WHEN 3 THEN lookup_p3 WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND lookup_p4 WHEN 5 THEN lookup_p5 WHEN 6 THEN lookup_p6 WHEN 7 THEN lookup_p7 WHEN 8 THEN lookup_p8 WHEN 9 THEN lookup_p9 WHEN 10 THEN lookup_p10 WHEN 11 THEN lookup_p11 ELSE false END ]
"
);

Expand All @@ -1326,7 +1326,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 0 THEN a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND lookup_p0 WHEN 1 THEN lookup_p1 WHEN 2 THEN lookup_p2 WHEN 3 THEN lookup_p3 WHEN 4 THEN lookup_p4 WHEN 5 THEN lookup_p5 WHEN 6 THEN lookup_p6 WHEN 7 THEN lookup_p7 WHEN 8 THEN lookup_p8 WHEN 9 THEN lookup_p9 WHEN 10 THEN lookup_p10 WHEN 11 THEN lookup_p11 ELSE false END ]
"
);

Expand Down Expand Up @@ -1503,7 +1503,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND hash_lookup ]
"
);

Expand Down Expand Up @@ -1671,8 +1671,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN b@0 >= aa AND b@0 <= ab AND lookup_p0 ELSE false END ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= ca AND d@0 <= cb AND lookup_p0 ELSE false END ]
"
);
}
Expand Down
39 changes: 26 additions & 13 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
};
use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator};
use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBuildAccumulator};
use crate::joins::hash_join::stream::{
BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
};
Expand Down Expand Up @@ -87,7 +87,8 @@ const HASH_JOIN_SEED: RandomState =
/// HashTable and input data for the left (build side) of a join
pub(super) struct JoinLeftData {
/// The hash table with indices into `batch`
pub(super) hash_map: Box<dyn JoinHashMapType>,
/// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown
pub(super) hash_map: Arc<dyn JoinHashMapType>,
/// The input rows for the build side
batch: RecordBatch,
/// The build side on expressions values
Expand All @@ -109,7 +110,7 @@ pub(super) struct JoinLeftData {
impl JoinLeftData {
/// Create a new `JoinLeftData` from its parts
pub(super) fn new(
hash_map: Box<dyn JoinHashMapType>,
hash_map: Arc<dyn JoinHashMapType>,
batch: RecordBatch,
values: Vec<ArrayRef>,
visited_indices_bitmap: SharedBitmapBuilder,
Expand All @@ -133,6 +134,11 @@ impl JoinLeftData {
&*self.hash_map
}

/// return an Arc clone of the hash map for sharing
pub(super) fn hash_map_arc(&self) -> Arc<dyn JoinHashMapType> {
Arc::clone(&self.hash_map)
}

/// returns a reference to the build side batch
pub(super) fn batch(&self) -> &RecordBatch {
&self.batch
Expand Down Expand Up @@ -364,9 +370,9 @@ pub struct HashJoinExec {
struct HashJoinExecDynamicFilter {
/// Dynamic filter that we'll update with the results of the build side once that is done.
filter: Arc<DynamicFilterPhysicalExpr>,
/// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition.
/// Build accumulator to collect build-side information (hash maps and/or bounds) from each partition.
/// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
}

impl fmt::Debug for HashJoinExec {
Expand Down Expand Up @@ -977,8 +983,10 @@ impl ExecutionPlan for HashJoinExec {

let batch_size = context.session_config().batch_size();

// Initialize bounds_accumulator lazily with runtime partition counts (only if enabled)
let bounds_accumulator = enable_dynamic_filter_pushdown
// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
// Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
let repartition_random_state = RandomState::with_seeds(0, 0, 0, 0);
let build_accumulator = enable_dynamic_filter_pushdown
.then(|| {
self.dynamic_filter.as_ref().map(|df| {
let filter = Arc::clone(&df.filter);
Expand All @@ -987,13 +995,14 @@ impl ExecutionPlan for HashJoinExec {
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();
Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
self.mode,
self.left.as_ref(),
self.right.as_ref(),
filter,
on_right,
repartition_random_state,
))
})))
})
Expand Down Expand Up @@ -1036,7 +1045,7 @@ impl ExecutionPlan for HashJoinExec {
batch_size,
vec![],
self.right.output_ordering().is_some(),
bounds_accumulator,
build_accumulator,
self.mode,
)))
}
Expand Down Expand Up @@ -1197,7 +1206,7 @@ impl ExecutionPlan for HashJoinExec {
cache: self.cache.clone(),
dynamic_filter: Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
bounds_accumulator: OnceLock::new(),
build_accumulator: OnceLock::new(),
}),
});
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
Expand Down Expand Up @@ -1346,7 +1355,7 @@ impl BuildSideState {
/// When `should_compute_bounds` is true, this function computes the min/max bounds
/// for each join key column but does NOT update the dynamic filter. Instead, the
/// bounds are stored in the returned `JoinLeftData` and later coordinated by
/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds
/// `SharedBuildAccumulator` to ensure all partitions contribute their bounds
/// before updating the filter exactly once.
///
/// # Returns
Expand Down Expand Up @@ -1417,6 +1426,7 @@ async fn collect_left_input(

// Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the
// `u64` indice variant
// Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown
let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
let estimated_hashtable_size =
estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
Expand Down Expand Up @@ -1487,8 +1497,11 @@ async fn collect_left_input(
_ => None,
};

// Convert Box to Arc for sharing with SharedBuildAccumulator
let hashmap_arc: Arc<dyn JoinHashMapType> = hashmap.into();

let data = JoinLeftData::new(
hashmap,
hashmap_arc,
single_batch,
left_values.clone(),
Mutex::new(visited_indices_bitmap),
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/joins/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@
pub use exec::HashJoinExec;

mod exec;
mod partitioned_hash_eval;
mod shared_bounds;
mod stream;
Loading