Skip to content

Commit 9a0500f

Browse files
zachschuermannmurali-db
authored andcommitted
refactor!: migrate Snapshot::try_new_from into SnapshotBuilder (delta-io#1289)
> [!IMPORTANT] > This PR is split into two commits: (1) material changes, and (2) callsite updates This PR integrates `Snapshot::try_new_from` into `SnapshotBuilder`. We move `Snapshot::try_new_from` into `Snapshot::builder_from` and modify `SnapshotBuilder` to return an `Arc<Snapshot>` instead of a `Snapshot`. Also, `Snapshot::builder(table_root)` is renamed to `Snapshot::builder_for(table_root)` 1. Replaces `Snapshot::try_new_from(existing)` with `Snapshot::build_from(existing).build()` 2. `SnapshotBuilder::build()` now returns an `Arc<Snapshot>` instead of a `Snapshot` 3. Renames `Snapshot::builder` to `Snapshot::builder_for` refactor, updated tests
1 parent baab3a7 commit 9a0500f

File tree

28 files changed

+226
-187
lines changed

28 files changed

+226
-187
lines changed

acceptance/src/data.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ pub async fn assert_scan_metadata(
115115
test_case: &TestCaseInfo,
116116
) -> TestResult<()> {
117117
let table_root = test_case.table_root()?;
118-
let snapshot = Snapshot::builder(table_root).build(engine.as_ref())?;
119-
let scan = snapshot.into_scan_builder().build()?;
118+
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
119+
let scan = snapshot.scan_builder().build()?;
120120
let mut schema = None;
121121
let batches: Vec<RecordBatch> = scan
122122
.execute(engine)?

acceptance/src/meta.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,11 @@ impl TestCaseInfo {
103103
let engine = engine.as_ref();
104104
let (latest, versions) = self.versions().await?;
105105

106-
let snapshot = Snapshot::builder(self.table_root()?).build(engine)?;
106+
let snapshot = Snapshot::builder_for(self.table_root()?).build(engine)?;
107107
self.assert_snapshot_meta(&latest, &snapshot)?;
108108

109109
for table_version in versions {
110-
let snapshot = Snapshot::builder(self.table_root()?)
110+
let snapshot = Snapshot::builder_for(self.table_root()?)
111111
.at_version(table_version.version)
112112
.build(engine)?;
113113
self.assert_snapshot_meta(&table_version, &snapshot)?;

ffi/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -601,15 +601,15 @@ fn snapshot_impl(
601601
extern_engine: &dyn ExternEngine,
602602
version: Option<Version>,
603603
) -> DeltaResult<Handle<SharedSnapshot>> {
604-
let builder = Snapshot::builder(url?);
604+
let builder = Snapshot::builder_for(url?);
605605
let builder = if let Some(v) = version {
606606
// TODO: should we include a `with_version_opt` method for the builder?
607607
builder.at_version(v)
608608
} else {
609609
builder
610610
};
611611
let snapshot = builder.build(extern_engine.engine().as_ref())?;
612-
Ok(Arc::new(snapshot).into())
612+
Ok(snapshot.into())
613613
}
614614

615615
/// # Safety

ffi/src/transaction/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use crate::{DeltaResult, ExternEngine, Snapshot, Url};
99
use crate::{ExclusiveEngineData, SharedExternEngine};
1010
use delta_kernel::transaction::{CommitResult, Transaction};
1111
use delta_kernel_ffi_macros::handle_descriptor;
12-
use std::sync::Arc;
1312

1413
/// A handle representing an exclusive transaction on a Delta table. (Similar to a Box<_>)
1514
///
@@ -38,7 +37,7 @@ fn transaction_impl(
3837
url: DeltaResult<Url>,
3938
extern_engine: &dyn ExternEngine,
4039
) -> DeltaResult<Handle<ExclusiveTransaction>> {
41-
let snapshot = Arc::new(Snapshot::builder(url?).build(extern_engine.engine().as_ref())?);
40+
let snapshot = Snapshot::builder_for(url?).build(extern_engine.engine().as_ref())?;
4241
let transaction = snapshot.transaction();
4342
Ok(Box::new(transaction?).into())
4443
}

kernel/benches/metadata_bench.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ fn create_snapshot_benchmark(c: &mut Criterion) {
5353

5454
c.bench_function("create_snapshot", |b| {
5555
b.iter(|| {
56-
Snapshot::builder(url.clone())
56+
Snapshot::builder_for(url.clone())
5757
.build(engine.as_ref())
5858
.expect("Failed to create snapshot")
5959
})
@@ -63,11 +63,9 @@ fn create_snapshot_benchmark(c: &mut Criterion) {
6363
fn scan_metadata_benchmark(c: &mut Criterion) {
6464
let (_tempdir, url, engine) = setup();
6565

66-
let snapshot = Arc::new(
67-
Snapshot::builder(url.clone())
68-
.build(engine.as_ref())
69-
.expect("Failed to create snapshot"),
70-
);
66+
let snapshot = Snapshot::builder_for(url.clone())
67+
.build(engine.as_ref())
68+
.expect("Failed to create snapshot");
7169

7270
let mut group = c.benchmark_group("scan_metadata");
7371
group.sample_size(SCAN_METADATA_BENCH_SAMPLE_SIZE);

kernel/examples/common/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub fn get_engine(
6262

6363
/// Construct a scan at the latest snapshot. This is over the specified table and using the passed
6464
/// engine. Parameters of the scan are controlled by the specified `ScanArgs`
65-
pub fn get_scan(snapshot: Snapshot, args: &ScanArgs) -> DeltaResult<Option<Scan>> {
65+
pub fn get_scan(snapshot: Arc<Snapshot>, args: &ScanArgs) -> DeltaResult<Option<Scan>> {
6666
if args.schema_only {
6767
println!("{:#?}", snapshot.schema());
6868
return Ok(None);
@@ -86,7 +86,7 @@ pub fn get_scan(snapshot: Snapshot, args: &ScanArgs) -> DeltaResult<Option<Scan>
8686
.transpose()?;
8787
Ok(Some(
8888
snapshot
89-
.into_scan_builder()
89+
.scan_builder()
9090
.with_schema_opt(read_schema_opt)
9191
.build()?,
9292
))

kernel/examples/inspect-table/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ fn try_main() -> DeltaResult<()> {
182182

183183
let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
184184
let engine = common::get_engine(&url, &cli.location_args)?;
185-
let snapshot = Snapshot::builder(url).build(&engine)?;
185+
let snapshot = Snapshot::builder_for(url).build(&engine)?;
186186

187187
match cli.command {
188188
Commands::TableVersion => {

kernel/examples/read-table-multi-threaded/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ fn try_main() -> DeltaResult<()> {
9999
let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
100100
println!("Reading {url}");
101101
let engine = common::get_engine(&url, &cli.location_args)?;
102-
let snapshot = Snapshot::builder(url).build(&engine)?;
102+
let snapshot = Snapshot::builder_for(url).build(&engine)?;
103103
let Some(scan) = common::get_scan(snapshot, &cli.scan_args)? else {
104104
return Ok(());
105105
};

kernel/examples/read-table-single-threaded/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ fn try_main() -> DeltaResult<()> {
4343
let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
4444
println!("Reading {url}");
4545
let engine = common::get_engine(&url, &cli.location_args)?;
46-
let snapshot = Snapshot::builder(url).build(&engine)?;
46+
let snapshot = Snapshot::builder_for(url).build(&engine)?;
4747
let Some(scan) = common::get_scan(snapshot, &cli.scan_args)? else {
4848
return Ok(());
4949
};

kernel/examples/write-table/src/main.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ async fn try_main() -> DeltaResult<()> {
7979
)?;
8080

8181
// Create or get the table
82-
let snapshot = Arc::new(create_or_get_base_snapshot(&url, &engine, &cli.schema).await?);
82+
let snapshot = create_or_get_base_snapshot(&url, &engine, &cli.schema).await?;
8383

8484
// Create sample data based on the schema
8585
let sample_data = create_sample_data(&snapshot.schema(), cli.num_rows)?;
@@ -123,9 +123,9 @@ async fn create_or_get_base_snapshot(
123123
url: &Url,
124124
engine: &dyn Engine,
125125
schema_str: &str,
126-
) -> DeltaResult<Snapshot> {
126+
) -> DeltaResult<Arc<Snapshot>> {
127127
// Check if table already exists
128-
match Snapshot::builder(url.clone()).build(engine) {
128+
match Snapshot::builder_for(url.clone()).build(engine) {
129129
Ok(snapshot) => {
130130
println!("✓ Found existing table at version {}", snapshot.version());
131131
Ok(snapshot)
@@ -135,7 +135,7 @@ async fn create_or_get_base_snapshot(
135135
println!("Creating new Delta table...");
136136
let schema = parse_schema(schema_str)?;
137137
create_table(url, &schema).await?;
138-
Snapshot::builder(url.clone()).build(engine)
138+
Snapshot::builder_for(url.clone()).build(engine)
139139
}
140140
}
141141
}
@@ -294,8 +294,8 @@ async fn read_and_display_data(
294294
table_url: &Url,
295295
engine: DefaultEngine<TokioBackgroundExecutor>,
296296
) -> DeltaResult<()> {
297-
let snapshot = Snapshot::builder(table_url.clone()).build(&engine)?;
298-
let scan = snapshot.into_scan_builder().build()?;
297+
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
298+
let scan = snapshot.scan_builder().build()?;
299299

300300
let batches: Vec<RecordBatch> = scan
301301
.execute(Arc::new(engine))?

0 commit comments

Comments
 (0)