Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "0.11.0", features = ["arrow-55", "internal-api"] }
delta_kernel = { git = "https://github.com/OussamaSaoudi/delta-kernel-rs", branch = "history_5_history_high_level_and_integration", features = ["arrow-55", "internal-api"] }

# arrow
arrow = { version = "=55.0.0" }
Expand Down
130 changes: 1 addition & 129 deletions crates/core/src/delta_datafusion/cdf/mod.rs
Original file line number Diff line number Diff line change
@@ -1,130 +1,2 @@
//! Logical operators and physical executions for CDF
use std::collections::HashMap;
use std::sync::LazyLock;

use arrow_schema::{DataType, Field, TimeUnit};

pub(crate) use self::scan_utils::*;
use crate::kernel::{Add, AddCDCFile, Remove};
use crate::DeltaResult;

// //! Logical operators and physical executions for CDF
pub mod scan;
mod scan_utils;

/// Change type column name
pub const CHANGE_TYPE_COL: &str = "_change_type";
/// Commit version column name
pub const COMMIT_VERSION_COL: &str = "_commit_version";
/// Commit Timestamp column name
pub const COMMIT_TIMESTAMP_COL: &str = "_commit_timestamp";

pub(crate) static CDC_PARTITION_SCHEMA: LazyLock<Vec<Field>> = LazyLock::new(|| {
vec![
Field::new(COMMIT_VERSION_COL, DataType::Int64, true),
Field::new(
COMMIT_TIMESTAMP_COL,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
]
});
pub(crate) static ADD_PARTITION_SCHEMA: LazyLock<Vec<Field>> = LazyLock::new(|| {
vec![
Field::new(CHANGE_TYPE_COL, DataType::Utf8, true),
Field::new(COMMIT_VERSION_COL, DataType::Int64, true),
Field::new(
COMMIT_TIMESTAMP_COL,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
]
});

#[derive(Debug)]
pub(crate) struct CdcDataSpec<F: FileAction> {
version: i64,
timestamp: i64,
actions: Vec<F>,
}

impl<F: FileAction> CdcDataSpec<F> {
pub fn new(version: i64, timestamp: i64, actions: Vec<F>) -> Self {
Self {
version,
timestamp,
actions,
}
}
}

/// This trait defines a generic set of operations used by CDF Reader
pub trait FileAction {
/// Adds partition values
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>>;
/// Physical Path to the data
fn path(&self) -> String;
/// Byte size of the physical file
fn size(&self) -> DeltaResult<usize>;
}

impl FileAction for Add {
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>> {
Ok(&self.partition_values)
}

fn path(&self) -> String {
self.path.clone()
}

fn size(&self) -> DeltaResult<usize> {
Ok(self.size as usize)
}
}

impl FileAction for AddCDCFile {
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>> {
Ok(&self.partition_values)
}

fn path(&self) -> String {
self.path.clone()
}

fn size(&self) -> DeltaResult<usize> {
Ok(self.size as usize)
}
}

impl FileAction for Remove {
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>> {
// If extended_file_metadata is true, it should be required to have this filled in
if self.extended_file_metadata.unwrap_or_default() {
Ok(self.partition_values.as_ref().unwrap())
} else {
match self.partition_values {
Some(ref part_map) => Ok(part_map),
_ => Err(crate::DeltaTableError::MetadataError(
"Remove action is missing required field: 'partition_values'".to_string(),
)),
}
}
}

fn path(&self) -> String {
self.path.clone()
}

fn size(&self) -> DeltaResult<usize> {
// If extended_file_metadata is true, it should be required to have this filled in
if self.extended_file_metadata.unwrap_or_default() {
Ok(self.size.unwrap() as usize)
} else {
match self.size {
Some(size) => Ok(size as usize),
_ => Err(crate::DeltaTableError::MetadataError(
"Remove action is missing required field: 'size'".to_string(),
)),
}
}
}
}
34 changes: 11 additions & 23 deletions crates/core/src/delta_datafusion/cdf/scan.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::any::Any;
use std::sync::Arc;

use arrow_schema::{Schema, SchemaRef};
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::catalog::Session;
use datafusion::catalog::TableProvider;
Expand All @@ -15,12 +15,9 @@ use datafusion_physical_plan::limit::GlobalLimitExec;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::ExecutionPlan;

use crate::operations::table_changes::TableChangesBuilder;
use crate::DeltaResult;
use crate::DeltaTableError;
use crate::{
delta_datafusion::DataFusionMixins, operations::load_cdf::CdfLoadBuilder, DeltaResult,
};

use super::ADD_PARTITION_SCHEMA;

fn session_state_from_session(session: &dyn Session) -> DataFusionResult<&SessionState> {
session
Expand All @@ -31,21 +28,16 @@ fn session_state_from_session(session: &dyn Session) -> DataFusionResult<&Sessio

#[derive(Debug)]
pub struct DeltaCdfTableProvider {
cdf_builder: CdfLoadBuilder,
plan: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
}

impl DeltaCdfTableProvider {
/// Build a DeltaCDFTableProvider
pub fn try_new(cdf_builder: CdfLoadBuilder) -> DeltaResult<Self> {
let mut fields = cdf_builder.snapshot.input_schema()?.fields().to_vec();
for f in ADD_PARTITION_SCHEMA.clone() {
fields.push(f.into());
}
Ok(DeltaCdfTableProvider {
cdf_builder,
schema: Schema::new(fields).into(),
})
pub async fn try_new(cdf_builder: TableChangesBuilder) -> DeltaResult<Self> {
let plan: Arc<dyn ExecutionPlan> = cdf_builder.build().await?;
let schema = plan.schema();
Ok(DeltaCdfTableProvider { plan, schema })
}
}

Expand All @@ -70,18 +62,14 @@ impl TableProvider for DeltaCdfTableProvider {
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let session_state = session_state_from_session(session)?;
session_state_from_session(session)?;
let schema: DFSchema = self.schema().try_into()?;

let mut plan = if let Some(filter_expr) = conjunction(filters.iter().cloned()) {
let physical_expr = session.create_physical_expr(filter_expr, &schema)?;
let plan = self
.cdf_builder
.build(session_state, Some(&physical_expr))
.await?;
Arc::new(FilterExec::try_new(physical_expr, plan)?)
Arc::new(FilterExec::try_new(physical_expr, self.plan.clone())?)
} else {
self.cdf_builder.build(session_state, None).await?
self.plan.clone()
};

let df_schema: DFSchema = plan.schema().try_into()?;
Expand Down
101 changes: 0 additions & 101 deletions crates/core/src/delta_datafusion/cdf/scan_utils.rs

This file was deleted.

2 changes: 1 addition & 1 deletion crates/core/src/operations/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use datafusion_common::ScalarValue;

pub const CDC_COLUMN_NAME: &str = "_change_type";

/// The CDCTracker is useful for hooking reads/writes in a manner nececessary to create CDC files
/// The CDCTracker is useful for hooking reads/writes in a manner necessary to create CDC files
/// associated with commits
pub(crate) struct CDCTracker {
pre_dataframe: DataFrame,
Expand Down
24 changes: 12 additions & 12 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,9 +972,9 @@ mod tests {

let ctx = SessionContext::new();
let table = DeltaOps(table)
.load_cdf()
.table_changes()
.with_starting_version(0)
.build(&ctx.state(), None)
.build()
.await
.expect("Failed to load CDF");

Expand Down Expand Up @@ -1056,9 +1056,9 @@ mod tests {

let ctx = SessionContext::new();
let table = DeltaOps(table)
.load_cdf()
.table_changes()
.with_starting_version(0)
.build(&ctx.state(), None)
.build()
.await
.expect("Failed to load CDF");

Expand All @@ -1074,14 +1074,14 @@ mod tests {
let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(4)).collect();

assert_batches_sorted_eq! {[
"+-------+------+--------------+-----------------+",
"| value | year | _change_type | _commit_version |",
"+-------+------+--------------+-----------------+",
"| 1 | 2020 | insert | 1 |",
"| 2 | 2020 | delete | 2 |",
"| 2 | 2020 | insert | 1 |",
"| 3 | 2024 | insert | 1 |",
"+-------+------+--------------+-----------------+",
"+------+-------+--------------+-----------------+",
"| year | value | _change_type | _commit_version |",
"+------+-------+--------------+-----------------+",
"| 2020 | 1 | insert | 1 |",
"| 2020 | 2 | delete | 2 |",
"| 2020 | 2 | insert | 1 |",
"| 2024 | 3 | insert | 1 |",
"+------+-------+--------------+-----------------+",
], &batches }
}

Expand Down
Loading
Loading