From 1dd48c4dacaf2df477d6c773b53517f552e9f4db Mon Sep 17 00:00:00 2001 From: Ze'ev Maor Date: Fri, 14 Feb 2025 19:27:12 +0200 Subject: [PATCH 1/3] Add UUID_CHECKPOINT_FILE_PATTERN --- crates/core/src/kernel/snapshot/log_segment.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index ffd95045dc..95dd8dd7b7 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -25,6 +25,9 @@ const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; static CHECKPOINT_FILE_PATTERN: LazyLock = LazyLock::new(|| Regex::new(r"\d+\.checkpoint(\.\d+\.\d+)?\.parquet").unwrap()); +static UUID_CHECKPOINT_FILE_PATTERN: LazyLock = LazyLock::new(|| { + Regex::new(r"\d+\.checkpoint([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})\.(parquet|json)").unwrap() +}); static DELTA_FILE_PATTERN: LazyLock = LazyLock::new(|| Regex::new(r"^\d+\.json$").unwrap()); static CRC_FILE_PATTERN: LazyLock = LazyLock::new(|| Regex::new(r"^(\.\d+(\.crc|\.json)|\d+)\.crc$").unwrap()); @@ -50,7 +53,10 @@ pub(crate) trait PathExt { /// Returns true if the file is a checkpoint parquet file fn is_checkpoint_file(&self) -> bool { self.filename() - .map(|name| CHECKPOINT_FILE_PATTERN.captures(name).is_some()) + .map(|name| { + CHECKPOINT_FILE_PATTERN.captures(name).is_some() + || UUID_CHECKPOINT_FILE_PATTERN.captures(name).is_some() + }) .unwrap_or(false) } From 43cb51cf7840b931cdb69f17b49fcba6fe5e1604 Mon Sep 17 00:00:00 2001 From: Ze'ev Maor Date: Sun, 16 Feb 2025 16:02:10 +0200 Subject: [PATCH 2/3] Read JSON checkpoint file --- .../core/src/kernel/snapshot/log_segment.rs | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 95dd8dd7b7..237b71ef33 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -4,6 +4,7 @@ use std::sync::{Arc, LazyLock}; use arrow_array::RecordBatch; use chrono::Utc; +use futures::Stream; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::path::Path; @@ -50,7 +51,7 @@ pub(crate) trait PathExt { .and_then(|(name, _)| name.parse().ok()) } - /// Returns true if the file is a checkpoint parquet file + /// Returns true if the file is a checkpoint file fn is_checkpoint_file(&self) -> bool { self.filename() .map(|name| { @@ -259,6 +260,39 @@ impl LogSegment { store: Arc, read_schema: &Schema, config: &DeltaTableConfig, + ) -> BoxStream<'_, DeltaResult> { + if let Some(file) = self.checkpoint_files.iter().next() { + match file.location.extension() { + Some("parquet") => self.checkpoint_stream_parquet(store, read_schema, config), + Some("json") => self.checkpoint_stream_json(store, read_schema, config), + _ => futures::stream::empty().boxed(), + } + } else { + futures::stream::empty().boxed() + } + } + + fn checkpoint_stream_json( + &self, + store: Arc, + read_schema: &Schema, + config: &DeltaTableConfig, + ) -> BoxStream<'_, DeltaResult> { + let decoder = json::get_decoder(Arc::new(read_schema.try_into().unwrap()), config).unwrap(); + let stream = futures::stream::iter(self.checkpoint_files.iter()) + .map(move |meta| { + let store = store.clone(); + async move { store.get(&meta.location).await.unwrap().bytes().await } + }) + .buffered(config.log_buffer_size); + json::decode_stream(decoder, stream).boxed() + } + + fn checkpoint_stream_parquet( + &self, + store: Arc, + read_schema: &Schema, + config: &DeltaTableConfig, ) -> BoxStream<'_, DeltaResult> { let batch_size = config.log_batch_size; let read_schema = Arc::new(read_schema.clone()); From ff94051005e60b4ad051b937f2b5d6aca55fc856 Mon Sep 17 00:00:00 2001 From: Ze'ev Maor Date: Sun, 16 Feb 2025 16:38:56 +0200 Subject: [PATCH 3/3] update regex --- crates/core/src/kernel/snapshot/log_segment.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 237b71ef33..50249e66e4 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -27,7 +27,7 @@ const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; static CHECKPOINT_FILE_PATTERN: LazyLock = LazyLock::new(|| Regex::new(r"\d+\.checkpoint(\.\d+\.\d+)?\.parquet").unwrap()); static UUID_CHECKPOINT_FILE_PATTERN: LazyLock = LazyLock::new(|| { - Regex::new(r"\d+\.checkpoint([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})\.(parquet|json)").unwrap() + Regex::new(r"\d+\.checkpoint\.([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})\.(parquet|json)").unwrap() }); static DELTA_FILE_PATTERN: LazyLock = LazyLock::new(|| Regex::new(r"^\d+\.json$").unwrap()); static CRC_FILE_PATTERN: LazyLock =