Skip to content

Commit 52102eb

Browse files
committed
refactor: Row tracking write cleanup
1 parent c6c3c2c commit 52102eb

File tree

3 files changed

+74
-35
lines changed

3 files changed

+74
-35
lines changed

kernel/src/actions/mod.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -926,14 +926,12 @@ impl DomainMetadata {
926926
mod tests {
927927
use super::*;
928928
use crate::{
929-
arrow::{
930-
array::{
931-
Array, BooleanArray, Int32Array, Int64Array, ListArray, ListBuilder, MapBuilder,
932-
MapFieldNames, RecordBatch, StringArray, StringBuilder, StructArray,
933-
},
934-
datatypes::{DataType as ArrowDataType, Field, Schema},
935-
json::ReaderBuilder,
929+
arrow::array::{
930+
Array, BooleanArray, Int32Array, Int64Array, ListArray, ListBuilder, MapBuilder,
931+
MapFieldNames, RecordBatch, StringArray, StringBuilder, StructArray,
936932
},
933+
arrow::datatypes::{DataType as ArrowDataType, Field, Schema},
934+
arrow::json::ReaderBuilder,
937935
engine::{arrow_data::ArrowEngineData, arrow_expression::ArrowEvaluationHandler},
938936
schema::{ArrayType, DataType, MapType, StructField},
939937
utils::test_utils::assert_result_error_with_message,

kernel/src/row_tracking.rs

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ pub(crate) struct RowTrackingVisitor {
8181
/// High water mark for row IDs
8282
pub(crate) row_id_high_water_mark: i64,
8383

84-
/// Computed base row IDs of the visited actions
85-
pub(crate) base_row_ids: Vec<i64>,
84+
/// Computed base row IDs of the visited actions, organized by batch
85+
pub(crate) base_row_id_batches: Vec<Vec<i64>>,
8686
}
8787

8888
impl RowTrackingVisitor {
@@ -94,11 +94,11 @@ impl RowTrackingVisitor {
9494
/// We verify this hard-coded index in a test.
9595
const NUM_RECORDS_FIELD_INDEX: usize = 5;
9696

97-
pub(crate) fn new(row_id_high_water_mark: Option<i64>) -> Self {
97+
pub(crate) fn new(row_id_high_water_mark: Option<i64>, num_batches: Option<usize>) -> Self {
9898
// A table might not have a row ID high water mark yet, so we model the input as an Option<i64>
9999
Self {
100100
row_id_high_water_mark: row_id_high_water_mark.unwrap_or(Self::DEFAULT_HIGH_WATER_MARK),
101-
base_row_ids: vec![],
101+
base_row_id_batches: Vec::with_capacity(num_batches.unwrap_or(0)),
102102
}
103103
}
104104
}
@@ -119,9 +119,8 @@ impl RowVisitor for RowTrackingVisitor {
119119
))
120120
);
121121

122-
// Reset base row ID vector and allocate the necessary capacity
123-
self.base_row_ids.clear();
124-
self.base_row_ids.reserve(row_count);
122+
// Create a new batch for this visit
123+
let mut batch_base_row_ids = Vec::with_capacity(row_count);
125124

126125
let mut current_hwm = self.row_id_high_water_mark;
127126
for i in 0..row_count {
@@ -133,10 +132,11 @@ impl RowVisitor for RowTrackingVisitor {
133132
.to_string(),
134133
)
135134
})?;
136-
self.base_row_ids.push(current_hwm + 1);
135+
batch_base_row_ids.push(current_hwm + 1);
137136
current_hwm += num_records;
138137
}
139138

139+
self.base_row_id_batches.push(batch_base_row_ids);
140140
self.row_id_high_water_mark = current_hwm;
141141
Ok(())
142142
}
@@ -205,15 +205,16 @@ mod tests {
205205

206206
#[test]
207207
fn test_visit_basic_functionality() -> DeltaResult<()> {
208-
let mut visitor = RowTrackingVisitor::new(None);
208+
let mut visitor = RowTrackingVisitor::new(None, Some(1));
209209
let num_records_mock = MockGetData::new(vec![Some(10), Some(5), Some(20)]);
210210
let unit_mock = ();
211211
let getters = create_getters(&num_records_mock, &unit_mock);
212212

213213
visitor.visit(3, &getters)?;
214214

215215
// Check that base row IDs are calculated correctly
216-
assert_eq!(visitor.base_row_ids, vec![0, 10, 15]);
216+
assert_eq!(visitor.base_row_id_batches.len(), 1);
217+
assert_eq!(visitor.base_row_id_batches[0], vec![0, 10, 15]);
217218

218219
// Check that high water mark is updated correctly
219220
assert_eq!(visitor.row_id_high_water_mark, 34); // -1 + 10 + 5 + 20
@@ -223,15 +224,16 @@ mod tests {
223224

224225
#[test]
225226
fn test_visit_with_negative_high_water_mark() -> DeltaResult<()> {
226-
let mut visitor = RowTrackingVisitor::new(Some(-5));
227+
let mut visitor = RowTrackingVisitor::new(Some(-5), Some(1));
227228
let num_records_mock = MockGetData::new(vec![Some(3), Some(2)]);
228229
let unit_mock = ();
229230
let getters = create_getters(&num_records_mock, &unit_mock);
230231

231232
visitor.visit(2, &getters)?;
232233

233234
// Base row IDs should start from high_water_mark + 1
234-
assert_eq!(visitor.base_row_ids, vec![-4, -1]); // -5+1=-4, then -4+3=-1
235+
assert_eq!(visitor.base_row_id_batches.len(), 1);
236+
assert_eq!(visitor.base_row_id_batches[0], vec![-4, -1]); // -5+1=-4, then -4+3=-1
235237

236238
// High water mark should be updated
237239
assert_eq!(visitor.row_id_high_water_mark, 0); // -5 + 3 + 2 = 0
@@ -241,15 +243,16 @@ mod tests {
241243

242244
#[test]
243245
fn test_visit_with_zero_records() -> DeltaResult<()> {
244-
let mut visitor = RowTrackingVisitor::new(Some(10));
246+
let mut visitor = RowTrackingVisitor::new(Some(10), Some(1));
245247
let num_records_mock = MockGetData::new(vec![Some(0), Some(0), Some(5)]);
246248
let unit_mock = ();
247249
let getters = create_getters(&num_records_mock, &unit_mock);
248250

249251
visitor.visit(3, &getters)?;
250252

251253
// Base row IDs should still be assigned even for zero-record files
252-
assert_eq!(visitor.base_row_ids, vec![11, 11, 11]);
254+
assert_eq!(visitor.base_row_id_batches.len(), 1);
255+
assert_eq!(visitor.base_row_id_batches[0], vec![11, 11, 11]);
253256

254257
// High water mark should only increase by non-zero records
255258
assert_eq!(visitor.row_id_high_water_mark, 15); // 10 + 0 + 0 + 5
@@ -259,23 +262,54 @@ mod tests {
259262

260263
#[test]
261264
fn test_visit_empty_batch() -> DeltaResult<()> {
262-
let mut visitor = RowTrackingVisitor::new(Some(42));
265+
let mut visitor = RowTrackingVisitor::new(Some(42), None);
263266
let num_records_mock = MockGetData::new(vec![]);
264267
let unit_mock = ();
265268
let getters = create_getters(&num_records_mock, &unit_mock);
266269

267270
visitor.visit(0, &getters)?;
268271

269272
// Should handle empty batch gracefully
270-
assert!(visitor.base_row_ids.is_empty());
273+
assert_eq!(visitor.base_row_id_batches.len(), 1);
274+
assert!(visitor.base_row_id_batches[0].is_empty());
271275
assert_eq!(visitor.row_id_high_water_mark, 42); // Should remain unchanged
272276

273277
Ok(())
274278
}
275279

280+
#[test]
281+
fn test_visit_multiple_batches() -> DeltaResult<()> {
282+
let mut visitor = RowTrackingVisitor::new(Some(0), Some(2));
283+
let unit_mock = ();
284+
285+
// First batch
286+
let num_records_mock1 = MockGetData::new(vec![Some(10), Some(5)]);
287+
let getters1 = create_getters(&num_records_mock1, &unit_mock);
288+
visitor.visit(2, &getters1)?;
289+
290+
// Second batch
291+
let num_records_mock2 = MockGetData::new(vec![Some(3), Some(7), Some(2)]);
292+
let getters2 = create_getters(&num_records_mock2, &unit_mock);
293+
visitor.visit(3, &getters2)?;
294+
295+
// Check that we have two batches
296+
assert_eq!(visitor.base_row_id_batches.len(), 2);
297+
298+
// Check first batch: starts at 1, then 11
299+
assert_eq!(visitor.base_row_id_batches[0], vec![1, 11]);
300+
301+
// Check second batch: starts at 16, then 19, then 26
302+
assert_eq!(visitor.base_row_id_batches[1], vec![16, 19, 26]);
303+
304+
// Check final high water mark: 0 + 10 + 5 + 3 + 7 + 2 = 27
305+
assert_eq!(visitor.row_id_high_water_mark, 27);
306+
307+
Ok(())
308+
}
309+
276310
#[test]
277311
fn test_visit_wrong_getter_count() -> DeltaResult<()> {
278-
let mut visitor = RowTrackingVisitor::new(Some(0));
312+
let mut visitor = RowTrackingVisitor::new(Some(0), None);
279313
let unit_mock = ();
280314
let wrong_getters: Vec<&dyn GetData<'_>> = vec![&unit_mock]; // Only one getter instead of expected count
281315

@@ -287,7 +321,7 @@ mod tests {
287321

288322
#[test]
289323
fn test_visit_missing_num_records() -> DeltaResult<()> {
290-
let mut visitor = RowTrackingVisitor::new(Some(0));
324+
let mut visitor = RowTrackingVisitor::new(Some(0), None);
291325
let num_records_mock = MockGetData::new(vec![None]); // Missing numRecords
292326
let unit_mock = ();
293327
let getters = create_getters(&num_records_mock, &unit_mock);
@@ -303,7 +337,7 @@ mod tests {
303337

304338
#[test]
305339
fn test_selected_column_names_and_types() {
306-
let visitor = RowTrackingVisitor::new(Some(0));
340+
let visitor = RowTrackingVisitor::new(Some(0), None);
307341
let (names, types) = visitor.selected_column_names_and_types();
308342

309343
// Should return the same as add_files_schema().leaves(None)

kernel/src/transaction/mod.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,8 @@ impl Transaction {
221221
let commit_path =
222222
ParsedLogPath::new_commit(self.read_snapshot.table_root(), commit_version)?;
223223
let actions = iter::once(commit_info_action)
224-
.chain(set_transaction_actions)
225-
.chain(add_actions);
224+
.chain(add_actions)
225+
.chain(set_transaction_actions);
226226

227227
let json_handler = engine.json_handler();
228228
match json_handler.write_json_file(&commit_path.location, Box::new(actions), false) {
@@ -357,28 +357,35 @@ impl Transaction {
357357
RowTrackingDomainMetadata::get_high_water_mark(&self.read_snapshot, engine)?;
358358

359359
// Create a row tracking visitor and visit all files to collect row tracking information
360-
let mut row_tracking_visitor = RowTrackingVisitor::new(row_id_high_water_mark);
361-
let mut base_row_id_batches = Vec::with_capacity(self.add_files_metadata.len());
360+
let mut row_tracking_visitor =
361+
RowTrackingVisitor::new(row_id_high_water_mark, Some(self.add_files_metadata.len()));
362362

363363
// We visit all files with the row visitor before creating the add action iterator
364364
// because we need to know the final row ID high water mark to create the domain metadata action
365365
for add_files_batch in &self.add_files_metadata {
366366
row_tracking_visitor.visit_rows_of(add_files_batch.deref())?;
367-
base_row_id_batches.push(row_tracking_visitor.base_row_ids.clone());
368367
}
369368

369+
// Deconstruct the row tracking visitor to avoid borrowing issues
370+
let RowTrackingVisitor {
371+
base_row_id_batches,
372+
row_id_high_water_mark,
373+
} = row_tracking_visitor;
374+
370375
// Generate a domain metadata action based on the final high water mark
371-
let domain_metadata = DomainMetadata::try_from(RowTrackingDomainMetadata::new(
372-
row_tracking_visitor.row_id_high_water_mark,
373-
))?;
376+
let domain_metadata =
377+
DomainMetadata::try_from(RowTrackingDomainMetadata::new(row_id_high_water_mark))?;
374378
let domain_metadata_action =
375379
domain_metadata.into_engine_data(get_log_domain_metadata_schema().clone(), engine);
376380

381+
let commit_version = i64::try_from(commit_version)
382+
.map_err(|_| Error::generic("Commit version is too large to fit in i64"))?;
383+
377384
// Create an iterator that pairs each add action with its row tracking metadata
378385
let extended_add_files_metadata =
379386
self.add_files_metadata.iter().zip(base_row_id_batches).map(
380387
move |(add_files_batch, base_row_ids)| {
381-
let commit_versions = vec![commit_version as i64; base_row_ids.len()];
388+
let commit_versions = vec![commit_version; base_row_ids.len()];
382389
let base_row_ids =
383390
ArrayData::try_new(ArrayType::new(DataType::LONG, true), base_row_ids)?;
384391
let row_commit_versions =

0 commit comments

Comments
 (0)