@@ -202,22 +202,9 @@ impl Transaction {
202
202
let commit_info_action =
203
203
commit_info. into_engine_data ( get_log_commit_info_schema ( ) . clone ( ) , engine) ;
204
204
205
- // Step 3: Generate add actions with or without row tracking metadata
205
+ // Step 3: Generate add actions (handles row tracking internally)
206
206
let commit_version = self . read_snapshot . version ( ) + 1 ;
207
- let add_actions = if self
208
- . read_snapshot
209
- . table_configuration ( )
210
- . should_write_row_tracking ( )
211
- {
212
- self . generate_adds_with_row_tracking ( engine, commit_version) ?
213
- } else {
214
- self . generate_adds (
215
- engine,
216
- self . add_files_metadata . iter ( ) . map ( |a| Ok ( a. deref ( ) ) ) ,
217
- add_files_schema ( ) . clone ( ) ,
218
- as_log_add_schema ( with_stats_col ( mandatory_add_file_schema ( ) ) ) ,
219
- )
220
- } ;
207
+ let add_actions = self . generate_adds ( engine, commit_version) ?;
221
208
222
209
let domain_metadata_actions = Self :: generate_domain_metadata_actions (
223
210
engine,
@@ -368,22 +355,105 @@ impl Transaction {
368
355
self . add_files_metadata . push ( add_metadata) ;
369
356
}
370
357
371
- /// Convert file metadata provided by the engine into protocol-compliant add actions.
372
- fn generate_adds < ' a , I , T > (
358
+ /// Generate add actions, handling row tracking internally if needed
359
+ fn generate_adds < ' a > (
360
+ & ' a self ,
361
+ engine : & dyn Engine ,
362
+ commit_version : u64 ,
363
+ ) -> DeltaResult < EngineDataResultIterator < ' a > > {
364
+ if self . add_files_metadata . is_empty ( ) {
365
+ return Ok ( Box :: new ( iter:: empty ( ) ) ) ;
366
+ }
367
+
368
+ let needs_row_tracking = self
369
+ . read_snapshot
370
+ . table_configuration ( )
371
+ . should_write_row_tracking ( ) ;
372
+
373
+ // Set up variables based on row tracking needs, then make one clean call
374
+ let ( input_schema, output_schema, domain_metadata_action) : (
375
+ SchemaRef ,
376
+ SchemaRef ,
377
+ Option < DeltaResult < Box < dyn EngineData > > > ,
378
+ ) = if needs_row_tracking {
379
+ let row_id_high_water_mark =
380
+ RowTrackingDomainMetadata :: get_high_water_mark ( & self . read_snapshot , engine) ?;
381
+ let mut row_tracking_visitor = RowTrackingVisitor :: new ( row_id_high_water_mark) ;
382
+ let mut base_row_id_batches = Vec :: with_capacity ( self . add_files_metadata . len ( ) ) ;
383
+
384
+ // Visit all files to collect row tracking info
385
+ for add_files_batch in & self . add_files_metadata {
386
+ row_tracking_visitor. visit_rows_of ( add_files_batch. deref ( ) ) ?;
387
+ base_row_id_batches. push ( row_tracking_visitor. base_row_ids . clone ( ) ) ;
388
+ }
389
+
390
+ // Create domain metadata action
391
+ let domain_metadata = DomainMetadata :: try_from ( RowTrackingDomainMetadata :: new (
392
+ row_tracking_visitor. row_id_high_water_mark ,
393
+ ) ) ?;
394
+ let domain_metadata_action =
395
+ domain_metadata. into_engine_data ( get_log_domain_metadata_schema ( ) . clone ( ) , engine) ;
396
+
397
+ // Extend add files with row tracking columns (mutate in place)
398
+ for ( add_files_batch, base_row_ids) in
399
+ self . add_files_metadata . iter ( ) . zip ( base_row_id_batches)
400
+ {
401
+ let commit_versions = vec ! [ commit_version as i64 ; base_row_ids. len( ) ] ;
402
+ let base_row_ids_array =
403
+ ArrayData :: try_new ( ArrayType :: new ( DataType :: LONG , true ) , base_row_ids) ?;
404
+ let commit_versions_array =
405
+ ArrayData :: try_new ( ArrayType :: new ( DataType :: LONG , true ) , commit_versions) ?;
406
+
407
+ add_files_batch. append_columns (
408
+ with_row_tracking_cols ( & Arc :: new ( StructType :: new ( vec ! [ ] ) ) ) ,
409
+ vec ! [ base_row_ids_array, commit_versions_array] ,
410
+ ) ?;
411
+ }
412
+
413
+ (
414
+ with_row_tracking_cols ( add_files_schema ( ) ) ,
415
+ as_log_add_schema ( with_row_tracking_cols ( & with_stats_col (
416
+ mandatory_add_file_schema ( ) ,
417
+ ) ) ) ,
418
+ Some ( domain_metadata_action) ,
419
+ )
420
+ } else {
421
+ (
422
+ add_files_schema ( ) . clone ( ) ,
423
+ as_log_add_schema ( with_stats_col ( mandatory_add_file_schema ( ) ) ) ,
424
+ None ,
425
+ )
426
+ } ;
427
+
428
+ // Single call to create add actions with the same iterator each time
429
+ let add_actions = self . create_add_actions (
430
+ engine,
431
+ self . add_files_metadata . iter ( ) . map ( |a| Ok ( a. deref ( ) ) ) ,
432
+ input_schema,
433
+ output_schema,
434
+ ) ;
435
+
436
+ // Chain with domain metadata if present
437
+ Ok ( Box :: new (
438
+ add_actions. chain ( domain_metadata_action. into_iter ( ) ) ,
439
+ ) )
440
+ }
441
+
442
+ /// Convert file metadata into protocol-compliant add actions
443
+ fn create_add_actions < ' a , I , T > (
373
444
& ' a self ,
374
445
engine : & dyn Engine ,
375
446
add_files_metadata : I ,
376
447
input_schema : SchemaRef ,
377
448
output_schema : SchemaRef ,
378
- ) -> EngineDataResultIterator < ' a >
449
+ ) -> impl Iterator < Item = DeltaResult < Box < dyn EngineData > > > + Send + ' a
379
450
where
380
451
I : Iterator < Item = DeltaResult < T > > + Send + ' a ,
381
452
T : Deref < Target = dyn EngineData > + Send + ' a ,
382
453
{
383
454
let evaluation_handler = engine. evaluation_handler ( ) ;
384
455
385
- Box :: new ( add_files_metadata. map ( move |add_files_batch| {
386
- // Convert stats to a JSON string and nest the add action in a top-level struct
456
+ add_files_metadata. map ( move |add_files_batch| {
387
457
let adds_expr = Expression :: struct_from ( [ Expression :: transform (
388
458
Transform :: new_top_level ( ) . with_replaced_field (
389
459
"stats" ,
@@ -396,74 +466,7 @@ impl Transaction {
396
466
output_schema. clone ( ) . into ( ) ,
397
467
) ;
398
468
adds_evaluator. evaluate ( add_files_batch?. deref ( ) )
399
- } ) )
400
- }
401
-
402
- /// Extend file metadata provided by the engine with row tracking information and convert them into
403
- /// protocol-compliant add actions.
404
- fn generate_adds_with_row_tracking < ' a > (
405
- & ' a self ,
406
- engine : & dyn Engine ,
407
- commit_version : u64 ,
408
- ) -> DeltaResult < EngineDataResultIterator < ' a > > {
409
- // Return early if we have nothing to add
410
- if self . add_files_metadata . is_empty ( ) {
411
- return Ok ( Box :: new ( iter:: empty ( ) ) ) ;
412
- }
413
-
414
- // Read the current rowIdHighWaterMark from the snapshot's row tracking domain metadata
415
- let row_id_high_water_mark =
416
- RowTrackingDomainMetadata :: get_high_water_mark ( & self . read_snapshot , engine) ?;
417
-
418
- // Create a row tracking visitor and visit all files to collect row tracking information
419
- let mut row_tracking_visitor = RowTrackingVisitor :: new ( row_id_high_water_mark) ;
420
- let mut base_row_id_batches = Vec :: with_capacity ( self . add_files_metadata . len ( ) ) ;
421
-
422
- // We visit all files with the row visitor before creating the add action iterator
423
- // because we need to know the final row ID high water mark to create the domain metadata action
424
- for add_files_batch in & self . add_files_metadata {
425
- row_tracking_visitor. visit_rows_of ( add_files_batch. deref ( ) ) ?;
426
- base_row_id_batches. push ( row_tracking_visitor. base_row_ids . clone ( ) ) ;
427
- }
428
-
429
- // Generate a domain metadata action based on the final high water mark
430
- let domain_metadata = DomainMetadata :: try_from ( RowTrackingDomainMetadata :: new (
431
- row_tracking_visitor. row_id_high_water_mark ,
432
- ) ) ?;
433
- let domain_metadata_action =
434
- domain_metadata. into_engine_data ( get_log_domain_metadata_schema ( ) . clone ( ) , engine) ;
435
-
436
- // Create an iterator that pairs each add action with its row tracking metadata
437
- let extended_add_files_metadata =
438
- self . add_files_metadata . iter ( ) . zip ( base_row_id_batches) . map (
439
- move |( add_files_batch, base_row_ids) | {
440
- let commit_versions = vec ! [ commit_version as i64 ; base_row_ids. len( ) ] ;
441
- let base_row_ids =
442
- ArrayData :: try_new ( ArrayType :: new ( DataType :: LONG , true ) , base_row_ids) ?;
443
- let row_commit_versions =
444
- ArrayData :: try_new ( ArrayType :: new ( DataType :: LONG , true ) , commit_versions) ?;
445
-
446
- add_files_batch. append_columns (
447
- with_row_tracking_cols ( & Arc :: new ( StructType :: new ( vec ! [ ] ) ) ) ,
448
- vec ! [ base_row_ids, row_commit_versions] ,
449
- )
450
- } ,
451
- ) ;
452
-
453
- // Generate add actions including row tracking metadata
454
- let add_actions = self . generate_adds (
455
- engine,
456
- extended_add_files_metadata,
457
- with_row_tracking_cols ( add_files_schema ( ) ) ,
458
- as_log_add_schema ( with_row_tracking_cols ( & with_stats_col (
459
- mandatory_add_file_schema ( ) ,
460
- ) ) ) ,
461
- ) ;
462
-
463
- // Return a chained iterator with add and domain metadata actions
464
- Ok ( Box :: new (
465
- add_actions. chain ( iter:: once ( domain_metadata_action) ) ,
466
- ) )
469
+ } )
467
470
}
468
471
}
469
472
0 commit comments