Skip to content

Commit 0609301

Browse files
authored
Merge branch 'main' into manndp/write_domain_metadata
2 parents 1aa7f66 + 3c3b650 commit 0609301

File tree

19 files changed

+1142
-65
lines changed

19 files changed

+1142
-65
lines changed

ffi/src/expressions/engine_visitor.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,16 @@ use delta_kernel::expressions::{
1010
ColumnName, Expression, ExpressionRef, JunctionPredicate, JunctionPredicateOp, MapData,
1111
OpaqueExpression, OpaqueExpressionOpRef, OpaquePredicate, OpaquePredicateOpRef, Predicate,
1212
Scalar, StructData, Transform, UnaryExpression, UnaryExpressionOp, UnaryPredicate,
13-
UnaryPredicateOp,
13+
UnaryPredicateOp, VariadicExpression, VariadicExpressionOp,
1414
};
1515

1616
use std::ffi::c_void;
1717

1818
type VisitLiteralFn<T> = extern "C" fn(data: *mut c_void, sibling_list_id: usize, value: T);
1919
type VisitUnaryFn = extern "C" fn(data: *mut c_void, sibling_list_id: usize, child_list_id: usize);
2020
type VisitBinaryFn = extern "C" fn(data: *mut c_void, sibling_list_id: usize, child_list_id: usize);
21+
type VisitVariadicFn =
22+
extern "C" fn(data: *mut c_void, sibling_list_id: usize, child_list_id: usize);
2123
type VisitJunctionFn =
2224
extern "C" fn(data: *mut c_void, sibling_list_id: usize, child_list_id: usize);
2325

@@ -163,6 +165,9 @@ pub struct EngineExpressionVisitor {
163165
/// Visits the `Divide` binary operator belonging to the list identified by `sibling_list_id`.
164166
/// The operands will be in a _two_ item list identified by `child_list_id`
165167
pub visit_divide: VisitBinaryFn,
168+
/// Visits the `Coalesce` variadic operator belonging to the list identified by `sibling_list_id`.
169+
/// The operands will be in a list identified by `child_list_id`
170+
pub visit_coalesce: VisitVariadicFn,
166171
/// Visits the `column` belonging to the list identified by `sibling_list_id`.
167172
pub visit_column:
168173
extern "C" fn(data: *mut c_void, sibling_list_id: usize, name: KernelStringSlice),
@@ -610,6 +615,16 @@ fn visit_expression_impl(
610615
};
611616
visit_fn(visitor.data, sibling_list_id, child_list_id);
612617
}
618+
Expression::Variadic(VariadicExpression { op, exprs }) => {
619+
let child_list_id = call!(visitor, make_field_list, exprs.len());
620+
for expr in exprs {
621+
visit_expression_impl(visitor, expr, child_list_id);
622+
}
623+
let visit_fn = match op {
624+
VariadicExpressionOp::Coalesce => visitor.visit_coalesce,
625+
};
626+
visit_fn(visitor.data, sibling_list_id, child_list_id);
627+
}
613628
Expression::Opaque(OpaqueExpression { op, exprs }) => {
614629
visit_expression_opaque(visitor, op, exprs, sibling_list_id)
615630
}

ffi/src/scan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex};
66

77
use delta_kernel::scan::state::DvInfo;
88
use delta_kernel::scan::{Scan, ScanMetadata};
9-
use delta_kernel::snapshot::Snapshot;
9+
use delta_kernel::snapshot::SnapshotRef;
1010
use delta_kernel::{DeltaResult, Error, Expression, ExpressionRef};
1111
use delta_kernel_ffi_macros::handle_descriptor;
1212
use tracing::debug;
@@ -103,7 +103,7 @@ pub unsafe extern "C" fn scan(
103103
}
104104

105105
fn scan_impl(
106-
snapshot: Arc<Snapshot>,
106+
snapshot: SnapshotRef,
107107
predicate: Option<&mut EnginePredicate>,
108108
) -> DeltaResult<Handle<SharedScan>> {
109109
let mut scan_builder = snapshot.scan_builder();

kernel/examples/common/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use delta_kernel::{
88
engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine},
99
scan::Scan,
1010
schema::Schema,
11-
DeltaResult, Snapshot,
11+
DeltaResult, SnapshotRef,
1212
};
1313

1414
use url::Url;
@@ -62,7 +62,7 @@ pub fn get_engine(
6262

6363
/// Construct a scan at the latest snapshot. This is over the specified table and using the passed
6464
/// engine. Parameters of the scan are controlled by the specified `ScanArgs`
65-
pub fn get_scan(snapshot: Arc<Snapshot>, args: &ScanArgs) -> DeltaResult<Option<Scan>> {
65+
pub fn get_scan(snapshot: SnapshotRef, args: &ScanArgs) -> DeltaResult<Option<Scan>> {
6666
if args.schema_only {
6767
println!("{:#?}", snapshot.schema());
6868
return Ok(None);

kernel/examples/write-table/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
1919
use delta_kernel::engine::default::DefaultEngine;
2020
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
2121
use delta_kernel::transaction::CommitResult;
22-
use delta_kernel::{DeltaResult, Engine, Error, Snapshot};
22+
use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef};
2323

2424
/// An example program that writes to a Delta table and creates it if necessary.
2525
#[derive(Parser)]
@@ -123,7 +123,7 @@ async fn create_or_get_base_snapshot(
123123
url: &Url,
124124
engine: &dyn Engine,
125125
schema_str: &str,
126-
) -> DeltaResult<Arc<Snapshot>> {
126+
) -> DeltaResult<SnapshotRef> {
127127
// Check if table already exists
128128
match Snapshot::builder_for(url.clone()).build(engine) {
129129
Ok(snapshot) => {

kernel/src/checkpoint/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
//! # use delta_kernel::checkpoint::CheckpointWriter;
3636
//! # use delta_kernel::Engine;
3737
//! # use delta_kernel::Snapshot;
38+
//! # use delta_kernel::SnapshotRef;
3839
//! # use delta_kernel::DeltaResult;
3940
//! # use delta_kernel::Error;
4041
//! # use delta_kernel::FileMeta;
@@ -79,6 +80,7 @@
7980
//!
8081
//! [`CheckpointMetadata`]: crate::actions::CheckpointMetadata
8182
//! [`LastCheckpointHint`]: crate::last_checkpoint_hint::LastCheckpointHint
83+
//! [`Snapshot::checkpoint`]: crate::Snapshot::checkpoint
8284
// Future extensions:
8385
// - TODO(#837): Multi-file V2 checkpoints are not supported yet. The API is designed to be extensible for future
8486
// multi-file support, but the current implementation only supports single-file checkpoints.
@@ -98,7 +100,7 @@ use crate::last_checkpoint_hint::LastCheckpointHint;
98100
use crate::log_replay::LogReplayProcessor;
99101
use crate::path::ParsedLogPath;
100102
use crate::schema::{DataType, SchemaRef, StructField, StructType, ToSchema as _};
101-
use crate::snapshot::Snapshot;
103+
use crate::snapshot::SnapshotRef;
102104
use crate::table_properties::TableProperties;
103105
use crate::{DeltaResult, Engine, EngineData, Error, EvaluationHandlerExtension, FileMeta};
104106

@@ -194,7 +196,7 @@ impl Iterator for CheckpointDataIterator {
194196
/// See the [module-level documentation](self) for the complete checkpoint workflow
195197
pub struct CheckpointWriter {
196198
/// Reference to the snapshot (i.e. version) of the table being checkpointed
197-
pub(crate) snapshot: Arc<Snapshot>,
199+
pub(crate) snapshot: SnapshotRef,
198200

199201
/// The version of the snapshot being checkpointed.
200202
/// Note: Although the version is stored as a u64 in the snapshot, it is stored as an i64
@@ -210,7 +212,7 @@ impl RetentionCalculator for CheckpointWriter {
210212

211213
impl CheckpointWriter {
212214
/// Creates a new [`CheckpointWriter`] for the given snapshot.
213-
pub(crate) fn try_new(snapshot: Arc<Snapshot>) -> DeltaResult<Self> {
215+
pub(crate) fn try_new(snapshot: SnapshotRef) -> DeltaResult<Self> {
214216
let version = i64::try_from(snapshot.version()).map_err(|e| {
215217
Error::CheckpointWrite(format!(
216218
"Failed to convert checkpoint version from u64 {} to i64: {}",

0 commit comments

Comments
 (0)