Skip to content

Commit 38d4963

Browse files
committed
Address review comments
1 parent 8c64146 commit 38d4963

File tree

2 files changed

+64
-89
lines changed

2 files changed

+64
-89
lines changed

kernel/src/engine/arrow_utils.rs

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -568,32 +568,32 @@ fn get_indices(
568568
// some fields are missing, but they might be nullable or metadata columns, need to insert them into the reorder_indices
569569
for (requested_position, field) in requested_schema.fields().enumerate() {
570570
if !found_fields.contains(field.name()) {
571-
if let Some(metadata_spec) = field.get_metadata_column_spec() {
572-
match metadata_spec {
573-
MetadataColumnSpec::RowIndex => {
574-
debug!("Inserting a row index column: {}", field.name());
575-
reorder_indices.push(ReorderIndex::row_index(
576-
requested_position,
577-
Arc::new(field.try_into_arrow()?),
578-
));
579-
}
580-
_ => {
581-
return Err(Error::Generic(format!(
582-
"Metadata column {metadata_spec:?} is not supported by the default parquet reader"
583-
)));
584-
}
571+
match field.get_metadata_column_spec() {
572+
Some(MetadataColumnSpec::RowIndex) => {
573+
debug!("Inserting a row index column: {}", field.name());
574+
reorder_indices.push(ReorderIndex::row_index(
575+
requested_position,
576+
Arc::new(field.try_into_arrow()?),
577+
));
578+
}
579+
Some(metadata_spec) => {
580+
return Err(Error::Generic(format!(
581+
"Metadata column {metadata_spec:?} is not supported by the default parquet reader"
582+
)));
583+
}
584+
None if field.nullable => {
585+
debug!("Inserting missing and nullable field: {}", field.name());
586+
reorder_indices.push(ReorderIndex::missing(
587+
requested_position,
588+
Arc::new(field.try_into_arrow()?),
589+
));
590+
}
591+
None => {
592+
return Err(Error::Generic(format!(
593+
"Requested field not found in parquet schema, and field is not nullable: {}",
594+
field.name()
595+
)));
585596
}
586-
} else if field.nullable {
587-
debug!("Inserting missing and nullable field: {}", field.name());
588-
reorder_indices.push(ReorderIndex::missing(
589-
requested_position,
590-
Arc::new(field.try_into_arrow()?),
591-
));
592-
} else {
593-
return Err(Error::Generic(format!(
594-
"Requested field not found in parquet schema, and field is not nullable: {}",
595-
field.name()
596-
)));
597597
}
598598
}
599599
}
@@ -821,15 +821,12 @@ pub(crate) fn reorder_struct_array(
821821
row_indexes.take(num_rows).collect();
822822
require!(
823823
row_index_array.len() == num_rows,
824-
Error::internal_error(format!(
825-
"Row index iterator exhausted after only {} of {} rows",
826-
row_index_array.len(),
827-
num_rows
828-
))
824+
Error::internal_error(
825+
"Row index iterator exhausted before reaching the end of the file"
826+
)
829827
);
830-
let field = field.clone(); // cheap Arc clone
831828
final_fields_cols[reorder_index.index] =
832-
Some((field, Arc::new(row_index_array)));
829+
Some((Arc::clone(field), Arc::new(row_index_array)));
833830
}
834831
}
835832
}
@@ -856,9 +853,6 @@ fn reorder_list<O: OffsetSizeTrait>(
856853
let (list_field, offset_buffer, maybe_sa, null_buf) = list_array.into_parts();
857854
if let Some(struct_array) = maybe_sa.as_struct_opt() {
858855
let struct_array = struct_array.clone();
859-
// WARNING: We cannot naively plumb through our caller's row index iterator, because each
860-
// array element of a given row must replicate the row's index and empty arrays must drop
861-
// that row's index. For now, just don't support it (Delta doesn't need the capability).
862856
let result_array = Arc::new(reorder_struct_array(
863857
struct_array,
864858
children,

kernel/tests/read.rs

Lines changed: 35 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1487,67 +1487,48 @@ async fn test_unsupported_metadata_columns() -> Result<(), Box<dyn std::error::E
14871487
Arc::new(TokioBackgroundExecutor::new()),
14881488
));
14891489

1490-
// Test reading the RowId metadata column (should fail)
1491-
let snapshot1 = Snapshot::builder(location.clone()).build(engine.as_ref())?;
1492-
let schema = Arc::new(StructType::try_new([
1493-
StructField::nullable("id", DataType::INTEGER),
1494-
StructField::create_metadata_column("row_id", MetadataColumnSpec::RowId),
1495-
])?);
1496-
let scan = snapshot1.into_scan_builder().with_schema(schema).build()?;
1497-
let stream = scan.execute(engine.clone())?;
1498-
1499-
let mut found_error = false;
1500-
for scan_result in stream {
1501-
match scan_result {
1502-
Err(e) => {
1503-
let error_msg = e.to_string();
1504-
println!("error_msg: {}", error_msg);
1505-
if error_msg.contains("RowId") && error_msg.contains("not supported") {
1506-
found_error = true;
1507-
break;
1508-
}
1509-
}
1510-
Ok(_) => {
1511-
panic!("Expected error for RowId metadata column, but scan succeeded");
1512-
}
1513-
}
1514-
}
1515-
assert!(
1516-
found_error,
1517-
"Expected error about RowId not being supported"
1518-
);
1519-
1520-
// Test reading the RowCommitVersion metadata column (should fail)
1521-
let snapshot2 = Snapshot::builder(location.clone()).build(engine.as_ref())?;
1522-
let schema = Arc::new(StructType::try_new([
1523-
StructField::nullable("id", DataType::INTEGER),
1524-
StructField::create_metadata_column(
1490+
// Test that unsupported metadata columns fail with appropriate errors
1491+
let test_cases = [
1492+
("row_id", MetadataColumnSpec::RowId, "RowId"),
1493+
(
15251494
"row_commit_version",
15261495
MetadataColumnSpec::RowCommitVersion,
1496+
"RowCommitVersion",
15271497
),
1528-
])?);
1529-
let scan = snapshot2.into_scan_builder().with_schema(schema).build()?;
1530-
let stream = scan.execute(engine)?;
1531-
1532-
let mut found_error = false;
1533-
for scan_result in stream {
1534-
match scan_result {
1535-
Err(e) => {
1536-
let error_msg = e.to_string();
1537-
if error_msg.contains("RowCommitVersion") && error_msg.contains("not supported") {
1538-
found_error = true;
1539-
break;
1498+
];
1499+
for (column_name, metadata_spec, error_text) in test_cases {
1500+
let snapshot = Snapshot::builder(location.clone()).build(engine.as_ref())?;
1501+
let schema = Arc::new(StructType::try_new([
1502+
StructField::nullable("id", DataType::INTEGER),
1503+
StructField::create_metadata_column(column_name, metadata_spec),
1504+
])?);
1505+
let scan = snapshot.into_scan_builder().with_schema(schema).build()?;
1506+
let stream = scan.execute(engine.clone())?;
1507+
1508+
let mut found_error = false;
1509+
for scan_result in stream {
1510+
match scan_result {
1511+
Err(e) => {
1512+
let error_msg = e.to_string();
1513+
if error_msg.contains(error_text) && error_msg.contains("not supported") {
1514+
found_error = true;
1515+
break;
1516+
}
1517+
}
1518+
Ok(_) => {
1519+
panic!(
1520+
"Expected error for {} metadata column, but scan succeeded",
1521+
error_text
1522+
);
15401523
}
1541-
}
1542-
Ok(_) => {
1543-
panic!("Expected error for RowCommitVersion metadata column, but scan succeeded");
15441524
}
15451525
}
1526+
assert!(
1527+
found_error,
1528+
"Expected error about {} not being supported",
1529+
error_text
1530+
);
15461531
}
1547-
assert!(
1548-
found_error,
1549-
"Expected error about RowCommitVersion not being supported"
1550-
);
15511532

15521533
Ok(())
15531534
}

0 commit comments

Comments
 (0)