Skip to content

Commit e21b643

Browse files
committed
Merge branch 'main' into manndp/remove_domain_metadata
2 parents 2e51ceb + 5552d21 commit e21b643

File tree

14 files changed

+389
-141
lines changed

14 files changed

+389
-141
lines changed

ffi/src/transaction/mod.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,18 @@ mod tests {
157157

158158
use std::sync::Arc;
159159

160+
const ZERO_UUID: &str = "00000000-0000-0000-0000-000000000000";
161+
160162
use super::*;
161163

162164
use tempfile::tempdir;
163165

166+
fn check_txn_id_exists(commit_info: &serde_json::Value) {
167+
commit_info["txnId"]
168+
.as_str()
169+
.expect("txnId should be present in commitInfo");
170+
}
171+
164172
fn create_arrow_ffi_from_json(
165173
schema: ArrowSchema,
166174
json_string: &str,
@@ -341,9 +349,12 @@ mod tests {
341349
.into_iter::<serde_json::Value>()
342350
.try_collect()?;
343351

344-
// set timestamps to 0 and paths to known string values for comparison
345-
// (otherwise timestamps are non-deterministic and paths are random UUIDs)
352+
check_txn_id_exists(&parsed_commits[0]["commitInfo"]);
353+
354+
// set timestamps to 0, paths and txn_id to known string values for comparison
355+
// (otherwise timestamps are non-deterministic, paths and txn_id are random UUIDs)
346356
set_json_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?;
357+
set_json_value(&mut parsed_commits[0], "commitInfo.txnId", json!(ZERO_UUID))?;
347358
set_json_value(&mut parsed_commits[1], "add.modificationTime", json!(0))?;
348359
set_json_value(&mut parsed_commits[1], "add.size", json!(0))?;
349360

@@ -355,6 +366,7 @@ mod tests {
355366
"operation": "UNKNOWN",
356367
"kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")),
357368
"operationParameters": {},
369+
"txnId": ZERO_UUID
358370
}
359371
}),
360372
json!({
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
//! # Action Reconciliation
2+
//! This module implements APIs related to action reconciliation.
3+
//! Please see the [Delta Lake Protocol](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#action-reconciliation)
4+
//! for more details about action reconciliation.
5+
//!
6+
//! ## Retention and Cleanup
7+
//!
8+
//! This module provides utilities for calculating retention timestamps used during action reconciliation:
9+
//!
10+
//! - **Deleted File Retention**: Determines when `remove` actions can be excluded from checkpoints
11+
//! - **Transaction Retention**: Calculates when expired app ids can be cleaned up
12+
use std::time::Duration;
13+
14+
use crate::table_properties::TableProperties;
15+
use crate::{DeltaResult, Error};
16+
17+
const SECONDS_PER_MINUTE: u64 = 60;
18+
const MINUTES_PER_HOUR: u64 = 60;
19+
const HOURS_PER_DAY: u64 = 24;
20+
21+
/// The default retention period for deleted files in seconds.
22+
/// This is set to 7 days, which is the default in delta-spark.
23+
pub(crate) const DEFAULT_RETENTION_SECS: u64 =
24+
7 * HOURS_PER_DAY * MINUTES_PER_HOUR * SECONDS_PER_MINUTE;
25+
26+
/// Provides common functionality for calculating file retention timestamps
27+
/// and transaction expiration timestamps.
28+
pub(crate) trait RetentionCalculator {
29+
/// Get the table properties for accessing retention durations
30+
fn table_properties(&self) -> &TableProperties;
31+
32+
/// Determines the minimum timestamp before which deleted files
33+
/// are eligible for permanent removal during VACUUM operations. It is used
34+
/// during checkpointing to decide whether to include `remove` actions.
35+
///
36+
/// If a deleted file's timestamp is older than this threshold (based on the
37+
/// table's `deleted_file_retention_duration`), the corresponding `remove` action
38+
/// is included in the checkpoint, allowing VACUUM operations to later identify
39+
/// and clean up those files.
40+
///
41+
/// # Returns:
42+
/// The cutoff timestamp in milliseconds since epoch, matching the remove action's
43+
/// `deletion_timestamp` field format for comparison.
44+
///
45+
/// Note: The default retention period is 7 days, matching delta-spark's behavior.
46+
fn deleted_file_retention_timestamp(&self) -> DeltaResult<i64> {
47+
let retention_duration = self.table_properties().deleted_file_retention_duration;
48+
49+
deleted_file_retention_timestamp_with_time(
50+
retention_duration,
51+
crate::utils::current_time_duration()?,
52+
)
53+
}
54+
55+
/// Calculate the transaction expiration timestamp
56+
///
57+
/// Calculates the timestamp threshold for transaction expiration based on
58+
/// the table's `set_transaction_retention_duration` property. Transactions that expired
59+
/// before this timestamp can be cleaned up.
60+
///
61+
/// # Returns
62+
/// The timestamp in milliseconds since epoch before which transactions are considered expired,
63+
/// or `None` if transaction retention is not configured.
64+
///
65+
/// # Errors
66+
/// Returns an error if the current system time cannot be obtained or if the retention
67+
/// duration exceeds the maximum representable value for i64.
68+
fn get_transaction_expiration_timestamp(&self) -> DeltaResult<Option<i64>> {
69+
calculate_transaction_expiration_timestamp(self.table_properties())
70+
}
71+
}
72+
73+
/// Calculates the timestamp threshold for deleted file retention based on the provided duration.
74+
/// This is factored out to allow testing with an injectable time and duration parameter.
75+
///
76+
/// # Parameters
77+
/// - `retention_duration`: The duration to retain deleted files. The table property
78+
/// `deleted_file_retention_duration` is passed here. If `None`, defaults to 7 days.
79+
/// - `now_duration`: The current time as a [`Duration`]. This allows for testing with
80+
/// a specific time instead of using `SystemTime::now()`.
81+
///
82+
/// # Returns: The timestamp in milliseconds since epoch
83+
pub(crate) fn deleted_file_retention_timestamp_with_time(
84+
retention_duration: Option<Duration>,
85+
now_duration: Duration,
86+
) -> DeltaResult<i64> {
87+
// Use provided retention duration or default (7 days)
88+
let retention_duration =
89+
retention_duration.unwrap_or_else(|| Duration::from_secs(DEFAULT_RETENTION_SECS));
90+
91+
// Convert to milliseconds for remove action deletion_timestamp comparison
92+
let now_ms = i64::try_from(now_duration.as_millis())
93+
.map_err(|_| Error::checkpoint_write("Current timestamp exceeds i64 millisecond range"))?;
94+
95+
let retention_ms = i64::try_from(retention_duration.as_millis())
96+
.map_err(|_| Error::checkpoint_write("Retention duration exceeds i64 millisecond range"))?;
97+
98+
// Simple subtraction - will produce negative values if retention > now
99+
Ok(now_ms - retention_ms)
100+
}
101+
102+
/// Calculates the transaction expiration timestamp based on table properties.
103+
/// Returns None if set_transaction_retention_duration is not set.
104+
pub(crate) fn calculate_transaction_expiration_timestamp(
105+
table_properties: &TableProperties,
106+
) -> DeltaResult<Option<i64>> {
107+
table_properties
108+
.set_transaction_retention_duration
109+
.map(|duration| -> DeltaResult<i64> {
110+
let now_ms = crate::utils::current_time_ms()?;
111+
112+
let expiration_ms = i64::try_from(duration.as_millis())
113+
.map_err(|_| Error::generic("Retention duration exceeds i64 millisecond range"))?;
114+
115+
Ok(now_ms - expiration_ms)
116+
})
117+
.transpose()
118+
}
119+
120+
#[cfg(test)]
121+
mod tests {
122+
use super::*;
123+
use std::time::Duration;
124+
125+
#[test]
126+
fn test_deleted_file_retention_timestamp_with_time() -> DeltaResult<()> {
127+
// Test with default retention (7 days)
128+
let reference_time = Duration::from_secs(1_000_000_000);
129+
let result = deleted_file_retention_timestamp_with_time(None, reference_time)?;
130+
let expected = 1_000_000_000_000 - (7 * 24 * 60 * 60 * 1000);
131+
assert_eq!(result, expected);
132+
133+
// Test with custom retention (1 day)
134+
let retention = Duration::from_secs(24 * 60 * 60); // 1 day
135+
let result = deleted_file_retention_timestamp_with_time(Some(retention), reference_time)?;
136+
let expected = 1_000_000_000_000 - (24 * 60 * 60 * 1000); // 1 day in milliseconds
137+
assert_eq!(result, expected);
138+
139+
// Test with zero retention
140+
let retention = Duration::from_secs(0);
141+
let result = deleted_file_retention_timestamp_with_time(Some(retention), reference_time)?;
142+
let expected = 1_000_000_000_000; // Same as reference time
143+
assert_eq!(result, expected);
144+
145+
Ok(())
146+
}
147+
148+
#[test]
149+
fn test_deleted_file_retention_timestamp_edge_cases() {
150+
// Test with very large retention duration
151+
let reference_time = Duration::from_secs(1_000_000_000);
152+
let large_retention = Duration::from_secs(u64::MAX);
153+
let result =
154+
deleted_file_retention_timestamp_with_time(Some(large_retention), reference_time);
155+
assert!(result.is_err());
156+
assert!(result
157+
.unwrap_err()
158+
.to_string()
159+
.contains("Retention duration exceeds i64 millisecond range"));
160+
}
161+
162+
#[test]
163+
fn test_deleted_file_retention_timestamp_with_large_now_time() {
164+
// Test with very large current time that would overflow i64 milliseconds
165+
let reference_time = Duration::from_secs(u64::MAX);
166+
let retention = Duration::from_secs(1);
167+
let result = deleted_file_retention_timestamp_with_time(Some(retention), reference_time);
168+
assert!(result.is_err());
169+
assert!(result
170+
.unwrap_err()
171+
.to_string()
172+
.contains("Current timestamp exceeds i64 millisecond range"));
173+
}
174+
175+
#[test]
176+
fn test_calculate_transaction_expiration_timestamp() -> DeltaResult<()> {
177+
// No set_transaction_retention_duration
178+
let properties = TableProperties::default();
179+
let result = calculate_transaction_expiration_timestamp(&properties)?;
180+
assert_eq!(result, None);
181+
182+
// Test with set_transaction_retention_duration
183+
let properties = TableProperties {
184+
set_transaction_retention_duration: Some(Duration::from_secs(3600)), // 1 hour
185+
..Default::default()
186+
};
187+
let result = calculate_transaction_expiration_timestamp(&properties)?;
188+
assert!(result.is_some());
189+
190+
// The result should be current time minus 1 hour (approximately)
191+
// We can't test exact value due to timing, but we can verify it's reasonable
192+
let timestamp = result.unwrap();
193+
let now_ms = crate::utils::current_time_ms().unwrap();
194+
let one_hour_ms = 3600 * 1000;
195+
196+
// Should be within a reasonable range (allowing for test execution time)
197+
assert!(timestamp < now_ms);
198+
assert!(timestamp > now_ms - one_hour_ms - 1000); // Allow 1 second buffer
199+
200+
Ok(())
201+
}
202+
203+
#[test]
204+
fn test_calculate_transaction_expiration_timestamp_edge_cases() {
205+
// Test with very large retention duration that would overflow
206+
let properties = TableProperties {
207+
set_transaction_retention_duration: Some(Duration::from_secs(u64::MAX)),
208+
..Default::default()
209+
};
210+
let result = calculate_transaction_expiration_timestamp(&properties);
211+
assert!(result.is_err());
212+
assert!(result
213+
.unwrap_err()
214+
.to_string()
215+
.contains("Retention duration exceeds i64 millisecond range"));
216+
}
217+
218+
// Mock implementation of RetentionCalculator for testing trait methods
219+
struct MockRetentionCalculator {
220+
properties: TableProperties,
221+
}
222+
223+
impl MockRetentionCalculator {
224+
fn new(properties: TableProperties) -> Self {
225+
Self { properties }
226+
}
227+
}
228+
229+
impl RetentionCalculator for MockRetentionCalculator {
230+
fn table_properties(&self) -> &TableProperties {
231+
&self.properties
232+
}
233+
}
234+
235+
#[test]
236+
fn test_retention_calculator_trait_deleted_file_retention_timestamp() -> DeltaResult<()> {
237+
// Test with default retention
238+
let properties = TableProperties::default();
239+
let calculator = MockRetentionCalculator::new(properties);
240+
let result = calculator.deleted_file_retention_timestamp()?;
241+
242+
// Should be current time minus 7 days (approximately)
243+
let now_ms = crate::utils::current_time_ms().unwrap();
244+
let seven_days_ms = 7 * 24 * 60 * 60 * 1000;
245+
246+
assert!(result < now_ms);
247+
assert!(result > now_ms - seven_days_ms - 1000); // Allow a small 1 second buffer
248+
249+
// Test with custom retention
250+
let properties = TableProperties {
251+
deleted_file_retention_duration: Some(Duration::from_secs(1800)), // 30 minutes
252+
..Default::default()
253+
};
254+
let calculator = MockRetentionCalculator::new(properties);
255+
let result = calculator.deleted_file_retention_timestamp()?;
256+
257+
let thirty_minutes_ms = 30 * 60 * 1000;
258+
assert!(result < now_ms);
259+
assert!(result > now_ms - thirty_minutes_ms - 1000); // Allow 1 second buffer
260+
261+
Ok(())
262+
}
263+
264+
#[test]
265+
fn test_retention_calculator_trait_get_transaction_expiration_timestamp() -> DeltaResult<()> {
266+
// Test with no transaction retention
267+
let properties = TableProperties::default();
268+
let calculator = MockRetentionCalculator::new(properties);
269+
let result = calculator.get_transaction_expiration_timestamp()?;
270+
assert_eq!(result, None);
271+
272+
// Test with transaction retention
273+
let properties = TableProperties {
274+
set_transaction_retention_duration: Some(Duration::from_secs(7200)), // 2 hours
275+
..Default::default()
276+
};
277+
let calculator = MockRetentionCalculator::new(properties);
278+
let result = calculator.get_transaction_expiration_timestamp()?;
279+
assert!(result.is_some());
280+
281+
let timestamp = result.unwrap();
282+
let now_ms = crate::utils::current_time_ms().unwrap();
283+
let two_hours_ms = 2 * 60 * 60 * 1000;
284+
285+
assert!(timestamp < now_ms);
286+
assert!(timestamp > now_ms - two_hours_ms - 1000); // Allow 1 second buffer
287+
288+
Ok(())
289+
}
290+
}

kernel/src/actions/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -584,8 +584,7 @@ pub(crate) struct CommitInfo {
584584
pub(crate) kernel_version: Option<String>,
585585
/// A place for the engine to store additional metadata associated with this commit
586586
pub(crate) engine_info: Option<String>,
587-
/// A unique transaction identified for this commit. When the `catalogManaged` table feature is
588-
/// enabled (not yet implemented), this field will be required. Otherwise, it is optional.
587+
/// A unique transaction identifier for this commit.
589588
pub(crate) txn_id: Option<String>,
590589
}
591590

@@ -602,7 +601,7 @@ impl CommitInfo {
602601
operation_parameters: None,
603602
kernel_version: Some(format!("v{KERNEL_VERSION}")),
604603
engine_info,
605-
txn_id: None,
604+
txn_id: Some(uuid::Uuid::new_v4().to_string()),
606605
}
607606
}
608607
}
@@ -1405,6 +1404,7 @@ mod tests {
14051404
let engine = ExprEngine::new();
14061405

14071406
let commit_info = CommitInfo::new(0, None, None);
1407+
let commit_info_txn_id = commit_info.txn_id.clone();
14081408

14091409
let engine_data = commit_info.into_engine_data(CommitInfo::to_schema().into(), &engine);
14101410

@@ -1428,7 +1428,7 @@ mod tests {
14281428
operation_parameters,
14291429
Arc::new(StringArray::from(vec![Some(format!("v{KERNEL_VERSION}"))])),
14301430
Arc::new(StringArray::from(vec![None::<String>])),
1431-
Arc::new(StringArray::from(vec![None::<String>])),
1431+
Arc::new(StringArray::from(vec![commit_info_txn_id])),
14321432
],
14331433
)
14341434
.unwrap();

0 commit comments

Comments
 (0)