Skip to content

Commit 05e6698

Browse files
authored
feat: support writing domain metadata (1/2) (#1274)
## What changes are proposed in this pull request? <!-- Please clarify what changes you are proposing and why the changes are needed. The purpose of this section is to outline the changes, why they are needed, and how this PR fixes the issue. If the reason for the change is already explained clearly in an issue, then it does not need to be restated here. 1. If you propose a new API or feature, clarify the use case for a new API or feature. 2. If you fix a bug, you can clarify why it is a bug. --> This PR is (1/2) to support writing domain metadata. 1. (this PR) support adding or updating a domain metadata configuration 2. (follow up) support _removing_ a domain metadata, #1275. - resolves #1270 Writing domain metadata, as per the [Delta protocol](https://arc.net/l/quote/fwjwtumy) requires: - The table must be on writer version 7, and a feature named `domainMetadata` must exist in the table's `writerFeatures`. We satisfy this via an explicit check in `Transaction::commit`. - Two overlapping transactions conflict if they both contain a domain metadata action for the same metadata domain. We satisfy this with Kernel's coarse grained conflict detection based on the fact that the `delta_log` dir already has a log file with the name this txn was going to write. No new logic needed in this PR. ## How was this change tested? <!-- Please make sure to add test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested, ideally via a reproducible test documented in the PR description. --> Added new integration tests in `write.rs`.
1 parent 93f3f30 commit 05e6698

File tree

5 files changed

+348
-104
lines changed

5 files changed

+348
-104
lines changed

kernel/src/actions/mod.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -931,22 +931,24 @@ pub(crate) struct DomainMetadata {
931931

932932
impl DomainMetadata {
933933
/// Create a new DomainMetadata action.
934-
// TODO: Discuss if we should remove `removed` from this method and introduce a dedicated
935-
// method for removed domain metadata.
936-
pub(crate) fn new(domain: String, configuration: String, removed: bool) -> Self {
937-
DomainMetadata {
934+
pub(crate) fn new(domain: String, configuration: String) -> Self {
935+
Self {
938936
domain,
939937
configuration,
940-
removed,
938+
removed: false,
941939
}
942940
}
943941

944942
// returns true if the domain metadata is an system-controlled domain (all domains that start
945943
// with "delta.")
946944
#[allow(unused)]
947-
fn is_internal(&self) -> bool {
945+
pub(crate) fn is_internal(&self) -> bool {
948946
self.domain.starts_with(INTERNAL_DOMAIN_PREFIX)
949947
}
948+
949+
pub(crate) fn domain(&self) -> &str {
950+
&self.domain
951+
}
950952
}
951953

952954
#[cfg(test)]

kernel/src/row_tracking.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ impl TryFrom<RowTrackingDomainMetadata> for DomainMetadata {
6464
Ok(DomainMetadata::new(
6565
RowTrackingDomainMetadata::ROW_TRACKING_DOMAIN_NAME.to_string(),
6666
serde_json::to_string(&metadata)?,
67-
false,
6867
))
6968
}
7069
}

kernel/src/transaction/mod.rs

Lines changed: 162 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ pub struct Transaction {
127127
// commit-wide timestamp (in milliseconds since epoch) - used in ICT, `txn` action, etc. to
128128
// keep all timestamps within the same commit consistent.
129129
commit_timestamp: i64,
130+
domain_metadatas: Vec<DomainMetadata>,
130131
}
131132

132133
impl std::fmt::Debug for Transaction {
@@ -163,6 +164,7 @@ impl Transaction {
163164
add_files_metadata: vec![],
164165
set_transactions: vec![],
165166
commit_timestamp,
167+
domain_metadatas: vec![],
166168
})
167169
}
168170

@@ -200,29 +202,22 @@ impl Transaction {
200202
let commit_info_action =
201203
commit_info.into_engine_data(get_log_commit_info_schema().clone(), engine);
202204

203-
// Step 3: Generate add actions with or without row tracking metadata
205+
// Step 3: Generate add actions and get data for domain metadata actions (e.g. row tracking high watermark)
204206
let commit_version = self.read_snapshot.version() + 1;
205-
let add_actions = if self
206-
.read_snapshot
207-
.table_configuration()
208-
.should_write_row_tracking()
209-
{
210-
self.generate_adds_with_row_tracking(engine, commit_version)?
211-
} else {
212-
self.generate_adds(
213-
engine,
214-
self.add_files_metadata.iter().map(|a| Ok(a.deref())),
215-
add_files_schema().clone(),
216-
as_log_add_schema(with_stats_col(mandatory_add_file_schema())),
217-
)
218-
};
207+
let (add_actions, row_tracking_domain_metadata) =
208+
self.generate_adds(engine, commit_version)?;
209+
210+
// Step 4: Generate all domain metadata actions (user and system domains)
211+
let domain_metadata_actions =
212+
self.generate_domain_metadata_actions(engine, row_tracking_domain_metadata)?;
219213

220-
// Step 4: Commit the actions as a JSON file to the Delta log
214+
// Step 5: Commit the actions as a JSON file to the Delta log
221215
let commit_path =
222216
ParsedLogPath::new_commit(self.read_snapshot.table_root(), commit_version)?;
223217
let actions = iter::once(commit_info_action)
224218
.chain(add_actions)
225-
.chain(set_transaction_actions);
219+
.chain(set_transaction_actions)
220+
.chain(domain_metadata_actions);
226221

227222
let json_handler = engine.json_handler();
228223
match json_handler.write_json_file(&commit_path.location, Box::new(actions), false) {
@@ -270,6 +265,66 @@ impl Transaction {
270265
self
271266
}
272267

268+
/// Set domain metadata to be written to the Delta log.
269+
/// Note that each domain can only appear once per transaction. That is, multiple configurations
270+
/// of the same domain are disallowed in a single transaction, as well as setting and removing
271+
/// the same domain in a single transaction. If a duplicate domain is included, the commit will
272+
/// fail (that is, we don't eagerly check domain validity here).
273+
/// Setting metadata for multiple distinct domains is allowed.
274+
pub fn with_domain_metadata(mut self, domain: String, configuration: String) -> Self {
275+
self.domain_metadatas
276+
.push(DomainMetadata::new(domain, configuration));
277+
self
278+
}
279+
280+
/// Generate domain metadata actions with validation. Handle both user and system domains.
281+
fn generate_domain_metadata_actions<'a>(
282+
&'a self,
283+
engine: &'a dyn Engine,
284+
row_tracking_high_watermark: Option<RowTrackingDomainMetadata>,
285+
) -> DeltaResult<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + 'a> {
286+
// if there are domain metadata actions, the table must support it
287+
if !self.domain_metadatas.is_empty()
288+
&& !self
289+
.read_snapshot
290+
.table_configuration()
291+
.is_domain_metadata_supported()
292+
{
293+
return Err(Error::unsupported(
294+
"Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature"
295+
));
296+
}
297+
298+
// validate domain metadata
299+
let mut domains = HashSet::new();
300+
for domain_metadata in &self.domain_metadatas {
301+
if domain_metadata.is_internal() {
302+
return Err(Error::Generic(
303+
"Cannot modify domains that start with 'delta.' as those are system controlled"
304+
.to_string(),
305+
));
306+
}
307+
if !domains.insert(domain_metadata.domain()) {
308+
return Err(Error::Generic(format!(
309+
"Metadata for domain {} already specified in this transaction",
310+
domain_metadata.domain()
311+
)));
312+
}
313+
}
314+
315+
let system_domains = row_tracking_high_watermark
316+
.map(DomainMetadata::try_from)
317+
.transpose()?
318+
.into_iter();
319+
320+
Ok(self
321+
.domain_metadatas
322+
.iter()
323+
.cloned()
324+
.chain(system_domains)
325+
.map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine)))
326+
}
327+
273328
// Generate the logical-to-physical transform expression which must be evaluated on every data
274329
// chunk before writing. At the moment, this is a transaction-wide expression.
275330
fn generate_logical_to_physical(&self) -> Expression {
@@ -309,109 +364,120 @@ impl Transaction {
309364
self.add_files_metadata.push(add_metadata);
310365
}
311366

312-
/// Convert file metadata provided by the engine into protocol-compliant add actions.
313-
fn generate_adds<'a, I, T>(
314-
&'a self,
315-
engine: &dyn Engine,
316-
add_files_metadata: I,
317-
input_schema: SchemaRef,
318-
output_schema: SchemaRef,
319-
) -> EngineDataResultIterator<'a>
320-
where
321-
I: Iterator<Item = DeltaResult<T>> + Send + 'a,
322-
T: Deref<Target = dyn EngineData> + Send + 'a,
323-
{
324-
let evaluation_handler = engine.evaluation_handler();
325-
326-
Box::new(add_files_metadata.map(move |add_files_batch| {
327-
// Convert stats to a JSON string and nest the add action in a top-level struct
328-
let adds_expr = Expression::struct_from([Expression::transform(
329-
Transform::new_top_level().with_replaced_field(
330-
"stats",
331-
Expression::unary(ToJson, Expression::column(["stats"])).into(),
332-
),
333-
)]);
334-
let adds_evaluator = evaluation_handler.new_expression_evaluator(
335-
input_schema.clone(),
336-
Arc::new(adds_expr),
337-
output_schema.clone().into(),
338-
);
339-
adds_evaluator.evaluate(add_files_batch?.deref())
340-
}))
341-
}
342-
343-
/// Extend file metadata provided by the engine with row tracking information and convert them into
344-
/// protocol-compliant add actions.
345-
fn generate_adds_with_row_tracking<'a>(
367+
/// Generate add actions, handling row tracking internally if needed
368+
fn generate_adds<'a>(
346369
&'a self,
347370
engine: &dyn Engine,
348371
commit_version: u64,
349-
) -> DeltaResult<EngineDataResultIterator<'a>> {
350-
// Return early if we have nothing to add
372+
) -> DeltaResult<(
373+
EngineDataResultIterator<'a>,
374+
Option<RowTrackingDomainMetadata>,
375+
)> {
376+
fn build_add_actions<'a, I, T>(
377+
engine: &dyn Engine,
378+
add_files_metadata: I,
379+
input_schema: SchemaRef,
380+
output_schema: SchemaRef,
381+
) -> impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + 'a
382+
where
383+
I: Iterator<Item = DeltaResult<T>> + Send + 'a,
384+
T: Deref<Target = dyn EngineData> + Send + 'a,
385+
{
386+
let evaluation_handler = engine.evaluation_handler();
387+
388+
add_files_metadata.map(move |add_files_batch| {
389+
// Convert stats to a JSON string and nest the add action in a top-level struct
390+
let adds_expr = Expression::struct_from([Expression::transform(
391+
Transform::new_top_level().with_replaced_field(
392+
"stats",
393+
Expression::unary(ToJson, Expression::column(["stats"])).into(),
394+
),
395+
)]);
396+
let adds_evaluator = evaluation_handler.new_expression_evaluator(
397+
input_schema.clone(),
398+
Arc::new(adds_expr),
399+
output_schema.clone().into(),
400+
);
401+
adds_evaluator.evaluate(add_files_batch?.deref())
402+
})
403+
}
404+
351405
if self.add_files_metadata.is_empty() {
352-
return Ok(Box::new(iter::empty()));
406+
return Ok((Box::new(iter::empty()), None));
353407
}
354408

355-
// Read the current rowIdHighWaterMark from the snapshot's row tracking domain metadata
356-
let row_id_high_water_mark =
357-
RowTrackingDomainMetadata::get_high_water_mark(&self.read_snapshot, engine)?;
409+
let commit_version = i64::try_from(commit_version)
410+
.map_err(|_| Error::generic("Commit version too large to fit in i64"))?;
358411

359-
// Create a row tracking visitor and visit all files to collect row tracking information
360-
let mut row_tracking_visitor =
361-
RowTrackingVisitor::new(row_id_high_water_mark, Some(self.add_files_metadata.len()));
412+
let needs_row_tracking = self
413+
.read_snapshot
414+
.table_configuration()
415+
.should_write_row_tracking();
362416

363-
// We visit all files with the row visitor before creating the add action iterator
364-
// because we need to know the final row ID high water mark to create the domain metadata action
365-
for add_files_batch in &self.add_files_metadata {
366-
row_tracking_visitor.visit_rows_of(add_files_batch.deref())?;
367-
}
417+
if needs_row_tracking {
418+
// Read the current rowIdHighWaterMark from the snapshot's row tracking domain metadata
419+
let row_id_high_water_mark =
420+
RowTrackingDomainMetadata::get_high_water_mark(&self.read_snapshot, engine)?;
368421

369-
// Deconstruct the row tracking visitor to avoid borrowing issues
370-
let RowTrackingVisitor {
371-
base_row_id_batches,
372-
row_id_high_water_mark,
373-
} = row_tracking_visitor;
422+
// Create a row tracking visitor and visit all files to collect row tracking information
423+
let mut row_tracking_visitor = RowTrackingVisitor::new(
424+
row_id_high_water_mark,
425+
Some(self.add_files_metadata.len()),
426+
);
374427

375-
// Generate a domain metadata action based on the final high water mark
376-
let domain_metadata =
377-
DomainMetadata::try_from(RowTrackingDomainMetadata::new(row_id_high_water_mark))?;
378-
let domain_metadata_action =
379-
domain_metadata.into_engine_data(get_log_domain_metadata_schema().clone(), engine);
428+
// We visit all files with the row visitor before creating the add action iterator
429+
// because we need to know the final row ID high water mark to create the domain metadata action
430+
for add_files_batch in &self.add_files_metadata {
431+
row_tracking_visitor.visit_rows_of(add_files_batch.deref())?;
432+
}
380433

381-
let commit_version = i64::try_from(commit_version)
382-
.map_err(|_| Error::generic("Commit version is too large to fit in i64"))?;
434+
// Deconstruct the row tracking visitor to avoid borrowing issues
435+
let RowTrackingVisitor {
436+
base_row_id_batches,
437+
row_id_high_water_mark,
438+
} = row_tracking_visitor;
383439

384-
// Create an iterator that pairs each add action with its row tracking metadata
385-
let extended_add_files_metadata =
386-
self.add_files_metadata.iter().zip(base_row_id_batches).map(
440+
// Create extended add files with row tracking columns
441+
let extended_add_files = self.add_files_metadata.iter().zip(base_row_id_batches).map(
387442
move |(add_files_batch, base_row_ids)| {
388443
let commit_versions = vec![commit_version; base_row_ids.len()];
389-
let base_row_ids =
444+
let base_row_ids_array =
390445
ArrayData::try_new(ArrayType::new(DataType::LONG, true), base_row_ids)?;
391-
let row_commit_versions =
446+
let commit_versions_array =
392447
ArrayData::try_new(ArrayType::new(DataType::LONG, true), commit_versions)?;
393448

394449
add_files_batch.append_columns(
395450
with_row_tracking_cols(&Arc::new(StructType::new_unchecked(vec![]))),
396-
vec![base_row_ids, row_commit_versions],
451+
vec![base_row_ids_array, commit_versions_array],
397452
)
398453
},
399454
);
400455

401-
// Generate add actions including row tracking metadata
402-
let add_actions = self.generate_adds(
403-
engine,
404-
extended_add_files_metadata,
405-
with_row_tracking_cols(add_files_schema()),
406-
as_log_add_schema(with_row_tracking_cols(&with_stats_col(
407-
mandatory_add_file_schema(),
408-
))),
409-
);
456+
let add_actions = build_add_actions(
457+
engine,
458+
extended_add_files,
459+
with_row_tracking_cols(add_files_schema()),
460+
as_log_add_schema(with_row_tracking_cols(&with_stats_col(
461+
mandatory_add_file_schema(),
462+
))),
463+
);
410464

411-
// Return a chained iterator with add and domain metadata actions
412-
Ok(Box::new(
413-
add_actions.chain(iter::once(domain_metadata_action)),
414-
))
465+
// Generate a row tracking domain metadata based on the final high water mark
466+
let row_tracking_domain_metadata: RowTrackingDomainMetadata =
467+
RowTrackingDomainMetadata::new(row_id_high_water_mark);
468+
469+
Ok((Box::new(add_actions), Some(row_tracking_domain_metadata)))
470+
} else {
471+
// Simple case without row tracking
472+
let add_actions = build_add_actions(
473+
engine,
474+
self.add_files_metadata.iter().map(|a| Ok(a.deref())),
475+
add_files_schema().clone(),
476+
as_log_add_schema(with_stats_col(mandatory_add_file_schema())),
477+
);
478+
479+
Ok((Box::new(add_actions), None))
480+
}
415481
}
416482
}
417483

0 commit comments

Comments
 (0)