Skip to content

Commit 1373bc4

Browse files
committed
feat!: Introduce metadata column API
1 parent 3ea5d34 commit 1373bc4

File tree

14 files changed

+720
-66
lines changed

14 files changed

+720
-66
lines changed

ffi/src/schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ pub unsafe extern "C" fn visit_schema(
222222
fn visit_schema_impl(schema: &StructType, visitor: &mut EngineSchemaVisitor) -> usize {
223223
// Visit all the fields of a struct and return the list of children
224224
fn visit_struct_fields(visitor: &EngineSchemaVisitor, s: &StructType) -> usize {
225-
let child_list_id = (visitor.make_field_list)(visitor.data, s.fields.len());
225+
let child_list_id = (visitor.make_field_list)(visitor.data, s.num_fields());
226226
for field in s.fields() {
227227
visit_schema_item(
228228
field.name(),

ffi/src/transaction/mod.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -274,11 +274,17 @@ mod tests {
274274
// Ensure we get the correct schema
275275
let write_schema = unsafe { get_write_schema(write_context.shallow_copy()) };
276276
let write_schema_ref = unsafe { write_schema.as_ref() };
277-
assert_eq!(write_schema_ref.fields.len(), 2);
278-
assert_eq!(write_schema_ref.fields[0].name, "number");
279-
assert_eq!(write_schema_ref.fields[0].data_type, DataType::INTEGER);
280-
assert_eq!(write_schema_ref.fields[1].name, "string");
281-
assert_eq!(write_schema_ref.fields[1].data_type, DataType::STRING);
277+
assert_eq!(write_schema_ref.num_fields(), 2);
278+
assert_eq!(write_schema_ref.field_at_index(0).unwrap().name, "number");
279+
assert_eq!(
280+
write_schema_ref.field_at_index(0).unwrap().data_type,
281+
DataType::INTEGER
282+
);
283+
assert_eq!(write_schema_ref.field_at_index(1).unwrap().name, "string");
284+
assert_eq!(
285+
write_schema_ref.field_at_index(1).unwrap().data_type,
286+
DataType::STRING
287+
);
282288

283289
// Ensure the ffi returns the correct table path
284290
let write_path = unsafe { get_write_path(write_context.shallow_copy(), allocate_str) };

kernel/src/actions/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ impl TryFrom<Format> for Scalar {
176176
)
177177
.map(Scalar::Map)?;
178178
Ok(Scalar::Struct(StructData::try_new(
179-
Format::to_schema().fields().cloned().collect(),
179+
Format::to_schema().into_fields().collect(),
180180
vec![provider, options],
181181
)?))
182182
}
@@ -190,6 +190,7 @@ impl TryFrom<Format> for Scalar {
190190
)]
191191
#[internal_api]
192192
pub(crate) struct Metadata {
193+
// TODO: Make the struct fields private to force using the try_new function.
193194
/// Unique identifier for this table
194195
pub(crate) id: String,
195196
/// User-provided identifier for this table
@@ -219,6 +220,16 @@ impl Metadata {
219220
created_time: i64,
220221
configuration: HashMap<String, String>,
221222
) -> DeltaResult<Self> {
223+
// Validate that the schema does not contain metadata columns
224+
// Note: We don't have to look for nested metadata columns because that is already validated
225+
// when creating a StructType.
226+
if let Some(metadata_field) = schema.fields().find(|field| field.is_metadata_column()) {
227+
return Err(Error::Schema(format!(
228+
"Table schema must not contain metadata columns. Found metadata column: '{}'",
229+
metadata_field.name
230+
)));
231+
}
232+
222233
Ok(Self {
223234
id: uuid::Uuid::new_v4().to_string(),
224235
name,

kernel/src/engine/arrow_expression/evaluate_expression.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,11 +179,11 @@ fn evaluate_transform_expression(
179179
}
180180

181181
// Verify that the lengths match before attempting to zip them below.
182-
if output_cols.len() != output_schema.fields_len() {
182+
if output_cols.len() != output_schema.num_fields() {
183183
return Err(Error::generic(format!(
184184
"Expression count ({}) doesn't match output schema field count ({})",
185185
output_cols.len(),
186-
output_schema.fields_len()
186+
output_schema.num_fields()
187187
)));
188188
}
189189

kernel/src/engine/arrow_expression/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ impl Scalar {
182182
// insert an entry for each child builder, even when we're inserting NULL.
183183
let builder = builder_as!(array::StructBuilder);
184184
require!(
185-
builder.num_fields() == stype.fields_len(),
185+
builder.num_fields() == stype.num_fields(),
186186
Error::generic("Struct builder has wrong number of fields")
187187
);
188188
for _ in 0..num_rows {

kernel/src/engine/arrow_utils.rs

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,8 @@ fn get_indices(
330330
fields: &ArrowFields,
331331
mask_indices: &mut Vec<usize>,
332332
) -> DeltaResult<(usize, Vec<ReorderIndex>)> {
333-
let mut found_fields = HashSet::with_capacity(requested_schema.fields.len());
334-
let mut reorder_indices = Vec::with_capacity(requested_schema.fields.len());
333+
let mut found_fields = HashSet::with_capacity(requested_schema.num_fields());
334+
let mut reorder_indices = Vec::with_capacity(requested_schema.num_fields());
335335
let mut parquet_offset = start_parquet_offset;
336336
// for each field, get its position in the parquet (via enumerate), a reference to the arrow
337337
// field, and info about where it appears in the requested_schema, or None if the field is not
@@ -507,10 +507,17 @@ fn get_indices(
507507
}
508508
}
509509

510-
if found_fields.len() != requested_schema.fields.len() {
510+
if found_fields.len() != requested_schema.num_fields() {
511511
// some fields are missing, but they might be nullable, need to insert them into the reorder_indices
512512
for (requested_position, field) in requested_schema.fields().enumerate() {
513513
if !found_fields.contains(field.name()) {
514+
if let Some(metadata_spec) = field.get_metadata_column_spec() {
515+
// We don't support reading any metadata columns yet
516+
// TODO: Implement row index support for the Parquet reader
517+
return Err(Error::Generic(format!(
518+
"Metadata column {metadata_spec:?} is not supported by the default parquet reader"
519+
)));
520+
}
514521
if field.nullable {
515522
debug!("Inserting missing and nullable field: {}", field.name());
516523
reorder_indices.push(ReorderIndex::missing(
@@ -582,11 +589,12 @@ fn match_parquet_fields<'k, 'p>(
582589
// Map the parquet ArrowField to the matching kernel KernelFieldInfo if present.
583590
let kernel_field_info =
584591
kernel_schema
585-
.fields
586-
.get_full(field_name)
587-
.map(|(idx, _name, field)| KernelFieldInfo {
588-
parquet_index: idx,
589-
field,
592+
.field_with_index(field_name)
593+
.and_then(|(idx, field)| {
594+
(!field.is_metadata_column()).then_some(KernelFieldInfo {
595+
parquet_index: idx,
596+
field,
597+
})
590598
});
591599

592600
MatchedParquetField {
@@ -1533,6 +1541,41 @@ mod tests {
15331541
]))
15341542
}
15351543

1544+
#[test]
1545+
fn test_match_parquet_fields_filters_metadata_columns() {
1546+
use crate::schema::MetadataColumnSpec;
1547+
1548+
let kernel_schema = StructType::new_unchecked([
1549+
StructField::not_null("regular_field", DataType::INTEGER),
1550+
StructField::create_metadata_column("row_index", MetadataColumnSpec::RowIndex),
1551+
StructField::nullable("another_field", DataType::STRING),
1552+
]);
1553+
1554+
let parquet_fields: ArrowFields = vec![
1555+
ArrowField::new("regular_field", ArrowDataType::Int32, false),
1556+
ArrowField::new("row_index", ArrowDataType::Int64, false),
1557+
ArrowField::new("another_field", ArrowDataType::Utf8, true),
1558+
]
1559+
.into();
1560+
1561+
let matched_fields: Vec<_> =
1562+
match_parquet_fields(&kernel_schema, &parquet_fields).collect();
1563+
1564+
assert_eq!(matched_fields.len(), 3);
1565+
1566+
// First field (regular_field) should have kernel_field_info
1567+
assert!(matched_fields[0].kernel_field_info.is_some());
1568+
assert_eq!(matched_fields[0].parquet_field.name(), "regular_field");
1569+
1570+
// Second field (row_index metadata column) should have None for kernel_field_info
1571+
assert!(matched_fields[1].kernel_field_info.is_none());
1572+
assert_eq!(matched_fields[1].parquet_field.name(), "row_index");
1573+
1574+
// Third field (another_field) should have kernel_field_info
1575+
assert!(matched_fields[2].kernel_field_info.is_some());
1576+
assert_eq!(matched_fields[2].parquet_field.name(), "another_field");
1577+
}
1578+
15361579
#[test]
15371580
fn nested_indices() {
15381581
column_mapping_cases().into_iter().for_each(|mode| {

kernel/src/engine/ensure_data_types.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl EnsureDataTypes {
110110
// build a list of kernel fields that matches the order of the arrow fields
111111
let mapped_fields = arrow_fields
112112
.iter()
113-
.filter_map(|f| kernel_fields.fields.get(f.name()));
113+
.filter_map(|f| kernel_fields.field(f.name()));
114114

115115
// keep track of how many fields we matched up
116116
let mut found_fields = 0;
@@ -122,12 +122,11 @@ impl EnsureDataTypes {
122122
}
123123

124124
// require that we found the number of fields that we requested.
125-
require!(kernel_fields.fields.len() == found_fields, {
125+
require!(kernel_fields.num_fields() == found_fields, {
126126
let arrow_field_map: HashSet<&String> =
127127
HashSet::from_iter(arrow_fields.iter().map(|f| f.name()));
128128
let missing_field_names = kernel_fields
129-
.fields
130-
.keys()
129+
.field_names()
131130
.filter(|kernel_field| !arrow_field_map.contains(kernel_field))
132131
.take(5)
133132
.join(", ");

kernel/src/row_tracking.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ impl RowVisitor for RowTrackingVisitor {
112112

113113
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
114114
require!(
115-
getters.len() == add_files_schema().fields_len(),
115+
getters.len() == add_files_schema().num_fields(),
116116
Error::generic(format!(
117117
"Wrong number of RowTrackingVisitor getters: {}",
118118
getters.len()
@@ -176,7 +176,7 @@ mod tests {
176176
let schema = add_files_schema();
177177
let mut getters: Vec<&'a dyn GetData<'a>> = Vec::new();
178178

179-
for i in 0..schema.fields_len() {
179+
for i in 0..schema.num_fields() {
180180
if i == RowTrackingVisitor::NUM_RECORDS_FIELD_INDEX {
181181
getters.push(num_records_mock);
182182
} else {

kernel/src/scan/log_replay.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ impl AddRemoveDedupVisitor<'_> {
127127
field_idx: usize,
128128
partition_values: &HashMap<String, String>,
129129
) -> DeltaResult<(usize, (String, Scalar))> {
130-
let field = self.logical_schema.fields.get_index(field_idx);
131-
let Some((_, field)) = field else {
130+
let field = self.logical_schema.field_at_index(field_idx);
131+
let Some(field) = field else {
132132
return Err(Error::InternalError(format!(
133133
"out of bounds partition column field index {field_idx}"
134134
)));

kernel/src/scan/mod.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,9 @@ impl StateInfo {
872872
column_mapping_mode: ColumnMappingMode,
873873
) -> DeltaResult<Self> {
874874
let mut have_partition_cols = false;
875-
let mut read_fields = Vec::with_capacity(logical_schema.fields.len());
875+
let mut read_fields = Vec::with_capacity(logical_schema.num_fields());
876+
let mut read_field_names = HashSet::with_capacity(logical_schema.num_fields());
877+
876878
// Loop over all selected fields and note if they are columns that will be read from the
877879
// parquet file ([`ColumnType::Selected`]) or if they are partition columns and will need to
878880
// be filled in by evaluating an expression ([`ColumnType::Partition`])
@@ -881,6 +883,13 @@ impl StateInfo {
881883
.enumerate()
882884
.map(|(index, logical_field)| -> DeltaResult<_> {
883885
if partition_columns.contains(logical_field.name()) {
886+
if logical_field.is_metadata_column() {
887+
return Err(Error::Schema(format!(
888+
"Metadata column names must not match partition columns: {}",
889+
logical_field.name()
890+
)));
891+
}
892+
884893
// Store the index into the schema for this field. When we turn it into an
885894
// expression in the inner loop, we will index into the schema and get the name and
886895
// data type, which we need to properly materialize the column.
@@ -893,10 +902,26 @@ impl StateInfo {
893902
debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n");
894903
let physical_name = physical_field.name.clone();
895904
read_fields.push(physical_field);
905+
906+
if !logical_field.is_metadata_column() {
907+
read_field_names.insert(physical_name.clone());
908+
}
909+
896910
Ok(ColumnType::Selected(physical_name))
897911
}
898912
})
899913
.try_collect()?;
914+
915+
// This iteration runs in O(3) time since each metadata column can appear at most once in the schema
916+
for metadata_column in logical_schema.metadata_columns() {
917+
if read_field_names.contains(metadata_column.name()) {
918+
return Err(Error::Schema(format!(
919+
"Metadata column names must not match physical columns: {}",
920+
metadata_column.name()
921+
)));
922+
}
923+
}
924+
900925
Ok(StateInfo {
901926
all_fields,
902927
read_fields,

0 commit comments

Comments
 (0)