Skip to content

Commit 0b1c615

Browse files
committed
refactor: use LogStore in Snapshot / LogSegment APIs
Signed-off-by: Robert Pack <[email protected]>
1 parent 9bc3b1b commit 0b1c615

File tree

19 files changed

+249
-227
lines changed

19 files changed

+249
-227
lines changed

crates/core/src/kernel/snapshot/log_segment.rs

Lines changed: 57 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -109,21 +109,19 @@ impl LogSegment {
109109
/// Try to create a new [`LogSegment`]
110110
///
111111
/// This will list the entire log directory and find all relevant files for the given table version.
112-
pub async fn try_new(
113-
table_root: &Path,
114-
version: Option<i64>,
115-
store: &dyn ObjectStore,
116-
) -> DeltaResult<Self> {
117-
let log_url = table_root.child("_delta_log");
118-
let maybe_cp = read_last_checkpoint(store, &log_url).await?;
112+
pub async fn try_new(log_store: &dyn LogStore, version: Option<i64>) -> DeltaResult<Self> {
113+
let store = log_store.object_store(None);
114+
115+
let log_url = Path::from("_delta_log");
116+
let maybe_cp = read_last_checkpoint(&store, &log_url).await?;
119117

120118
// List relevant files from log
121119
let (mut commit_files, checkpoint_files) = match (maybe_cp, version) {
122-
(Some(cp), None) => list_log_files_with_checkpoint(&cp, store, &log_url).await?,
120+
(Some(cp), None) => list_log_files_with_checkpoint(&cp, &store, &log_url).await?,
123121
(Some(cp), Some(v)) if cp.version <= v => {
124-
list_log_files_with_checkpoint(&cp, store, &log_url).await?
122+
list_log_files_with_checkpoint(&cp, &store, &log_url).await?
125123
}
126-
_ => list_log_files(store, &log_url, version, None).await?,
124+
_ => list_log_files(&store, &log_url, version, None).await?,
127125
};
128126

129127
// remove all files above requested version
@@ -273,10 +271,12 @@ impl LogSegment {
273271

274272
pub(super) fn commit_stream(
275273
&self,
276-
store: Arc<dyn ObjectStore>,
274+
log_store: &dyn LogStore,
277275
read_schema: &Schema,
278276
config: &DeltaTableConfig,
279277
) -> DeltaResult<BoxStream<'_, DeltaResult<RecordBatch>>> {
278+
let store = log_store.object_store(None);
279+
280280
let decoder = json::get_decoder(Arc::new(read_schema.try_into()?), config)?;
281281
let stream = futures::stream::iter(self.commit_files.iter())
282282
.map(move |meta| {
@@ -289,10 +289,12 @@ impl LogSegment {
289289

290290
pub(super) fn checkpoint_stream(
291291
&self,
292-
store: Arc<dyn ObjectStore>,
292+
log_store: &dyn LogStore,
293293
read_schema: &Schema,
294294
config: &DeltaTableConfig,
295295
) -> BoxStream<'_, DeltaResult<RecordBatch>> {
296+
let store = log_store.object_store(None);
297+
296298
let batch_size = config.log_batch_size;
297299
let read_schema = Arc::new(read_schema.clone());
298300
futures::stream::iter(self.checkpoint_files.clone())
@@ -341,7 +343,7 @@ impl LogSegment {
341343
/// Read [`Protocol`] and [`Metadata`] actions
342344
pub(super) async fn read_metadata(
343345
&self,
344-
store: Arc<dyn ObjectStore>,
346+
log_store: &dyn LogStore,
345347
config: &DeltaTableConfig,
346348
) -> DeltaResult<(Option<Protocol>, Option<Metadata>)> {
347349
static READ_SCHEMA: LazyLock<StructType> = LazyLock::new(|| {
@@ -354,7 +356,7 @@ impl LogSegment {
354356
let mut maybe_protocol = None;
355357
let mut maybe_metadata = None;
356358

357-
let mut commit_stream = self.commit_stream(store.clone(), &READ_SCHEMA, config)?;
359+
let mut commit_stream = self.commit_stream(log_store, &READ_SCHEMA, config)?;
358360
while let Some(batch) = commit_stream.next().await {
359361
let batch = batch?;
360362
if maybe_protocol.is_none() {
@@ -372,7 +374,7 @@ impl LogSegment {
372374
}
373375
}
374376

375-
let mut checkpoint_stream = self.checkpoint_stream(store.clone(), &READ_SCHEMA, config);
377+
let mut checkpoint_stream = self.checkpoint_stream(log_store, &READ_SCHEMA, config);
376378
while let Some(batch) = checkpoint_stream.next().await {
377379
let batch = batch?;
378380
if maybe_protocol.is_none() {
@@ -602,12 +604,9 @@ pub(super) mod tests {
602604
}
603605

604606
async fn log_segment_serde() -> TestResult {
605-
let store = TestTables::Simple
606-
.table_builder()
607-
.build_storage()?
608-
.object_store(None);
607+
let log_store = TestTables::Simple.table_builder().build_storage()?;
609608

610-
let segment = LogSegment::try_new(&Path::default(), None, store.as_ref()).await?;
609+
let segment = LogSegment::try_new(&log_store, None).await?;
611610
let bytes = serde_json::to_vec(&segment).unwrap();
612611
let actual: LogSegment = serde_json::from_slice(&bytes).unwrap();
613612
assert_eq!(actual.version(), segment.version());
@@ -621,63 +620,56 @@ pub(super) mod tests {
621620
}
622621

623622
async fn read_log_files() -> TestResult {
624-
let store = TestTables::SimpleWithCheckpoint
623+
let log_store = TestTables::SimpleWithCheckpoint
625624
.table_builder()
626-
.build_storage()?
627-
.object_store(None);
625+
.build_storage()?;
626+
let store = log_store.object_store(None);
628627

629628
let log_path = Path::from("_delta_log");
630-
let cp = read_last_checkpoint(store.as_ref(), &log_path)
631-
.await?
632-
.unwrap();
629+
let cp = read_last_checkpoint(&store, &log_path).await?.unwrap();
633630
assert_eq!(cp.version, 10);
634631

635-
let (log, check) = list_log_files_with_checkpoint(&cp, store.as_ref(), &log_path).await?;
632+
let (log, check) = list_log_files_with_checkpoint(&cp, &store, &log_path).await?;
636633
assert_eq!(log.len(), 0);
637634
assert_eq!(check.len(), 1);
638635

639-
let (log, check) = list_log_files(store.as_ref(), &log_path, None, None).await?;
636+
let (log, check) = list_log_files(&store, &log_path, None, None).await?;
640637
assert_eq!(log.len(), 0);
641638
assert_eq!(check.len(), 1);
642639

643-
let (log, check) = list_log_files(store.as_ref(), &log_path, Some(8), None).await?;
640+
let (log, check) = list_log_files(&store, &log_path, Some(8), None).await?;
644641
assert_eq!(log.len(), 9);
645642
assert_eq!(check.len(), 0);
646643

647-
let segment = LogSegment::try_new(&Path::default(), None, store.as_ref()).await?;
644+
let segment = LogSegment::try_new(&log_store, None).await?;
648645
assert_eq!(segment.version, 10);
649646
assert_eq!(segment.commit_files.len(), 0);
650647
assert_eq!(segment.checkpoint_files.len(), 1);
651648

652-
let segment = LogSegment::try_new(&Path::default(), Some(8), store.as_ref()).await?;
649+
let segment = LogSegment::try_new(&log_store, Some(8)).await?;
653650
assert_eq!(segment.version, 8);
654651
assert_eq!(segment.commit_files.len(), 9);
655652
assert_eq!(segment.checkpoint_files.len(), 0);
656653

657-
let store = TestTables::Simple
658-
.table_builder()
659-
.build_storage()?
660-
.object_store(None);
654+
let log_store = TestTables::Simple.table_builder().build_storage()?;
655+
let store = log_store.object_store(None);
661656

662-
let (log, check) = list_log_files(store.as_ref(), &log_path, None, None).await?;
657+
let (log, check) = list_log_files(&store, &log_path, None, None).await?;
663658
assert_eq!(log.len(), 5);
664659
assert_eq!(check.len(), 0);
665660

666-
let (log, check) = list_log_files(store.as_ref(), &log_path, Some(2), None).await?;
661+
let (log, check) = list_log_files(&store, &log_path, Some(2), None).await?;
667662
assert_eq!(log.len(), 3);
668663
assert_eq!(check.len(), 0);
669664

670665
Ok(())
671666
}
672667

673668
async fn read_metadata() -> TestResult {
674-
let store = TestTables::WithDvSmall
675-
.table_builder()
676-
.build_storage()?
677-
.object_store(None);
678-
let segment = LogSegment::try_new(&Path::default(), None, store.as_ref()).await?;
669+
let log_store = TestTables::WithDvSmall.table_builder().build_storage()?;
670+
let segment = LogSegment::try_new(&log_store, None).await?;
679671
let (protocol, _metadata) = segment
680-
.read_metadata(store.clone(), &Default::default())
672+
.read_metadata(&log_store, &Default::default())
681673
.await?;
682674
let protocol = protocol.unwrap();
683675

@@ -697,17 +689,17 @@ pub(super) mod tests {
697689
.table_builder()
698690
.load()
699691
.await?;
700-
let store = TestTables::LatestNotCheckpointed
692+
693+
let base_store = table_to_checkpoint.log_store().root_object_store(None);
694+
let slow_list_store = Arc::new(slow_store::SlowListStore { store: base_store });
695+
let slow_log_store = TestTables::LatestNotCheckpointed
701696
.table_builder()
702-
.build_storage()?
703-
.object_store(None);
704-
let slow_list_store = Arc::new(slow_store::SlowListStore { store });
697+
.with_storage_backend(slow_list_store, url::Url::parse("dummy:///").unwrap())
698+
.build_storage()?;
705699

706700
let version = table_to_checkpoint.version();
707701
let load_task: JoinHandle<Result<LogSegment, DeltaTableError>> = tokio::spawn(async move {
708-
let segment =
709-
LogSegment::try_new(&Path::default(), Some(version), slow_list_store.as_ref())
710-
.await?;
702+
let segment = LogSegment::try_new(&slow_log_store, Some(version)).await?;
711703
Ok(segment)
712704
});
713705

@@ -870,25 +862,21 @@ pub(super) mod tests {
870862
assert_eq!(commit.metrics.num_log_files_cleaned_up, 0);
871863
assert!(!commit.metrics.new_checkpoint_created);
872864

873-
let batches = LogSegment::try_new(
874-
&Path::default(),
875-
Some(commit.version),
876-
log_store.object_store(None).as_ref(),
877-
)
878-
.await
879-
.unwrap()
880-
.checkpoint_stream(
881-
log_store.object_store(None),
882-
&StructType::new(vec![
883-
ActionType::Metadata.schema_field().clone(),
884-
ActionType::Protocol.schema_field().clone(),
885-
ActionType::Add.schema_field().clone(),
886-
]),
887-
&Default::default(),
888-
)
889-
.try_collect::<Vec<_>>()
890-
.await
891-
.unwrap();
865+
let batches = LogSegment::try_new(&log_store, Some(commit.version))
866+
.await
867+
.unwrap()
868+
.checkpoint_stream(
869+
&log_store,
870+
&StructType::new(vec![
871+
ActionType::Metadata.schema_field().clone(),
872+
ActionType::Protocol.schema_field().clone(),
873+
ActionType::Add.schema_field().clone(),
874+
]),
875+
&Default::default(),
876+
)
877+
.try_collect::<Vec<_>>()
878+
.await
879+
.unwrap();
892880

893881
let batch = arrow::compute::concat_batches(&batches[0].schema(), batches.iter()).unwrap();
894882

0 commit comments

Comments
 (0)