From 5c56e5df1fea914b3bc55c73c03ec418595bb3c3 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 9 Sep 2025 18:02:59 -0700 Subject: [PATCH 1/4] log_root --- acceptance/src/data.rs | 2 +- acceptance/src/meta.rs | 4 +-- ffi/src/lib.rs | 4 +-- ffi/src/transaction/mod.rs | 3 +- kernel/benches/metadata_bench.rs | 4 +-- kernel/examples/common/src/lib.rs | 2 +- kernel/examples/inspect-table/src/main.rs | 2 +- .../read-table-multi-threaded/src/main.rs | 2 +- .../read-table-single-threaded/src/main.rs | 2 +- kernel/examples/write-table/src/main.rs | 10 +++---- kernel/src/actions/set_transaction.rs | 6 ++-- kernel/src/checkpoint/mod.rs | 2 +- kernel/src/checkpoint/tests.rs | 20 ++++++------- kernel/src/log_segment/tests.rs | 2 +- kernel/src/scan/mod.rs | 18 ++++++------ kernel/tests/dv.rs | 4 +-- kernel/tests/golden_tables.rs | 6 ++-- kernel/tests/hdfs.rs | 2 +- kernel/tests/read.rs | 14 +++++----- kernel/tests/v2_checkpoints.rs | 2 +- kernel/tests/write.rs | 28 +++++++++---------- test-utils/src/lib.rs | 2 +- 22 files changed, 69 insertions(+), 72 deletions(-) diff --git a/acceptance/src/data.rs b/acceptance/src/data.rs index 00a9b6a1b..a947aa666 100644 --- a/acceptance/src/data.rs +++ b/acceptance/src/data.rs @@ -115,7 +115,7 @@ pub async fn assert_scan_metadata( test_case: &TestCaseInfo, ) -> TestResult<()> { let table_root = test_case.table_root()?; - let snapshot = Snapshot::builder(table_root).build(engine.as_ref())?; + let snapshot = Snapshot::builder().with_table_root(table_root).build(engine.as_ref())?; let scan = snapshot.into_scan_builder().build()?; let mut schema = None; let batches: Vec = scan diff --git a/acceptance/src/meta.rs b/acceptance/src/meta.rs index 7884c8f95..72f33db8a 100644 --- a/acceptance/src/meta.rs +++ b/acceptance/src/meta.rs @@ -103,11 +103,11 @@ impl TestCaseInfo { let engine = engine.as_ref(); let (latest, versions) = self.versions().await?; - let snapshot = Snapshot::builder(self.table_root()?).build(engine)?; + let snapshot = Snapshot::builder().with_table_root(self.table_root()?).build(engine)?; self.assert_snapshot_meta(&latest, &snapshot)?; for table_version in versions { - let snapshot = Snapshot::builder(self.table_root()?) + let snapshot = Snapshot::builder().with_table_root(self.table_root()?) .at_version(table_version.version) .build(engine)?; self.assert_snapshot_meta(&table_version, &snapshot)?; diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index 868c12f18..3f344902b 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -601,7 +601,7 @@ fn snapshot_impl( extern_engine: &dyn ExternEngine, version: Option, ) -> DeltaResult> { - let builder = Snapshot::builder(url?); + let builder = Snapshot::builder().with_table_root(url?); let builder = if let Some(v) = version { // TODO: should we include a `with_version_opt` method for the builder? builder.at_version(v) @@ -609,7 +609,7 @@ fn snapshot_impl( builder }; let snapshot = builder.build(extern_engine.engine().as_ref())?; - Ok(Arc::new(snapshot).into()) + Ok(snapshot.into()) } /// # Safety diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 56aab541d..5faa16020 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -9,7 +9,6 @@ use crate::{DeltaResult, ExternEngine, Snapshot, Url}; use crate::{ExclusiveEngineData, SharedExternEngine}; use delta_kernel::transaction::{CommitResult, Transaction}; use delta_kernel_ffi_macros::handle_descriptor; -use std::sync::Arc; /// A handle representing an exclusive transaction on a Delta table. (Similar to a Box<_>) /// @@ -38,7 +37,7 @@ fn transaction_impl( url: DeltaResult, extern_engine: &dyn ExternEngine, ) -> DeltaResult> { - let snapshot = Arc::new(Snapshot::builder(url?).build(extern_engine.engine().as_ref())?); + let snapshot = Snapshot::builder().with_table_root(url?).build(extern_engine.engine().as_ref())?; let transaction = snapshot.transaction(); Ok(Box::new(transaction?).into()) } diff --git a/kernel/benches/metadata_bench.rs b/kernel/benches/metadata_bench.rs index d4b067578..278776894 100644 --- a/kernel/benches/metadata_bench.rs +++ b/kernel/benches/metadata_bench.rs @@ -53,7 +53,7 @@ fn create_snapshot_benchmark(c: &mut Criterion) { c.bench_function("create_snapshot", |b| { b.iter(|| { - Snapshot::builder(url.clone()) + Snapshot::builder().with_table_root(url.clone()) .build(engine.as_ref()) .expect("Failed to create snapshot") }) @@ -64,7 +64,7 @@ fn scan_metadata_benchmark(c: &mut Criterion) { let (_tempdir, url, engine) = setup(); let snapshot = Arc::new( - Snapshot::builder(url.clone()) + Snapshot::builder().with_table_root(url.clone()) .build(engine.as_ref()) .expect("Failed to create snapshot"), ); diff --git a/kernel/examples/common/src/lib.rs b/kernel/examples/common/src/lib.rs index 437bb849c..e6ab98fcd 100644 --- a/kernel/examples/common/src/lib.rs +++ b/kernel/examples/common/src/lib.rs @@ -62,7 +62,7 @@ pub fn get_engine( /// Construct a scan at the latest snapshot. This is over the specified table and using the passed /// engine. Parameters of the scan are controlled by the specified `ScanArgs` -pub fn get_scan(snapshot: Snapshot, args: &ScanArgs) -> DeltaResult> { +pub fn get_scan(snapshot: Arc, args: &ScanArgs) -> DeltaResult> { if args.schema_only { println!("{:#?}", snapshot.schema()); return Ok(None); diff --git a/kernel/examples/inspect-table/src/main.rs b/kernel/examples/inspect-table/src/main.rs index a7b225dd0..b476fa5ec 100644 --- a/kernel/examples/inspect-table/src/main.rs +++ b/kernel/examples/inspect-table/src/main.rs @@ -182,7 +182,7 @@ fn try_main() -> DeltaResult<()> { let url = delta_kernel::try_parse_uri(&cli.location_args.path)?; let engine = common::get_engine(&url, &cli.location_args)?; - let snapshot = Snapshot::builder(url).build(&engine)?; + let snapshot = Snapshot::builder().with_table_root(url).build(&engine)?; match cli.command { Commands::TableVersion => { diff --git a/kernel/examples/read-table-multi-threaded/src/main.rs b/kernel/examples/read-table-multi-threaded/src/main.rs index b5f0e6ae9..7f096b9d0 100644 --- a/kernel/examples/read-table-multi-threaded/src/main.rs +++ b/kernel/examples/read-table-multi-threaded/src/main.rs @@ -99,7 +99,7 @@ fn try_main() -> DeltaResult<()> { let url = delta_kernel::try_parse_uri(&cli.location_args.path)?; println!("Reading {url}"); let engine = common::get_engine(&url, &cli.location_args)?; - let snapshot = Snapshot::builder(url).build(&engine)?; + let snapshot = Snapshot::builder().with_table_root(url).build(&engine)?; let Some(scan) = common::get_scan(snapshot, &cli.scan_args)? else { return Ok(()); }; diff --git a/kernel/examples/read-table-single-threaded/src/main.rs b/kernel/examples/read-table-single-threaded/src/main.rs index dca65d9a5..54dd5215d 100644 --- a/kernel/examples/read-table-single-threaded/src/main.rs +++ b/kernel/examples/read-table-single-threaded/src/main.rs @@ -43,7 +43,7 @@ fn try_main() -> DeltaResult<()> { let url = delta_kernel::try_parse_uri(&cli.location_args.path)?; println!("Reading {url}"); let engine = common::get_engine(&url, &cli.location_args)?; - let snapshot = Snapshot::builder(url).build(&engine)?; + let snapshot = Snapshot::builder().with_table_root(url).build(&engine)?; let Some(scan) = common::get_scan(snapshot, &cli.scan_args)? else { return Ok(()); }; diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index d0a4a924d..fb73c7459 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -79,7 +79,7 @@ async fn try_main() -> DeltaResult<()> { )?; // Create or get the table - let snapshot = Arc::new(create_or_get_base_snapshot(&url, &engine, &cli.schema).await?); + let snapshot = create_or_get_base_snapshot(&url, &engine, &cli.schema).await?; // Create sample data based on the schema let sample_data = create_sample_data(&snapshot.schema(), cli.num_rows)?; @@ -123,9 +123,9 @@ async fn create_or_get_base_snapshot( url: &Url, engine: &dyn Engine, schema_str: &str, -) -> DeltaResult { +) -> DeltaResult> { // Check if table already exists - match Snapshot::builder(url.clone()).build(engine) { + match Snapshot::builder().with_table_root(url.clone()).build(engine) { Ok(snapshot) => { println!("✓ Found existing table at version {}", snapshot.version()); Ok(snapshot) @@ -135,7 +135,7 @@ async fn create_or_get_base_snapshot( println!("Creating new Delta table..."); let schema = parse_schema(schema_str)?; create_table(url, &schema).await?; - Snapshot::builder(url.clone()).build(engine) + Snapshot::builder().with_table_root(url.clone()).build(engine) } } } @@ -294,7 +294,7 @@ async fn read_and_display_data( table_url: &Url, engine: DefaultEngine, ) -> DeltaResult<()> { - let snapshot = Snapshot::builder(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; let scan = snapshot.into_scan_builder().build()?; let batches: Vec = scan diff --git a/kernel/src/actions/set_transaction.rs b/kernel/src/actions/set_transaction.rs index 792dfc628..9d18ba9bf 100644 --- a/kernel/src/actions/set_transaction.rs +++ b/kernel/src/actions/set_transaction.rs @@ -113,7 +113,7 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder().with_table_root(url).build(&engine).unwrap(); let log_segment = snapshot.log_segment(); ( @@ -163,7 +163,7 @@ mod tests { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder().with_table_root(url).build(&engine).unwrap(); let log_segment = snapshot.log_segment(); // The checkpoint has five parts, each containing one action. There are two app ids. @@ -180,7 +180,7 @@ mod tests { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder().with_table_root(url).build(&engine).unwrap(); let log_segment = snapshot.log_segment(); // Test with no retention (should get all transactions) diff --git a/kernel/src/checkpoint/mod.rs b/kernel/src/checkpoint/mod.rs index b9afe04fe..af323d5c4 100644 --- a/kernel/src/checkpoint/mod.rs +++ b/kernel/src/checkpoint/mod.rs @@ -47,7 +47,7 @@ //! //! // Create a snapshot for the table at the version you want to checkpoint //! let url = delta_kernel::try_parse_uri("./tests/data/app-txn-no-checkpoint")?; -//! let snapshot = Arc::new(Snapshot::builder(url).build(engine)?); +//! let snapshot = Arc::new(Snapshot::builder().with_table_root(url).build(engine)?); //! //! // Create a checkpoint writer from the snapshot //! let mut writer = snapshot.checkpoint()?; diff --git a/kernel/src/checkpoint/tests.rs b/kernel/src/checkpoint/tests.rs index 20edff045..99fa94e20 100644 --- a/kernel/src/checkpoint/tests.rs +++ b/kernel/src/checkpoint/tests.rs @@ -72,8 +72,8 @@ fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> { )?; let table_root = Url::parse("memory:///")?; - let snapshot = Snapshot::builder(table_root).build(&engine)?; - let writer = Arc::new(snapshot).checkpoint()?; + let snapshot = Snapshot::builder().with_table_root(table_root).build(&engine)?; + let writer = snapshot.clone().checkpoint()?; let checkpoint_batch = writer.create_checkpoint_metadata_batch(&engine)?; @@ -295,8 +295,8 @@ fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> { )?; let table_root = Url::parse("memory:///")?; - let snapshot = Arc::new(Snapshot::builder(table_root).build(&engine)?); - let writer = snapshot.checkpoint()?; + let snapshot = Snapshot::builder().with_table_root(table_root).build(&engine)?; + let writer = snapshot.clone().checkpoint()?; // Verify the checkpoint file path is the latest version by default. assert_eq!( @@ -363,8 +363,8 @@ fn test_v1_checkpoint_specific_version() -> DeltaResult<()> { let table_root = Url::parse("memory:///")?; // Specify version 0 for checkpoint - let snapshot = Arc::new(Snapshot::builder(table_root).at_version(0).build(&engine)?); - let writer = snapshot.checkpoint()?; + let snapshot = Snapshot::builder().with_table_root(table_root).at_version(0).build(&engine)?; + let writer = snapshot.clone().checkpoint()?; // Verify the checkpoint file path is the specified version. assert_eq!( @@ -411,8 +411,8 @@ fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaR )?; let table_root = Url::parse("memory:///")?; - let snapshot = Arc::new(Snapshot::builder(table_root).at_version(0).build(&engine)?); - let writer = snapshot.checkpoint()?; + let snapshot = Snapshot::builder().with_table_root(table_root).at_version(0).build(&engine)?; + let writer = snapshot.clone().checkpoint()?; let data_iter = writer.checkpoint_data(&engine)?; /* The returned data iterator has batches that we do not consume */ @@ -465,8 +465,8 @@ fn test_v2_checkpoint_supported_table() -> DeltaResult<()> { )?; let table_root = Url::parse("memory:///")?; - let snapshot = Arc::new(Snapshot::builder(table_root).build(&engine)?); - let writer = snapshot.checkpoint()?; + let snapshot = Snapshot::builder().with_table_root(table_root).build(&engine)?; + let writer = snapshot.clone().checkpoint()?; // Verify the checkpoint file path is the latest version by default. assert_eq!( diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 3661b09d9..639b31c43 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -50,7 +50,7 @@ fn test_replay_for_metadata() { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder().with_table_root(url).build(&engine).unwrap(); let data: Vec<_> = snapshot .log_segment() .replay_for_metadata(&engine) diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 4d6d3a9ce..150752e9e 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -1256,7 +1256,7 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder().with_table_root(url).build(&engine).unwrap(); let scan = snapshot.into_scan_builder().build().unwrap(); let files = get_files_for_scan(scan, &engine).unwrap(); assert_eq!(files.len(), 1); @@ -1273,7 +1273,7 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Snapshot::builder(url).build(engine.as_ref()).unwrap(); + let snapshot = Snapshot::builder().with_table_root(url).build(engine.as_ref()).unwrap(); let scan = snapshot.into_scan_builder().build().unwrap(); let files: Vec = scan.execute(engine).unwrap().try_collect().unwrap(); @@ -1289,7 +1289,7 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Snapshot::builder(url).build(engine.as_ref()).unwrap(); + let snapshot = Snapshot::builder().with_table_root(url).build(engine.as_ref()).unwrap(); let version = snapshot.version(); let scan = snapshot.into_scan_builder().build().unwrap(); let files: Vec<_> = scan @@ -1323,7 +1323,7 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Snapshot::builder(url.clone()) + let snapshot = Snapshot::builder().with_table_root(url.clone()) .at_version(0) .build(engine.as_ref()) .unwrap(); @@ -1347,7 +1347,7 @@ mod tests { .into_iter() .map(|b| Box::new(ArrowEngineData::from(b)) as Box) .collect(); - let snapshot = Snapshot::builder(url) + let snapshot = Snapshot::builder().with_table_root(url) .at_version(1) .build(engine.as_ref()) .unwrap(); @@ -1419,7 +1419,7 @@ mod tests { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder().with_table_root(url).build(&engine).unwrap(); let scan = snapshot.into_scan_builder().build().unwrap(); let data: Vec<_> = scan .replay_for_scan_metadata(&engine) @@ -1439,7 +1439,7 @@ mod tests { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Arc::new(Snapshot::builder(url).build(engine.as_ref()).unwrap()); + let snapshot = Snapshot::builder().with_table_root(url).build(engine.as_ref()).unwrap(); // No predicate pushdown attempted, so the one data file should be returned. // @@ -1482,7 +1482,7 @@ mod tests { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Arc::new(Snapshot::builder(url).build(engine.as_ref()).unwrap()); + let snapshot = Snapshot::builder().with_table_root(url).build(engine.as_ref()).unwrap(); // Predicate over a logically valid but physically missing column. No data files should be // returned because the column is inferred to be all-null. @@ -1517,7 +1517,7 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder().with_table_root(url).build(&engine).unwrap(); let scan = snapshot.into_scan_builder().build()?; let files = get_files_for_scan(scan, &engine)?; // test case: diff --git a/kernel/tests/dv.rs b/kernel/tests/dv.rs index f70c0fa9b..7c70717a1 100644 --- a/kernel/tests/dv.rs +++ b/kernel/tests/dv.rs @@ -32,7 +32,7 @@ fn dv_table() -> Result<(), Box> { let url = url::Url::from_directory_path(path).unwrap(); let engine = DefaultEngine::new_local(); - let snapshot = Snapshot::builder(url).build(engine.as_ref())?; + let snapshot = Snapshot::builder().with_table_root(url).build(engine.as_ref())?; let scan = snapshot.into_scan_builder().build()?; let stream = scan.execute(engine)?; @@ -47,7 +47,7 @@ fn non_dv_table() -> Result<(), Box> { let url = url::Url::from_directory_path(path).unwrap(); let engine = DefaultEngine::new_local(); - let snapshot = Snapshot::builder(url).build(engine.as_ref())?; + let snapshot = Snapshot::builder().with_table_root(url).build(engine.as_ref())?; let scan = snapshot.into_scan_builder().build()?; let stream = scan.execute(engine)?; diff --git a/kernel/tests/golden_tables.rs b/kernel/tests/golden_tables.rs index 9e573a656..99f83f0a2 100644 --- a/kernel/tests/golden_tables.rs +++ b/kernel/tests/golden_tables.rs @@ -168,7 +168,7 @@ async fn latest_snapshot_test( url: Url, expected_path: Option, ) -> Result<(), Box> { - let snapshot = Snapshot::builder(url).build(&engine)?; + let snapshot = Snapshot::builder().with_table_root(url).build(&engine)?; let scan = snapshot.into_scan_builder().build()?; let scan_res = scan.execute(Arc::new(engine))?; let batches: Vec = scan_res @@ -271,7 +271,7 @@ async fn canonicalized_paths_test( _expected: Option, ) -> Result<(), Box> { // assert latest version is 1 and there are no files in the snapshot (add is removed) - let snapshot = Snapshot::builder(table_root).build(&engine).unwrap(); + let snapshot = Snapshot::builder().with_table_root(table_root).build(&engine).unwrap(); assert_eq!(snapshot.version(), 1); let scan = snapshot .into_scan_builder() @@ -287,7 +287,7 @@ async fn checkpoint_test( table_root: Url, _expected: Option, ) -> Result<(), Box> { - let snapshot = Snapshot::builder(table_root).build(&engine).unwrap(); + let snapshot = Snapshot::builder().with_table_root(table_root).build(&engine).unwrap(); let version = snapshot.version(); let scan = snapshot .into_scan_builder() diff --git a/kernel/tests/hdfs.rs b/kernel/tests/hdfs.rs index fedfac09f..5dbd1d11a 100644 --- a/kernel/tests/hdfs.rs +++ b/kernel/tests/hdfs.rs @@ -73,7 +73,7 @@ async fn read_table_version_hdfs() -> Result<(), Box> { Arc::new(TokioBackgroundExecutor::new()), )?; - let snapshot = Snapshot::builder(url).build(&engine)?; + let snapshot = Snapshot::builder().with_table_root(url).build(&engine)?; assert_eq!(snapshot.version(), 1); Ok(()) diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index b2c8de5b8..961bf0ce3 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -66,7 +66,7 @@ async fn single_commit_two_add_files() -> Result<(), Box> let expected_data = vec![batch.clone(), batch]; - let snapshot = Snapshot::builder(location).build(engine.as_ref())?; + let snapshot = Snapshot::builder().with_table_root(location).build(engine.as_ref())?; let scan = snapshot.into_scan_builder().build()?; let mut files = 0; @@ -118,7 +118,7 @@ async fn two_commits() -> Result<(), Box> { let expected_data = vec![batch.clone(), batch]; - let snapshot = Snapshot::builder(location).build(&engine)?; + let snapshot = Snapshot::builder().with_table_root(location).build(&engine)?; let scan = snapshot.into_scan_builder().build()?; let mut files = 0; @@ -171,7 +171,7 @@ async fn remove_action() -> Result<(), Box> { let expected_data = vec![batch]; - let snapshot = Snapshot::builder(location).build(&engine)?; + let snapshot = Snapshot::builder().with_table_root(location).build(&engine)?; let scan = snapshot.into_scan_builder().build()?; let stream = scan.execute(Arc::new(engine))?.zip(expected_data); @@ -242,7 +242,7 @@ async fn stats() -> Result<(), Box> { storage.clone(), Arc::new(TokioBackgroundExecutor::new()), )); - let snapshot = Arc::new(Snapshot::builder(location).build(engine.as_ref())?); + let snapshot = Snapshot::builder().with_table_root(location).build(engine.as_ref())?; // The first file has id between 1 and 3; the second has id between 5 and 7. For each operator, // we validate the boundary values where we expect the set of matched files to change. @@ -433,7 +433,7 @@ fn read_table_data( Arc::new(TokioBackgroundExecutor::new()), )?); - let snapshot = Snapshot::builder(url.clone()).build(engine.as_ref())?; + let snapshot = Snapshot::builder().with_table_root(url.clone()).build(engine.as_ref())?; let read_schema = select_cols.map(|select_cols| { let table_schema = snapshot.schema(); @@ -1057,7 +1057,7 @@ async fn predicate_on_non_nullable_partition_column() -> Result<(), Box Result<(), Box) -> DeltaResult Result<(), Box> { setup_test_tables(schema, &[], None, "test_table").await? { // create a transaction - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; let txn = snapshot.transaction()?.with_engine_info("default engine"); // commit! @@ -143,7 +143,7 @@ async fn write_data_and_check_result_and_stats( engine: Arc>, expected_since_commit: u64, ) -> Result<(), Box> { - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(engine.as_ref())?); + let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(engine.as_ref())?; let mut txn = snapshot.transaction()?; // create two new arrow record batches to append @@ -213,7 +213,7 @@ async fn test_commit_info_action() -> Result<(), Box> { for (table_url, engine, store, table_name) in setup_test_tables(schema.clone(), &[], None, "test_table").await? { - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; let txn = snapshot.transaction()?.with_engine_info("default engine"); txn.commit(&engine)?; @@ -397,7 +397,7 @@ async fn test_append_partitioned() -> Result<(), Box> { for (table_url, engine, store, table_name) in setup_test_tables(table_schema.clone(), &[partition_col], None, "test_table").await? { - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; let mut txn = snapshot.transaction()?.with_engine_info("default engine"); // create two new arrow record batches to append @@ -540,7 +540,7 @@ async fn test_append_invalid_schema() -> Result<(), Box> for (table_url, engine, _store, _table_name) in setup_test_tables(table_schema, &[], None, "test_table").await? { - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; let txn = snapshot.transaction()?.with_engine_info("default engine"); // create two new arrow record batches to append @@ -598,7 +598,7 @@ async fn test_write_txn_actions() -> Result<(), Box> { setup_test_tables(schema, &[], None, "test_table").await? { // can't have duplicate app_id in same transaction - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; assert!(matches!( snapshot .transaction()? @@ -608,7 +608,7 @@ async fn test_write_txn_actions() -> Result<(), Box> { Err(KernelError::Generic(msg)) if msg == "app_id app_id1 already exists in transaction" )); - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; let txn = snapshot .transaction()? .with_engine_info("default engine") @@ -618,11 +618,9 @@ async fn test_write_txn_actions() -> Result<(), Box> { // commit! txn.commit(&engine)?; - let snapshot = Arc::new( - Snapshot::builder(table_url.clone()) - .at_version(1) - .build(&engine)?, - ); + let snapshot = Snapshot::builder().with_table_root(table_url.clone()) + .at_version(1) + .build(&engine)?; assert_eq!( snapshot.clone().get_app_id_version("app_id1", &engine)?, Some(1) @@ -742,7 +740,7 @@ async fn test_append_timestamp_ntz() -> Result<(), Box> { ) .await?; - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; let mut txn = snapshot.transaction()?.with_engine_info("default engine"); // Create Arrow data with TIMESTAMP_NTZ values including edge cases @@ -871,7 +869,7 @@ async fn test_append_variant() -> Result<(), Box> { ) .await?; - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; let mut txn = snapshot.transaction()?; // First value corresponds to the variant value "1". Third value corresponds to the variant @@ -1081,7 +1079,7 @@ async fn test_shredded_variant_read_rejection() -> Result<(), Box, ) -> Result<(), Box> { - let snapshot = Snapshot::builder(url.clone()).build(engine.as_ref())?; + let snapshot = Snapshot::builder().with_table_root(url.clone()).build(engine.as_ref())?; let scan = snapshot.into_scan_builder().build()?; let batches = read_scan(&scan, engine)?; let formatted = pretty_format_batches(&batches).unwrap().to_string(); From 6be4e46a1acd9008b2997317fde715f5b8afb5af Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 9 Sep 2025 18:03:11 -0700 Subject: [PATCH 2/4] try_new_from --- kernel/src/snapshot.rs | 289 +++++++++----------------------- kernel/src/snapshot/builder.rs | 260 ++++++++++++++++++++++++++-- kernel/src/table_changes/mod.rs | 19 ++- 3 files changed, 339 insertions(+), 229 deletions(-) diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 3178186f8..8bd130ea0 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -8,7 +8,6 @@ use crate::actions::domain_metadata::domain_metadata_configuration; use crate::actions::set_transaction::SetTransactionScanner; use crate::actions::{Metadata, Protocol, INTERNAL_DOMAIN_PREFIX}; use crate::checkpoint::CheckpointWriter; -use crate::listed_log_files::ListedLogFiles; use crate::log_segment::LogSegment; use crate::scan::ScanBuilder; use crate::schema::SchemaRef; @@ -54,8 +53,8 @@ impl std::fmt::Debug for Snapshot { impl Snapshot { /// Create a new [`SnapshotBuilder`] to build a [`Snapshot`] for a given table root. - pub fn builder(table_root: Url) -> SnapshotBuilder { - SnapshotBuilder::new(table_root) + pub fn builder() -> SnapshotBuilder { + SnapshotBuilder::default() } #[internal_api] @@ -66,167 +65,6 @@ impl Snapshot { } } - /// Create a new [`Snapshot`] instance from an existing [`Snapshot`]. This is useful when you - /// already have a [`Snapshot`] lying around and want to do the minimal work to 'update' the - /// snapshot to a later version. - /// - /// We implement a simple heuristic: - /// 1. if the new version == existing version, just return the existing snapshot - /// 2. if the new version < existing version, error: there is no optimization to do here - /// 3. list from (existing checkpoint version + 1) onward (or just existing snapshot version if - /// no checkpoint) - /// 4. a. if new checkpoint is found: just create a new snapshot from that checkpoint (and - /// commits after it) - /// b. if no new checkpoint is found: do lightweight P+M replay on the latest commits (after - /// ensuring we only retain commits > any checkpoints) - /// - /// # Parameters - /// - /// - `existing_snapshot`: reference to an existing [`Snapshot`] - /// - `engine`: Implementation of [`Engine`] apis. - /// - `version`: target version of the [`Snapshot`]. None will create a snapshot at the latest - /// version of the table. - pub fn try_new_from( - existing_snapshot: Arc, - engine: &dyn Engine, - version: impl Into>, - ) -> DeltaResult> { - let old_log_segment = &existing_snapshot.log_segment; - let old_version = existing_snapshot.version(); - let new_version = version.into(); - if let Some(new_version) = new_version { - if new_version == old_version { - // Re-requesting the same version - return Ok(existing_snapshot.clone()); - } - if new_version < old_version { - // Hint is too new: error since this is effectively an incorrect optimization - return Err(Error::Generic(format!( - "Requested snapshot version {new_version} is older than snapshot hint version {old_version}" - ))); - } - } - - let log_root = old_log_segment.log_root.clone(); - let storage = engine.storage_handler(); - - // Start listing just after the previous segment's checkpoint, if any - let listing_start = old_log_segment.checkpoint_version.unwrap_or(0) + 1; - - // Check for new commits (and CRC) - let new_listed_files = ListedLogFiles::list( - storage.as_ref(), - &log_root, - Some(listing_start), - new_version, - )?; - - // NB: we need to check both checkpoints and commits since we filter commits at and below - // the checkpoint version. Example: if we have a checkpoint + commit at version 1, the log - // listing above will only return the checkpoint and not the commit. - if new_listed_files.ascending_commit_files.is_empty() - && new_listed_files.checkpoint_parts.is_empty() - { - match new_version { - Some(new_version) if new_version != old_version => { - // No new commits, but we are looking for a new version - return Err(Error::Generic(format!( - "Requested snapshot version {new_version} is newer than the latest version {old_version}" - ))); - } - _ => { - // No new commits, just return the same snapshot - return Ok(existing_snapshot.clone()); - } - } - } - - // create a log segment just from existing_checkpoint.version -> new_version - // OR could be from 1 -> new_version - let mut new_log_segment = - LogSegment::try_new(new_listed_files, log_root.clone(), new_version)?; - - let new_end_version = new_log_segment.end_version; - if new_end_version < old_version { - // we should never see a new log segment with a version < the existing snapshot - // version, that would mean a commit was incorrectly deleted from the log - return Err(Error::Generic(format!( - "Unexpected state: The newest version in the log {new_end_version} is older than the old version {old_version}"))); - } - if new_end_version == old_version { - // No new commits, just return the same snapshot - return Ok(existing_snapshot.clone()); - } - - if new_log_segment.checkpoint_version.is_some() { - // we have a checkpoint in the new LogSegment, just construct a new snapshot from that - let snapshot = Self::try_new_from_log_segment( - existing_snapshot.table_root().clone(), - new_log_segment, - engine, - ); - return Ok(Arc::new(snapshot?)); - } - - // after this point, we incrementally update the snapshot with the new log segment. - // first we remove the 'overlap' in commits, example: - // - // old logsegment checkpoint1-commit1-commit2-commit3 - // 1. new logsegment commit1-commit2-commit3 - // 2. new logsegment commit1-commit2-commit3-commit4 - // 3. new logsegment checkpoint2+commit2-commit3-commit4 - // - // retain does - // 1. new logsegment [empty] -> caught above - // 2. new logsegment [commit4] - // 3. new logsegment [checkpoint2-commit3] -> caught above - new_log_segment - .ascending_commit_files - .retain(|log_path| old_version < log_path.version); - - // we have new commits and no new checkpoint: we replay new commits for P+M and then - // create a new snapshot by combining LogSegments and building a new TableConfiguration - let (new_metadata, new_protocol) = new_log_segment.protocol_and_metadata(engine)?; - let table_configuration = TableConfiguration::try_new_from( - existing_snapshot.table_configuration(), - new_metadata, - new_protocol, - new_log_segment.end_version, - )?; - - // NB: we must add the new log segment to the existing snapshot's log segment - let mut ascending_commit_files = old_log_segment.ascending_commit_files.clone(); - ascending_commit_files.extend(new_log_segment.ascending_commit_files); - let mut ascending_compaction_files = old_log_segment.ascending_compaction_files.clone(); - ascending_compaction_files.extend(new_log_segment.ascending_compaction_files); - - // Note that we _could_ go backwards if someone deletes a CRC: - // old listing: 1, 2, 2.crc, 3, 3.crc (latest is 3.crc) - // new listing: 1, 2, 2.crc, 3 (latest is 2.crc) - // and we would still pick the new listing's (older) CRC file since it ostensibly still - // exists - let latest_crc_file = new_log_segment - .latest_crc_file - .or_else(|| old_log_segment.latest_crc_file.clone()); - - // we can pass in just the old checkpoint parts since by the time we reach this line, we - // know there are no checkpoints in the new log segment. - let combined_log_segment = LogSegment::try_new( - ListedLogFiles { - ascending_commit_files, - ascending_compaction_files, - checkpoint_parts: old_log_segment.checkpoint_parts.clone(), - latest_crc_file, - }, - log_root, - new_version, - )?; - Ok(Arc::new(Snapshot::new( - combined_log_segment, - table_configuration, - ))) - } - /// Create a new [`Snapshot`] instance. pub(crate) fn try_new_from_log_segment( location: Url, @@ -309,7 +147,7 @@ impl Snapshot { } /// Consume this `Snapshot` to create a [`ScanBuilder`] - pub fn into_scan_builder(self) -> ScanBuilder { + pub fn into_scan_builder(self: Arc) -> ScanBuilder { ScanBuilder::new(self) } @@ -391,7 +229,11 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).at_version(1).build(&engine).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(url) + .at_version(1) + .build(&engine) + .unwrap(); let expected = Protocol::try_new(3, 7, Some(["deletionVectors"]), Some(["deletionVectors"])).unwrap(); @@ -409,7 +251,10 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(url) + .build(&engine) + .unwrap(); let expected = Protocol::try_new(3, 7, Some(["deletionVectors"]), Some(["deletionVectors"])).unwrap(); @@ -448,21 +293,27 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let old_snapshot = Arc::new( - Snapshot::builder(url.clone()) - .at_version(1) - .build(&engine) - .unwrap(), - ); + let old_snapshot = Snapshot::builder() + .with_table_root(url.clone()) + .at_version(1) + .build(&engine) + .unwrap(); // 1. new version < existing version: error - let snapshot_res = Snapshot::try_new_from(old_snapshot.clone(), &engine, Some(0)); + let snapshot_res = Snapshot::builder() + .from_snapshot(old_snapshot.clone()) + .at_version(0) + .build(&engine); assert!(matches!( snapshot_res, Err(Error::Generic(msg)) if msg == "Requested snapshot version 0 is older than snapshot hint version 1" )); // 2. new version == existing version - let snapshot = Snapshot::try_new_from(old_snapshot.clone(), &engine, Some(1)).unwrap(); + let snapshot = Snapshot::builder() + .from_snapshot(old_snapshot.clone()) + .at_version(1) + .build(&engine) + .unwrap(); let expected = old_snapshot.clone(); assert_eq!(snapshot, expected); @@ -478,13 +329,16 @@ mod tests { fn test_new_from(store: Arc) -> DeltaResult<()> { let url = Url::parse("memory:///")?; let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); - let base_snapshot = Arc::new( - Snapshot::builder(url.clone()) - .at_version(0) - .build(&engine)?, - ); - let snapshot = Snapshot::try_new_from(base_snapshot.clone(), &engine, Some(1))?; - let expected = Snapshot::builder(url.clone()) + let base_snapshot = Snapshot::builder() + .with_table_root(url.clone()) + .at_version(0) + .build(&engine)?; + let snapshot = Snapshot::builder() + .from_snapshot(base_snapshot.clone()) + .at_version(1) + .build(&engine)?; + let expected = Snapshot::builder() + .with_table_root(url.clone()) .at_version(1) .build(&engine)?; assert_eq!(snapshot, expected.into()); @@ -531,19 +385,24 @@ mod tests { Arc::new(store.fork()), Arc::new(TokioBackgroundExecutor::new()), ); - let base_snapshot = Arc::new( - Snapshot::builder(url.clone()) - .at_version(0) - .build(&engine)?, - ); - let snapshot = Snapshot::try_new_from(base_snapshot.clone(), &engine, None)?; - let expected = Snapshot::builder(url.clone()) + let base_snapshot = Snapshot::builder() + .with_table_root(url.clone()) + .at_version(0) + .build(&engine)?; + let snapshot = Snapshot::builder() + .from_snapshot(base_snapshot.clone()) + .build(&engine)?; + let expected = Snapshot::builder() + .with_table_root(url.clone()) .at_version(0) .build(&engine)?; assert_eq!(snapshot, expected.into()); // version exceeds latest version of the table = err assert!(matches!( - Snapshot::try_new_from(base_snapshot.clone(), &engine, Some(1)), + Snapshot::builder() + .from_snapshot(base_snapshot.clone()) + .at_version(1) + .build(&engine), Err(Error::Generic(msg)) if msg == "Requested snapshot version 1 is newer than the latest version 0" )); @@ -606,13 +465,15 @@ mod tests { // new commits AND request version > end of log let url = Url::parse("memory:///")?; let engine = DefaultEngine::new(store_3c_i, Arc::new(TokioBackgroundExecutor::new())); - let base_snapshot = Arc::new( - Snapshot::builder(url.clone()) - .at_version(0) - .build(&engine)?, - ); + let base_snapshot = Snapshot::builder() + .with_table_root(url.clone()) + .at_version(0) + .build(&engine)?; assert!(matches!( - Snapshot::try_new_from(base_snapshot.clone(), &engine, Some(2)), + Snapshot::builder() + .from_snapshot(base_snapshot.clone()) + .at_version(2) + .build(&engine), Err(Error::Generic(msg)) if msg == "LogSegment end version 1 not the same as the specified end version 2" )); @@ -717,15 +578,18 @@ mod tests { store.put(&path, crc.to_string().into()).await?; // base snapshot is at version 0 - let base_snapshot = Arc::new( - Snapshot::builder(url.clone()) - .at_version(0) - .build(&engine)?, - ); + let base_snapshot = Snapshot::builder() + .with_table_root(url.clone()) + .at_version(0) + .build(&engine)?; // first test: no new crc - let snapshot = Snapshot::try_new_from(base_snapshot.clone(), &engine, Some(1))?; - let expected = Snapshot::builder(url.clone()) + let snapshot = Snapshot::builder() + .from_snapshot(base_snapshot.clone()) + .at_version(1) + .build(&engine)?; + let expected = Snapshot::builder() + .with_table_root(url.clone()) .at_version(1) .build(&engine)?; assert_eq!(snapshot, expected.into()); @@ -751,8 +615,12 @@ mod tests { "protocol": protocol(1, 2), }); store.put(&path, crc.to_string().into()).await?; - let snapshot = Snapshot::try_new_from(base_snapshot.clone(), &engine, Some(1))?; - let expected = Snapshot::builder(url.clone()) + let snapshot = Snapshot::builder() + .from_snapshot(base_snapshot.clone()) + .at_version(1) + .build(&engine)?; + let expected = Snapshot::builder() + .with_table_root(url.clone()) .at_version(1) .build(&engine)?; assert_eq!(snapshot, expected.into()); @@ -865,7 +733,10 @@ mod tests { .unwrap(); let location = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(location).build(&engine).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(location) + .build(&engine) + .unwrap(); assert_eq!(snapshot.log_segment.checkpoint_parts.len(), 1); assert_eq!( @@ -972,7 +843,11 @@ mod tests { .join("\n"); add_commit(store.as_ref(), 1, commit).await.unwrap(); - let snapshot = Arc::new(Snapshot::builder(url.clone()).build(&engine)?); + let snapshot = Arc::new( + Snapshot::builder() + .with_table_root(url.clone()) + .build(&engine)?, + ); assert_eq!(snapshot.get_domain_metadata("domain1", &engine)?, None); assert_eq!( diff --git a/kernel/src/snapshot/builder.rs b/kernel/src/snapshot/builder.rs index 171820a10..4f73df85c 100644 --- a/kernel/src/snapshot/builder.rs +++ b/kernel/src/snapshot/builder.rs @@ -1,7 +1,12 @@ //! Builder for creating [`Snapshot`] instances. +use std::sync::Arc; + +use crate::listed_log_files::ListedLogFiles; use crate::log_segment::LogSegment; -use crate::{DeltaResult, Engine, Snapshot, Version}; +use crate::table_configuration::TableConfiguration; +use crate::utils::require; +use crate::{DeltaResult, Engine, Error, Snapshot, Version}; use url::Url; @@ -11,29 +16,38 @@ use url::Url; /// /// ```no_run /// # use delta_kernel::{Snapshot, Engine}; +/// # use std::sync::Arc; /// # use url::Url; /// # fn example(engine: &dyn Engine) -> delta_kernel::DeltaResult<()> { /// let table_root = Url::parse("file:///path/to/table")?; /// -/// // Build a snapshot +/// // Build a snapshot from scratch /// let snapshot = Snapshot::builder(table_root.clone()) /// .at_version(5) // Optional: specify a time-travel version (default is latest version) /// .build(engine)?; /// +/// // Build incrementally from an existing snapshot +/// let updated_snapshot = Snapshot::builder(table_root.clone()) +/// .from_snapshot(snapshot.clone()) +/// .at_version(10) +/// .build(engine)?; +/// /// # Ok(()) /// # } /// ``` +#[derive(Debug, Default)] pub struct SnapshotBuilder { - table_root: Url, + table_root: Option, version: Option, + existing_snapshot: Option>, } impl SnapshotBuilder { - pub(crate) fn new(table_root: Url) -> Self { - Self { - table_root, - version: None, - } + /// Set the table root URL. This is required when there is no existing snapshot hint (see + /// [from_snapshot]). + pub fn with_table_root(mut self, table_root: Url) -> Self { + self.table_root = Some(table_root); + self } /// Set the target version of the [`Snapshot`]. When omitted, the Snapshot is created at the @@ -43,18 +57,203 @@ impl SnapshotBuilder { self } + /// Create a new [`Snapshot`] instance from an existing [`Snapshot`]. This is useful when you + /// already have a [`Snapshot`] lying around and want to do the minimal work to 'update' the + /// snapshot to a later version. + /// + /// We implement a simple heuristic: + /// 1. if the new version == existing version, just return the existing snapshot + /// 2. if the new version < existing version, error: there is no optimization to do here + /// 3. list from (existing checkpoint version + 1) onward (or just existing snapshot version if + /// no checkpoint) + /// 4. a. if new checkpoint is found: just create a new snapshot from that checkpoint (and + /// commits after it) + /// b. if no new checkpoint is found: do lightweight P+M replay on the latest commits (after + /// ensuring we only retain commits > any checkpoints) + /// + /// NOTE: if a table_root is already set, it must match the table_root of the existing + /// snapshot. + pub fn from_snapshot(mut self, snapshot: Arc) -> Self { + self.existing_snapshot = Some(snapshot); + self + } + /// Create a new [`Snapshot`] instance. /// + /// Returns an `Arc` to support efficient incremental updates. + /// When building from an existing snapshot and the version hasn't changed, + /// this returns the same `Arc` without creating a new snapshot. + /// /// # Parameters /// /// - `engine`: Implementation of [`Engine`] apis. - pub fn build(self, engine: &dyn Engine) -> DeltaResult { - let log_segment = LogSegment::for_snapshot( - engine.storage_handler().as_ref(), - self.table_root.join("_delta_log/")?, - self.version, + pub fn build(self, engine: &dyn Engine) -> DeltaResult> { + // Note: since both table_root AND existing_snapshot are optional but one is actually + // required, we check here that one is set and if both are set they must match. + // TODO: we could improve this + match (&self.table_root, &self.existing_snapshot) { + (Some(table_root), Some(existing)) => require!( + table_root == existing.table_root(), + Error::generic("table_root must match the table_root of existing snapshot") + ), + (None, None) => return Err(Error::generic( + "Either table_root or existing_snapshot must be provided to construct a Snapshot", + )), + _ => (), + } + + // If we don't have an existing snapshot, build from scratch. + let Some(existing_snapshot) = self.existing_snapshot else { + // if we don't have an existing snapshot the log_root is required + let table_root = self.table_root.ok_or_else(|| { + Error::generic( + "table_root is required to build a Snapshot without existing snapshot hint", + ) + })?; + let log_segment = LogSegment::for_snapshot( + engine.storage_handler().as_ref(), + table_root.join("_delta_log/")?, + self.version, + )?; + let snapshot = Snapshot::try_new_from_log_segment(table_root, log_segment, engine)?; + return Ok(Arc::new(snapshot)); + }; + + let old_log_segment = &existing_snapshot.log_segment; + let old_version = existing_snapshot.version(); + let new_version = self.version.into(); + if let Some(new_version) = new_version { + if new_version == old_version { + // Re-requesting the same version + return Ok(existing_snapshot.clone()); + } + if new_version < old_version { + // Hint is too new: error since this is effectively an incorrect optimization + return Err(Error::Generic(format!( + "Requested snapshot version {new_version} is older than snapshot hint version {old_version}" + ))); + } + } + + let log_root = old_log_segment.log_root.clone(); + let storage = engine.storage_handler(); + + // Start listing just after the previous segment's checkpoint, if any + let listing_start = old_log_segment.checkpoint_version.unwrap_or(0) + 1; + + // Check for new commits (and CRC) + let new_listed_files = ListedLogFiles::list( + storage.as_ref(), + &log_root, + Some(listing_start), + new_version, + )?; + + // NB: we need to check both checkpoints and commits since we filter commits at and below + // the checkpoint version. Example: if we have a checkpoint + commit at version 1, the log + // listing above will only return the checkpoint and not the commit. + if new_listed_files.ascending_commit_files.is_empty() + && new_listed_files.checkpoint_parts.is_empty() + { + match new_version { + Some(new_version) if new_version != old_version => { + // No new commits, but we are looking for a new version + return Err(Error::Generic(format!( + "Requested snapshot version {new_version} is newer than the latest version {old_version}" + ))); + } + _ => { + // No new commits, just return the same snapshot + return Ok(existing_snapshot.clone()); + } + } + } + + // create a log segment just from existing_checkpoint.version -> new_version + // OR could be from 1 -> new_version + let mut new_log_segment = + LogSegment::try_new(new_listed_files, log_root.clone(), new_version)?; + + let new_end_version = new_log_segment.end_version; + if new_end_version < old_version { + // we should never see a new log segment with a version < the existing snapshot + // version, that would mean a commit was incorrectly deleted from the log + return Err(Error::Generic(format!( + "Unexpected state: The newest version in the log {new_end_version} is older than the + old version {old_version}"))); + } + if new_end_version == old_version { + // No new commits, just return the same snapshot + return Ok(existing_snapshot.clone()); + } + + if new_log_segment.checkpoint_version.is_some() { + // we have a checkpoint in the new LogSegment, just construct a new snapshot from that + let snapshot = Snapshot::try_new_from_log_segment( + existing_snapshot.table_root().clone(), + new_log_segment, + engine, + ); + return Ok(Arc::new(snapshot?)); + } + + // after this point, we incrementally update the snapshot with the new log segment. + // first we remove the 'overlap' in commits, example: + // + // old logsegment checkpoint1-commit1-commit2-commit3 + // 1. new logsegment commit1-commit2-commit3 + // 2. new logsegment commit1-commit2-commit3-commit4 + // 3. new logsegment checkpoint2+commit2-commit3-commit4 + // + // retain does + // 1. new logsegment [empty] -> caught above + // 2. new logsegment [commit4] + // 3. new logsegment [checkpoint2-commit3] -> caught above + new_log_segment + .ascending_commit_files + .retain(|log_path| old_version < log_path.version); + + // we have new commits and no new checkpoint: we replay new commits for P+M and then + // create a new snapshot by combining LogSegments and building a new TableConfiguration + let (new_metadata, new_protocol) = new_log_segment.protocol_and_metadata(engine)?; + let table_configuration = TableConfiguration::try_new_from( + existing_snapshot.table_configuration(), + new_metadata, + new_protocol, + new_log_segment.end_version, )?; - Snapshot::try_new_from_log_segment(self.table_root, log_segment, engine) + + // NB: we must add the new log segment to the existing snapshot's log segment + let mut ascending_commit_files = old_log_segment.ascending_commit_files.clone(); + ascending_commit_files.extend(new_log_segment.ascending_commit_files); + let mut ascending_compaction_files = old_log_segment.ascending_compaction_files.clone(); + ascending_compaction_files.extend(new_log_segment.ascending_compaction_files); + + // Note that we _could_ go backwards if someone deletes a CRC: + // old listing: 1, 2, 2.crc, 3, 3.crc (latest is 3.crc) + // new listing: 1, 2, 2.crc, 3 (latest is 2.crc) + // and we would still pick the new listing's (older) CRC file since it ostensibly still + // exists + let latest_crc_file = new_log_segment + .latest_crc_file + .or_else(|| old_log_segment.latest_crc_file.clone()); + + // we can pass in just the old checkpoint parts since by the time we reach this line, we + // know there are no checkpoints in the new log segment. + let combined_log_segment = LogSegment::try_new( + ListedLogFiles { + ascending_commit_files, + ascending_compaction_files, + checkpoint_parts: old_log_segment.checkpoint_parts.clone(), + latest_crc_file, + }, + log_root, + new_version, + )?; + Ok(Arc::new(Snapshot::new( + combined_log_segment, + table_configuration, + ))) } } @@ -157,14 +356,43 @@ mod tests { let engine = engine.as_ref(); create_table(&store, &table_root)?; - let snapshot = SnapshotBuilder::new(table_root.clone()).build(engine)?; + let snapshot = SnapshotBuilder::default().with_table_root(table_root.clone()).build(engine)?; assert_eq!(snapshot.version(), 1); - let snapshot = SnapshotBuilder::new(table_root.clone()) + let snapshot = SnapshotBuilder::default().with_table_root(table_root.clone()) .at_version(0) .build(engine)?; assert_eq!(snapshot.version(), 0); Ok(()) } + + #[test] + fn test_snapshot_builder_incremental() -> Result<(), Box> { + let (engine, store, table_root) = setup_test(); + let engine = engine.as_ref(); + create_table(&store, &table_root)?; + + // Create initial snapshot at version 0 + let snapshot_v0 = SnapshotBuilder::default().with_table_root(table_root.clone()) + .at_version(0) + .build(engine)?; + assert_eq!(snapshot_v0.version(), 0); + + // Create incremental snapshot to version 1 + let snapshot_v1 = SnapshotBuilder::default().with_table_root(table_root.clone()) + .from_snapshot(snapshot_v0.clone()) + .at_version(1) + .build(engine)?; + assert_eq!(snapshot_v1.version(), 1); + + // Try to get the same version - should return the same Arc + let snapshot_v1_again = SnapshotBuilder::default().with_table_root(table_root.clone()) + .from_snapshot(snapshot_v1.clone()) + .at_version(1) + .build(engine)?; + assert!(Arc::ptr_eq(&snapshot_v1, &snapshot_v1_again)); + + Ok(()) + } } diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index ed7e1d255..57cc8fd1c 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -150,12 +150,19 @@ impl TableChanges { // Both snapshots ensure that reading is supported at the start and end version using // `ensure_read_supported`. Note that we must still verify that reading is // supported for every protocol action in the CDF range. - let start_snapshot = Arc::new( - Snapshot::builder(table_root.as_url().clone()) - .at_version(start_version) - .build(engine)?, - ); - let end_snapshot = Snapshot::try_new_from(start_snapshot.clone(), engine, end_version)?; + let start_snapshot = Snapshot::builder() + .with_table_root(table_root.as_url().clone()) + .at_version(start_version) + .build(engine)?; + let mut builder = Snapshot::builder() + .with_table_root(start_snapshot.table_root().clone()) + .from_snapshot(start_snapshot.clone()); + + if let Some(v) = end_version { + builder = builder.at_version(v); + } + + let end_snapshot = builder.build(engine)?; // Verify CDF is enabled at the beginning and end of the interval using // [`check_cdf_table_properties`] to fail early. This also ensures that column mapping is From 56a4c21a5d3d1c99eda5d187348e0296104c36fb Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 9 Sep 2025 18:20:27 -0700 Subject: [PATCH 3/4] clean --- acceptance/src/data.rs | 4 ++- acceptance/src/meta.rs | 7 ++-- ffi/src/transaction/mod.rs | 4 ++- kernel/benches/metadata_bench.rs | 6 ++-- kernel/examples/write-table/src/main.rs | 13 ++++++-- kernel/src/actions/set_transaction.rs | 15 +++++++-- kernel/src/checkpoint/tests.rs | 22 ++++++++++--- kernel/src/log_segment/tests.rs | 5 ++- kernel/src/scan/mod.rs | 41 +++++++++++++++++------ kernel/src/snapshot/builder.rs | 20 ++++++++---- kernel/tests/dv.rs | 8 +++-- kernel/tests/golden_tables.rs | 10 ++++-- kernel/tests/read.rs | 28 ++++++++++++---- kernel/tests/v2_checkpoints.rs | 5 ++- kernel/tests/write.rs | 43 ++++++++++++++++++------- test-utils/src/lib.rs | 4 ++- 16 files changed, 177 insertions(+), 58 deletions(-) diff --git a/acceptance/src/data.rs b/acceptance/src/data.rs index a947aa666..c6f63b770 100644 --- a/acceptance/src/data.rs +++ b/acceptance/src/data.rs @@ -115,7 +115,9 @@ pub async fn assert_scan_metadata( test_case: &TestCaseInfo, ) -> TestResult<()> { let table_root = test_case.table_root()?; - let snapshot = Snapshot::builder().with_table_root(table_root).build(engine.as_ref())?; + let snapshot = Snapshot::builder() + .with_table_root(table_root) + .build(engine.as_ref())?; let scan = snapshot.into_scan_builder().build()?; let mut schema = None; let batches: Vec = scan diff --git a/acceptance/src/meta.rs b/acceptance/src/meta.rs index 72f33db8a..5f5b1ce30 100644 --- a/acceptance/src/meta.rs +++ b/acceptance/src/meta.rs @@ -103,11 +103,14 @@ impl TestCaseInfo { let engine = engine.as_ref(); let (latest, versions) = self.versions().await?; - let snapshot = Snapshot::builder().with_table_root(self.table_root()?).build(engine)?; + let snapshot = Snapshot::builder() + .with_table_root(self.table_root()?) + .build(engine)?; self.assert_snapshot_meta(&latest, &snapshot)?; for table_version in versions { - let snapshot = Snapshot::builder().with_table_root(self.table_root()?) + let snapshot = Snapshot::builder() + .with_table_root(self.table_root()?) .at_version(table_version.version) .build(engine)?; self.assert_snapshot_meta(&table_version, &snapshot)?; diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 5faa16020..8adddbc0b 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -37,7 +37,9 @@ fn transaction_impl( url: DeltaResult, extern_engine: &dyn ExternEngine, ) -> DeltaResult> { - let snapshot = Snapshot::builder().with_table_root(url?).build(extern_engine.engine().as_ref())?; + let snapshot = Snapshot::builder() + .with_table_root(url?) + .build(extern_engine.engine().as_ref())?; let transaction = snapshot.transaction(); Ok(Box::new(transaction?).into()) } diff --git a/kernel/benches/metadata_bench.rs b/kernel/benches/metadata_bench.rs index 278776894..8b2abd8e0 100644 --- a/kernel/benches/metadata_bench.rs +++ b/kernel/benches/metadata_bench.rs @@ -53,7 +53,8 @@ fn create_snapshot_benchmark(c: &mut Criterion) { c.bench_function("create_snapshot", |b| { b.iter(|| { - Snapshot::builder().with_table_root(url.clone()) + Snapshot::builder() + .with_table_root(url.clone()) .build(engine.as_ref()) .expect("Failed to create snapshot") }) @@ -64,7 +65,8 @@ fn scan_metadata_benchmark(c: &mut Criterion) { let (_tempdir, url, engine) = setup(); let snapshot = Arc::new( - Snapshot::builder().with_table_root(url.clone()) + Snapshot::builder() + .with_table_root(url.clone()) .build(engine.as_ref()) .expect("Failed to create snapshot"), ); diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index fb73c7459..b2305c96b 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -125,7 +125,10 @@ async fn create_or_get_base_snapshot( schema_str: &str, ) -> DeltaResult> { // Check if table already exists - match Snapshot::builder().with_table_root(url.clone()).build(engine) { + match Snapshot::builder() + .with_table_root(url.clone()) + .build(engine) + { Ok(snapshot) => { println!("✓ Found existing table at version {}", snapshot.version()); Ok(snapshot) @@ -135,7 +138,9 @@ async fn create_or_get_base_snapshot( println!("Creating new Delta table..."); let schema = parse_schema(schema_str)?; create_table(url, &schema).await?; - Snapshot::builder().with_table_root(url.clone()).build(engine) + Snapshot::builder() + .with_table_root(url.clone()) + .build(engine) } } } @@ -294,7 +299,9 @@ async fn read_and_display_data( table_url: &Url, engine: DefaultEngine, ) -> DeltaResult<()> { - let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(table_url.clone()) + .build(&engine)?; let scan = snapshot.into_scan_builder().build()?; let batches: Vec = scan diff --git a/kernel/src/actions/set_transaction.rs b/kernel/src/actions/set_transaction.rs index 9d18ba9bf..f8f12e397 100644 --- a/kernel/src/actions/set_transaction.rs +++ b/kernel/src/actions/set_transaction.rs @@ -113,7 +113,10 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder().with_table_root(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(url) + .build(&engine) + .unwrap(); let log_segment = snapshot.log_segment(); ( @@ -163,7 +166,10 @@ mod tests { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder().with_table_root(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(url) + .build(&engine) + .unwrap(); let log_segment = snapshot.log_segment(); // The checkpoint has five parts, each containing one action. There are two app ids. @@ -180,7 +186,10 @@ mod tests { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder().with_table_root(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(url) + .build(&engine) + .unwrap(); let log_segment = snapshot.log_segment(); // Test with no retention (should get all transactions) diff --git a/kernel/src/checkpoint/tests.rs b/kernel/src/checkpoint/tests.rs index 99fa94e20..ec781a21b 100644 --- a/kernel/src/checkpoint/tests.rs +++ b/kernel/src/checkpoint/tests.rs @@ -72,7 +72,9 @@ fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> { )?; let table_root = Url::parse("memory:///")?; - let snapshot = Snapshot::builder().with_table_root(table_root).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(table_root) + .build(&engine)?; let writer = snapshot.clone().checkpoint()?; let checkpoint_batch = writer.create_checkpoint_metadata_batch(&engine)?; @@ -295,7 +297,9 @@ fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> { )?; let table_root = Url::parse("memory:///")?; - let snapshot = Snapshot::builder().with_table_root(table_root).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(table_root) + .build(&engine)?; let writer = snapshot.clone().checkpoint()?; // Verify the checkpoint file path is the latest version by default. @@ -363,7 +367,10 @@ fn test_v1_checkpoint_specific_version() -> DeltaResult<()> { let table_root = Url::parse("memory:///")?; // Specify version 0 for checkpoint - let snapshot = Snapshot::builder().with_table_root(table_root).at_version(0).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(table_root) + .at_version(0) + .build(&engine)?; let writer = snapshot.clone().checkpoint()?; // Verify the checkpoint file path is the specified version. @@ -411,7 +418,10 @@ fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaR )?; let table_root = Url::parse("memory:///")?; - let snapshot = Snapshot::builder().with_table_root(table_root).at_version(0).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(table_root) + .at_version(0) + .build(&engine)?; let writer = snapshot.clone().checkpoint()?; let data_iter = writer.checkpoint_data(&engine)?; @@ -465,7 +475,9 @@ fn test_v2_checkpoint_supported_table() -> DeltaResult<()> { )?; let table_root = Url::parse("memory:///")?; - let snapshot = Snapshot::builder().with_table_root(table_root).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(table_root) + .build(&engine)?; let writer = snapshot.clone().checkpoint()?; // Verify the checkpoint file path is the latest version by default. diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 639b31c43..b4c350b1b 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -50,7 +50,10 @@ fn test_replay_for_metadata() { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder().with_table_root(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(url) + .build(&engine) + .unwrap(); let data: Vec<_> = snapshot .log_segment() .replay_for_metadata(&engine) diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 150752e9e..a0dac5bca 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -1256,7 +1256,10 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder().with_table_root(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(url) + .build(&engine) + .unwrap(); let scan = snapshot.into_scan_builder().build().unwrap(); let files = get_files_for_scan(scan, &engine).unwrap(); assert_eq!(files.len(), 1); @@ -1273,7 +1276,10 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Snapshot::builder().with_table_root(url).build(engine.as_ref()).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(url) + .build(engine.as_ref()) + .unwrap(); let scan = snapshot.into_scan_builder().build().unwrap(); let files: Vec = scan.execute(engine).unwrap().try_collect().unwrap(); @@ -1289,7 +1295,10 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Snapshot::builder().with_table_root(url).build(engine.as_ref()).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(url) + .build(engine.as_ref()) + .unwrap(); let version = snapshot.version(); let scan = snapshot.into_scan_builder().build().unwrap(); let files: Vec<_> = scan @@ -1323,7 +1332,8 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Snapshot::builder().with_table_root(url.clone()) + let snapshot = Snapshot::builder() + .with_table_root(url.clone()) .at_version(0) .build(engine.as_ref()) .unwrap(); @@ -1347,7 +1357,8 @@ mod tests { .into_iter() .map(|b| Box::new(ArrowEngineData::from(b)) as Box) .collect(); - let snapshot = Snapshot::builder().with_table_root(url) + let snapshot = Snapshot::builder() + .with_table_root(url) .at_version(1) .build(engine.as_ref()) .unwrap(); @@ -1419,7 +1430,10 @@ mod tests { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder().with_table_root(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(url) + .build(&engine) + .unwrap(); let scan = snapshot.into_scan_builder().build().unwrap(); let data: Vec<_> = scan .replay_for_scan_metadata(&engine) @@ -1439,7 +1453,10 @@ mod tests { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Snapshot::builder().with_table_root(url).build(engine.as_ref()).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(url) + .build(engine.as_ref()) + .unwrap(); // No predicate pushdown attempted, so the one data file should be returned. // @@ -1482,7 +1499,10 @@ mod tests { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Snapshot::builder().with_table_root(url).build(engine.as_ref()).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(url) + .build(engine.as_ref()) + .unwrap(); // Predicate over a logically valid but physically missing column. No data files should be // returned because the column is inferred to be all-null. @@ -1517,7 +1537,10 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder().with_table_root(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(url) + .build(&engine) + .unwrap(); let scan = snapshot.into_scan_builder().build()?; let files = get_files_for_scan(scan, &engine)?; // test case: diff --git a/kernel/src/snapshot/builder.rs b/kernel/src/snapshot/builder.rs index 4f73df85c..6d7203b75 100644 --- a/kernel/src/snapshot/builder.rs +++ b/kernel/src/snapshot/builder.rs @@ -44,7 +44,7 @@ pub struct SnapshotBuilder { impl SnapshotBuilder { /// Set the table root URL. This is required when there is no existing snapshot hint (see - /// [from_snapshot]). + /// [Self::from_snapshot]). pub fn with_table_root(mut self, table_root: Url) -> Self { self.table_root = Some(table_root); self @@ -121,7 +121,7 @@ impl SnapshotBuilder { let old_log_segment = &existing_snapshot.log_segment; let old_version = existing_snapshot.version(); - let new_version = self.version.into(); + let new_version = self.version; if let Some(new_version) = new_version { if new_version == old_version { // Re-requesting the same version @@ -356,10 +356,13 @@ mod tests { let engine = engine.as_ref(); create_table(&store, &table_root)?; - let snapshot = SnapshotBuilder::default().with_table_root(table_root.clone()).build(engine)?; + let snapshot = SnapshotBuilder::default() + .with_table_root(table_root.clone()) + .build(engine)?; assert_eq!(snapshot.version(), 1); - let snapshot = SnapshotBuilder::default().with_table_root(table_root.clone()) + let snapshot = SnapshotBuilder::default() + .with_table_root(table_root.clone()) .at_version(0) .build(engine)?; assert_eq!(snapshot.version(), 0); @@ -374,20 +377,23 @@ mod tests { create_table(&store, &table_root)?; // Create initial snapshot at version 0 - let snapshot_v0 = SnapshotBuilder::default().with_table_root(table_root.clone()) + let snapshot_v0 = SnapshotBuilder::default() + .with_table_root(table_root.clone()) .at_version(0) .build(engine)?; assert_eq!(snapshot_v0.version(), 0); // Create incremental snapshot to version 1 - let snapshot_v1 = SnapshotBuilder::default().with_table_root(table_root.clone()) + let snapshot_v1 = SnapshotBuilder::default() + .with_table_root(table_root.clone()) .from_snapshot(snapshot_v0.clone()) .at_version(1) .build(engine)?; assert_eq!(snapshot_v1.version(), 1); // Try to get the same version - should return the same Arc - let snapshot_v1_again = SnapshotBuilder::default().with_table_root(table_root.clone()) + let snapshot_v1_again = SnapshotBuilder::default() + .with_table_root(table_root.clone()) .from_snapshot(snapshot_v1.clone()) .at_version(1) .build(engine)?; diff --git a/kernel/tests/dv.rs b/kernel/tests/dv.rs index 7c70717a1..ec7ea88dd 100644 --- a/kernel/tests/dv.rs +++ b/kernel/tests/dv.rs @@ -32,7 +32,9 @@ fn dv_table() -> Result<(), Box> { let url = url::Url::from_directory_path(path).unwrap(); let engine = DefaultEngine::new_local(); - let snapshot = Snapshot::builder().with_table_root(url).build(engine.as_ref())?; + let snapshot = Snapshot::builder() + .with_table_root(url) + .build(engine.as_ref())?; let scan = snapshot.into_scan_builder().build()?; let stream = scan.execute(engine)?; @@ -47,7 +49,9 @@ fn non_dv_table() -> Result<(), Box> { let url = url::Url::from_directory_path(path).unwrap(); let engine = DefaultEngine::new_local(); - let snapshot = Snapshot::builder().with_table_root(url).build(engine.as_ref())?; + let snapshot = Snapshot::builder() + .with_table_root(url) + .build(engine.as_ref())?; let scan = snapshot.into_scan_builder().build()?; let stream = scan.execute(engine)?; diff --git a/kernel/tests/golden_tables.rs b/kernel/tests/golden_tables.rs index 99f83f0a2..f17f64d07 100644 --- a/kernel/tests/golden_tables.rs +++ b/kernel/tests/golden_tables.rs @@ -271,7 +271,10 @@ async fn canonicalized_paths_test( _expected: Option, ) -> Result<(), Box> { // assert latest version is 1 and there are no files in the snapshot (add is removed) - let snapshot = Snapshot::builder().with_table_root(table_root).build(&engine).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(table_root) + .build(&engine) + .unwrap(); assert_eq!(snapshot.version(), 1); let scan = snapshot .into_scan_builder() @@ -287,7 +290,10 @@ async fn checkpoint_test( table_root: Url, _expected: Option, ) -> Result<(), Box> { - let snapshot = Snapshot::builder().with_table_root(table_root).build(&engine).unwrap(); + let snapshot = Snapshot::builder() + .with_table_root(table_root) + .build(&engine) + .unwrap(); let version = snapshot.version(); let scan = snapshot .into_scan_builder() diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 961bf0ce3..f15aaf6d8 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -66,7 +66,9 @@ async fn single_commit_two_add_files() -> Result<(), Box> let expected_data = vec![batch.clone(), batch]; - let snapshot = Snapshot::builder().with_table_root(location).build(engine.as_ref())?; + let snapshot = Snapshot::builder() + .with_table_root(location) + .build(engine.as_ref())?; let scan = snapshot.into_scan_builder().build()?; let mut files = 0; @@ -118,7 +120,9 @@ async fn two_commits() -> Result<(), Box> { let expected_data = vec![batch.clone(), batch]; - let snapshot = Snapshot::builder().with_table_root(location).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(location) + .build(&engine)?; let scan = snapshot.into_scan_builder().build()?; let mut files = 0; @@ -171,7 +175,9 @@ async fn remove_action() -> Result<(), Box> { let expected_data = vec![batch]; - let snapshot = Snapshot::builder().with_table_root(location).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(location) + .build(&engine)?; let scan = snapshot.into_scan_builder().build()?; let stream = scan.execute(Arc::new(engine))?.zip(expected_data); @@ -242,7 +248,9 @@ async fn stats() -> Result<(), Box> { storage.clone(), Arc::new(TokioBackgroundExecutor::new()), )); - let snapshot = Snapshot::builder().with_table_root(location).build(engine.as_ref())?; + let snapshot = Snapshot::builder() + .with_table_root(location) + .build(engine.as_ref())?; // The first file has id between 1 and 3; the second has id between 5 and 7. For each operator, // we validate the boundary values where we expect the set of matched files to change. @@ -433,7 +441,9 @@ fn read_table_data( Arc::new(TokioBackgroundExecutor::new()), )?); - let snapshot = Snapshot::builder().with_table_root(url.clone()).build(engine.as_ref())?; + let snapshot = Snapshot::builder() + .with_table_root(url.clone()) + .build(engine.as_ref())?; let read_schema = select_cols.map(|select_cols| { let table_schema = snapshot.schema(); @@ -1057,7 +1067,9 @@ async fn predicate_on_non_nullable_partition_column() -> Result<(), Box Result<(), Box) -> DeltaResult Result<(), Box> { setup_test_tables(schema, &[], None, "test_table").await? { // create a transaction - let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(table_url.clone()) + .build(&engine)?; let txn = snapshot.transaction()?.with_engine_info("default engine"); // commit! @@ -143,7 +145,9 @@ async fn write_data_and_check_result_and_stats( engine: Arc>, expected_since_commit: u64, ) -> Result<(), Box> { - let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(engine.as_ref())?; + let snapshot = Snapshot::builder() + .with_table_root(table_url.clone()) + .build(engine.as_ref())?; let mut txn = snapshot.transaction()?; // create two new arrow record batches to append @@ -213,7 +217,9 @@ async fn test_commit_info_action() -> Result<(), Box> { for (table_url, engine, store, table_name) in setup_test_tables(schema.clone(), &[], None, "test_table").await? { - let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(table_url.clone()) + .build(&engine)?; let txn = snapshot.transaction()?.with_engine_info("default engine"); txn.commit(&engine)?; @@ -397,7 +403,9 @@ async fn test_append_partitioned() -> Result<(), Box> { for (table_url, engine, store, table_name) in setup_test_tables(table_schema.clone(), &[partition_col], None, "test_table").await? { - let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(table_url.clone()) + .build(&engine)?; let mut txn = snapshot.transaction()?.with_engine_info("default engine"); // create two new arrow record batches to append @@ -540,7 +548,9 @@ async fn test_append_invalid_schema() -> Result<(), Box> for (table_url, engine, _store, _table_name) in setup_test_tables(table_schema, &[], None, "test_table").await? { - let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(table_url.clone()) + .build(&engine)?; let txn = snapshot.transaction()?.with_engine_info("default engine"); // create two new arrow record batches to append @@ -598,7 +608,9 @@ async fn test_write_txn_actions() -> Result<(), Box> { setup_test_tables(schema, &[], None, "test_table").await? { // can't have duplicate app_id in same transaction - let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(table_url.clone()) + .build(&engine)?; assert!(matches!( snapshot .transaction()? @@ -608,7 +620,9 @@ async fn test_write_txn_actions() -> Result<(), Box> { Err(KernelError::Generic(msg)) if msg == "app_id app_id1 already exists in transaction" )); - let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(table_url.clone()) + .build(&engine)?; let txn = snapshot .transaction()? .with_engine_info("default engine") @@ -618,7 +632,8 @@ async fn test_write_txn_actions() -> Result<(), Box> { // commit! txn.commit(&engine)?; - let snapshot = Snapshot::builder().with_table_root(table_url.clone()) + let snapshot = Snapshot::builder() + .with_table_root(table_url.clone()) .at_version(1) .build(&engine)?; assert_eq!( @@ -740,7 +755,9 @@ async fn test_append_timestamp_ntz() -> Result<(), Box> { ) .await?; - let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(table_url.clone()) + .build(&engine)?; let mut txn = snapshot.transaction()?.with_engine_info("default engine"); // Create Arrow data with TIMESTAMP_NTZ values including edge cases @@ -869,7 +886,9 @@ async fn test_append_variant() -> Result<(), Box> { ) .await?; - let snapshot = Snapshot::builder().with_table_root(table_url.clone()).build(&engine)?; + let snapshot = Snapshot::builder() + .with_table_root(table_url.clone()) + .build(&engine)?; let mut txn = snapshot.transaction()?; // First value corresponds to the variant value "1". Third value corresponds to the variant @@ -1079,7 +1098,9 @@ async fn test_shredded_variant_read_rejection() -> Result<(), Box, ) -> Result<(), Box> { - let snapshot = Snapshot::builder().with_table_root(url.clone()).build(engine.as_ref())?; + let snapshot = Snapshot::builder() + .with_table_root(url.clone()) + .build(engine.as_ref())?; let scan = snapshot.into_scan_builder().build()?; let batches = read_scan(&scan, engine)?; let formatted = pretty_format_batches(&batches).unwrap().to_string(); From 018e45e8435924b5b1b7fd168b3dbbe8ecc44726 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 10 Sep 2025 10:49:38 -0700 Subject: [PATCH 4/4] fix --- kernel/benches/metadata_bench.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/kernel/benches/metadata_bench.rs b/kernel/benches/metadata_bench.rs index 8b2abd8e0..2f0b9483c 100644 --- a/kernel/benches/metadata_bench.rs +++ b/kernel/benches/metadata_bench.rs @@ -64,12 +64,10 @@ fn create_snapshot_benchmark(c: &mut Criterion) { fn scan_metadata_benchmark(c: &mut Criterion) { let (_tempdir, url, engine) = setup(); - let snapshot = Arc::new( - Snapshot::builder() - .with_table_root(url.clone()) - .build(engine.as_ref()) - .expect("Failed to create snapshot"), - ); + let snapshot = Snapshot::builder() + .with_table_root(url.clone()) + .build(engine.as_ref()) + .expect("Failed to create snapshot"); let mut group = c.benchmark_group("scan_metadata"); group.sample_size(SCAN_METADATA_BENCH_SAMPLE_SIZE);