diff --git a/acceptance/src/data.rs b/acceptance/src/data.rs index 00a9b6a1b..95e489228 100644 --- a/acceptance/src/data.rs +++ b/acceptance/src/data.rs @@ -115,8 +115,8 @@ pub async fn assert_scan_metadata( test_case: &TestCaseInfo, ) -> TestResult<()> { let table_root = test_case.table_root()?; - let snapshot = Snapshot::builder(table_root).build(engine.as_ref())?; - let scan = snapshot.into_scan_builder().build()?; + let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?; + let scan = snapshot.scan_builder().build()?; let mut schema = None; let batches: Vec = scan .execute(engine)? diff --git a/acceptance/src/meta.rs b/acceptance/src/meta.rs index 7884c8f95..93144d76e 100644 --- a/acceptance/src/meta.rs +++ b/acceptance/src/meta.rs @@ -103,11 +103,11 @@ impl TestCaseInfo { let engine = engine.as_ref(); let (latest, versions) = self.versions().await?; - let snapshot = Snapshot::builder(self.table_root()?).build(engine)?; + let snapshot = Snapshot::builder_for(self.table_root()?).build(engine)?; self.assert_snapshot_meta(&latest, &snapshot)?; for table_version in versions { - let snapshot = Snapshot::builder(self.table_root()?) + let snapshot = Snapshot::builder_for(self.table_root()?) .at_version(table_version.version) .build(engine)?; self.assert_snapshot_meta(&table_version, &snapshot)?; diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index 868c12f18..a70b531f1 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -601,7 +601,7 @@ fn snapshot_impl( extern_engine: &dyn ExternEngine, version: Option, ) -> DeltaResult> { - let builder = Snapshot::builder(url?); + let builder = Snapshot::builder_for(url?); let builder = if let Some(v) = version { // TODO: should we include a `with_version_opt` method for the builder? builder.at_version(v) @@ -609,7 +609,7 @@ fn snapshot_impl( builder }; let snapshot = builder.build(extern_engine.engine().as_ref())?; - Ok(Arc::new(snapshot).into()) + Ok(snapshot.into()) } /// # Safety diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index fe8911111..2d445328f 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -9,7 +9,6 @@ use crate::{DeltaResult, ExternEngine, Snapshot, Url}; use crate::{ExclusiveEngineData, SharedExternEngine}; use delta_kernel::transaction::{CommitResult, Transaction}; use delta_kernel_ffi_macros::handle_descriptor; -use std::sync::Arc; /// A handle representing an exclusive transaction on a Delta table. (Similar to a Box<_>) /// @@ -38,7 +37,7 @@ fn transaction_impl( url: DeltaResult, extern_engine: &dyn ExternEngine, ) -> DeltaResult> { - let snapshot = Arc::new(Snapshot::builder(url?).build(extern_engine.engine().as_ref())?); + let snapshot = Snapshot::builder_for(url?).build(extern_engine.engine().as_ref())?; let transaction = snapshot.transaction(); Ok(Box::new(transaction?).into()) } diff --git a/kernel/benches/metadata_bench.rs b/kernel/benches/metadata_bench.rs index d4b067578..86168baf1 100644 --- a/kernel/benches/metadata_bench.rs +++ b/kernel/benches/metadata_bench.rs @@ -53,7 +53,7 @@ fn create_snapshot_benchmark(c: &mut Criterion) { c.bench_function("create_snapshot", |b| { b.iter(|| { - Snapshot::builder(url.clone()) + Snapshot::builder_for(url.clone()) .build(engine.as_ref()) .expect("Failed to create snapshot") }) @@ -63,11 +63,9 @@ fn create_snapshot_benchmark(c: &mut Criterion) { fn scan_metadata_benchmark(c: &mut Criterion) { let (_tempdir, url, engine) = setup(); - let snapshot = Arc::new( - Snapshot::builder(url.clone()) - .build(engine.as_ref()) - .expect("Failed to create snapshot"), - ); + let snapshot = Snapshot::builder_for(url.clone()) + .build(engine.as_ref()) + .expect("Failed to create snapshot"); let mut group = c.benchmark_group("scan_metadata"); group.sample_size(SCAN_METADATA_BENCH_SAMPLE_SIZE); diff --git a/kernel/examples/common/src/lib.rs b/kernel/examples/common/src/lib.rs index 437bb849c..998a155d0 100644 --- a/kernel/examples/common/src/lib.rs +++ b/kernel/examples/common/src/lib.rs @@ -62,7 +62,7 @@ pub fn get_engine( /// Construct a scan at the latest snapshot. This is over the specified table and using the passed /// engine. Parameters of the scan are controlled by the specified `ScanArgs` -pub fn get_scan(snapshot: Snapshot, args: &ScanArgs) -> DeltaResult> { +pub fn get_scan(snapshot: Arc, args: &ScanArgs) -> DeltaResult> { if args.schema_only { println!("{:#?}", snapshot.schema()); return Ok(None); @@ -86,7 +86,7 @@ pub fn get_scan(snapshot: Snapshot, args: &ScanArgs) -> DeltaResult .transpose()?; Ok(Some( snapshot - .into_scan_builder() + .scan_builder() .with_schema_opt(read_schema_opt) .build()?, )) diff --git a/kernel/examples/inspect-table/src/main.rs b/kernel/examples/inspect-table/src/main.rs index a7b225dd0..67a651ae1 100644 --- a/kernel/examples/inspect-table/src/main.rs +++ b/kernel/examples/inspect-table/src/main.rs @@ -182,7 +182,7 @@ fn try_main() -> DeltaResult<()> { let url = delta_kernel::try_parse_uri(&cli.location_args.path)?; let engine = common::get_engine(&url, &cli.location_args)?; - let snapshot = Snapshot::builder(url).build(&engine)?; + let snapshot = Snapshot::builder_for(url).build(&engine)?; match cli.command { Commands::TableVersion => { diff --git a/kernel/examples/read-table-multi-threaded/src/main.rs b/kernel/examples/read-table-multi-threaded/src/main.rs index b5f0e6ae9..e5afea11c 100644 --- a/kernel/examples/read-table-multi-threaded/src/main.rs +++ b/kernel/examples/read-table-multi-threaded/src/main.rs @@ -99,7 +99,7 @@ fn try_main() -> DeltaResult<()> { let url = delta_kernel::try_parse_uri(&cli.location_args.path)?; println!("Reading {url}"); let engine = common::get_engine(&url, &cli.location_args)?; - let snapshot = Snapshot::builder(url).build(&engine)?; + let snapshot = Snapshot::builder_for(url).build(&engine)?; let Some(scan) = common::get_scan(snapshot, &cli.scan_args)? else { return Ok(()); }; diff --git a/kernel/examples/read-table-single-threaded/src/main.rs b/kernel/examples/read-table-single-threaded/src/main.rs index dca65d9a5..b9ba43eeb 100644 --- a/kernel/examples/read-table-single-threaded/src/main.rs +++ b/kernel/examples/read-table-single-threaded/src/main.rs @@ -43,7 +43,7 @@ fn try_main() -> DeltaResult<()> { let url = delta_kernel::try_parse_uri(&cli.location_args.path)?; println!("Reading {url}"); let engine = common::get_engine(&url, &cli.location_args)?; - let snapshot = Snapshot::builder(url).build(&engine)?; + let snapshot = Snapshot::builder_for(url).build(&engine)?; let Some(scan) = common::get_scan(snapshot, &cli.scan_args)? else { return Ok(()); }; diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index d0a4a924d..825f652cc 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -79,7 +79,7 @@ async fn try_main() -> DeltaResult<()> { )?; // Create or get the table - let snapshot = Arc::new(create_or_get_base_snapshot(&url, &engine, &cli.schema).await?); + let snapshot = create_or_get_base_snapshot(&url, &engine, &cli.schema).await?; // Create sample data based on the schema let sample_data = create_sample_data(&snapshot.schema(), cli.num_rows)?; @@ -123,9 +123,9 @@ async fn create_or_get_base_snapshot( url: &Url, engine: &dyn Engine, schema_str: &str, -) -> DeltaResult { +) -> DeltaResult> { // Check if table already exists - match Snapshot::builder(url.clone()).build(engine) { + match Snapshot::builder_for(url.clone()).build(engine) { Ok(snapshot) => { println!("✓ Found existing table at version {}", snapshot.version()); Ok(snapshot) @@ -135,7 +135,7 @@ async fn create_or_get_base_snapshot( println!("Creating new Delta table..."); let schema = parse_schema(schema_str)?; create_table(url, &schema).await?; - Snapshot::builder(url.clone()).build(engine) + Snapshot::builder_for(url.clone()).build(engine) } } } @@ -294,8 +294,8 @@ async fn read_and_display_data( table_url: &Url, engine: DefaultEngine, ) -> DeltaResult<()> { - let snapshot = Snapshot::builder(table_url.clone()).build(&engine)?; - let scan = snapshot.into_scan_builder().build()?; + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let scan = snapshot.scan_builder().build()?; let batches: Vec = scan .execute(Arc::new(engine))? diff --git a/kernel/src/actions/set_transaction.rs b/kernel/src/actions/set_transaction.rs index 792dfc628..d263521f5 100644 --- a/kernel/src/actions/set_transaction.rs +++ b/kernel/src/actions/set_transaction.rs @@ -113,7 +113,7 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); let log_segment = snapshot.log_segment(); ( @@ -163,7 +163,7 @@ mod tests { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); let log_segment = snapshot.log_segment(); // The checkpoint has five parts, each containing one action. There are two app ids. @@ -180,7 +180,7 @@ mod tests { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); let log_segment = snapshot.log_segment(); // Test with no retention (should get all transactions) diff --git a/kernel/src/checkpoint/mod.rs b/kernel/src/checkpoint/mod.rs index 58fb223b9..5b9d9cc6c 100644 --- a/kernel/src/checkpoint/mod.rs +++ b/kernel/src/checkpoint/mod.rs @@ -47,7 +47,7 @@ //! //! // Create a snapshot for the table at the version you want to checkpoint //! let url = delta_kernel::try_parse_uri("./tests/data/app-txn-no-checkpoint")?; -//! let snapshot = Arc::new(Snapshot::builder(url).build(engine)?); +//! let snapshot = Snapshot::builder_for(url).build(engine)?; //! //! // Create a checkpoint writer from the snapshot //! let mut writer = snapshot.checkpoint()?; diff --git a/kernel/src/checkpoint/tests.rs b/kernel/src/checkpoint/tests.rs index 20edff045..bcc4e96c0 100644 --- a/kernel/src/checkpoint/tests.rs +++ b/kernel/src/checkpoint/tests.rs @@ -72,8 +72,8 @@ fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> { )?; let table_root = Url::parse("memory:///")?; - let snapshot = Snapshot::builder(table_root).build(&engine)?; - let writer = Arc::new(snapshot).checkpoint()?; + let snapshot = Snapshot::builder_for(table_root).build(&engine)?; + let writer = snapshot.checkpoint()?; let checkpoint_batch = writer.create_checkpoint_metadata_batch(&engine)?; @@ -295,7 +295,7 @@ fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> { )?; let table_root = Url::parse("memory:///")?; - let snapshot = Arc::new(Snapshot::builder(table_root).build(&engine)?); + let snapshot = Snapshot::builder_for(table_root).build(&engine)?; let writer = snapshot.checkpoint()?; // Verify the checkpoint file path is the latest version by default. @@ -363,7 +363,9 @@ fn test_v1_checkpoint_specific_version() -> DeltaResult<()> { let table_root = Url::parse("memory:///")?; // Specify version 0 for checkpoint - let snapshot = Arc::new(Snapshot::builder(table_root).at_version(0).build(&engine)?); + let snapshot = Snapshot::builder_for(table_root) + .at_version(0) + .build(&engine)?; let writer = snapshot.checkpoint()?; // Verify the checkpoint file path is the specified version. @@ -411,7 +413,9 @@ fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaR )?; let table_root = Url::parse("memory:///")?; - let snapshot = Arc::new(Snapshot::builder(table_root).at_version(0).build(&engine)?); + let snapshot = Snapshot::builder_for(table_root) + .at_version(0) + .build(&engine)?; let writer = snapshot.checkpoint()?; let data_iter = writer.checkpoint_data(&engine)?; @@ -465,7 +469,7 @@ fn test_v2_checkpoint_supported_table() -> DeltaResult<()> { )?; let table_root = Url::parse("memory:///")?; - let snapshot = Arc::new(Snapshot::builder(table_root).build(&engine)?); + let snapshot = Snapshot::builder_for(table_root).build(&engine)?; let writer = snapshot.checkpoint()?; // Verify the checkpoint file path is the latest version by default. diff --git a/kernel/src/log_compaction/mod.rs b/kernel/src/log_compaction/mod.rs index 4fc79a0d3..c802847f4 100644 --- a/kernel/src/log_compaction/mod.rs +++ b/kernel/src/log_compaction/mod.rs @@ -39,7 +39,7 @@ //! # fn example(engine: &dyn Engine) -> DeltaResult<()> { //! // Create a snapshot for the table //! let table_root = Url::parse("file:///path/to/table")?; -//! let snapshot = Arc::new(Snapshot::builder(table_root).build(engine)?); +//! let snapshot = Snapshot::builder_for(table_root).build(engine)?; //! //! // Create a log compaction writer for versions 10-20 //! let mut writer = snapshot.get_log_compaction_writer(10, 20)?; diff --git a/kernel/src/log_compaction/tests.rs b/kernel/src/log_compaction/tests.rs index 8677da119..d3dd158d0 100644 --- a/kernel/src/log_compaction/tests.rs +++ b/kernel/src/log_compaction/tests.rs @@ -12,7 +12,7 @@ fn create_mock_snapshot() -> Arc { .unwrap(); let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - Arc::new(Snapshot::builder(url).build(&engine).unwrap()) + Snapshot::builder_for(url).build(&engine).unwrap() } fn create_multi_version_snapshot() -> Arc { @@ -20,7 +20,7 @@ fn create_multi_version_snapshot() -> Arc { std::fs::canonicalize(std::path::PathBuf::from("./tests/data/basic_partitioned/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - Arc::new(Snapshot::builder(url).build(&engine).unwrap()) + Snapshot::builder_for(url).build(&engine).unwrap() } #[test] diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 3661b09d9..0ddab72af 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -50,7 +50,7 @@ fn test_replay_for_metadata() { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); let data: Vec<_> = snapshot .log_segment() .replay_for_metadata(&engine) diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 4d6d3a9ce..fedd662b2 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -1256,8 +1256,8 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); - let scan = snapshot.into_scan_builder().build().unwrap(); + let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); + let scan = snapshot.scan_builder().build().unwrap(); let files = get_files_for_scan(scan, &engine).unwrap(); assert_eq!(files.len(), 1); assert_eq!( @@ -1273,8 +1273,8 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Snapshot::builder(url).build(engine.as_ref()).unwrap(); - let scan = snapshot.into_scan_builder().build().unwrap(); + let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap(); + let scan = snapshot.scan_builder().build().unwrap(); let files: Vec = scan.execute(engine).unwrap().try_collect().unwrap(); assert_eq!(files.len(), 1); @@ -1289,9 +1289,9 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Snapshot::builder(url).build(engine.as_ref()).unwrap(); + let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap(); let version = snapshot.version(); - let scan = snapshot.into_scan_builder().build().unwrap(); + let scan = snapshot.scan_builder().build().unwrap(); let files: Vec<_> = scan .scan_metadata(engine.as_ref()) .unwrap() @@ -1323,11 +1323,11 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Snapshot::builder(url.clone()) + let snapshot = Snapshot::builder_for(url.clone()) .at_version(0) .build(engine.as_ref()) .unwrap(); - let scan = snapshot.into_scan_builder().build().unwrap(); + let scan = snapshot.scan_builder().build().unwrap(); let files: Vec<_> = scan .scan_metadata(engine.as_ref()) .unwrap() @@ -1347,11 +1347,11 @@ mod tests { .into_iter() .map(|b| Box::new(ArrowEngineData::from(b)) as Box) .collect(); - let snapshot = Snapshot::builder(url) + let snapshot = Snapshot::builder_for(url) .at_version(1) .build(engine.as_ref()) .unwrap(); - let scan = snapshot.into_scan_builder().build().unwrap(); + let scan = snapshot.scan_builder().build().unwrap(); let new_files: Vec<_> = scan .scan_metadata_from(engine.as_ref(), 0, files, None) .unwrap() @@ -1419,8 +1419,8 @@ mod tests { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); - let scan = snapshot.into_scan_builder().build().unwrap(); + let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); + let scan = snapshot.scan_builder().build().unwrap(); let data: Vec<_> = scan .replay_for_scan_metadata(&engine) .unwrap() @@ -1439,7 +1439,7 @@ mod tests { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Arc::new(Snapshot::builder(url).build(engine.as_ref()).unwrap()); + let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap(); // No predicate pushdown attempted, so the one data file should be returned. // @@ -1482,7 +1482,7 @@ mod tests { let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = Arc::new(SyncEngine::new()); - let snapshot = Arc::new(Snapshot::builder(url).build(engine.as_ref()).unwrap()); + let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap(); // Predicate over a logically valid but physically missing column. No data files should be // returned because the column is inferred to be all-null. @@ -1517,8 +1517,8 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); - let scan = snapshot.into_scan_builder().build()?; + let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); + let scan = snapshot.scan_builder().build()?; let files = get_files_for_scan(scan, &engine)?; // test case: // diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index c8eb2a1cf..99555d17f 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -26,6 +26,8 @@ pub use builder::SnapshotBuilder; use tracing::debug; use url::Url; +pub type SnapshotRef = Arc; + // TODO expose methods for accessing the files of a table (with file pruning). /// In-memory representation of a specific snapshot of a Delta table. While a `DeltaTable` exists /// throughout time, `Snapshot`s represent a view of a table at a specific point in time; they @@ -54,22 +56,15 @@ impl std::fmt::Debug for Snapshot { } impl Snapshot { - /// Create a new [`SnapshotBuilder`] to build a [`Snapshot`] for a given table root. - pub fn builder(table_root: Url) -> SnapshotBuilder { + /// Create a new [`SnapshotBuilder`] to build a new [`Snapshot`] for a given table root. If you + /// instead have an existing [`Snapshot`] you would like to do minimal work to update, consider + /// using + pub fn builder_for(table_root: Url) -> SnapshotBuilder { SnapshotBuilder::new(table_root) } - #[internal_api] - pub(crate) fn new(log_segment: LogSegment, table_configuration: TableConfiguration) -> Self { - Self { - log_segment, - table_configuration, - } - } - - /// Create a new [`Snapshot`] instance from an existing [`Snapshot`]. This is useful when you - /// already have a [`Snapshot`] lying around and want to do the minimal work to 'update' the - /// snapshot to a later version. + /// Create a new [`SnapshotBuilder`] to incrementally update a [`Snapshot`] to a more recent + /// version. /// /// We implement a simple heuristic: /// 1. if the new version == existing version, just return the existing snapshot @@ -87,7 +82,22 @@ impl Snapshot { /// - `engine`: Implementation of [`Engine`] apis. /// - `version`: target version of the [`Snapshot`]. None will create a snapshot at the latest /// version of the table. - pub fn try_new_from( + pub fn builder_from(existing_snapshot: Arc) -> SnapshotBuilder { + SnapshotBuilder::new_from(existing_snapshot) + } + + #[internal_api] + pub(crate) fn new(log_segment: LogSegment, table_configuration: TableConfiguration) -> Self { + Self { + log_segment, + table_configuration, + } + } + + /// Create a new [`Snapshot`] instance from an existing [`Snapshot`]. This is useful when you + /// already have a [`Snapshot`] lying around and want to do the minimal work to 'update' the + /// snapshot to a later version. + fn try_new_from( existing_snapshot: Arc, engine: &dyn Engine, version: impl Into>, @@ -411,7 +421,10 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).at_version(1).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(url) + .at_version(1) + .build(&engine) + .unwrap(); let expected = Protocol::try_new(3, 7, Some(["deletionVectors"]), Some(["deletionVectors"])).unwrap(); @@ -429,7 +442,7 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(url).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); let expected = Protocol::try_new(3, 7, Some(["deletionVectors"]), Some(["deletionVectors"])).unwrap(); @@ -468,21 +481,24 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let old_snapshot = Arc::new( - Snapshot::builder(url.clone()) - .at_version(1) - .build(&engine) - .unwrap(), - ); + let old_snapshot = Snapshot::builder_for(url.clone()) + .at_version(1) + .build(&engine) + .unwrap(); // 1. new version < existing version: error - let snapshot_res = Snapshot::try_new_from(old_snapshot.clone(), &engine, Some(0)); + let snapshot_res = Snapshot::builder_from(old_snapshot.clone()) + .at_version(0) + .build(&engine); assert!(matches!( snapshot_res, Err(Error::Generic(msg)) if msg == "Requested snapshot version 0 is older than snapshot hint version 1" )); // 2. new version == existing version - let snapshot = Snapshot::try_new_from(old_snapshot.clone(), &engine, Some(1)).unwrap(); + let snapshot = Snapshot::builder_from(old_snapshot.clone()) + .at_version(1) + .build(&engine) + .unwrap(); let expected = old_snapshot.clone(); assert_eq!(snapshot, expected); @@ -498,16 +514,16 @@ mod tests { fn test_new_from(store: Arc) -> DeltaResult<()> { let url = Url::parse("memory:///")?; let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); - let base_snapshot = Arc::new( - Snapshot::builder(url.clone()) - .at_version(0) - .build(&engine)?, - ); - let snapshot = Snapshot::try_new_from(base_snapshot.clone(), &engine, Some(1))?; - let expected = Snapshot::builder(url.clone()) + let base_snapshot = Snapshot::builder_for(url.clone()) + .at_version(0) + .build(&engine)?; + let snapshot = Snapshot::builder_from(base_snapshot.clone()) .at_version(1) .build(&engine)?; - assert_eq!(snapshot, expected.into()); + let expected = Snapshot::builder_for(url.clone()) + .at_version(1) + .build(&engine)?; + assert_eq!(snapshot, expected); Ok(()) } @@ -551,19 +567,17 @@ mod tests { Arc::new(store.fork()), Arc::new(TokioBackgroundExecutor::new()), ); - let base_snapshot = Arc::new( - Snapshot::builder(url.clone()) - .at_version(0) - .build(&engine)?, - ); - let snapshot = Snapshot::try_new_from(base_snapshot.clone(), &engine, None)?; - let expected = Snapshot::builder(url.clone()) + let base_snapshot = Snapshot::builder_for(url.clone()) .at_version(0) .build(&engine)?; - assert_eq!(snapshot, expected.into()); + let snapshot = Snapshot::builder_from(base_snapshot.clone()).build(&engine)?; + let expected = Snapshot::builder_for(url.clone()) + .at_version(0) + .build(&engine)?; + assert_eq!(snapshot, expected); // version exceeds latest version of the table = err assert!(matches!( - Snapshot::try_new_from(base_snapshot.clone(), &engine, Some(1)), + Snapshot::builder_from(base_snapshot.clone()).at_version(1).build(&engine), Err(Error::Generic(msg)) if msg == "Requested snapshot version 1 is newer than the latest version 0" )); @@ -626,13 +640,11 @@ mod tests { // new commits AND request version > end of log let url = Url::parse("memory:///")?; let engine = DefaultEngine::new(store_3c_i, Arc::new(TokioBackgroundExecutor::new())); - let base_snapshot = Arc::new( - Snapshot::builder(url.clone()) - .at_version(0) - .build(&engine)?, - ); + let base_snapshot = Snapshot::builder_for(url.clone()) + .at_version(0) + .build(&engine)?; assert!(matches!( - Snapshot::try_new_from(base_snapshot.clone(), &engine, Some(2)), + Snapshot::builder_from(base_snapshot.clone()).at_version(2).build(&engine), Err(Error::Generic(msg)) if msg == "LogSegment end version 1 not the same as the specified end version 2" )); @@ -737,18 +749,18 @@ mod tests { store.put(&path, crc.to_string().into()).await?; // base snapshot is at version 0 - let base_snapshot = Arc::new( - Snapshot::builder(url.clone()) - .at_version(0) - .build(&engine)?, - ); + let base_snapshot = Snapshot::builder_for(url.clone()) + .at_version(0) + .build(&engine)?; // first test: no new crc - let snapshot = Snapshot::try_new_from(base_snapshot.clone(), &engine, Some(1))?; - let expected = Snapshot::builder(url.clone()) + let snapshot = Snapshot::builder_from(base_snapshot.clone()) + .at_version(1) + .build(&engine)?; + let expected = Snapshot::builder_for(url.clone()) .at_version(1) .build(&engine)?; - assert_eq!(snapshot, expected.into()); + assert_eq!(snapshot, expected); assert_eq!( snapshot .log_segment @@ -771,11 +783,13 @@ mod tests { "protocol": protocol(1, 2), }); store.put(&path, crc.to_string().into()).await?; - let snapshot = Snapshot::try_new_from(base_snapshot.clone(), &engine, Some(1))?; - let expected = Snapshot::builder(url.clone()) + let snapshot = Snapshot::builder_from(base_snapshot.clone()) .at_version(1) .build(&engine)?; - assert_eq!(snapshot, expected.into()); + let expected = Snapshot::builder_for(url.clone()) + .at_version(1) + .build(&engine)?; + assert_eq!(snapshot, expected); assert_eq!( snapshot .log_segment @@ -885,7 +899,7 @@ mod tests { .unwrap(); let location = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Snapshot::builder(location).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(location).build(&engine).unwrap(); assert_eq!(snapshot.log_segment.checkpoint_parts.len(), 1); assert_eq!( @@ -992,7 +1006,7 @@ mod tests { .join("\n"); add_commit(store.as_ref(), 1, commit).await.unwrap(); - let snapshot = Arc::new(Snapshot::builder(url.clone()).build(&engine)?); + let snapshot = Snapshot::builder_for(url.clone()).build(&engine)?; assert_eq!(snapshot.get_domain_metadata("domain1", &engine)?, None); assert_eq!( @@ -1018,7 +1032,7 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let engine = SyncEngine::new(); - let snapshot = Arc::new(Snapshot::builder(url).build(&engine).unwrap()); + let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); // Test creating a log compaction writer let writer = snapshot.clone().get_log_compaction_writer(0, 1).unwrap(); diff --git a/kernel/src/snapshot/builder.rs b/kernel/src/snapshot/builder.rs index 171820a10..8f00c308f 100644 --- a/kernel/src/snapshot/builder.rs +++ b/kernel/src/snapshot/builder.rs @@ -1,7 +1,9 @@ //! Builder for creating [`Snapshot`] instances. +use std::sync::Arc; use crate::log_segment::LogSegment; -use crate::{DeltaResult, Engine, Snapshot, Version}; +use crate::snapshot::SnapshotRef; +use crate::{DeltaResult, Engine, Error, Snapshot, Version}; use url::Url; @@ -16,22 +18,37 @@ use url::Url; /// let table_root = Url::parse("file:///path/to/table")?; /// /// // Build a snapshot -/// let snapshot = Snapshot::builder(table_root.clone()) +/// let snapshot = Snapshot::builder_for(table_root.clone()) /// .at_version(5) // Optional: specify a time-travel version (default is latest version) /// .build(engine)?; /// /// # Ok(()) /// # } /// ``` +// +// Note the SnapshotBuilder must have either a table_root or an existing_snapshot (but not both). +// We enforce this in the constructors. We could improve this in the future with different +// types/add type state. +#[derive(Debug)] pub struct SnapshotBuilder { - table_root: Url, + table_root: Option, + existing_snapshot: Option, version: Option, } impl SnapshotBuilder { pub(crate) fn new(table_root: Url) -> Self { Self { - table_root, + table_root: Some(table_root), + existing_snapshot: None, + version: None, + } + } + + pub(crate) fn new_from(existing_snapshot: Arc) -> Self { + Self { + table_root: None, + existing_snapshot: Some(existing_snapshot), version: None, } } @@ -43,18 +60,29 @@ impl SnapshotBuilder { self } - /// Create a new [`Snapshot`] instance. + /// Create a new [`Snapshot`]. This returns a [`SnapshotRef`] (`Arc`), perhaps + /// returning a reference to an existing snapshot if the request to build a new snapshot + /// matches the version of an existing snapshot. /// /// # Parameters /// /// - `engine`: Implementation of [`Engine`] apis. - pub fn build(self, engine: &dyn Engine) -> DeltaResult { - let log_segment = LogSegment::for_snapshot( - engine.storage_handler().as_ref(), - self.table_root.join("_delta_log/")?, - self.version, - )?; - Snapshot::try_new_from_log_segment(self.table_root, log_segment, engine) + pub fn build(self, engine: &dyn Engine) -> DeltaResult> { + if let Some(table_root) = self.table_root { + let log_segment = LogSegment::for_snapshot( + engine.storage_handler().as_ref(), + table_root.join("_delta_log/")?, + self.version, + )?; + Ok(Snapshot::try_new_from_log_segment(table_root, log_segment, engine)?.into()) + } else { + let existing_snapshot = self.existing_snapshot.ok_or_else(|| { + Error::internal_error( + "SnapshotBuilder should have either table_root or existing_snapshot", + ) + })?; + Snapshot::try_new_from(existing_snapshot, engine, self.version) + } } } diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index ed7e1d255..4c6a0fafd 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -150,12 +150,15 @@ impl TableChanges { // Both snapshots ensure that reading is supported at the start and end version using // `ensure_read_supported`. Note that we must still verify that reading is // supported for every protocol action in the CDF range. - let start_snapshot = Arc::new( - Snapshot::builder(table_root.as_url().clone()) - .at_version(start_version) + let start_snapshot = Snapshot::builder_for(table_root.as_url().clone()) + .at_version(start_version) + .build(engine)?; + let end_snapshot = match end_version { + Some(version) => Snapshot::builder_from(start_snapshot.clone()) + .at_version(version) .build(engine)?, - ); - let end_snapshot = Snapshot::try_new_from(start_snapshot.clone(), engine, end_version)?; + None => Snapshot::builder_from(start_snapshot.clone()).build(engine)?, + }; // Verify CDF is enabled at the beginning and end of the interval using // [`check_cdf_table_properties`] to fail early. This also ensures that column mapping is diff --git a/kernel/tests/dv.rs b/kernel/tests/dv.rs index f70c0fa9b..a8def7b29 100644 --- a/kernel/tests/dv.rs +++ b/kernel/tests/dv.rs @@ -32,8 +32,8 @@ fn dv_table() -> Result<(), Box> { let url = url::Url::from_directory_path(path).unwrap(); let engine = DefaultEngine::new_local(); - let snapshot = Snapshot::builder(url).build(engine.as_ref())?; - let scan = snapshot.into_scan_builder().build()?; + let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?; + let scan = snapshot.scan_builder().build()?; let stream = scan.execute(engine)?; let total_rows = count_total_scan_rows(stream)?; @@ -47,8 +47,8 @@ fn non_dv_table() -> Result<(), Box> { let url = url::Url::from_directory_path(path).unwrap(); let engine = DefaultEngine::new_local(); - let snapshot = Snapshot::builder(url).build(engine.as_ref())?; - let scan = snapshot.into_scan_builder().build()?; + let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?; + let scan = snapshot.scan_builder().build()?; let stream = scan.execute(engine)?; let total_rows = count_total_scan_rows(stream)?; diff --git a/kernel/tests/golden_tables.rs b/kernel/tests/golden_tables.rs index 9e573a656..758902e7f 100644 --- a/kernel/tests/golden_tables.rs +++ b/kernel/tests/golden_tables.rs @@ -168,8 +168,8 @@ async fn latest_snapshot_test( url: Url, expected_path: Option, ) -> Result<(), Box> { - let snapshot = Snapshot::builder(url).build(&engine)?; - let scan = snapshot.into_scan_builder().build()?; + let snapshot = Snapshot::builder_for(url).build(&engine)?; + let scan = snapshot.scan_builder().build()?; let scan_res = scan.execute(Arc::new(engine))?; let batches: Vec = scan_res .map(|scan_result| -> DeltaResult<_> { @@ -271,12 +271,9 @@ async fn canonicalized_paths_test( _expected: Option, ) -> Result<(), Box> { // assert latest version is 1 and there are no files in the snapshot (add is removed) - let snapshot = Snapshot::builder(table_root).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(table_root).build(&engine).unwrap(); assert_eq!(snapshot.version(), 1); - let scan = snapshot - .into_scan_builder() - .build() - .expect("build the scan"); + let scan = snapshot.scan_builder().build().expect("build the scan"); let mut scan_metadata = scan.scan_metadata(&engine).expect("scan metadata"); assert!(scan_metadata.next().is_none()); Ok(()) @@ -287,12 +284,9 @@ async fn checkpoint_test( table_root: Url, _expected: Option, ) -> Result<(), Box> { - let snapshot = Snapshot::builder(table_root).build(&engine).unwrap(); + let snapshot = Snapshot::builder_for(table_root).build(&engine).unwrap(); let version = snapshot.version(); - let scan = snapshot - .into_scan_builder() - .build() - .expect("build the scan"); + let scan = snapshot.scan_builder().build().expect("build the scan"); let scan_metadata: Vec<_> = scan .scan_metadata(&engine) .expect("scan metadata") diff --git a/kernel/tests/hdfs.rs b/kernel/tests/hdfs.rs index fedfac09f..da39c9c77 100644 --- a/kernel/tests/hdfs.rs +++ b/kernel/tests/hdfs.rs @@ -73,7 +73,7 @@ async fn read_table_version_hdfs() -> Result<(), Box> { Arc::new(TokioBackgroundExecutor::new()), )?; - let snapshot = Snapshot::builder(url).build(&engine)?; + let snapshot = Snapshot::builder_for(url).build(&engine)?; assert_eq!(snapshot.version(), 1); Ok(()) diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index b2c8de5b8..85ddcf2cc 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -66,8 +66,8 @@ async fn single_commit_two_add_files() -> Result<(), Box> let expected_data = vec![batch.clone(), batch]; - let snapshot = Snapshot::builder(location).build(engine.as_ref())?; - let scan = snapshot.into_scan_builder().build()?; + let snapshot = Snapshot::builder_for(location).build(engine.as_ref())?; + let scan = snapshot.scan_builder().build()?; let mut files = 0; let stream = scan.execute(engine)?.zip(expected_data); @@ -118,8 +118,8 @@ async fn two_commits() -> Result<(), Box> { let expected_data = vec![batch.clone(), batch]; - let snapshot = Snapshot::builder(location).build(&engine)?; - let scan = snapshot.into_scan_builder().build()?; + let snapshot = Snapshot::builder_for(location).build(&engine)?; + let scan = snapshot.scan_builder().build()?; let mut files = 0; let stream = scan.execute(Arc::new(engine))?.zip(expected_data); @@ -171,8 +171,8 @@ async fn remove_action() -> Result<(), Box> { let expected_data = vec![batch]; - let snapshot = Snapshot::builder(location).build(&engine)?; - let scan = snapshot.into_scan_builder().build()?; + let snapshot = Snapshot::builder_for(location).build(&engine)?; + let scan = snapshot.scan_builder().build()?; let stream = scan.execute(Arc::new(engine))?.zip(expected_data); @@ -242,7 +242,7 @@ async fn stats() -> Result<(), Box> { storage.clone(), Arc::new(TokioBackgroundExecutor::new()), )); - let snapshot = Arc::new(Snapshot::builder(location).build(engine.as_ref())?); + let snapshot = Snapshot::builder_for(location).build(engine.as_ref())?; // The first file has id between 1 and 3; the second has id between 5 and 7. For each operator, // we validate the boundary values where we expect the set of matched files to change. @@ -433,7 +433,7 @@ fn read_table_data( Arc::new(TokioBackgroundExecutor::new()), )?); - let snapshot = Snapshot::builder(url.clone()).build(engine.as_ref())?; + let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref())?; let read_schema = select_cols.map(|select_cols| { let table_schema = snapshot.schema(); @@ -444,7 +444,7 @@ fn read_table_data( }); println!("Read {url:?} with schema {read_schema:#?} and predicate {predicate:#?}"); let scan = snapshot - .into_scan_builder() + .scan_builder() .with_schema_opt(read_schema) .with_predicate(predicate.clone()) .build()?; @@ -1057,7 +1057,7 @@ async fn predicate_on_non_nullable_partition_column() -> Result<(), Box Result<(), Box>, data: Vec, ) -> DeltaResult { - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(engine.as_ref())?); + let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?; let mut txn = snapshot.transaction()?; // Write data out by spawning async tasks to simulate executors @@ -553,8 +553,8 @@ async fn test_row_tracking_with_empty_adds() -> DeltaResult<()> { .await?; // Verify that the table is empty - let snapshot = Snapshot::builder(table_url).build(engine.as_ref())?; - let scan = snapshot.into_scan_builder().build()?; + let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?; + let scan = snapshot.scan_builder().build()?; let batches = read_scan(&scan, engine)?; assert!(batches.is_empty(), "Table should be empty"); @@ -575,7 +575,7 @@ async fn test_row_tracking_without_adds() -> DeltaResult<()> { let (table_url, engine, store) = create_row_tracking_table(&tmp_test_dir, "test_consecutive_commits", schema.clone()) .await?; - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(engine.as_ref())?); + let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?; let txn = snapshot.transaction()?; // Commit without adding any add files @@ -615,8 +615,8 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { let engine2 = engine; // Create two snapshots from the same initial state - let snapshot1 = Arc::new(Snapshot::builder(table_url.clone()).build(engine1.as_ref())?); - let snapshot2 = Arc::new(Snapshot::builder(table_url.clone()).build(engine2.as_ref())?); + let snapshot1 = Snapshot::builder_for(table_url.clone()).build(engine1.as_ref())?; + let snapshot2 = Snapshot::builder_for(table_url.clone()).build(engine2.as_ref())?; // Create two transactions from the same snapshot (simulating parallel transactions) let mut txn1 = snapshot1.transaction()?.with_engine_info("transaction 1"); diff --git a/kernel/tests/v2_checkpoints.rs b/kernel/tests/v2_checkpoints.rs index 93261d529..a3ae99b23 100644 --- a/kernel/tests/v2_checkpoints.rs +++ b/kernel/tests/v2_checkpoints.rs @@ -17,8 +17,8 @@ fn read_v2_checkpoint_table(test_name: impl AsRef) -> DeltaResult Result<(), Box> { setup_test_tables(schema, &[], None, "test_table").await? { // create a transaction - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; let txn = snapshot.transaction()?.with_engine_info("default engine"); // commit! @@ -143,7 +143,7 @@ async fn write_data_and_check_result_and_stats( engine: Arc>, expected_since_commit: u64, ) -> Result<(), Box> { - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(engine.as_ref())?); + let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?; let mut txn = snapshot.transaction()?; // create two new arrow record batches to append @@ -213,7 +213,7 @@ async fn test_commit_info_action() -> Result<(), Box> { for (table_url, engine, store, table_name) in setup_test_tables(schema.clone(), &[], None, "test_table").await? { - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; let txn = snapshot.transaction()?.with_engine_info("default engine"); txn.commit(&engine)?; @@ -397,7 +397,7 @@ async fn test_append_partitioned() -> Result<(), Box> { for (table_url, engine, store, table_name) in setup_test_tables(table_schema.clone(), &[partition_col], None, "test_table").await? { - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; let mut txn = snapshot.transaction()?.with_engine_info("default engine"); // create two new arrow record batches to append @@ -540,7 +540,7 @@ async fn test_append_invalid_schema() -> Result<(), Box> for (table_url, engine, _store, _table_name) in setup_test_tables(table_schema, &[], None, "test_table").await? { - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; let txn = snapshot.transaction()?.with_engine_info("default engine"); // create two new arrow record batches to append @@ -598,7 +598,7 @@ async fn test_write_txn_actions() -> Result<(), Box> { setup_test_tables(schema, &[], None, "test_table").await? { // can't have duplicate app_id in same transaction - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; assert!(matches!( snapshot .transaction()? @@ -608,7 +608,7 @@ async fn test_write_txn_actions() -> Result<(), Box> { Err(KernelError::Generic(msg)) if msg == "app_id app_id1 already exists in transaction" )); - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; let txn = snapshot .transaction()? .with_engine_info("default engine") @@ -618,11 +618,9 @@ async fn test_write_txn_actions() -> Result<(), Box> { // commit! txn.commit(&engine)?; - let snapshot = Arc::new( - Snapshot::builder(table_url.clone()) - .at_version(1) - .build(&engine)?, - ); + let snapshot = Snapshot::builder_for(table_url.clone()) + .at_version(1) + .build(&engine)?; assert_eq!( snapshot.clone().get_app_id_version("app_id1", &engine)?, Some(1) @@ -631,7 +629,10 @@ async fn test_write_txn_actions() -> Result<(), Box> { snapshot.clone().get_app_id_version("app_id2", &engine)?, Some(2) ); - assert_eq!(snapshot.get_app_id_version("app_id3", &engine)?, None); + assert_eq!( + snapshot.clone().get_app_id_version("app_id3", &engine)?, + None + ); let commit1 = store .get(&Path::from(format!( @@ -741,7 +742,7 @@ async fn test_append_timestamp_ntz() -> Result<(), Box> { ) .await?; - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; let mut txn = snapshot.transaction()?.with_engine_info("default engine"); // Create Arrow data with TIMESTAMP_NTZ values including edge cases @@ -875,7 +876,7 @@ async fn test_append_variant() -> Result<(), Box> { ) .await?; - let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?); + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; let mut txn = snapshot.transaction()?; // First value corresponds to the variant value "1". Third value corresponds to the variant @@ -1084,7 +1085,7 @@ async fn test_shredded_variant_read_rejection() -> Result<(), Box, ) -> DeltaResult<()> { - let snapshot = Snapshot::builder(url.clone()).build(engine.as_ref())?; - let scan = snapshot.into_scan_builder().build()?; + let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref())?; + let scan = snapshot.scan_builder().build()?; let batches = read_scan(&scan, engine)?; let formatted = pretty_format_batches(&batches).unwrap().to_string();