Skip to content

Commit eaac1df

Browse files
committed
feat: Add write support for row tracking
1 parent 43ab6e3 commit eaac1df

File tree

13 files changed

+1201
-183
lines changed

13 files changed

+1201
-183
lines changed

ffi/src/transaction/mod.rs

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,15 @@ mod tests {
130130
use delta_kernel::schema::{DataType, StructField, StructType};
131131

132132
use delta_kernel::arrow::array::{Array, ArrayRef, Int32Array, StringArray, StructArray};
133-
use delta_kernel::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
133+
use delta_kernel::arrow::datatypes::Schema as ArrowSchema;
134134
use delta_kernel::arrow::ffi::to_ffi;
135135
use delta_kernel::arrow::json::reader::ReaderBuilder;
136136
use delta_kernel::arrow::record_batch::RecordBatch;
137+
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
137138
use delta_kernel::engine::arrow_data::ArrowEngineData;
138139
use delta_kernel::parquet::arrow::arrow_writer::ArrowWriter;
139140
use delta_kernel::parquet::file::properties::WriterProperties;
141+
use delta_kernel::transaction::add_files_schema;
140142

141143
use delta_kernel_ffi::engine_data::get_engine_data;
142144
use delta_kernel_ffi::engine_data::ArrowFFIData;
@@ -183,39 +185,15 @@ mod tests {
183185
path: &str,
184186
num_rows: i64,
185187
) -> Result<ArrowFFIData, Box<dyn std::error::Error>> {
186-
let schema = ArrowSchema::new(vec![
187-
Field::new("path", ArrowDataType::Utf8, false),
188-
Field::new(
189-
"partitionValues",
190-
ArrowDataType::Map(
191-
Arc::new(Field::new(
192-
"entries",
193-
ArrowDataType::Struct(
194-
vec![
195-
Field::new("key", ArrowDataType::Utf8, false),
196-
Field::new("value", ArrowDataType::Utf8, true),
197-
]
198-
.into(),
199-
),
200-
false,
201-
)),
202-
false,
203-
),
204-
false,
205-
),
206-
Field::new("size", ArrowDataType::Int64, false),
207-
Field::new("modificationTime", ArrowDataType::Int64, false),
208-
Field::new("dataChange", ArrowDataType::Boolean, false),
209-
Field::new("numRecords", ArrowDataType::Int64, true),
210-
]);
188+
let schema: ArrowSchema = add_files_schema().as_ref().try_into_arrow()?;
211189

212190
let current_time: i64 = std::time::SystemTime::now()
213191
.duration_since(std::time::UNIX_EPOCH)
214192
.unwrap()
215193
.as_millis() as i64;
216194

217195
let file_metadata = format!(
218-
r#"{{"path":"{path}", "partitionValues": {{}}, "size": {num_rows}, "modificationTime": {current_time}, "dataChange": true, "numRecords": {num_rows}}}"#,
196+
r#"{{"path":"{path}", "partitionValues": {{}}, "size": {num_rows}, "modificationTime": {current_time}, "dataChange": true, "stats": {{"numRecords": {num_rows}}}}}"#,
219197
);
220198

221199
create_arrow_ffi_from_json(schema, file_metadata.as_str())

kernel/src/actions/mod.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,14 @@ pub(crate) fn get_log_domain_metadata_schema() -> &'static SchemaRef {
132132
&LOG_DOMAIN_METADATA_SCHEMA
133133
}
134134

135+
/// Nest an existing add action schema in an additional [`ADD_NAME`] struct.
136+
///
137+
/// This is useful for JSON conversion, as it allows us to wrap a dynamically maintained add action
138+
/// schema in a top-level "add" struct.
139+
pub(crate) fn as_log_add_schema(schema: SchemaRef) -> SchemaRef {
140+
Arc::new(StructType::new([StructField::nullable(ADD_NAME, schema)]))
141+
}
142+
135143
#[derive(Debug, Clone, PartialEq, Eq, ToSchema)]
136144
#[cfg_attr(
137145
any(test, feature = "internal-api"),
@@ -675,7 +683,7 @@ pub(crate) struct Add {
675683

676684
/// Default generated Row ID of the first row in the file. The default generated Row IDs
677685
/// of the other rows in the file can be reconstructed by adding the physical index of the
678-
/// row within the file to the base Row ID
686+
/// row within the file to the base Row ID.
679687
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
680688
pub base_row_id: Option<i64>,
681689

@@ -883,6 +891,17 @@ pub(crate) struct DomainMetadata {
883891
}
884892

885893
impl DomainMetadata {
894+
/// Create a new DomainMetadata action.
895+
// TODO: Discuss if we should remove `removed` from this method and introduce a dedicated
896+
// method for removed domain metadata.
897+
pub(crate) fn new(domain: String, configuration: String, removed: bool) -> Self {
898+
DomainMetadata {
899+
domain,
900+
configuration,
901+
removed,
902+
}
903+
}
904+
886905
// returns true if the domain metadata is an system-controlled domain (all domains that start
887906
// with "delta.")
888907
#[allow(unused)]
@@ -904,7 +923,6 @@ mod tests {
904923
engine::arrow_data::ArrowEngineData,
905924
engine::arrow_expression::ArrowEvaluationHandler,
906925
schema::{ArrayType, DataType, MapType, StructField},
907-
utils::test_utils::assert_result_error_with_message,
908926
Engine, EvaluationHandler, JsonHandler, ParquetHandler, StorageHandler,
909927
};
910928
use serde_json::json;
@@ -1298,22 +1316,11 @@ mod tests {
12981316
WriterFeature::AppendOnly,
12991317
WriterFeature::DeletionVectors,
13001318
WriterFeature::Invariants,
1319+
WriterFeature::RowTracking,
13011320
]),
13021321
)
13031322
.unwrap();
13041323
assert!(protocol.ensure_write_supported().is_ok());
1305-
1306-
let protocol = Protocol::try_new(
1307-
3,
1308-
7,
1309-
Some([ReaderFeature::DeletionVectors]),
1310-
Some([WriterFeature::RowTracking]),
1311-
)
1312-
.unwrap();
1313-
assert_result_error_with_message(
1314-
protocol.ensure_write_supported(),
1315-
r#"Unsupported: Unknown WriterFeatures: "rowTracking". Supported WriterFeatures: "appendOnly", "deletionVectors", "invariants", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#,
1316-
);
13171324
}
13181325

13191326
#[test]

kernel/src/engine/default/parquet.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use std::ops::Range;
55
use std::sync::Arc;
66

77
use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder};
8-
use crate::arrow::array::{BooleanArray, Int64Array, RecordBatch, StringArray};
8+
use crate::arrow::array::{BooleanArray, Int64Array, RecordBatch, StringArray, StructArray};
9+
use crate::arrow::datatypes::{DataType, Field};
910
use crate::parquet::arrow::arrow_reader::{
1011
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
1112
};
@@ -24,6 +25,7 @@ use crate::engine::arrow_utils::{fixup_parquet_read, generate_mask, get_requeste
2425
use crate::engine::default::executor::TaskExecutor;
2526
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
2627
use crate::schema::SchemaRef;
28+
use crate::transaction::add_files_schema;
2729
use crate::{
2830
DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, ParquetHandler,
2931
PredicateRef,
@@ -55,7 +57,10 @@ impl DataFileMetadata {
5557
}
5658
}
5759

58-
// convert DataFileMetadata into a record batch which matches the 'add_files_schema' schema
60+
/// Convert DataFileMetadata into a record batch which matches the schema returned by
61+
/// [`add_files_schema`].
62+
///
63+
/// [`add_files_schema`]: crate::transaction::add_files_schema
5964
fn as_record_batch(
6065
&self,
6166
partition_values: &HashMap<String, String>,
@@ -70,8 +75,6 @@ impl DataFileMetadata {
7075
},
7176
num_records,
7277
} = self;
73-
let add_files_schema = crate::transaction::add_files_schema();
74-
7578
// create the record batch of the write metadata
7679
let path = Arc::new(StringArray::from(vec![location.to_string()]));
7780
let key_builder = StringBuilder::new();
@@ -95,17 +98,22 @@ impl DataFileMetadata {
9598
let size = Arc::new(Int64Array::from(vec![size]));
9699
let data_change = Arc::new(BooleanArray::from(vec![data_change]));
97100
let modification_time = Arc::new(Int64Array::from(vec![*last_modified]));
98-
let num_records = Arc::new(Int64Array::from(vec![*num_records as i64]));
101+
let stats = Arc::new(StructArray::try_new_with_length(
102+
vec![Field::new("numRecords", DataType::Int64, true)].into(),
103+
vec![Arc::new(Int64Array::from(vec![*num_records as i64]))],
104+
None,
105+
1,
106+
)?);
99107

100108
Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
101-
Arc::new(add_files_schema.as_ref().try_into_arrow()?),
109+
Arc::new(add_files_schema().as_ref().try_into_arrow()?),
102110
vec![
103111
path,
104112
partitions,
105113
size,
106114
modification_time,
107115
data_change,
108-
num_records,
116+
stats,
109117
],
110118
)?)))
111119
}
@@ -502,6 +510,14 @@ mod tests {
502510
partition_values_builder.values().append_value("a");
503511
partition_values_builder.append(true).unwrap();
504512
let partition_values = partition_values_builder.finish();
513+
let stats_struct = StructArray::try_new_with_length(
514+
vec![Field::new("numRecords", DataType::Int64, true)].into(),
515+
vec![Arc::new(Int64Array::from(vec![num_records as i64]))],
516+
None,
517+
1,
518+
)
519+
.unwrap();
520+
505521
let expected = RecordBatch::try_new(
506522
schema,
507523
vec![
@@ -510,7 +526,7 @@ mod tests {
510526
Arc::new(Int64Array::from(vec![size as i64])),
511527
Arc::new(Int64Array::from(vec![last_modified])),
512528
Arc::new(BooleanArray::from(vec![data_change])),
513-
Arc::new(Int64Array::from(vec![num_records as i64])),
529+
Arc::new(stats_struct),
514530
],
515531
)
516532
.unwrap();

kernel/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ pub mod checkpoint;
8989
pub mod engine_data;
9090
pub mod error;
9191
pub mod expressions;
92+
pub mod row_tracking;
9293
pub mod scan;
9394
pub mod schema;
9495
pub mod snapshot;

kernel/src/row_tracking.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
use std::sync::LazyLock;
2+
3+
use serde::{Deserialize, Serialize};
4+
5+
use delta_kernel_derive::{internal_api, ToSchema};
6+
7+
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
8+
use crate::schema::{ColumnName, ColumnNamesAndTypes, DataType};
9+
use crate::transaction::add_files_schema;
10+
use crate::utils::require;
11+
use crate::{DeltaResult, Error};
12+
13+
pub(crate) const ROW_TRACKING_DOMAIN_NAME: &str = "delta.rowTracking";
14+
15+
#[derive(Debug, Clone, PartialEq, Eq, ToSchema, Deserialize, Serialize)]
16+
#[serde(rename_all = "camelCase")]
17+
pub(crate) struct RowTrackingDomainMetadata {
18+
// The Delta spec does not define unsigned integers, so we use i64 here
19+
#[serde(skip_serializing_if = "Option::is_none")]
20+
pub(crate) row_id_high_water_mark: Option<i64>,
21+
}
22+
23+
/// A row visitor that iterates over preliminary [`Add`] actions as returned by the engine and
24+
/// computes a base row ID for each action.
25+
/// It expects to visit engine data conforming to the schema returned by [`add_files_schema()`].
26+
///
27+
/// This visitor it only required for the row tracking write path. The read path will be completely
28+
/// implemented via expressions.
29+
///
30+
/// [`Add`]: delta_kernel::actions::Add
31+
#[internal_api]
32+
pub(crate) struct RowTrackingVisitor {
33+
/// High water mark for row IDs
34+
pub(crate) row_id_high_water_mark: i64,
35+
36+
/// Computed base row IDs of the visited actions
37+
pub(crate) base_row_ids: Vec<i64>,
38+
}
39+
40+
impl RowTrackingVisitor {
41+
/// Field index for "numRecords" in [`add_files_schema()`]
42+
const NUM_RECORDS_FIELD_INDEX: usize = 5;
43+
44+
/// Default value for an absent high water mark
45+
const DEFAULT_HWM: i64 = -1;
46+
47+
#[internal_api]
48+
pub(crate) fn new(row_id_high_water_mark: Option<i64>) -> Self {
49+
// A table might not have a row ID high water mark yet, so we model the input as an Option<i64>
50+
Self {
51+
row_id_high_water_mark: row_id_high_water_mark.unwrap_or(Self::DEFAULT_HWM),
52+
base_row_ids: vec![],
53+
}
54+
}
55+
}
56+
57+
impl RowVisitor for RowTrackingVisitor {
58+
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
59+
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
60+
LazyLock::new(|| add_files_schema().leaves(None));
61+
NAMES_AND_TYPES.as_ref()
62+
}
63+
64+
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
65+
require!(
66+
getters.len() == add_files_schema().fields_len(),
67+
Error::InternalError(format!(
68+
"Wrong number of RowTrackingVisitor getters: {}",
69+
getters.len()
70+
))
71+
);
72+
73+
// Reset base row ID vector and allocate the necessary capacity
74+
self.base_row_ids.clear();
75+
self.base_row_ids.reserve(row_count);
76+
77+
let mut current_hwm = self.row_id_high_water_mark;
78+
for i in 0..row_count {
79+
let num_records: i64 = getters[Self::NUM_RECORDS_FIELD_INDEX]
80+
.get_opt(i, "numRecords")?
81+
.ok_or_else(|| {
82+
Error::InternalError(
83+
"numRecords must be present in Add actions when row tracking is enabled."
84+
.to_string(),
85+
)
86+
})?;
87+
self.base_row_ids.push(current_hwm + 1);
88+
current_hwm += num_records;
89+
}
90+
91+
self.row_id_high_water_mark = current_hwm;
92+
Ok(())
93+
}
94+
}

kernel/src/table_configuration.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,61 @@ impl TableConfiguration {
342342
)),
343343
}
344344
}
345+
346+
/// Returns `true` if the table supports writing domain metadata.
347+
///
348+
/// To support this feature the table must:
349+
/// - Have a min_writer_version of 7.
350+
/// - Have the [`WriterFeature::DomainMetadata`] writer feature.
351+
#[allow(unused)]
352+
pub(crate) fn is_domain_metadata_supported(&self) -> bool {
353+
self.protocol().min_writer_version() == 7
354+
&& self
355+
.protocol()
356+
.has_writer_feature(&WriterFeature::DomainMetadata)
357+
}
358+
359+
/// Returns `true` if the table supports writing row tracking metadata.
360+
///
361+
/// To support this feature the table must:
362+
/// - Have a min_writer_version of 7.
363+
/// - Have the [`WriterFeature::RowTracking`] writer feature.
364+
#[allow(unused)]
365+
pub(crate) fn is_row_tracking_supported(&self) -> bool {
366+
self.protocol().min_writer_version() == 7
367+
&& self
368+
.protocol()
369+
.has_writer_feature(&WriterFeature::RowTracking)
370+
}
371+
372+
/// Returns `true` if row tracking is enabled for this table.
373+
///
374+
/// In order to enable row tracking the table must:
375+
/// - Support row tracking (see [`Self::is_row_tracking_supported`]).
376+
/// - Have the `delta.enableRowTracking` table property set to `true`.
377+
#[allow(unused)]
378+
pub(crate) fn is_row_tracking_enabled(&self) -> bool {
379+
self.is_row_tracking_supported()
380+
&& self.table_properties().enable_row_tracking.unwrap_or(false)
381+
}
382+
383+
/// Returns `true` if row tracking is suspended for this table.
384+
///
385+
/// Row tracking is suspended when the `delta.rowTrackingSuspended` table property is set to `true`.
386+
/// Note that:
387+
/// - If row tracking is not supported, it is implicitly suspended.
388+
/// - Row tracking can be _supported_ but _suspended_ at the same time.
389+
/// - Row tracking cannot be _enabled_ while _suspended_.
390+
#[allow(unused)]
391+
pub(crate) fn is_row_tracking_suspended(&self) -> bool {
392+
// We do not check whether row tracking is enabled here but rely on the table properties to
393+
// be in a valid state.
394+
!self.is_row_tracking_supported()
395+
|| self
396+
.table_properties()
397+
.row_tracking_suspended
398+
.unwrap_or(false)
399+
}
345400
}
346401

347402
#[cfg(test)]

0 commit comments

Comments
 (0)