Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions acceptance/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ pub async fn assert_scan_metadata(
test_case: &TestCaseInfo,
) -> TestResult<()> {
let table_root = test_case.table_root()?;
let snapshot = Snapshot::builder(table_root).build(engine.as_ref())?;
let scan = snapshot.into_scan_builder().build()?;
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let scan = snapshot.scan_builder().build()?;
let mut schema = None;
let batches: Vec<RecordBatch> = scan
.execute(engine)?
Expand Down
4 changes: 2 additions & 2 deletions acceptance/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ impl TestCaseInfo {
let engine = engine.as_ref();
let (latest, versions) = self.versions().await?;

let snapshot = Snapshot::builder(self.table_root()?).build(engine)?;
let snapshot = Snapshot::builder_for(self.table_root()?).build(engine)?;
self.assert_snapshot_meta(&latest, &snapshot)?;

for table_version in versions {
let snapshot = Snapshot::builder(self.table_root()?)
let snapshot = Snapshot::builder_for(self.table_root()?)
.at_version(table_version.version)
.build(engine)?;
self.assert_snapshot_meta(&table_version, &snapshot)?;
Expand Down
4 changes: 2 additions & 2 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,15 +601,15 @@ fn snapshot_impl(
extern_engine: &dyn ExternEngine,
version: Option<Version>,
) -> DeltaResult<Handle<SharedSnapshot>> {
let builder = Snapshot::builder(url?);
let builder = Snapshot::builder_for(url?);
let builder = if let Some(v) = version {
// TODO: should we include a `with_version_opt` method for the builder?
builder.at_version(v)
} else {
builder
};
let snapshot = builder.build(extern_engine.engine().as_ref())?;
Ok(Arc::new(snapshot).into())
Ok(snapshot.into())
}

/// # Safety
Expand Down
3 changes: 1 addition & 2 deletions ffi/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{DeltaResult, ExternEngine, Snapshot, Url};
use crate::{ExclusiveEngineData, SharedExternEngine};
use delta_kernel::transaction::{CommitResult, Transaction};
use delta_kernel_ffi_macros::handle_descriptor;
use std::sync::Arc;

/// A handle representing an exclusive transaction on a Delta table. (Similar to a Box<_>)
///
Expand Down Expand Up @@ -38,7 +37,7 @@ fn transaction_impl(
url: DeltaResult<Url>,
extern_engine: &dyn ExternEngine,
) -> DeltaResult<Handle<ExclusiveTransaction>> {
let snapshot = Arc::new(Snapshot::builder(url?).build(extern_engine.engine().as_ref())?);
let snapshot = Snapshot::builder_for(url?).build(extern_engine.engine().as_ref())?;
let transaction = snapshot.transaction();
Ok(Box::new(transaction?).into())
}
Expand Down
10 changes: 4 additions & 6 deletions kernel/benches/metadata_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn create_snapshot_benchmark(c: &mut Criterion) {

c.bench_function("create_snapshot", |b| {
b.iter(|| {
Snapshot::builder(url.clone())
Snapshot::builder_for(url.clone())
.build(engine.as_ref())
.expect("Failed to create snapshot")
})
Expand All @@ -63,11 +63,9 @@ fn create_snapshot_benchmark(c: &mut Criterion) {
fn scan_metadata_benchmark(c: &mut Criterion) {
let (_tempdir, url, engine) = setup();

let snapshot = Arc::new(
Snapshot::builder(url.clone())
.build(engine.as_ref())
.expect("Failed to create snapshot"),
);
let snapshot = Snapshot::builder_for(url.clone())
.build(engine.as_ref())
.expect("Failed to create snapshot");

let mut group = c.benchmark_group("scan_metadata");
group.sample_size(SCAN_METADATA_BENCH_SAMPLE_SIZE);
Expand Down
4 changes: 2 additions & 2 deletions kernel/examples/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub fn get_engine(

/// Construct a scan at the latest snapshot. This is over the specified table and using the passed
/// engine. Parameters of the scan are controlled by the specified `ScanArgs`
pub fn get_scan(snapshot: Snapshot, args: &ScanArgs) -> DeltaResult<Option<Scan>> {
pub fn get_scan(snapshot: Arc<Snapshot>, args: &ScanArgs) -> DeltaResult<Option<Scan>> {
if args.schema_only {
println!("{:#?}", snapshot.schema());
return Ok(None);
Expand All @@ -86,7 +86,7 @@ pub fn get_scan(snapshot: Snapshot, args: &ScanArgs) -> DeltaResult<Option<Scan>
.transpose()?;
Ok(Some(
snapshot
.into_scan_builder()
.scan_builder()
.with_schema_opt(read_schema_opt)
.build()?,
))
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ fn try_main() -> DeltaResult<()> {

let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
let engine = common::get_engine(&url, &cli.location_args)?;
let snapshot = Snapshot::builder(url).build(&engine)?;
let snapshot = Snapshot::builder_for(url).build(&engine)?;

match cli.command {
Commands::TableVersion => {
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn try_main() -> DeltaResult<()> {
let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
println!("Reading {url}");
let engine = common::get_engine(&url, &cli.location_args)?;
let snapshot = Snapshot::builder(url).build(&engine)?;
let snapshot = Snapshot::builder_for(url).build(&engine)?;
let Some(scan) = common::get_scan(snapshot, &cli.scan_args)? else {
return Ok(());
};
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/read-table-single-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn try_main() -> DeltaResult<()> {
let url = delta_kernel::try_parse_uri(&cli.location_args.path)?;
println!("Reading {url}");
let engine = common::get_engine(&url, &cli.location_args)?;
let snapshot = Snapshot::builder(url).build(&engine)?;
let snapshot = Snapshot::builder_for(url).build(&engine)?;
let Some(scan) = common::get_scan(snapshot, &cli.scan_args)? else {
return Ok(());
};
Expand Down
12 changes: 6 additions & 6 deletions kernel/examples/write-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn try_main() -> DeltaResult<()> {
)?;

// Create or get the table
let snapshot = Arc::new(create_or_get_base_snapshot(&url, &engine, &cli.schema).await?);
let snapshot = create_or_get_base_snapshot(&url, &engine, &cli.schema).await?;

// Create sample data based on the schema
let sample_data = create_sample_data(&snapshot.schema(), cli.num_rows)?;
Expand Down Expand Up @@ -123,9 +123,9 @@ async fn create_or_get_base_snapshot(
url: &Url,
engine: &dyn Engine,
schema_str: &str,
) -> DeltaResult<Snapshot> {
) -> DeltaResult<Arc<Snapshot>> {
// Check if table already exists
match Snapshot::builder(url.clone()).build(engine) {
match Snapshot::builder_for(url.clone()).build(engine) {
Ok(snapshot) => {
println!("✓ Found existing table at version {}", snapshot.version());
Ok(snapshot)
Expand All @@ -135,7 +135,7 @@ async fn create_or_get_base_snapshot(
println!("Creating new Delta table...");
let schema = parse_schema(schema_str)?;
create_table(url, &schema).await?;
Snapshot::builder(url.clone()).build(engine)
Snapshot::builder_for(url.clone()).build(engine)
}
}
}
Expand Down Expand Up @@ -294,8 +294,8 @@ async fn read_and_display_data(
table_url: &Url,
engine: DefaultEngine<TokioBackgroundExecutor>,
) -> DeltaResult<()> {
let snapshot = Snapshot::builder(table_url.clone()).build(&engine)?;
let scan = snapshot.into_scan_builder().build()?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
let scan = snapshot.scan_builder().build()?;

let batches: Vec<RecordBatch> = scan
.execute(Arc::new(engine))?
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/actions/set_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ mod tests {
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();

let snapshot = Snapshot::builder(url).build(&engine).unwrap();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let log_segment = snapshot.log_segment();

(
Expand Down Expand Up @@ -163,7 +163,7 @@ mod tests {
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let snapshot = Snapshot::builder(url).build(&engine).unwrap();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let log_segment = snapshot.log_segment();

// The checkpoint has five parts, each containing one action. There are two app ids.
Expand All @@ -180,7 +180,7 @@ mod tests {
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let snapshot = Snapshot::builder(url).build(&engine).unwrap();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let log_segment = snapshot.log_segment();

// Test with no retention (should get all transactions)
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
//!
//! // Create a snapshot for the table at the version you want to checkpoint
//! let url = delta_kernel::try_parse_uri("./tests/data/app-txn-no-checkpoint")?;
//! let snapshot = Arc::new(Snapshot::builder(url).build(engine)?);
//! let snapshot = Snapshot::builder_for(url).build(engine)?;
//!
//! // Create a checkpoint writer from the snapshot
//! let mut writer = snapshot.checkpoint()?;
Expand Down
16 changes: 10 additions & 6 deletions kernel/src/checkpoint/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> {
)?;

let table_root = Url::parse("memory:///")?;
let snapshot = Snapshot::builder(table_root).build(&engine)?;
let writer = Arc::new(snapshot).checkpoint()?;
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
let writer = snapshot.checkpoint()?;

let checkpoint_batch = writer.create_checkpoint_metadata_batch(&engine)?;

Expand Down Expand Up @@ -295,7 +295,7 @@ fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> {
)?;

let table_root = Url::parse("memory:///")?;
let snapshot = Arc::new(Snapshot::builder(table_root).build(&engine)?);
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
let writer = snapshot.checkpoint()?;

// Verify the checkpoint file path is the latest version by default.
Expand Down Expand Up @@ -363,7 +363,9 @@ fn test_v1_checkpoint_specific_version() -> DeltaResult<()> {

let table_root = Url::parse("memory:///")?;
// Specify version 0 for checkpoint
let snapshot = Arc::new(Snapshot::builder(table_root).at_version(0).build(&engine)?);
let snapshot = Snapshot::builder_for(table_root)
.at_version(0)
.build(&engine)?;
let writer = snapshot.checkpoint()?;

// Verify the checkpoint file path is the specified version.
Expand Down Expand Up @@ -411,7 +413,9 @@ fn test_finalize_errors_if_checkpoint_data_iterator_is_not_exhausted() -> DeltaR
)?;

let table_root = Url::parse("memory:///")?;
let snapshot = Arc::new(Snapshot::builder(table_root).at_version(0).build(&engine)?);
let snapshot = Snapshot::builder_for(table_root)
.at_version(0)
.build(&engine)?;
let writer = snapshot.checkpoint()?;
let data_iter = writer.checkpoint_data(&engine)?;

Expand Down Expand Up @@ -465,7 +469,7 @@ fn test_v2_checkpoint_supported_table() -> DeltaResult<()> {
)?;

let table_root = Url::parse("memory:///")?;
let snapshot = Arc::new(Snapshot::builder(table_root).build(&engine)?);
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
let writer = snapshot.checkpoint()?;

// Verify the checkpoint file path is the latest version by default.
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/log_compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
//! # fn example(engine: &dyn Engine) -> DeltaResult<()> {
//! // Create a snapshot for the table
//! let table_root = Url::parse("file:///path/to/table")?;
//! let snapshot = Arc::new(Snapshot::builder(table_root).build(engine)?);
//! let snapshot = Snapshot::builder_for(table_root).build(engine)?;
//!
//! // Create a log compaction writer for versions 10-20
//! let mut writer = snapshot.get_log_compaction_writer(10, 20)?;
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/log_compaction/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ fn create_mock_snapshot() -> Arc<Snapshot> {
.unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
Arc::new(Snapshot::builder(url).build(&engine).unwrap())
Snapshot::builder_for(url).build(&engine).unwrap()
}

fn create_multi_version_snapshot() -> Arc<Snapshot> {
let path =
std::fs::canonicalize(std::path::PathBuf::from("./tests/data/basic_partitioned/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
Arc::new(Snapshot::builder(url).build(&engine).unwrap())
Snapshot::builder_for(url).build(&engine).unwrap()
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn test_replay_for_metadata() {
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let snapshot = Snapshot::builder(url).build(&engine).unwrap();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let data: Vec<_> = snapshot
.log_segment()
.replay_for_metadata(&engine)
Expand Down
32 changes: 16 additions & 16 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1256,8 +1256,8 @@ mod tests {
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();

let snapshot = Snapshot::builder(url).build(&engine).unwrap();
let scan = snapshot.into_scan_builder().build().unwrap();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let scan = snapshot.scan_builder().build().unwrap();
let files = get_files_for_scan(scan, &engine).unwrap();
assert_eq!(files.len(), 1);
assert_eq!(
Expand All @@ -1273,8 +1273,8 @@ mod tests {
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(SyncEngine::new());

let snapshot = Snapshot::builder(url).build(engine.as_ref()).unwrap();
let scan = snapshot.into_scan_builder().build().unwrap();
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let scan = snapshot.scan_builder().build().unwrap();
let files: Vec<ScanResult> = scan.execute(engine).unwrap().try_collect().unwrap();

assert_eq!(files.len(), 1);
Expand All @@ -1289,9 +1289,9 @@ mod tests {
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(SyncEngine::new());

let snapshot = Snapshot::builder(url).build(engine.as_ref()).unwrap();
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let version = snapshot.version();
let scan = snapshot.into_scan_builder().build().unwrap();
let scan = snapshot.scan_builder().build().unwrap();
let files: Vec<_> = scan
.scan_metadata(engine.as_ref())
.unwrap()
Expand Down Expand Up @@ -1323,11 +1323,11 @@ mod tests {
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(SyncEngine::new());

let snapshot = Snapshot::builder(url.clone())
let snapshot = Snapshot::builder_for(url.clone())
.at_version(0)
.build(engine.as_ref())
.unwrap();
let scan = snapshot.into_scan_builder().build().unwrap();
let scan = snapshot.scan_builder().build().unwrap();
let files: Vec<_> = scan
.scan_metadata(engine.as_ref())
.unwrap()
Expand All @@ -1347,11 +1347,11 @@ mod tests {
.into_iter()
.map(|b| Box::new(ArrowEngineData::from(b)) as Box<dyn EngineData>)
.collect();
let snapshot = Snapshot::builder(url)
let snapshot = Snapshot::builder_for(url)
.at_version(1)
.build(engine.as_ref())
.unwrap();
let scan = snapshot.into_scan_builder().build().unwrap();
let scan = snapshot.scan_builder().build().unwrap();
let new_files: Vec<_> = scan
.scan_metadata_from(engine.as_ref(), 0, files, None)
.unwrap()
Expand Down Expand Up @@ -1419,8 +1419,8 @@ mod tests {
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let snapshot = Snapshot::builder(url).build(&engine).unwrap();
let scan = snapshot.into_scan_builder().build().unwrap();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let scan = snapshot.scan_builder().build().unwrap();
let data: Vec<_> = scan
.replay_for_scan_metadata(&engine)
.unwrap()
Expand All @@ -1439,7 +1439,7 @@ mod tests {
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = Arc::new(SyncEngine::new());

let snapshot = Arc::new(Snapshot::builder(url).build(engine.as_ref()).unwrap());
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();

// No predicate pushdown attempted, so the one data file should be returned.
//
Expand Down Expand Up @@ -1482,7 +1482,7 @@ mod tests {
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = Arc::new(SyncEngine::new());

let snapshot = Arc::new(Snapshot::builder(url).build(engine.as_ref()).unwrap());
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();

// Predicate over a logically valid but physically missing column. No data files should be
// returned because the column is inferred to be all-null.
Expand Down Expand Up @@ -1517,8 +1517,8 @@ mod tests {
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();

let snapshot = Snapshot::builder(url).build(&engine).unwrap();
let scan = snapshot.into_scan_builder().build()?;
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let scan = snapshot.scan_builder().build()?;
let files = get_files_for_scan(scan, &engine)?;
// test case:
//
Expand Down
Loading
Loading