-
Notifications
You must be signed in to change notification settings - Fork 1.6k
feat: support lambda function for scalar udf #17220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { | ||
let [arg_type] = take_function_args(self.name(), arg_types)?; | ||
match arg_type { | ||
List(_) | LargeList(_) => Ok(arg_type.clone()), | ||
_ => plan_err!("{} does not support type {}", self.name(), arg_type), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please implement return_field_from_args
instead so it won't be nullable in case the input is not nullable
fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> { | ||
datafusion_common::not_impl_err!( | ||
"Function {} does not implement coerce_types", | ||
self.name() | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the default implementation, can you please remove it?
lambda: &dyn PhysicalLambda, | ||
field: &Arc<Field>, | ||
) -> Result<ArrayRef> { | ||
let mut offsets = vec![OffsetSize::zero()]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you use OffsetBufferBuilder
instead? so we don't have to manage the offsets ourselves
/// Implementation of the `array_filter` scalar user-defined function. | ||
/// | ||
/// This function filters array elements using a lambda function, returning a new array | ||
/// containing only the elements for which the lambda function returns true. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also note that nulls will count as false
), | ||
argument( | ||
name = "lambda", | ||
description = "Lambda function with one argument that returns a boolean. The lambda is applied to each element of the array." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add that returning null will be the same as false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job, left some comments
let values = list_array.values(); | ||
let value_offsets = list_array.value_offsets(); | ||
let nulls = list_array.nulls(); | ||
|
||
let batch = RecordBatch::try_new( | ||
Schema::new(vec![field | ||
.as_ref() | ||
.clone() | ||
.with_name(lambda.params()[0].clone())]) | ||
.into(), | ||
vec![Arc::clone(values)], | ||
)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can do it in a separate PR.
this will lead to unnecessary computation as it will include values that are not part of list "visible" values in case of either of the following.
- the list is sliced, making the evaluate work on more data that is needed
this is how to create that:
let data = vec![
Some(vec![Some(0), Some(1), Some(2)]),
Some(vec![Some(3), Some(4), Some(5)]),
Some(vec![Some(6), Some(7)]),
Some(vec![Some(8)]),
];
let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(data);
let list_sliced_values = list_array.slice(1, 2);
- in case of nulls in the list that are not behind an empty list
this is how to create that
let data = vec![
Some(vec![Some(0), Some(1), Some(2)]),
Some(vec![Some(3), Some(4), Some(5)]),
Some(vec![Some(6), Some(7)]),
];
let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(data);
let (field, offsets, values, nulls) = list_array.into_parts();
let list_array_with_null_pointing_to_non_empty_list = ListArray::try_new(
field,
offsets,
values,
Some(NullBuffer::from(&[true, false, true]))
)?;
```
let ColumnarValue::Array(filter_array) = filter_array else { | ||
return exec_err!( | ||
"array_filter requires a lambda that returns an array of booleans" | ||
); | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can add optimization for scalar if you want or I can do it in a different PR
// Handle null arrays by keeping the offset unchanged | ||
offsets.push(offsets[row_index]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This have a bug in case of null value pointing to a non empty list and none of the underlying values were filtered
|
||
# array_filter with multiple array columns | ||
statement ok | ||
CREATE TABLE test_arrays (arr1 ARRAY<INTEGER>, arr2 ARRAY<INTEGER>) AS VALUES ([1, 2, 3], [4, 5, 6]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add null list here as well and null items
Which issue does this PR close?
Rationale for this change
Some array-related UDFs need to support passing in lambda
What changes are included in this PR?
support lambda function in Scalar UDF, and implement array_filter as an example.
Are these changes tested?
UT
Are there any user-facing changes?
No