Skip to content

Commit 22b5b3c

Browse files
committed
impl pr feedback
1 parent c475873 commit 22b5b3c

File tree

3 files changed

+72
-57
lines changed

3 files changed

+72
-57
lines changed

kernel/src/transaction/mod.rs

Lines changed: 41 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ impl Transaction {
285285
engine: &'a dyn Engine,
286286
domain_metadatas: &'a [DomainMetadata],
287287
read_snapshot: &Snapshot,
288-
row_tracking_high_watermark: Option<i64>,
288+
row_tracking_high_watermark: Option<RowTrackingDomainMetadata>,
289289
) -> DeltaResult<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + 'a> {
290290
// if there are domain metadata actions, the table must support it
291291
if !domain_metadatas.is_empty()
@@ -315,7 +315,7 @@ impl Transaction {
315315
}
316316

317317
let system_domains = row_tracking_high_watermark
318-
.map(|hwm| DomainMetadata::try_from(RowTrackingDomainMetadata::new(hwm)))
318+
.map(|hwm| DomainMetadata::try_from(hwm))
319319
.transpose()?
320320
.into_iter();
321321

@@ -370,7 +370,39 @@ impl Transaction {
370370
&'a self,
371371
engine: &dyn Engine,
372372
commit_version: u64,
373-
) -> DeltaResult<(EngineDataResultIterator<'a>, Option<i64>)> {
373+
) -> DeltaResult<(
374+
EngineDataResultIterator<'a>,
375+
Option<RowTrackingDomainMetadata>,
376+
)> {
377+
fn build_add_actions<'a, I, T>(
378+
engine: &dyn Engine,
379+
add_files_metadata: I,
380+
input_schema: SchemaRef,
381+
output_schema: SchemaRef,
382+
) -> impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + 'a
383+
where
384+
I: Iterator<Item = DeltaResult<T>> + Send + 'a,
385+
T: Deref<Target = dyn EngineData> + Send + 'a,
386+
{
387+
let evaluation_handler = engine.evaluation_handler();
388+
389+
add_files_metadata.map(move |add_files_batch| {
390+
// Convert stats to a JSON string and nest the add action in a top-level struct
391+
let adds_expr = Expression::struct_from([Expression::transform(
392+
Transform::new_top_level().with_replaced_field(
393+
"stats",
394+
Expression::unary(ToJson, Expression::column(["stats"])).into(),
395+
),
396+
)]);
397+
let adds_evaluator = evaluation_handler.new_expression_evaluator(
398+
input_schema.clone(),
399+
Arc::new(adds_expr),
400+
output_schema.clone().into(),
401+
);
402+
adds_evaluator.evaluate(add_files_batch?.deref())
403+
})
404+
}
405+
374406
if self.add_files_metadata.is_empty() {
375407
return Ok((Box::new(iter::empty()), None));
376408
}
@@ -411,7 +443,7 @@ impl Transaction {
411443
},
412444
);
413445

414-
let add_actions = self.create_add_actions(
446+
let add_actions = build_add_actions(
415447
engine,
416448
extended_add_files,
417449
with_row_tracking_cols(add_files_schema()),
@@ -420,13 +452,13 @@ impl Transaction {
420452
))),
421453
);
422454

423-
Ok((
424-
Box::new(add_actions),
425-
Some(row_tracking_visitor.row_id_high_water_mark),
426-
))
455+
let row_tracking_domain_metadata =
456+
RowTrackingDomainMetadata::new(row_tracking_visitor.row_id_high_water_mark);
457+
458+
Ok((Box::new(add_actions), Some(row_tracking_domain_metadata)))
427459
} else {
428460
// Simple case without row tracking
429-
let add_actions = self.create_add_actions(
461+
let add_actions = build_add_actions(
430462
engine,
431463
self.add_files_metadata.iter().map(|a| Ok(a.deref())),
432464
add_files_schema().clone(),
@@ -436,37 +468,6 @@ impl Transaction {
436468
Ok((Box::new(add_actions), None))
437469
}
438470
}
439-
440-
/// Convert file metadata provided by the engine into protocol-compliant add actions.
441-
fn create_add_actions<'a, I, T>(
442-
&'a self,
443-
engine: &dyn Engine,
444-
add_files_metadata: I,
445-
input_schema: SchemaRef,
446-
output_schema: SchemaRef,
447-
) -> impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + 'a
448-
where
449-
I: Iterator<Item = DeltaResult<T>> + Send + 'a,
450-
T: Deref<Target = dyn EngineData> + Send + 'a,
451-
{
452-
let evaluation_handler = engine.evaluation_handler();
453-
454-
add_files_metadata.map(move |add_files_batch| {
455-
// Convert stats to a JSON string and nest the add action in a top-level struct
456-
let adds_expr = Expression::struct_from([Expression::transform(
457-
Transform::new_top_level().with_replaced_field(
458-
"stats",
459-
Expression::unary(ToJson, Expression::column(["stats"])).into(),
460-
),
461-
)]);
462-
let adds_evaluator = evaluation_handler.new_expression_evaluator(
463-
input_schema.clone(),
464-
Arc::new(adds_expr),
465-
output_schema.clone().into(),
466-
);
467-
adds_evaluator.evaluate(add_files_batch?.deref())
468-
})
469-
}
470471
}
471472

472473
/// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to

kernel/tests/write.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ use tempfile::tempdir;
3131

3232
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
3333

34-
use test_utils::{create_table, engine_store_setup, setup_test_tables, test_read};
34+
use test_utils::{
35+
assert_result_error_with_message, create_table, engine_store_setup, setup_test_tables,
36+
test_read,
37+
};
3538

3639
mod common;
3740
use url::Url;
@@ -1283,24 +1286,24 @@ async fn test_set_domain_metadata_errors() -> Result<(), Box<dyn std::error::Err
12831286

12841287
// System domain rejection
12851288
let txn = snapshot.clone().transaction()?;
1286-
let err = txn
1289+
let res = txn
12871290
.with_domain_metadata("delta.system".to_string(), "config".to_string())
1288-
.commit(&engine)
1289-
.unwrap_err();
1290-
assert!(err
1291-
.to_string()
1292-
.contains("Users cannot modify system controlled metadata domains"));
1291+
.commit(&engine);
1292+
assert_result_error_with_message(
1293+
res,
1294+
"Users cannot modify system controlled metadata domains",
1295+
);
12931296

12941297
// Duplicate domain rejection
12951298
let txn2 = snapshot.clone().transaction()?;
1296-
let err = txn2
1299+
let res = txn2
12971300
.with_domain_metadata("app.config".to_string(), "v1".to_string())
12981301
.with_domain_metadata("app.config".to_string(), "v2".to_string())
1299-
.commit(&engine)
1300-
.unwrap_err();
1301-
assert!(err
1302-
.to_string()
1303-
.contains("Metadata for domain app.config already specified in this transaction"));
1302+
.commit(&engine);
1303+
assert_result_error_with_message(
1304+
res,
1305+
"Metadata for domain app.config already specified in this transaction",
1306+
);
13041307

13051308
Ok(())
13061309
}
@@ -1331,14 +1334,12 @@ async fn test_set_domain_metadata_unsupported_writer_feature(
13311334
.await?;
13321335

13331336
let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?);
1334-
let result = snapshot
1337+
let res = snapshot
13351338
.transaction()?
13361339
.with_domain_metadata("app.config".to_string(), "test_config".to_string())
13371340
.commit(&engine);
13381341

1339-
assert!(result.is_err());
1340-
let err = result.unwrap_err();
1341-
assert!(err.to_string().contains("Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature"));
1342+
assert_result_error_with_message(res, "Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature");
13421343

13431344
Ok(())
13441345
}

test-utils/src/lib.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,3 +426,16 @@ pub fn set_json_value(
426426
*v = new_value;
427427
Ok(())
428428
}
429+
430+
pub fn assert_result_error_with_message<T, E: ToString>(res: Result<T, E>, message: &str) {
431+
match res {
432+
Ok(_) => panic!("Expected error, but got Ok result"),
433+
Err(error) => {
434+
let error_str = error.to_string();
435+
assert!(
436+
error_str.contains(message),
437+
"Error message does not contain the expected message.\nExpected message:\t{message}\nActual message:\t\t{error_str}"
438+
);
439+
}
440+
}
441+
}

0 commit comments

Comments
 (0)