Skip to content

Commit c475873

Browse files
committed
CODE MOVEMENT/REFACTOR ONLY! Implements the comments left on 1239. No behavior change.
1 parent c25fda4 commit c475873

File tree

1 file changed

+91
-89
lines changed

1 file changed

+91
-89
lines changed

kernel/src/transaction/mod.rs

Lines changed: 91 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -202,30 +202,20 @@ impl Transaction {
202202
let commit_info_action =
203203
commit_info.into_engine_data(get_log_commit_info_schema().clone(), engine);
204204

205-
// Step 3: Generate add actions with or without row tracking metadata
205+
// Step 3: Generate add actions and get data for domain metadata actions (e.g. row tracking high watermark)
206206
let commit_version = self.read_snapshot.version() + 1;
207-
let add_actions = if self
208-
.read_snapshot
209-
.table_configuration()
210-
.should_write_row_tracking()
211-
{
212-
self.generate_adds_with_row_tracking(engine, commit_version)?
213-
} else {
214-
self.generate_adds(
215-
engine,
216-
self.add_files_metadata.iter().map(|a| Ok(a.deref())),
217-
add_files_schema().clone(),
218-
as_log_add_schema(with_stats_col(mandatory_add_file_schema())),
219-
)
220-
};
207+
let (add_actions, row_tracking_high_watermark) =
208+
self.generate_adds(engine, commit_version)?;
221209

210+
// Step 4: Generate all domain metadata actions (user and system domains)
222211
let domain_metadata_actions = Self::generate_domain_metadata_actions(
223212
engine,
224213
&self.domain_metadatas,
225214
&self.read_snapshot,
215+
row_tracking_high_watermark,
226216
)?;
227217

228-
// Step 4: Commit the actions as a JSON file to the Delta log
218+
// Step 5: Commit the actions as a JSON file to the Delta log
229219
let commit_path =
230220
ParsedLogPath::new_commit(self.read_snapshot.table_root(), commit_version)?;
231221
let actions = iter::once(commit_info_action)
@@ -290,11 +280,12 @@ impl Transaction {
290280
self
291281
}
292282

293-
/// Generate domain metadata actions with validation.
283+
/// Generate domain metadata actions with validation. Handle both user and system domains.
294284
fn generate_domain_metadata_actions<'a>(
295285
engine: &'a dyn Engine,
296286
domain_metadatas: &'a [DomainMetadata],
297287
read_snapshot: &Snapshot,
288+
row_tracking_high_watermark: Option<i64>,
298289
) -> DeltaResult<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + 'a> {
299290
// if there are domain metadata actions, the table must support it
300291
if !domain_metadatas.is_empty()
@@ -323,10 +314,16 @@ impl Transaction {
323314
}
324315
}
325316

317+
let system_domains = row_tracking_high_watermark
318+
.map(|hwm| DomainMetadata::try_from(RowTrackingDomainMetadata::new(hwm)))
319+
.transpose()?
320+
.into_iter();
321+
326322
Ok(domain_metadatas
327323
.iter()
328324
.cloned()
329-
.map(move |dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine)))
325+
.chain(system_domains)
326+
.map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine)))
330327
}
331328

332329
// Generate the logical-to-physical transform expression which must be evaluated on every data
@@ -368,21 +365,93 @@ impl Transaction {
368365
self.add_files_metadata.push(add_metadata);
369366
}
370367

368+
/// Generate add actions, handling row tracking internally if needed
369+
fn generate_adds<'a>(
370+
&'a self,
371+
engine: &dyn Engine,
372+
commit_version: u64,
373+
) -> DeltaResult<(EngineDataResultIterator<'a>, Option<i64>)> {
374+
if self.add_files_metadata.is_empty() {
375+
return Ok((Box::new(iter::empty()), None));
376+
}
377+
378+
let commit_version = i64::try_from(commit_version)
379+
.map_err(|_| Error::generic("Commit version too large to fit in i64"))?;
380+
381+
let needs_row_tracking = self
382+
.read_snapshot
383+
.table_configuration()
384+
.should_write_row_tracking();
385+
386+
if needs_row_tracking {
387+
let row_id_high_water_mark =
388+
RowTrackingDomainMetadata::get_high_water_mark(&self.read_snapshot, engine)?;
389+
let mut row_tracking_visitor = RowTrackingVisitor::new(row_id_high_water_mark);
390+
let mut base_row_id_batches = Vec::with_capacity(self.add_files_metadata.len());
391+
392+
// Visit all files to collect row tracking info
393+
for add_files_batch in &self.add_files_metadata {
394+
row_tracking_visitor.visit_rows_of(add_files_batch.deref())?;
395+
base_row_id_batches.push(row_tracking_visitor.base_row_ids.clone());
396+
}
397+
398+
// Create extended add files with row tracking columns
399+
let extended_add_files = self.add_files_metadata.iter().zip(base_row_id_batches).map(
400+
move |(add_files_batch, base_row_ids)| {
401+
let commit_versions = vec![commit_version; base_row_ids.len()];
402+
let base_row_ids_array =
403+
ArrayData::try_new(ArrayType::new(DataType::LONG, true), base_row_ids)?;
404+
let commit_versions_array =
405+
ArrayData::try_new(ArrayType::new(DataType::LONG, true), commit_versions)?;
406+
407+
add_files_batch.append_columns(
408+
with_row_tracking_cols(&Arc::new(StructType::new(vec![]))),
409+
vec![base_row_ids_array, commit_versions_array],
410+
)
411+
},
412+
);
413+
414+
let add_actions = self.create_add_actions(
415+
engine,
416+
extended_add_files,
417+
with_row_tracking_cols(add_files_schema()),
418+
as_log_add_schema(with_row_tracking_cols(&with_stats_col(
419+
mandatory_add_file_schema(),
420+
))),
421+
);
422+
423+
Ok((
424+
Box::new(add_actions),
425+
Some(row_tracking_visitor.row_id_high_water_mark),
426+
))
427+
} else {
428+
// Simple case without row tracking
429+
let add_actions = self.create_add_actions(
430+
engine,
431+
self.add_files_metadata.iter().map(|a| Ok(a.deref())),
432+
add_files_schema().clone(),
433+
as_log_add_schema(with_stats_col(mandatory_add_file_schema())),
434+
);
435+
436+
Ok((Box::new(add_actions), None))
437+
}
438+
}
439+
371440
/// Convert file metadata provided by the engine into protocol-compliant add actions.
372-
fn generate_adds<'a, I, T>(
441+
fn create_add_actions<'a, I, T>(
373442
&'a self,
374443
engine: &dyn Engine,
375444
add_files_metadata: I,
376445
input_schema: SchemaRef,
377446
output_schema: SchemaRef,
378-
) -> EngineDataResultIterator<'a>
447+
) -> impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + 'a
379448
where
380449
I: Iterator<Item = DeltaResult<T>> + Send + 'a,
381450
T: Deref<Target = dyn EngineData> + Send + 'a,
382451
{
383452
let evaluation_handler = engine.evaluation_handler();
384453

385-
Box::new(add_files_metadata.map(move |add_files_batch| {
454+
add_files_metadata.map(move |add_files_batch| {
386455
// Convert stats to a JSON string and nest the add action in a top-level struct
387456
let adds_expr = Expression::struct_from([Expression::transform(
388457
Transform::new_top_level().with_replaced_field(
@@ -396,74 +465,7 @@ impl Transaction {
396465
output_schema.clone().into(),
397466
);
398467
adds_evaluator.evaluate(add_files_batch?.deref())
399-
}))
400-
}
401-
402-
/// Extend file metadata provided by the engine with row tracking information and convert them into
403-
/// protocol-compliant add actions.
404-
fn generate_adds_with_row_tracking<'a>(
405-
&'a self,
406-
engine: &dyn Engine,
407-
commit_version: u64,
408-
) -> DeltaResult<EngineDataResultIterator<'a>> {
409-
// Return early if we have nothing to add
410-
if self.add_files_metadata.is_empty() {
411-
return Ok(Box::new(iter::empty()));
412-
}
413-
414-
// Read the current rowIdHighWaterMark from the snapshot's row tracking domain metadata
415-
let row_id_high_water_mark =
416-
RowTrackingDomainMetadata::get_high_water_mark(&self.read_snapshot, engine)?;
417-
418-
// Create a row tracking visitor and visit all files to collect row tracking information
419-
let mut row_tracking_visitor = RowTrackingVisitor::new(row_id_high_water_mark);
420-
let mut base_row_id_batches = Vec::with_capacity(self.add_files_metadata.len());
421-
422-
// We visit all files with the row visitor before creating the add action iterator
423-
// because we need to know the final row ID high water mark to create the domain metadata action
424-
for add_files_batch in &self.add_files_metadata {
425-
row_tracking_visitor.visit_rows_of(add_files_batch.deref())?;
426-
base_row_id_batches.push(row_tracking_visitor.base_row_ids.clone());
427-
}
428-
429-
// Generate a domain metadata action based on the final high water mark
430-
let domain_metadata = DomainMetadata::try_from(RowTrackingDomainMetadata::new(
431-
row_tracking_visitor.row_id_high_water_mark,
432-
))?;
433-
let domain_metadata_action =
434-
domain_metadata.into_engine_data(get_log_domain_metadata_schema().clone(), engine);
435-
436-
// Create an iterator that pairs each add action with its row tracking metadata
437-
let extended_add_files_metadata =
438-
self.add_files_metadata.iter().zip(base_row_id_batches).map(
439-
move |(add_files_batch, base_row_ids)| {
440-
let commit_versions = vec![commit_version as i64; base_row_ids.len()];
441-
let base_row_ids =
442-
ArrayData::try_new(ArrayType::new(DataType::LONG, true), base_row_ids)?;
443-
let row_commit_versions =
444-
ArrayData::try_new(ArrayType::new(DataType::LONG, true), commit_versions)?;
445-
446-
add_files_batch.append_columns(
447-
with_row_tracking_cols(&Arc::new(StructType::new(vec![]))),
448-
vec![base_row_ids, row_commit_versions],
449-
)
450-
},
451-
);
452-
453-
// Generate add actions including row tracking metadata
454-
let add_actions = self.generate_adds(
455-
engine,
456-
extended_add_files_metadata,
457-
with_row_tracking_cols(add_files_schema()),
458-
as_log_add_schema(with_row_tracking_cols(&with_stats_col(
459-
mandatory_add_file_schema(),
460-
))),
461-
);
462-
463-
// Return a chained iterator with add and domain metadata actions
464-
Ok(Box::new(
465-
add_actions.chain(iter::once(domain_metadata_action)),
466-
))
468+
})
467469
}
468470
}
469471

0 commit comments

Comments
 (0)