Skip to content

Commit 93f3f30

Browse files
feat!(catalog-managed): add log_tail to SnapshotBuilder (#1290)
> [!NOTE] > split into three commits (1 - material changes, 2 - callsites, 3 - test) ## What changes are proposed in this pull request? Two main things: (1) Adds `with_log_tail` API for to `SnapshotBuilder` (only for feature = `catalog-managed`) and (2) new `LogPath` type (and in a new module) to expose a public API for communicating delta log paths to kernel (currently leveraged in the new `with_log_tail` API). This allows users (catalogs) to pass in a recent log_tail during snapshot construction. Note that our other high-level read API, `TableChanges` is not yet integrated for CCv2 (with log_tail during reads) and now explicitly checks that start/end snapshot are _not_ catalog-managed. Will track a follow up for CDF read support. ### This PR affects the following public APIs Removes the unnecessary `Snapshot::into_scan_builder` API - instead use `Snapshot::scan_builder` ## How was this change tested? New UT
1 parent 1ddc026 commit 93f3f30

File tree

12 files changed

+665
-44
lines changed

12 files changed

+665
-44
lines changed

kernel/src/actions/mod.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,11 @@ impl Protocol {
479479
/// Check if writing to a table with this protocol is supported. That is: does the kernel
480480
/// support the specified protocol writer version and all enabled writer features?
481481
pub(crate) fn ensure_write_supported(&self) -> DeltaResult<()> {
482+
#[cfg(feature = "catalog-managed")]
483+
require!(
484+
!self.is_catalog_managed(),
485+
Error::unsupported("Writes are not yet supported for catalog-managed tables")
486+
);
482487
match &self.writer_features {
483488
Some(writer_features) if self.min_writer_version == 7 => {
484489
// if we're on version 7, make sure we support all the specified features
@@ -513,6 +518,17 @@ impl Protocol {
513518
}
514519
}
515520
}
521+
522+
#[cfg(feature = "catalog-managed")]
523+
pub(crate) fn is_catalog_managed(&self) -> bool {
524+
self.reader_features.as_ref().is_some_and(|fs| {
525+
fs.contains(&ReaderFeature::CatalogManaged)
526+
|| fs.contains(&ReaderFeature::CatalogOwnedPreview)
527+
}) || self.writer_features.as_ref().is_some_and(|fs| {
528+
fs.contains(&WriterFeature::CatalogManaged)
529+
|| fs.contains(&WriterFeature::CatalogOwnedPreview)
530+
})
531+
}
516532
}
517533

518534
// TODO: implement Scalar::From<HashMap<K, V>> so we can derive IntoEngineData using a macro (issue#1083)
@@ -1426,6 +1442,26 @@ mod tests {
14261442
assert_eq!(parse_features::<ReaderFeature>(features), expected);
14271443
}
14281444

1445+
#[test]
1446+
fn test_no_catalog_managed_writes() {
1447+
let protocol = Protocol::try_new(
1448+
3,
1449+
7,
1450+
Some([ReaderFeature::CatalogManaged]),
1451+
Some([WriterFeature::CatalogManaged]),
1452+
)
1453+
.unwrap();
1454+
assert!(protocol.ensure_write_supported().is_err());
1455+
let protocol = Protocol::try_new(
1456+
3,
1457+
7,
1458+
Some([ReaderFeature::CatalogOwnedPreview]),
1459+
Some([WriterFeature::CatalogOwnedPreview]),
1460+
)
1461+
.unwrap();
1462+
assert!(protocol.ensure_write_supported().is_err());
1463+
}
1464+
14291465
#[test]
14301466
fn test_into_engine_data() {
14311467
let engine = ExprEngine::new();

kernel/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ use std::{cmp::Ordering, ops::Range};
8282
use bytes::Bytes;
8383
use url::Url;
8484

85+
use crate::path::ParsedLogPath;
86+
8587
use self::schema::{DataType, SchemaRef};
8688

8789
mod action_reconciliation;
@@ -91,6 +93,7 @@ pub mod engine_data;
9193
pub mod error;
9294
pub mod expressions;
9395
mod log_compaction;
96+
mod log_path;
9497
pub mod scan;
9598
pub mod schema;
9699
pub mod snapshot;
@@ -101,6 +104,8 @@ pub mod table_properties;
101104
pub mod transaction;
102105
pub(crate) mod transforms;
103106

107+
pub use log_path::LogPath;
108+
104109
mod row_tracking;
105110

106111
mod arrow_compat;

kernel/src/listed_log_files.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,11 +245,10 @@ impl ListedLogFiles {
245245
pub(crate) fn list(
246246
storage: &dyn StorageHandler,
247247
log_root: &Url,
248+
log_tail: Vec<ParsedLogPath>,
248249
start_version: Option<Version>,
249250
end_version: Option<Version>,
250251
) -> DeltaResult<Self> {
251-
// TODO: plumb through a log_tail provided by our caller
252-
let log_tail = vec![];
253252
let log_files = list_log_files(storage, log_root, log_tail, start_version, end_version)?;
254253

255254
log_files.process_results(|iter| {
@@ -318,11 +317,13 @@ impl ListedLogFiles {
318317
checkpoint_metadata: &LastCheckpointHint,
319318
storage: &dyn StorageHandler,
320319
log_root: &Url,
320+
log_tail: Vec<ParsedLogPath>,
321321
end_version: Option<Version>,
322322
) -> DeltaResult<Self> {
323323
let listed_files = Self::list(
324324
storage,
325325
log_root,
326+
log_tail,
326327
Some(checkpoint_metadata.version),
327328
end_version,
328329
)?;

kernel/src/log_path.rs

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
//! Public-facing [`LogPath`] type for representing paths to delta log files.
2+
3+
use crate::path::ParsedLogPath;
4+
use crate::utils::require;
5+
use crate::{DeltaResult, Error, FileMeta, FileSize};
6+
7+
use url::Url;
8+
9+
/// A path to a valid delta log file. You can parse a given `FileMeta` into a `LogPath` using
10+
/// [`LogPath::try_new`].
11+
///
12+
/// Today, a `LogPath` is a file in the `_delta_log` directory of a Delta table; in the future,
13+
/// this will expand to support providing inline data in the log path itself.
14+
#[derive(Debug, Clone, PartialEq)]
15+
pub struct LogPath(ParsedLogPath);
16+
17+
impl From<LogPath> for ParsedLogPath {
18+
fn from(p: LogPath) -> Self {
19+
p.0
20+
}
21+
}
22+
23+
impl LogPath {
24+
/// Attempt to create a `LogPath` from `FileMeta`. This returns an error if the path isn't a
25+
/// valid log path.
26+
pub fn try_new(file_meta: FileMeta) -> DeltaResult<Self> {
27+
// TODO: we should avoid the clone
28+
let parsed = ParsedLogPath::try_from(file_meta.clone())?
29+
.ok_or_else(|| Error::invalid_log_path(&file_meta.location))?;
30+
31+
require!(
32+
!parsed.is_unknown(),
33+
Error::invalid_log_path(&file_meta.location)
34+
);
35+
36+
Ok(Self(parsed))
37+
}
38+
39+
/// Create a new staged commit log path given the table root and filename and metadata.
40+
pub fn staged_commit(
41+
table_root: Url,
42+
filename: &str,
43+
last_modified: i64,
44+
size: FileSize,
45+
) -> DeltaResult<LogPath> {
46+
// TODO: we should introduce TablePath/LogPath types which enforce checks like ending '/'
47+
48+
// require table_root ends with '/'
49+
require!(
50+
table_root.path().ends_with('/'),
51+
Error::generic("table root must be a directory-like URL ending with '/'")
52+
);
53+
let location = table_root
54+
.join("_delta_log/")?
55+
.join("_staged_commits/")?
56+
.join(filename)?;
57+
let file_meta = FileMeta {
58+
location,
59+
last_modified,
60+
size,
61+
};
62+
LogPath::try_new(file_meta)
63+
}
64+
}
65+
66+
#[cfg(test)]
67+
mod test {
68+
use super::*;
69+
70+
use std::str::FromStr;
71+
72+
#[test]
73+
fn test_staged_commit_path_creation() {
74+
let table_root = Url::from_str("s3://my-bucket/my-table/").unwrap();
75+
let filename = "00000000000000000010.3a0d65cd-4a56-49a8-937b-95f9e3ee90e5.json";
76+
let last_modified = 1234567890i64;
77+
let size = 1024u64;
78+
79+
let log_path = LogPath::staged_commit(table_root.clone(), filename, last_modified, size)
80+
.expect("Failed to create staged commit log path");
81+
82+
let expected_path =
83+
Url::from_str("s3://my-bucket/my-table/_delta_log/_staged_commits/00000000000000000010.3a0d65cd-4a56-49a8-937b-95f9e3ee90e5.json")
84+
.unwrap();
85+
let expected = FileMeta {
86+
location: expected_path,
87+
last_modified,
88+
size,
89+
};
90+
91+
let path = log_path.0;
92+
assert_eq!(path.location, expected);
93+
}
94+
95+
#[test]
96+
fn test_staged_commit_path_creation_failures() {
97+
let last_modified = 1234567890i64;
98+
let size = 1024u64;
99+
100+
// table root not ending with '/'
101+
let table_root = Url::from_str("s3://my-bucket/my-table").unwrap();
102+
let filename = "00000000000000000010.3a0d65cd-4a56-49a8-937b-95f9e3ee90e5.json";
103+
LogPath::staged_commit(table_root.clone(), filename, last_modified, size).unwrap_err();
104+
105+
// filename with path separators
106+
let table_root = Url::from_str("s3://my-bucket/my-table/").unwrap();
107+
let filename = "subdir/00000000000000000010.3a0d65cd-4a56-49a8-937b-95f9e3ee90e5.json";
108+
LogPath::staged_commit(table_root.clone(), filename, last_modified, size).unwrap_err();
109+
110+
// incorrect filenames
111+
let table_root = Url::from_str("s3://my-bucket/my-table/").unwrap();
112+
let filename = "00000000000000000010.not-a-uuid.json";
113+
LogPath::staged_commit(table_root.clone(), filename, last_modified, size).unwrap_err();
114+
let filename = "000000000000000000aa.3a0d65cd-4a56-49a8-937b-95f9e3ee90e5.json";
115+
LogPath::staged_commit(table_root.clone(), filename, last_modified, size).unwrap_err();
116+
let filename = "00000000000000000010.3a0d65cd-4a56-49a8-937b-95f9e3ee90e5.parquet";
117+
LogPath::staged_commit(table_root.clone(), filename, last_modified, size).unwrap_err();
118+
}
119+
}

kernel/src/log_segment.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,33 +144,42 @@ impl LogSegment {
144144
pub(crate) fn for_snapshot(
145145
storage: &dyn StorageHandler,
146146
log_root: Url,
147+
log_tail: Vec<ParsedLogPath>,
147148
time_travel_version: impl Into<Option<Version>>,
148149
) -> DeltaResult<Self> {
149150
let time_travel_version = time_travel_version.into();
150151
let checkpoint_hint = LastCheckpointHint::try_read(storage, &log_root)?;
151-
Self::for_snapshot_impl(storage, log_root, checkpoint_hint, time_travel_version)
152+
Self::for_snapshot_impl(
153+
storage,
154+
log_root,
155+
log_tail,
156+
checkpoint_hint,
157+
time_travel_version,
158+
)
152159
}
153160

154161
// factored out for testing
155162
pub(crate) fn for_snapshot_impl(
156163
storage: &dyn StorageHandler,
157164
log_root: Url,
165+
log_tail: Vec<ParsedLogPath>,
158166
checkpoint_hint: Option<LastCheckpointHint>,
159167
time_travel_version: Option<Version>,
160168
) -> DeltaResult<Self> {
161169
let listed_files = match (checkpoint_hint, time_travel_version) {
162170
(Some(cp), None) => {
163-
ListedLogFiles::list_with_checkpoint_hint(&cp, storage, &log_root, None)?
171+
ListedLogFiles::list_with_checkpoint_hint(&cp, storage, &log_root, log_tail, None)?
164172
}
165173
(Some(cp), Some(end_version)) if cp.version <= end_version => {
166174
ListedLogFiles::list_with_checkpoint_hint(
167175
&cp,
168176
storage,
169177
&log_root,
178+
log_tail,
170179
Some(end_version),
171180
)?
172181
}
173-
_ => ListedLogFiles::list(storage, &log_root, None, time_travel_version)?,
182+
_ => ListedLogFiles::list(storage, &log_root, log_tail, None, time_travel_version)?,
174183
};
175184

176185
LogSegment::try_new(listed_files, log_root, time_travel_version)

0 commit comments

Comments
 (0)