Skip to content

Commit 5552d21

Browse files
authored
refactor: retention calculation into a new trait (#1264)
This is a pure refactoring change to: - Extract retention calculation logic into new module with a RetentionCalculator trait - Update CheckpointWriter to use RetentionCalculator trait The trait will be reused in the upcoming log compaction writer. ## How was this change tested? New tests, ran existing tests
1 parent 67aad5c commit 5552d21

File tree

6 files changed

+307
-89
lines changed

6 files changed

+307
-89
lines changed
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
//! # Action Reconciliation
2+
//! This module implements APIs related to action reconciliation.
3+
//! Please see the [Delta Lake Protocol](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#action-reconciliation)
4+
//! for more details about action reconciliation.
5+
//!
6+
//! ## Retention and Cleanup
7+
//!
8+
//! This module provides utilities for calculating retention timestamps used during action reconciliation:
9+
//!
10+
//! - **Deleted File Retention**: Determines when `remove` actions can be excluded from checkpoints
11+
//! - **Transaction Retention**: Calculates when expired app ids can be cleaned up
12+
use std::time::Duration;
13+
14+
use crate::table_properties::TableProperties;
15+
use crate::{DeltaResult, Error};
16+
17+
const SECONDS_PER_MINUTE: u64 = 60;
18+
const MINUTES_PER_HOUR: u64 = 60;
19+
const HOURS_PER_DAY: u64 = 24;
20+
21+
/// The default retention period for deleted files in seconds.
22+
/// This is set to 7 days, which is the default in delta-spark.
23+
pub(crate) const DEFAULT_RETENTION_SECS: u64 =
24+
7 * HOURS_PER_DAY * MINUTES_PER_HOUR * SECONDS_PER_MINUTE;
25+
26+
/// Provides common functionality for calculating file retention timestamps
27+
/// and transaction expiration timestamps.
28+
pub(crate) trait RetentionCalculator {
29+
/// Get the table properties for accessing retention durations
30+
fn table_properties(&self) -> &TableProperties;
31+
32+
/// Determines the minimum timestamp before which deleted files
33+
/// are eligible for permanent removal during VACUUM operations. It is used
34+
/// during checkpointing to decide whether to include `remove` actions.
35+
///
36+
/// If a deleted file's timestamp is older than this threshold (based on the
37+
/// table's `deleted_file_retention_duration`), the corresponding `remove` action
38+
/// is included in the checkpoint, allowing VACUUM operations to later identify
39+
/// and clean up those files.
40+
///
41+
/// # Returns:
42+
/// The cutoff timestamp in milliseconds since epoch, matching the remove action's
43+
/// `deletion_timestamp` field format for comparison.
44+
///
45+
/// Note: The default retention period is 7 days, matching delta-spark's behavior.
46+
fn deleted_file_retention_timestamp(&self) -> DeltaResult<i64> {
47+
let retention_duration = self.table_properties().deleted_file_retention_duration;
48+
49+
deleted_file_retention_timestamp_with_time(
50+
retention_duration,
51+
crate::utils::current_time_duration()?,
52+
)
53+
}
54+
55+
/// Calculate the transaction expiration timestamp
56+
///
57+
/// Calculates the timestamp threshold for transaction expiration based on
58+
/// the table's `set_transaction_retention_duration` property. Transactions that expired
59+
/// before this timestamp can be cleaned up.
60+
///
61+
/// # Returns
62+
/// The timestamp in milliseconds since epoch before which transactions are considered expired,
63+
/// or `None` if transaction retention is not configured.
64+
///
65+
/// # Errors
66+
/// Returns an error if the current system time cannot be obtained or if the retention
67+
/// duration exceeds the maximum representable value for i64.
68+
fn get_transaction_expiration_timestamp(&self) -> DeltaResult<Option<i64>> {
69+
calculate_transaction_expiration_timestamp(self.table_properties())
70+
}
71+
}
72+
73+
/// Calculates the timestamp threshold for deleted file retention based on the provided duration.
74+
/// This is factored out to allow testing with an injectable time and duration parameter.
75+
///
76+
/// # Parameters
77+
/// - `retention_duration`: The duration to retain deleted files. The table property
78+
/// `deleted_file_retention_duration` is passed here. If `None`, defaults to 7 days.
79+
/// - `now_duration`: The current time as a [`Duration`]. This allows for testing with
80+
/// a specific time instead of using `SystemTime::now()`.
81+
///
82+
/// # Returns: The timestamp in milliseconds since epoch
83+
pub(crate) fn deleted_file_retention_timestamp_with_time(
84+
retention_duration: Option<Duration>,
85+
now_duration: Duration,
86+
) -> DeltaResult<i64> {
87+
// Use provided retention duration or default (7 days)
88+
let retention_duration =
89+
retention_duration.unwrap_or_else(|| Duration::from_secs(DEFAULT_RETENTION_SECS));
90+
91+
// Convert to milliseconds for remove action deletion_timestamp comparison
92+
let now_ms = i64::try_from(now_duration.as_millis())
93+
.map_err(|_| Error::checkpoint_write("Current timestamp exceeds i64 millisecond range"))?;
94+
95+
let retention_ms = i64::try_from(retention_duration.as_millis())
96+
.map_err(|_| Error::checkpoint_write("Retention duration exceeds i64 millisecond range"))?;
97+
98+
// Simple subtraction - will produce negative values if retention > now
99+
Ok(now_ms - retention_ms)
100+
}
101+
102+
/// Calculates the transaction expiration timestamp based on table properties.
103+
/// Returns None if set_transaction_retention_duration is not set.
104+
pub(crate) fn calculate_transaction_expiration_timestamp(
105+
table_properties: &TableProperties,
106+
) -> DeltaResult<Option<i64>> {
107+
table_properties
108+
.set_transaction_retention_duration
109+
.map(|duration| -> DeltaResult<i64> {
110+
let now_ms = crate::utils::current_time_ms()?;
111+
112+
let expiration_ms = i64::try_from(duration.as_millis())
113+
.map_err(|_| Error::generic("Retention duration exceeds i64 millisecond range"))?;
114+
115+
Ok(now_ms - expiration_ms)
116+
})
117+
.transpose()
118+
}
119+
120+
#[cfg(test)]
121+
mod tests {
122+
use super::*;
123+
use std::time::Duration;
124+
125+
#[test]
126+
fn test_deleted_file_retention_timestamp_with_time() -> DeltaResult<()> {
127+
// Test with default retention (7 days)
128+
let reference_time = Duration::from_secs(1_000_000_000);
129+
let result = deleted_file_retention_timestamp_with_time(None, reference_time)?;
130+
let expected = 1_000_000_000_000 - (7 * 24 * 60 * 60 * 1000);
131+
assert_eq!(result, expected);
132+
133+
// Test with custom retention (1 day)
134+
let retention = Duration::from_secs(24 * 60 * 60); // 1 day
135+
let result = deleted_file_retention_timestamp_with_time(Some(retention), reference_time)?;
136+
let expected = 1_000_000_000_000 - (24 * 60 * 60 * 1000); // 1 day in milliseconds
137+
assert_eq!(result, expected);
138+
139+
// Test with zero retention
140+
let retention = Duration::from_secs(0);
141+
let result = deleted_file_retention_timestamp_with_time(Some(retention), reference_time)?;
142+
let expected = 1_000_000_000_000; // Same as reference time
143+
assert_eq!(result, expected);
144+
145+
Ok(())
146+
}
147+
148+
#[test]
149+
fn test_deleted_file_retention_timestamp_edge_cases() {
150+
// Test with very large retention duration
151+
let reference_time = Duration::from_secs(1_000_000_000);
152+
let large_retention = Duration::from_secs(u64::MAX);
153+
let result =
154+
deleted_file_retention_timestamp_with_time(Some(large_retention), reference_time);
155+
assert!(result.is_err());
156+
assert!(result
157+
.unwrap_err()
158+
.to_string()
159+
.contains("Retention duration exceeds i64 millisecond range"));
160+
}
161+
162+
#[test]
163+
fn test_deleted_file_retention_timestamp_with_large_now_time() {
164+
// Test with very large current time that would overflow i64 milliseconds
165+
let reference_time = Duration::from_secs(u64::MAX);
166+
let retention = Duration::from_secs(1);
167+
let result = deleted_file_retention_timestamp_with_time(Some(retention), reference_time);
168+
assert!(result.is_err());
169+
assert!(result
170+
.unwrap_err()
171+
.to_string()
172+
.contains("Current timestamp exceeds i64 millisecond range"));
173+
}
174+
175+
#[test]
176+
fn test_calculate_transaction_expiration_timestamp() -> DeltaResult<()> {
177+
// No set_transaction_retention_duration
178+
let properties = TableProperties::default();
179+
let result = calculate_transaction_expiration_timestamp(&properties)?;
180+
assert_eq!(result, None);
181+
182+
// Test with set_transaction_retention_duration
183+
let properties = TableProperties {
184+
set_transaction_retention_duration: Some(Duration::from_secs(3600)), // 1 hour
185+
..Default::default()
186+
};
187+
let result = calculate_transaction_expiration_timestamp(&properties)?;
188+
assert!(result.is_some());
189+
190+
// The result should be current time minus 1 hour (approximately)
191+
// We can't test exact value due to timing, but we can verify it's reasonable
192+
let timestamp = result.unwrap();
193+
let now_ms = crate::utils::current_time_ms().unwrap();
194+
let one_hour_ms = 3600 * 1000;
195+
196+
// Should be within a reasonable range (allowing for test execution time)
197+
assert!(timestamp < now_ms);
198+
assert!(timestamp > now_ms - one_hour_ms - 1000); // Allow 1 second buffer
199+
200+
Ok(())
201+
}
202+
203+
#[test]
204+
fn test_calculate_transaction_expiration_timestamp_edge_cases() {
205+
// Test with very large retention duration that would overflow
206+
let properties = TableProperties {
207+
set_transaction_retention_duration: Some(Duration::from_secs(u64::MAX)),
208+
..Default::default()
209+
};
210+
let result = calculate_transaction_expiration_timestamp(&properties);
211+
assert!(result.is_err());
212+
assert!(result
213+
.unwrap_err()
214+
.to_string()
215+
.contains("Retention duration exceeds i64 millisecond range"));
216+
}
217+
218+
// Mock implementation of RetentionCalculator for testing trait methods
219+
struct MockRetentionCalculator {
220+
properties: TableProperties,
221+
}
222+
223+
impl MockRetentionCalculator {
224+
fn new(properties: TableProperties) -> Self {
225+
Self { properties }
226+
}
227+
}
228+
229+
impl RetentionCalculator for MockRetentionCalculator {
230+
fn table_properties(&self) -> &TableProperties {
231+
&self.properties
232+
}
233+
}
234+
235+
#[test]
236+
fn test_retention_calculator_trait_deleted_file_retention_timestamp() -> DeltaResult<()> {
237+
// Test with default retention
238+
let properties = TableProperties::default();
239+
let calculator = MockRetentionCalculator::new(properties);
240+
let result = calculator.deleted_file_retention_timestamp()?;
241+
242+
// Should be current time minus 7 days (approximately)
243+
let now_ms = crate::utils::current_time_ms().unwrap();
244+
let seven_days_ms = 7 * 24 * 60 * 60 * 1000;
245+
246+
assert!(result < now_ms);
247+
assert!(result > now_ms - seven_days_ms - 1000); // Allow a small 1 second buffer
248+
249+
// Test with custom retention
250+
let properties = TableProperties {
251+
deleted_file_retention_duration: Some(Duration::from_secs(1800)), // 30 minutes
252+
..Default::default()
253+
};
254+
let calculator = MockRetentionCalculator::new(properties);
255+
let result = calculator.deleted_file_retention_timestamp()?;
256+
257+
let thirty_minutes_ms = 30 * 60 * 1000;
258+
assert!(result < now_ms);
259+
assert!(result > now_ms - thirty_minutes_ms - 1000); // Allow 1 second buffer
260+
261+
Ok(())
262+
}
263+
264+
#[test]
265+
fn test_retention_calculator_trait_get_transaction_expiration_timestamp() -> DeltaResult<()> {
266+
// Test with no transaction retention
267+
let properties = TableProperties::default();
268+
let calculator = MockRetentionCalculator::new(properties);
269+
let result = calculator.get_transaction_expiration_timestamp()?;
270+
assert_eq!(result, None);
271+
272+
// Test with transaction retention
273+
let properties = TableProperties {
274+
set_transaction_retention_duration: Some(Duration::from_secs(7200)), // 2 hours
275+
..Default::default()
276+
};
277+
let calculator = MockRetentionCalculator::new(properties);
278+
let result = calculator.get_transaction_expiration_timestamp()?;
279+
assert!(result.is_some());
280+
281+
let timestamp = result.unwrap();
282+
let now_ms = crate::utils::current_time_ms().unwrap();
283+
let two_hours_ms = 2 * 60 * 60 * 1000;
284+
285+
assert!(timestamp < now_ms);
286+
assert!(timestamp > now_ms - two_hours_ms - 1000); // Allow 1 second buffer
287+
288+
Ok(())
289+
}
290+
}

0 commit comments

Comments
 (0)