diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index ffd95045dc..50249e66e4 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -4,6 +4,7 @@ use std::sync::{Arc, LazyLock}; use arrow_array::RecordBatch; use chrono::Utc; +use futures::Stream; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::path::Path; @@ -25,6 +26,9 @@ const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; static CHECKPOINT_FILE_PATTERN: LazyLock = LazyLock::new(|| Regex::new(r"\d+\.checkpoint(\.\d+\.\d+)?\.parquet").unwrap()); +static UUID_CHECKPOINT_FILE_PATTERN: LazyLock = LazyLock::new(|| { + Regex::new(r"\d+\.checkpoint\.([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})\.(parquet|json)").unwrap() +}); static DELTA_FILE_PATTERN: LazyLock = LazyLock::new(|| Regex::new(r"^\d+\.json$").unwrap()); static CRC_FILE_PATTERN: LazyLock = LazyLock::new(|| Regex::new(r"^(\.\d+(\.crc|\.json)|\d+)\.crc$").unwrap()); @@ -47,10 +51,13 @@ pub(crate) trait PathExt { .and_then(|(name, _)| name.parse().ok()) } - /// Returns true if the file is a checkpoint parquet file + /// Returns true if the file is a checkpoint file fn is_checkpoint_file(&self) -> bool { self.filename() - .map(|name| CHECKPOINT_FILE_PATTERN.captures(name).is_some()) + .map(|name| { + CHECKPOINT_FILE_PATTERN.captures(name).is_some() + || UUID_CHECKPOINT_FILE_PATTERN.captures(name).is_some() + }) .unwrap_or(false) } @@ -253,6 +260,39 @@ impl LogSegment { store: Arc, read_schema: &Schema, config: &DeltaTableConfig, + ) -> BoxStream<'_, DeltaResult> { + if let Some(file) = self.checkpoint_files.iter().next() { + match file.location.extension() { + Some("parquet") => self.checkpoint_stream_parquet(store, read_schema, config), + Some("json") => self.checkpoint_stream_json(store, read_schema, config), + _ => futures::stream::empty().boxed(), + } + } else { + futures::stream::empty().boxed() + } + } + + fn checkpoint_stream_json( + &self, + store: Arc, + read_schema: &Schema, + config: &DeltaTableConfig, + ) -> BoxStream<'_, DeltaResult> { + let decoder = json::get_decoder(Arc::new(read_schema.try_into().unwrap()), config).unwrap(); + let stream = futures::stream::iter(self.checkpoint_files.iter()) + .map(move |meta| { + let store = store.clone(); + async move { store.get(&meta.location).await.unwrap().bytes().await } + }) + .buffered(config.log_buffer_size); + json::decode_stream(decoder, stream).boxed() + } + + fn checkpoint_stream_parquet( + &self, + store: Arc, + read_schema: &Schema, + config: &DeltaTableConfig, ) -> BoxStream<'_, DeltaResult> { let batch_size = config.log_batch_size; let read_schema = Arc::new(read_schema.clone());