Skip to content

feat: support lambda function for scalar udf #17220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

chenkovsky
Copy link
Contributor

@chenkovsky chenkovsky commented Aug 17, 2025

Which issue does this PR close?

  • Closes #.

Rationale for this change

Some array-related UDFs need to support passing in lambda

What changes are included in this PR?

support lambda function in Scalar UDF, and implement array_filter as an example.

Are these changes tested?

UT

Are there any user-facing changes?

No

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate catalog Related to the catalog crate proto Related to proto crate labels Aug 17, 2025
@chenkovsky chenkovsky marked this pull request as draft August 17, 2025 14:59
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Aug 17, 2025
@chenkovsky chenkovsky marked this pull request as ready for review August 18, 2025 01:19
Comment on lines 179 to 185
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let [arg_type] = take_function_args(self.name(), arg_types)?;
match arg_type {
List(_) | LargeList(_) => Ok(arg_type.clone()),
_ => plan_err!("{} does not support type {}", self.name(), arg_type),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please implement return_field_from_args instead so it won't be nullable in case the input is not nullable

Comment on lines 281 to 286
fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> {
datafusion_common::not_impl_err!(
"Function {} does not implement coerce_types",
self.name()
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the default implementation, can you please remove it?

lambda: &dyn PhysicalLambda,
field: &Arc<Field>,
) -> Result<ArrayRef> {
let mut offsets = vec![OffsetSize::zero()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use OffsetBufferBuilder instead? so we don't have to manage the offsets ourselves

/// Implementation of the `array_filter` scalar user-defined function.
///
/// This function filters array elements using a lambda function, returning a new array
/// containing only the elements for which the lambda function returns true.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also note that nulls will count as false

),
argument(
name = "lambda",
description = "Lambda function with one argument that returns a boolean. The lambda is applied to each element of the array."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add that returning null will be the same as false

Copy link
Contributor

@rluvaton rluvaton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job, left some comments

Comment on lines +320 to +331
let values = list_array.values();
let value_offsets = list_array.value_offsets();
let nulls = list_array.nulls();

let batch = RecordBatch::try_new(
Schema::new(vec![field
.as_ref()
.clone()
.with_name(lambda.params()[0].clone())])
.into(),
vec![Arc::clone(values)],
)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do it in a separate PR.

this will lead to unnecessary computation as it will include values that are not part of list "visible" values in case of either of the following.

  1. the list is sliced, making the evaluate work on more data that is needed
    this is how to create that:
let data = vec![
    Some(vec![Some(0), Some(1), Some(2)]),
    Some(vec![Some(3), Some(4), Some(5)]),
    Some(vec![Some(6), Some(7)]),
    Some(vec![Some(8)]),
];
let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(data);
let list_sliced_values = list_array.slice(1, 2);
  1. in case of nulls in the list that are not behind an empty list
    this is how to create that
let data = vec![
    Some(vec![Some(0), Some(1), Some(2)]),
    Some(vec![Some(3), Some(4), Some(5)]),
    Some(vec![Some(6), Some(7)]),
];
let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(data);
let (field, offsets, values, nulls) = list_array.into_parts();
let list_array_with_null_pointing_to_non_empty_list = ListArray::try_new(
    field,
    offsets,
    values,
    Some(NullBuffer::from(&[true, false, true]))
)?;
```   

Comment on lines +334 to +338
let ColumnarValue::Array(filter_array) = filter_array else {
return exec_err!(
"array_filter requires a lambda that returns an array of booleans"
);
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add optimization for scalar if you want or I can do it in a different PR

Comment on lines 344 to 345
// Handle null arrays by keeping the offset unchanged
offsets.push(offsets[row_index]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This have a bug in case of null value pointing to a non empty list and none of the underlying values were filtered


# array_filter with multiple array columns
statement ok
CREATE TABLE test_arrays (arr1 ARRAY<INTEGER>, arr2 ARRAY<INTEGER>) AS VALUES ([1, 2, 3], [4, 5, 6]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add null list here as well and null items

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
catalog Related to the catalog crate documentation Improvements or additions to documentation logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Changes to the physical-expr crates proto Related to proto crate sql SQL Planner sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants