Skip to content

Commit 7d90c15

Browse files
committed
impl feedback from 1239 (Lennart's PR)
1 parent 24dd4b0 commit 7d90c15

File tree

1 file changed

+59
-30
lines changed

1 file changed

+59
-30
lines changed

kernel/src/transaction/mod.rs

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ use crate::{
2121
RowVisitor, Version,
2222
};
2323

24-
/// Type alias for an iterator of [`EngineData`] results.
25-
type EngineDataResultIterator<'a> =
26-
Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'a>;
27-
2824
/// The minimal (i.e., mandatory) fields in an add action.
2925
pub(crate) static MANDATORY_ADD_FILE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
3026
Arc::new(StructType::new(vec![
@@ -99,6 +95,18 @@ fn with_row_tracking_cols(schema: &SchemaRef) -> SchemaRef {
9995
Arc::new(StructType::new(fields))
10096
}
10197

98+
/// Type alias for the complex return type of prepare_row_tracking_metadata:
99+
/// - Row tracking domain metadata actions (system metadata with "delta.rowTracking" domain)
100+
/// - Input schema for add actions (with row tracking columns)
101+
/// - Output schema for add actions (with row tracking columns)
102+
/// - Extended add files metadata (with row tracking columns appended)
103+
type RowTrackingMetadata<'a> = (
104+
Vec<DeltaResult<Box<dyn EngineData>>>,
105+
SchemaRef,
106+
SchemaRef,
107+
Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'a>,
108+
);
109+
102110
/// A transaction represents an in-progress write to a table. After creating a transaction, changes
103111
/// to the table may be staged via the transaction methods before calling `commit` to commit the
104112
/// changes to the table.
@@ -204,22 +212,36 @@ impl Transaction {
204212

205213
// Step 3: Generate add actions with or without row tracking metadata
206214
let commit_version = self.read_snapshot.version() + 1;
207-
let add_actions = if self
215+
216+
let (row_tracking_domain_metadata_actions, add_actions): (
217+
Vec<_>,
218+
Box<dyn Iterator<Item = _> + Send>,
219+
) = if self
208220
.read_snapshot
209221
.table_configuration()
210222
.should_write_row_tracking()
211223
{
212-
self.generate_adds_with_row_tracking(engine, commit_version)?
224+
let (domain_metadata, input_schema, output_schema, extended_add_files_metadata) =
225+
self.prepare_row_tracking_metadata(engine, commit_version)?;
226+
227+
let add_actions = self.generate_adds(
228+
engine,
229+
extended_add_files_metadata,
230+
input_schema,
231+
output_schema,
232+
);
233+
(domain_metadata, Box::new(add_actions))
213234
} else {
214-
self.generate_adds(
235+
let add_actions = self.generate_adds(
215236
engine,
216-
self.add_files_metadata.iter().map(|a| Ok(a.deref())),
237+
self.add_files_metadata.iter().map(|a| Ok(a.as_ref())),
217238
add_files_schema().clone(),
218239
as_log_add_schema(with_stats_col(mandatory_add_file_schema())),
219-
)
240+
);
241+
(vec![], Box::new(add_actions))
220242
};
221243

222-
let domain_metadata_actions = Self::generate_domain_metadata_actions(
244+
let user_domain_metadata_actions = Self::generate_domain_metadata_actions(
223245
engine,
224246
&self.domain_metadatas,
225247
&self.read_snapshot,
@@ -228,6 +250,9 @@ impl Transaction {
228250
// Step 4: Commit the actions as a JSON file to the Delta log
229251
let commit_path =
230252
ParsedLogPath::new_commit(self.read_snapshot.table_root(), commit_version)?;
253+
let domain_metadata_actions =
254+
user_domain_metadata_actions.chain(row_tracking_domain_metadata_actions);
255+
231256
let actions = iter::once(commit_info_action)
232257
.chain(set_transaction_actions)
233258
.chain(add_actions)
@@ -375,14 +400,14 @@ impl Transaction {
375400
add_files_metadata: I,
376401
input_schema: SchemaRef,
377402
output_schema: SchemaRef,
378-
) -> EngineDataResultIterator<'a>
403+
) -> impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'a
379404
where
380405
I: Iterator<Item = DeltaResult<T>> + Send + 'a,
381406
T: Deref<Target = dyn EngineData> + Send + 'a,
382407
{
383408
let evaluation_handler = engine.evaluation_handler();
384409

385-
Box::new(add_files_metadata.map(move |add_files_batch| {
410+
add_files_metadata.map(move |add_files_batch| {
386411
// Convert stats to a JSON string and nest the add action in a top-level struct
387412
let adds_expr = Expression::struct_from([Expression::transform(
388413
Transform::new_top_level().with_replaced_field(
@@ -396,19 +421,31 @@ impl Transaction {
396421
output_schema.clone().into(),
397422
);
398423
adds_evaluator.evaluate(add_files_batch?.deref())
399-
}))
424+
})
400425
}
401426

402-
/// Extend file metadata provided by the engine with row tracking information and convert them into
403-
/// protocol-compliant add actions.
404-
fn generate_adds_with_row_tracking<'a>(
427+
/// When row tracking is enabled, prepare row tracking domain metadata and add files metadata with row tracking columns.
428+
///
429+
/// Returns:
430+
/// - Row tracking domain metadata actions (system metadata with "delta.rowTracking" domain)
431+
/// - Input schema for add actions (with row tracking columns)
432+
/// - Output schema for add actions (with row tracking columns)
433+
/// - Extended add files metadata (with row tracking columns appended)
434+
fn prepare_row_tracking_metadata<'a>(
405435
&'a self,
406436
engine: &dyn Engine,
407437
commit_version: u64,
408-
) -> DeltaResult<EngineDataResultIterator<'a>> {
409-
// Return early if we have nothing to add
438+
) -> DeltaResult<RowTrackingMetadata<'a>> {
439+
// There is nothing to add
410440
if self.add_files_metadata.is_empty() {
411-
return Ok(Box::new(iter::empty()));
441+
return Ok((
442+
vec![],
443+
with_row_tracking_cols(add_files_schema()),
444+
as_log_add_schema(with_row_tracking_cols(&with_stats_col(
445+
mandatory_add_file_schema(),
446+
))),
447+
Box::new(iter::empty()),
448+
));
412449
}
413450

414451
// Read the current rowIdHighWaterMark from the snapshot's row tracking domain metadata
@@ -426,14 +463,12 @@ impl Transaction {
426463
base_row_id_batches.push(row_tracking_visitor.base_row_ids.clone());
427464
}
428465

429-
// Generate a domain metadata action based on the final high water mark
430466
let domain_metadata = DomainMetadata::try_from(RowTrackingDomainMetadata::new(
431467
row_tracking_visitor.row_id_high_water_mark,
432468
))?;
433469
let domain_metadata_action =
434470
domain_metadata.into_engine_data(get_log_domain_metadata_schema().clone(), engine);
435471

436-
// Create an iterator that pairs each add action with its row tracking metadata
437472
let extended_add_files_metadata =
438473
self.add_files_metadata.iter().zip(base_row_id_batches).map(
439474
move |(add_files_batch, base_row_ids)| {
@@ -450,19 +485,13 @@ impl Transaction {
450485
},
451486
);
452487

453-
// Generate add actions including row tracking metadata
454-
let add_actions = self.generate_adds(
455-
engine,
456-
extended_add_files_metadata,
488+
Ok((
489+
vec![domain_metadata_action],
457490
with_row_tracking_cols(add_files_schema()),
458491
as_log_add_schema(with_row_tracking_cols(&with_stats_col(
459492
mandatory_add_file_schema(),
460493
))),
461-
);
462-
463-
// Return a chained iterator with add and domain metadata actions
464-
Ok(Box::new(
465-
add_actions.chain(iter::once(domain_metadata_action)),
494+
Box::new(extended_add_files_metadata),
466495
))
467496
}
468497
}

0 commit comments

Comments
 (0)