Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a35a342
Push down hash table references from HashJoinExec into scans using CA…
adriangb Oct 27, 2025
be006e8
fix lints
adriangb Oct 27, 2025
7e28cc4
update snapshot
adriangb Oct 27, 2025
3332644
implement for CollectLeft
adriangb Oct 28, 2025
018fbfc
use inlist expressions for build sides <128MB
adriangb Oct 30, 2025
6bbba43
wip on InList pushdown
adriangb Oct 30, 2025
0cbf645
fix struct building
adriangb Oct 31, 2025
dbaa756
omit empty partitions, update slts
adriangb Oct 31, 2025
488c64b
fmt
adriangb Oct 31, 2025
b727111
update snaps
adriangb Oct 31, 2025
2a9033a
lots of cleanup/refactor
adriangb Oct 31, 2025
83a7d42
working but InList sets don't support Struct arrays
adriangb Oct 31, 2025
6908152
add optimizations
adriangb Oct 31, 2025
feb9c40
avoid cloning arrays, tweak default value
adriangb Oct 31, 2025
3aa305e
lints, typos, formatting
adriangb Oct 31, 2025
8d4cdfd
fix test
adriangb Oct 31, 2025
932438d
some nits
adriangb Oct 31, 2025
5654512
fix
adriangb Oct 31, 2025
bf5a0b6
fmt
adriangb Oct 31, 2025
ba073f7
fix some more tests
adriangb Oct 31, 2025
c309d56
fix dict handling
adriangb Oct 31, 2025
e32c080
update tpch slt
adriangb Nov 1, 2025
31d7cc3
use array storage for InListExpr
adriangb Nov 2, 2025
56ded1f
Fix error handling and formatting issues in InList pushdown
adriangb Nov 2, 2025
b9c595e
clean up hash_utils.rs
adriangb Nov 2, 2025
6a90a9c
attempt to unify in_list.rs (failing tests)
adriangb Nov 2, 2025
533d650
in_list.rs test passing
adriangb Nov 2, 2025
e608c58
fmt
adriangb Nov 2, 2025
71faf93
Improve InListExpr::list docstring
adriangb Nov 2, 2025
5a01cdc
fix null handling in InListExpr
adriangb Nov 2, 2025
f34d341
fix PartialEq for InListExpr
adriangb Nov 2, 2025
d198ab3
fix
adriangb Nov 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,22 @@ config_namespace! {
/// will be collected into a single partition
pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128

/// Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering.
/// Build sides larger than this will use hash table lookups instead.
/// Set to 0 to always use hash table lookups.
///
/// InList pushdown can be more efficient for small build sides because it can result in better
/// statistics pruning as well as use any bloom filters present on the scan side.
/// InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion.
/// On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory.
///
/// This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` * `target_partitions` memory.
///
/// The default is 128kB per partition.
/// This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases
/// but avoids excessive memory usage or overhead for larger joins.
pub hash_join_inlist_pushdown_max_size: usize, default = 128 * 1024

/// The default filter selectivity used by Filter Statistics
/// when an exact selectivity cannot be determined. Valid values are
/// between 0 (no selectivity) and 100 (all rows are selected).
Expand Down
182 changes: 112 additions & 70 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

//! Functionality used both on logical and physical plans

#[cfg(not(feature = "force_hash_collisions"))]
use std::sync::Arc;

use ahash::RandomState;
use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
use arrow::array::*;
Expand Down Expand Up @@ -215,12 +212,11 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
// Hash each dictionary value once, and then use that computed
// hash for each key value to avoid a potentially expensive
// redundant hashing for large dictionary elements (e.g. strings)
let dict_values = Arc::clone(array.values());
let dict_values = array.values();
let mut dict_hashes = vec![0; dict_values.len()];
create_hashes(&[dict_values], random_state, &mut dict_hashes)?;
create_hashes_from_arrays(&[dict_values.as_ref()], random_state, &mut dict_hashes)?;

// combine hash for each index in values
let dict_values = array.values();
for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
if let Some(key) = key {
let idx = key.as_usize();
Expand Down Expand Up @@ -308,11 +304,11 @@ fn hash_list_array<OffsetSize>(
where
OffsetSize: OffsetSizeTrait,
{
let values = Arc::clone(array.values());
let values = array.values();
let offsets = array.value_offsets();
let nulls = array.nulls();
let mut values_hashes = vec![0u64; values.len()];
create_hashes(&[values], random_state, &mut values_hashes)?;
create_hashes_from_arrays(&[values.as_ref()], random_state, &mut values_hashes)?;
if let Some(nulls) = nulls {
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
if nulls.is_valid(i) {
Expand All @@ -339,11 +335,11 @@ fn hash_fixed_list_array(
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let values = Arc::clone(array.values());
let values = array.values();
let value_length = array.value_length() as usize;
let nulls = array.nulls();
let mut values_hashes = vec![0u64; values.len()];
create_hashes(&[values], random_state, &mut values_hashes)?;
create_hashes_from_arrays(&[values.as_ref()], random_state, &mut values_hashes)?;
if let Some(nulls) = nulls {
for i in 0..array.len() {
if nulls.is_valid(i) {
Expand All @@ -366,83 +362,113 @@ fn hash_fixed_list_array(
Ok(())
}

/// Test version of `create_hashes` that produces the same value for
/// all hashes (to test collisions)
///
/// See comments on `hashes_buffer` for more details
/// Internal helper function that hashes a single array and either initializes or combines
/// the hash values in the buffer.
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_single_array(
array: &dyn Array,
random_state: &RandomState,
hashes_buffer: &mut [u64],
rehash: bool,
) -> Result<()> {
downcast_primitive_array! {
array => hash_array_primitive(array, random_state, hashes_buffer, rehash),
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
DataType::Boolean => hash_array(as_boolean_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, rehash),
DataType::Binary => hash_array(as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeBinary => hash_array(as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
DataType::FixedSizeBinary(_) => {
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
hash_array(array, random_state, hashes_buffer, rehash)
}
DataType::Dictionary(_, _) => downcast_dictionary_array! {
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
_ => unreachable!()
}
DataType::Struct(_) => {
let array = as_struct_array(array)?;
hash_struct_array(array, random_state, hashes_buffer)?;
}
DataType::List(_) => {
let array = as_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::LargeList(_) => {
let array = as_large_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::Map(_, _) => {
let array = as_map_array(array)?;
hash_map_array(array, random_state, hashes_buffer)?;
}
DataType::FixedSizeList(_,_) => {
let array = as_fixed_size_list_array(array)?;
hash_fixed_list_array(array, random_state, hashes_buffer)?;
}
_ => {
// This is internal because we should have caught this before.
return _internal_err!(
"Unsupported data type in hasher: {}",
array.data_type()
);
}
}
Ok(())
}

/// Test version of `hash_single_array` that forces all hashes to collide to zero.
#[cfg(feature = "force_hash_collisions")]
pub fn create_hashes<'a>(
_arrays: &[ArrayRef],
fn hash_single_array(
_array: &dyn Array,
_random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
hashes_buffer: &mut [u64],
_rehash: bool,
) -> Result<()> {
for hash in hashes_buffer.iter_mut() {
*hash = 0
}
Ok(hashes_buffer)
Ok(())
}

/// Creates hash values for every row, based on the values in the
/// columns.
/// Creates hash values for every row, based on the values in the columns.
///
/// The number of rows to hash is determined by `hashes_buffer.len()`.
/// `hashes_buffer` should be pre-sized appropriately
#[cfg(not(feature = "force_hash_collisions"))]
///
/// This is the same as [`create_hashes`] but accepts `&dyn Array`s instead of requiring
/// `ArrayRef`s.
pub fn create_hashes_from_arrays<'a>(
arrays: &[&dyn Array],
random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
for (i, &array) in arrays.iter().enumerate() {
// combine hashes with `combine_hashes` for all columns besides the first
let rehash = i >= 1;
hash_single_array(array, random_state, hashes_buffer, rehash)?;
}
Ok(hashes_buffer)
}

/// Creates hash values for every row, based on the values in the columns.
///
/// The number of rows to hash is determined by `hashes_buffer.len()`.
/// `hashes_buffer` should be pre-sized appropriately.
///
/// This is the same as [`create_hashes_from_arrays`] but accepts `ArrayRef`s.
pub fn create_hashes<'a>(
arrays: &[ArrayRef],
random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
for (i, col) in arrays.iter().enumerate() {
let array = col.as_ref();
for (i, array) in arrays.iter().enumerate() {
// combine hashes with `combine_hashes` for all columns besides the first
let rehash = i >= 1;
downcast_primitive_array! {
array => hash_array_primitive(array, random_state, hashes_buffer, rehash),
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
DataType::Boolean => hash_array(as_boolean_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, rehash),
DataType::Binary => hash_array(as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeBinary => hash_array(as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
DataType::FixedSizeBinary(_) => {
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
hash_array(array, random_state, hashes_buffer, rehash)
}
DataType::Dictionary(_, _) => downcast_dictionary_array! {
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
_ => unreachable!()
}
DataType::Struct(_) => {
let array = as_struct_array(array)?;
hash_struct_array(array, random_state, hashes_buffer)?;
}
DataType::List(_) => {
let array = as_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::LargeList(_) => {
let array = as_large_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::Map(_, _) => {
let array = as_map_array(array)?;
hash_map_array(array, random_state, hashes_buffer)?;
}
DataType::FixedSizeList(_,_) => {
let array = as_fixed_size_list_array(array)?;
hash_fixed_list_array(array, random_state, hashes_buffer)?;
}
_ => {
// This is internal because we should have caught this before.
return _internal_err!(
"Unsupported data type in hasher: {}",
col.data_type()
);
}
}
hash_single_array(array.as_ref(), random_state, hashes_buffer, rehash)?;
}
Ok(hashes_buffer)
}
Expand Down Expand Up @@ -896,4 +922,20 @@ mod tests {

assert_ne!(one_col_hashes, two_col_hashes);
}

#[test]
fn test_create_hashes_from_arrays() {
let int_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let float_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; int_array.len()];
let hashes = create_hashes_from_arrays(
&[int_array.as_ref(), float_array.as_ref()],
&random_state,
hashes_buff,
)
.unwrap();
assert_eq!(hashes.len(), 4,);
}
}
Loading