Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion acceptance/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ pub async fn assert_scan_metadata(
test_case: &TestCaseInfo,
) -> TestResult<()> {
let table_root = test_case.table_root()?;
let snapshot = Snapshot::builder(table_root).build(engine.as_ref())?;
let snapshot = Snapshot::builder()
.with_table_root(table_root)
.build(engine.as_ref())?;
let scan = snapshot.into_scan_builder().build()?;
let mut schema = None;
let batches: Vec<RecordBatch> = scan
Expand Down
7 changes: 5 additions & 2 deletions acceptance/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,14 @@ impl TestCaseInfo {
let engine = engine.as_ref();
let (latest, versions) = self.versions().await?;

let snapshot = Snapshot::builder(self.table_root()?).build(engine)?;
let snapshot = Snapshot::builder()
.with_table_root(self.table_root()?)
.build(engine)?;
self.assert_snapshot_meta(&latest, &snapshot)?;

for table_version in versions {
let snapshot = Snapshot::builder(self.table_root()?)
let snapshot = Snapshot::builder()
.with_table_root(self.table_root()?)
.at_version(table_version.version)
.build(engine)?;
self.assert_snapshot_meta(&table_version, &snapshot)?;
Expand Down
4 changes: 2 additions & 2 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,15 +601,15 @@ fn snapshot_impl(
extern_engine: &dyn ExternEngine,
version: Option<Version>,
) -> DeltaResult<Handle<SharedSnapshot>> {
let builder = Snapshot::builder(url?);
let builder = Snapshot::builder().with_table_root(url?);
let builder = if let Some(v) = version {
// TODO: should we include a `with_version_opt` method for the builder?
builder.at_version(v)
} else {
builder
};
let snapshot = builder.build(extern_engine.engine().as_ref())?;
Ok(Arc::new(snapshot).into())
Ok(snapshot.into())
}

/// # Safety
Expand Down
5 changes: 3 additions & 2 deletions ffi/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{DeltaResult, ExternEngine, Snapshot, Url};
use crate::{ExclusiveEngineData, SharedExternEngine};
use delta_kernel::transaction::{CommitResult, Transaction};
use delta_kernel_ffi_macros::handle_descriptor;
use std::sync::Arc;

/// A handle representing an exclusive transaction on a Delta table. (Similar to a Box<_>)
///
Expand Down Expand Up @@ -38,7 +37,9 @@ fn transaction_impl(
url: DeltaResult<Url>,
extern_engine: &dyn ExternEngine,
) -> DeltaResult<Handle<ExclusiveTransaction>> {
let snapshot = Arc::new(Snapshot::builder(url?).build(extern_engine.engine().as_ref())?);
let snapshot = Snapshot::builder()
.with_table_root(url?)
.build(extern_engine.engine().as_ref())?;
let transaction = snapshot.transaction();
Ok(Box::new(transaction?).into())
}
Expand Down
12 changes: 6 additions & 6 deletions kernel/benches/metadata_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ fn create_snapshot_benchmark(c: &mut Criterion) {

c.bench_function("create_snapshot", |b| {
b.iter(|| {
Snapshot::builder(url.clone())
Snapshot::builder()
.with_table_root(url.clone())
.build(engine.as_ref())
.expect("Failed to create snapshot")
})
Expand All @@ -63,11 +64,10 @@ fn create_snapshot_benchmark(c: &mut Criterion) {
fn scan_metadata_benchmark(c: &mut Criterion) {
let (_tempdir, url, engine) = setup();

let snapshot = Arc::new(
Snapshot::builder(url.clone())
.build(engine.as_ref())
.expect("Failed to create snapshot"),
);
let snapshot = Snapshot::builder()
.with_table_root(url.clone())
.build(engine.as_ref())
.expect("Failed to create snapshot");

let mut group = c.benchmark_group("scan_metadata");
group.sample_size(SCAN_METADATA_BENCH_SAMPLE_SIZE);
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub fn get_engine(

/// Construct a scan at the latest snapshot. This is over the specified table and using the passed
/// engine. Parameters of the scan are controlled by the specified `ScanArgs`
pub fn get_scan(snapshot: Snapshot, args: &ScanArgs) -> DeltaResult<Option<Scan>> {
pub fn get_scan(snapshot: Arc<Snapshot>, args: &ScanArgs) -> DeltaResult<Option<Scan>> {
if args.schema_only {
println!("{:#?}", snapshot.schema());
return Ok(None);
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ fn try_main() -> DeltaResult<()> {

let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
let engine = common::get_engine(&url, &cli.location_args)?;
let snapshot = Snapshot::builder(url).build(&engine)?;
let snapshot = Snapshot::builder().with_table_root(url).build(&engine)?;

match cli.command {
Commands::TableVersion => {
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn try_main() -> DeltaResult<()> {
let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
println!("Reading {url}");
let engine = common::get_engine(&url, &cli.location_args)?;
let snapshot = Snapshot::builder(url).build(&engine)?;
let snapshot = Snapshot::builder().with_table_root(url).build(&engine)?;
let Some(scan) = common::get_scan(snapshot, &cli.scan_args)? else {
return Ok(());
};
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/read-table-single-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn try_main() -> DeltaResult<()> {
let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
println!("Reading {url}");
let engine = common::get_engine(&url, &cli.location_args)?;
let snapshot = Snapshot::builder(url).build(&engine)?;
let snapshot = Snapshot::builder().with_table_root(url).build(&engine)?;
let Some(scan) = common::get_scan(snapshot, &cli.scan_args)? else {
return Ok(());
};
Expand Down
17 changes: 12 additions & 5 deletions kernel/examples/write-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn try_main() -> DeltaResult<()> {
)?;

// Create or get the table
let snapshot = Arc::new(create_or_get_base_snapshot(&url, &engine, &cli.schema).await?);
let snapshot = create_or_get_base_snapshot(&url, &engine, &cli.schema).await?;

// Create sample data based on the schema
let sample_data = create_sample_data(&snapshot.schema(), cli.num_rows)?;
Expand Down Expand Up @@ -123,9 +123,12 @@ async fn create_or_get_base_snapshot(
url: &Url,
engine: &dyn Engine,
schema_str: &str,
) -> DeltaResult<Snapshot> {
) -> DeltaResult<Arc<Snapshot>> {
// Check if table already exists
match Snapshot::builder(url.clone()).build(engine) {
match Snapshot::builder()
.with_table_root(url.clone())
.build(engine)
{
Ok(snapshot) => {
println!("✓ Found existing table at version {}", snapshot.version());
Ok(snapshot)
Expand All @@ -135,7 +138,9 @@ async fn create_or_get_base_snapshot(
println!("Creating new Delta table...");
let schema = parse_schema(schema_str)?;
create_table(url, &schema).await?;
Snapshot::builder(url.clone()).build(engine)
Snapshot::builder()
.with_table_root(url.clone())
.build(engine)
}
}
}
Expand Down Expand Up @@ -294,7 +299,9 @@ async fn read_and_display_data(
table_url: &Url,
engine: DefaultEngine<TokioBackgroundExecutor>,
) -> DeltaResult<()> {
let snapshot = Snapshot::builder(table_url.clone()).build(&engine)?;
let snapshot = Snapshot::builder()
.with_table_root(table_url.clone())
.build(&engine)?;
let scan = snapshot.into_scan_builder().build()?;

let batches: Vec<RecordBatch> = scan
Expand Down
15 changes: 12 additions & 3 deletions kernel/src/actions/set_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ mod tests {
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();

let snapshot = Snapshot::builder(url).build(&engine).unwrap();
let snapshot = Snapshot::builder()
.with_table_root(url)
.build(&engine)
.unwrap();
let log_segment = snapshot.log_segment();

(
Expand Down Expand Up @@ -163,7 +166,10 @@ mod tests {
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let snapshot = Snapshot::builder(url).build(&engine).unwrap();
let snapshot = Snapshot::builder()
.with_table_root(url)
.build(&engine)
.unwrap();
let log_segment = snapshot.log_segment();

// The checkpoint has five parts, each containing one action. There are two app ids.
Expand All @@ -180,7 +186,10 @@ mod tests {
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let snapshot = Snapshot::builder(url).build(&engine).unwrap();
let snapshot = Snapshot::builder()
.with_table_root(url)
.build(&engine)
.unwrap();
let log_segment = snapshot.log_segment();

// Test with no retention (should get all transactions)
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
//!
//! // Create a snapshot for the table at the version you want to checkpoint
//! let url = delta_kernel::try_parse_uri("./tests/data/app-txn-no-checkpoint")?;
//! let snapshot = Arc::new(Snapshot::builder(url).build(engine)?);
//! let snapshot = Arc::new(Snapshot::builder().with_table_root(url).build(engine)?);
//!
//! // Create a checkpoint writer from the snapshot
//! let mut writer = snapshot.checkpoint()?;
Expand Down
32 changes: 22 additions & 10 deletions kernel/src/checkpoint/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> {
)?;

let table_root = Url::parse("memory:///")?;
let snapshot = Snapshot::builder(table_root).build(&engine)?;
let writer = Arc::new(snapshot).checkpoint()?;
let snapshot = Snapshot::builder()
.with_table_root(table_root)
.build(&engine)?;
let writer = snapshot.clone().checkpoint()?;

let checkpoint_batch = writer.create_checkpoint_metadata_batch(&engine)?;

Expand Down Expand Up @@ -295,8 +297,10 @@ fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> {
)?;

let table_root = Url::parse("memory:///")?;
let snapshot = Arc::new(Snapshot::builder(table_root).build(&engine)?);
let writer = snapshot.checkpoint()?;
let snapshot = Snapshot::builder()
.with_table_root(table_root)
.build(&engine)?;
let writer = snapshot.clone().checkpoint()?;

// Verify the checkpoint file path is the latest version by default.
assert_eq!(
Expand Down Expand Up @@ -363,8 +367,11 @@ fn test_v1_checkpoint_specific_version() -> DeltaResult<()> {

let table_root = Url::parse("memory:///")?;
// Specify version 0 for checkpoint
let snapshot = Arc::new(Snapshot::builder(table_root).at_version(0).build(&engine)?);
let writer = snapshot.checkpoint()?;
let snapshot = Snapshot::builder()
.with_table_root(table_root)
.at_version(0)
.build(&engine)?;
let writer = snapshot.clone().checkpoint()?;

// Verify the checkpoint file path is the specified version.
assert_eq!(
Expand Down Expand Up @@ -411,8 +418,11 @@ fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaR
)?;

let table_root = Url::parse("memory:///")?;
let snapshot = Arc::new(Snapshot::builder(table_root).at_version(0).build(&engine)?);
let writer = snapshot.checkpoint()?;
let snapshot = Snapshot::builder()
.with_table_root(table_root)
.at_version(0)
.build(&engine)?;
let writer = snapshot.clone().checkpoint()?;
let data_iter = writer.checkpoint_data(&engine)?;

/* The returned data iterator has batches that we do not consume */
Expand Down Expand Up @@ -465,8 +475,10 @@ fn test_v2_checkpoint_supported_table() -> DeltaResult<()> {
)?;

let table_root = Url::parse("memory:///")?;
let snapshot = Arc::new(Snapshot::builder(table_root).build(&engine)?);
let writer = snapshot.checkpoint()?;
let snapshot = Snapshot::builder()
.with_table_root(table_root)
.build(&engine)?;
let writer = snapshot.clone().checkpoint()?;

// Verify the checkpoint file path is the latest version by default.
assert_eq!(
Expand Down
5 changes: 4 additions & 1 deletion kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ fn test_replay_for_metadata() {
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let snapshot = Snapshot::builder(url).build(&engine).unwrap();
let snapshot = Snapshot::builder()
.with_table_root(url)
.build(&engine)
.unwrap();
let data: Vec<_> = snapshot
.log_segment()
.replay_for_metadata(&engine)
Expand Down
Loading
Loading