Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
| Expr::WindowFunction { .. }
| Expr::Wildcard { .. }
| Expr::Unnest { .. }
| Expr::Lambda(_)
| Expr::Placeholder(_) => {
is_applicable = false;
Ok(TreeNodeRecursion::Stop)
Expand Down
93 changes: 92 additions & 1 deletion datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ pub enum Expr {
OuterReferenceColumn(DataType, Column),
/// Unnest expression
Unnest(Unnest),
/// Lambda expression
Lambda(LambdaFunction),
}

impl Default for Expr {
Expand Down Expand Up @@ -1466,6 +1468,7 @@ impl Expr {
#[expect(deprecated)]
Expr::Wildcard { .. } => "Wildcard",
Expr::Unnest { .. } => "Unnest",
Expr::Lambda(..) => "LambdaFunction",
}
}

Expand Down Expand Up @@ -2020,7 +2023,8 @@ impl Expr {
| Expr::Wildcard { .. }
| Expr::WindowFunction(..)
| Expr::Literal(..)
| Expr::Placeholder(..) => false,
| Expr::Placeholder(..)
| Expr::Lambda(..) => false,
}
}

Expand Down Expand Up @@ -2607,6 +2611,12 @@ impl HashNode for Expr {
column.hash(state);
}
Expr::Unnest(Unnest { expr: _expr }) => {}
Expr::Lambda(LambdaFunction {
params: arguments,
body: _body,
}) => {
arguments.hash(state);
}
};
}
}
Expand Down Expand Up @@ -2911,6 +2921,17 @@ impl Display for SchemaDisplay<'_> {
}
}
}
Expr::Lambda(LambdaFunction {
params: arguments,
body,
}) => {
write!(
f,
"({arguments}) -> {body}",
arguments = arguments.join(", "),
body = SchemaDisplay(body)
)
}
}
}
}
Expand Down Expand Up @@ -3156,6 +3177,41 @@ fn schema_name_from_exprs_inner(exprs: &[Expr], sep: &str) -> Result<String, fmt
Ok(s)
}

/// Creates a schema name from a slice of expression references.
///
/// This function generates a comma-separated string representation of expressions
/// suitable for use in schema names. It's particularly useful for functions that
/// work with lambda expressions where argument names need to be preserved.
///
/// # Arguments
/// * `exprs` - A slice of expression references to convert to schema names
///
/// # Returns
/// A comma-separated string representation of the expressions
pub fn schema_name_from_exprs_ref(exprs: &[&Expr]) -> Result<String, fmt::Error> {
schema_name_from_exprs_inner_ref(exprs, ", ")
}

/// Internal helper function for creating schema names with custom separator.
///
/// # Arguments
/// * `exprs` - A slice of expression references
/// * `sep` - The separator to use between expressions
fn schema_name_from_exprs_inner_ref(
exprs: &[&Expr],
sep: &str,
) -> Result<String, fmt::Error> {
let mut s = String::new();
for (i, e) in exprs.iter().enumerate() {
if i > 0 {
write!(&mut s, "{sep}")?;
}
write!(&mut s, "{}", SchemaDisplay(e))?;
}

Ok(s)
}

pub fn schema_name_from_sorts(sorts: &[Sort]) -> Result<String, fmt::Error> {
let mut s = String::new();
for (i, e) in sorts.iter().enumerate() {
Expand Down Expand Up @@ -3393,10 +3449,45 @@ impl Display for Expr {
Expr::Unnest(Unnest { expr }) => {
write!(f, "{UNNEST_COLUMN_PREFIX}({expr})")
}
Expr::Lambda(LambdaFunction { params, body }) => {
write!(
f,
"({params}) -> {body}",
params = params.join(", "),
body = SchemaDisplay(body)
)
}
}
}
}

/// Represents a lambda function expression with parameters and a body.
///
/// Lambda functions are anonymous functions that can be used in higher-order
/// functions like `array_filter`. They consist of parameter names and an
/// expression body that can reference those parameters.
///
/// # Example
/// In SQL: `x -> x > 3` represents a lambda with parameter `x` and body `x > 3`
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct LambdaFunction {
/// The parameter names for this lambda function
pub params: Vec<String>,
/// The expression body that references the parameters
pub body: Box<Expr>,
}

impl LambdaFunction {
/// Creates a new lambda function with the given parameters and body.
///
/// # Arguments
/// * `params` - The parameter names for the lambda function
/// * `body` - The expression body that can reference the parameters
pub fn new(params: Vec<String>, body: Box<Expr>) -> Self {
Self { params, body }
}
}

fn fmt_function(
f: &mut Formatter,
fun: &str,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl ExprSchemable for Expr {
// Grouping sets do not really have a type and do not appear in projections
Ok(DataType::Null)
}
Expr::Lambda(..) => Ok(DataType::Null),
}
}

Expand Down Expand Up @@ -342,6 +343,7 @@ impl ExprSchemable for Expr {
// in projections
Ok(true)
}
Expr::Lambda(..) => Ok(true),
}
}

Expand Down Expand Up @@ -559,6 +561,7 @@ impl ExprSchemable for Expr {
| Expr::Wildcard { .. }
| Expr::GroupingSet(_)
| Expr::Placeholder(_)
| Expr::Lambda(..)
| Expr::Unnest(_) => Ok(Arc::new(Field::new(
&schema_name,
self.get_type(schema)?,
Expand Down
66 changes: 66 additions & 0 deletions datafusion/expr/src/lambda.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::fmt::Debug;

use arrow::array::RecordBatch;
use datafusion_common::{DFSchema, Result};
use datafusion_expr_common::columnar_value::ColumnarValue;

use crate::expr::LambdaFunction;

/// Trait for planning lambda functions into their physical representation.
///
/// This trait is implemented by query planners to convert logical lambda expressions
/// into executable physical lambda functions that can be evaluated at runtime.
pub trait LambdaPlanner {
/// Plans a logical lambda function into a physical lambda implementation.
///
/// # Arguments
/// * `lambda` - The logical lambda function to plan
/// * `df_schema` - The schema context for the lambda function
///
/// # Returns
/// A boxed physical lambda that can be executed
fn plan_lambda(
&self,
lambda: &LambdaFunction,
df_schema: &DFSchema,
) -> Result<Box<dyn PhysicalLambda>>;
}

/// Trait for physical lambda functions that can be executed on record batches.
///
/// Physical lambda functions are the runtime representation of lambda expressions
/// that have been planned and optimized for execution. They can evaluate lambda
/// logic against columnar data in record batches.
pub trait PhysicalLambda: Send + Sync + Debug {
/// Returns the parameter names for this lambda function.
///
/// # Returns
/// A slice of parameter names that this lambda expects
fn params(&self) -> &[String];

/// Evaluates the lambda function against a record batch.
///
/// # Arguments
/// * `batch` - The record batch containing the input data
///
/// # Returns
/// The result of evaluating the lambda function as a columnar value
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
}
2 changes: 2 additions & 0 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
//!
//! The [expr_fn] module contains functions for creating expressions.
mod lambda;
mod literal;
mod operation;
mod partition_evaluator;
Expand Down Expand Up @@ -97,6 +98,7 @@ pub use function::{
AccumulatorFactoryFunction, PartitionEvaluatorFactory, ReturnTypeFunction,
ScalarFunctionImplementation, StateTypeFunction,
};
pub use lambda::{LambdaPlanner, PhysicalLambda};
pub use literal::{
lit, lit_timestamp_nano, lit_with_metadata, Literal, TimestampLiteral,
};
Expand Down
13 changes: 11 additions & 2 deletions datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

use crate::expr::{
AggregateFunction, AggregateFunctionParams, Alias, Between, BinaryExpr, Case, Cast,
GroupingSet, InList, InSubquery, Like, Placeholder, ScalarFunction, TryCast, Unnest,
WindowFunction, WindowFunctionParams,
GroupingSet, InList, InSubquery, LambdaFunction, Like, Placeholder, ScalarFunction,
TryCast, Unnest, WindowFunction, WindowFunctionParams,
};
use crate::{Expr, ExprFunctionExt};

Expand Down Expand Up @@ -105,6 +105,9 @@ impl TreeNode for Expr {
Expr::InList(InList { expr, list, .. }) => {
(expr, list).apply_ref_elements(f)
}
Expr::Lambda(LambdaFunction { body, .. }) => {
body.apply_elements(f)
}
}
}

Expand Down Expand Up @@ -312,6 +315,12 @@ impl TreeNode for Expr {
.update_data(|(new_expr, new_list)| {
Expr::InList(InList::new(new_expr, new_list, negated))
}),
Expr::Lambda(LambdaFunction {
params: arguments,
body,
}) => body.map_elements(f)?.update_data(|new_body| {
Expr::Lambda(LambdaFunction::new(arguments, new_body))
}),
})
}
}
Loading