|
| 1 | +use std::sync::Arc; |
| 2 | +use std::time::{SystemTime, UNIX_EPOCH}; |
| 3 | + |
| 4 | +use delta_kernel::engine::to_json_bytes; |
| 5 | +use delta_kernel::schema::{DataType, StructField, StructType}; |
| 6 | +use delta_kernel::Snapshot; |
| 7 | +use test_utils::{create_table, engine_store_setup}; |
| 8 | + |
| 9 | +use object_store::path::Path; |
| 10 | +use object_store::ObjectStore; |
| 11 | +use url::Url; |
| 12 | + |
| 13 | +/// Convert a URL to an object_store::Path |
| 14 | +fn url_to_object_store_path(url: &Url) -> Result<Path, Box<dyn std::error::Error>> { |
| 15 | + let path_segments = url |
| 16 | + .path_segments() |
| 17 | + .ok_or_else(|| format!("URL has no path segments: {}", url))?; |
| 18 | + |
| 19 | + let path_string = path_segments.skip(1).collect::<Vec<_>>().join("/"); |
| 20 | + |
| 21 | + Ok(Path::from(path_string)) |
| 22 | +} |
| 23 | + |
| 24 | +#[tokio::test] |
| 25 | +async fn action_reconciliation_round_trip() -> Result<(), Box<dyn std::error::Error>> { |
| 26 | + let _ = tracing_subscriber::fmt::try_init(); |
| 27 | + |
| 28 | + // Create a simple table schema: one int column named 'id' |
| 29 | + let schema = Arc::new(StructType::try_new(vec![StructField::nullable( |
| 30 | + "id", |
| 31 | + DataType::INTEGER, |
| 32 | + )])?); |
| 33 | + |
| 34 | + // Setup engine and storage - this creates a proper temporary table |
| 35 | + let (store, engine, table_location) = engine_store_setup("test_compaction_table", None); |
| 36 | + |
| 37 | + // Create table (this will be commit 0) |
| 38 | + let table_url = create_table( |
| 39 | + store.clone(), |
| 40 | + table_location, |
| 41 | + schema.clone(), |
| 42 | + &[], |
| 43 | + false, |
| 44 | + vec![], |
| 45 | + vec![], |
| 46 | + ) |
| 47 | + .await?; |
| 48 | + |
| 49 | + // Commit 1: Add two files |
| 50 | + let commit1_content = r#"{"commitInfo":{"timestamp":1587968586000,"operation":"WRITE","operationParameters":{"mode":"Append"},"isBlindAppend":true}} |
| 51 | +{"add":{"path":"part-00000-file1.parquet","partitionValues":{},"size":1024,"modificationTime":1587968586000,"dataChange":true, "stats":"{\"numRecords\":10,\"nullCount\":{\"id\":0},\"minValues\":{\"id\": 1},\"maxValues\":{\"id\":10}}"}} |
| 52 | +{"add":{"path":"part-00001-file2.parquet","partitionValues":{},"size":2048,"modificationTime":1587968586000,"dataChange":true, "stats":"{\"numRecords\":20,\"nullCount\":{\"id\":0},\"minValues\":{\"id\": 11},\"maxValues\":{\"id\":30}}"}} |
| 53 | +"#; |
| 54 | + store |
| 55 | + .put( |
| 56 | + &Path::from("test_compaction_table/_delta_log/00000000000000000001.json"), |
| 57 | + commit1_content.as_bytes().into(), |
| 58 | + ) |
| 59 | + .await?; |
| 60 | + |
| 61 | + // Commit 2: Remove only the first file with a recent deletionTimestamp, keep the second file |
| 62 | + let current_timestamp_millis = SystemTime::now() |
| 63 | + .duration_since(UNIX_EPOCH) |
| 64 | + .unwrap() |
| 65 | + .as_millis() as i64; |
| 66 | + let commit2_content = format!( |
| 67 | + r#"{{"commitInfo":{{"timestamp":{},"operation":"DELETE","operationParameters":{{"predicate":"id <= 10"}},"isBlindAppend":false}}}} |
| 68 | +{{"remove":{{"path":"part-00000-file1.parquet","partitionValues":{{}},"size":1024,"modificationTime":1587968586000,"dataChange":true,"deletionTimestamp":{}}}}} |
| 69 | +"#, |
| 70 | + current_timestamp_millis, current_timestamp_millis |
| 71 | + ); |
| 72 | + store |
| 73 | + .put( |
| 74 | + &Path::from("test_compaction_table/_delta_log/00000000000000000002.json"), |
| 75 | + commit2_content.clone().into_bytes().into(), |
| 76 | + ) |
| 77 | + .await?; |
| 78 | + |
| 79 | + // Create snapshot and log compaction writer |
| 80 | + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; |
| 81 | + let mut writer = snapshot.log_compaction_writer(0, 2)?; |
| 82 | + |
| 83 | + // Get compaction data iterator |
| 84 | + let mut compaction_data = writer.compaction_data(&engine)?; |
| 85 | + let compaction_path = writer.compaction_path().clone(); |
| 86 | + |
| 87 | + // Verify the compaction file name |
| 88 | + let expected_filename = "00000000000000000000.00000000000000000002.compacted.json"; |
| 89 | + assert!(compaction_path.to_string().ends_with(expected_filename)); |
| 90 | + |
| 91 | + // Process compaction data batches and collect the actual compacted data |
| 92 | + let mut batch_count = 0; |
| 93 | + let mut compacted_data_batches = Vec::new(); |
| 94 | + |
| 95 | + // Log compaction should produce reconciled actions from the version range: |
| 96 | + // - Protocol + metadata from table creation |
| 97 | + // - Add action for file1 (first/newest add for this file path) |
| 98 | + // - Add action for file2 (first/newest add for this file path) |
| 99 | + // - Remove action for file1 (first/newest remove for this file path, non-expired tombstone) |
| 100 | + // - CommitInfo actions should be excluded from compaction |
| 101 | + // |
| 102 | + // Note: Actions are processed in reverse chronological order (newest to oldest). |
| 103 | + // The reconciliation keeps the first (newest) occurrence of each action type |
| 104 | + // for each unique file path, so both add and remove actions for file1 are kept. |
| 105 | + for batch_result in compaction_data.by_ref() { |
| 106 | + let batch = batch_result?; |
| 107 | + compacted_data_batches.push(batch); |
| 108 | + batch_count += 1; |
| 109 | + } |
| 110 | + |
| 111 | + assert!( |
| 112 | + batch_count > 0, |
| 113 | + "Should have processed at least one compaction batch" |
| 114 | + ); |
| 115 | + |
| 116 | + // Convert the end-to-end flow of writing the JSON. We are going beyond the public |
| 117 | + // log compaction APIs since the test is writing the compacted JSON and verifying it |
| 118 | + // bu this is intentional, as most engines would be implementing something similar |
| 119 | + let compaction_data_iter = compacted_data_batches |
| 120 | + .into_iter() |
| 121 | + .map(|batch| Ok(batch.data)); |
| 122 | + let json_bytes = to_json_bytes(compaction_data_iter)?; |
| 123 | + let final_content = String::from_utf8(json_bytes)?; |
| 124 | + |
| 125 | + let compaction_file_path = url_to_object_store_path(&compaction_path)?; |
| 126 | + |
| 127 | + store |
| 128 | + .put(&compaction_file_path, final_content.clone().into()) |
| 129 | + .await?; |
| 130 | + |
| 131 | + // Verify the compacted file content that we just wrote |
| 132 | + let compacted_content = store.get(&compaction_file_path).await?; |
| 133 | + let compacted_bytes = compacted_content.bytes().await?; |
| 134 | + let compacted_str = std::str::from_utf8(&compacted_bytes)?; |
| 135 | + |
| 136 | + // Parse and verify the actions |
| 137 | + let compacted_lines: Vec<&str> = compacted_str.trim().lines().collect(); |
| 138 | + assert!( |
| 139 | + !compacted_lines.is_empty(), |
| 140 | + "Compacted file should not be empty" |
| 141 | + ); |
| 142 | + |
| 143 | + // Check for expected actions |
| 144 | + let has_protocol = compacted_lines.iter().any(|line| line.contains("protocol")); |
| 145 | + let has_metadata = compacted_lines.iter().any(|line| line.contains("metaData")); |
| 146 | + let has_remove = compacted_lines.iter().any(|line| line.contains("remove")); |
| 147 | + let has_add_file1 = compacted_lines |
| 148 | + .iter() |
| 149 | + .any(|line| line.contains("part-00000-file1.parquet") && line.contains("add")); |
| 150 | + let has_add_file2 = compacted_lines |
| 151 | + .iter() |
| 152 | + .any(|line| line.contains("part-00001-file2.parquet") && line.contains("add")); |
| 153 | + let has_commit_info = compacted_lines |
| 154 | + .iter() |
| 155 | + .any(|line| line.contains("commitInfo")); |
| 156 | + |
| 157 | + assert!( |
| 158 | + has_protocol, |
| 159 | + "Compacted file should contain protocol action" |
| 160 | + ); |
| 161 | + assert!( |
| 162 | + has_metadata, |
| 163 | + "Compacted file should contain metadata action" |
| 164 | + ); |
| 165 | + assert!( |
| 166 | + has_remove, |
| 167 | + "Compacted file should contain remove action (non-expired tombstone)" |
| 168 | + ); |
| 169 | + assert!( |
| 170 | + has_add_file1, |
| 171 | + "Compacted file should contain add action for file1 (first/newest add action for this file path)" |
| 172 | + ); |
| 173 | + assert!( |
| 174 | + has_add_file2, |
| 175 | + "Compacted file should contain add action for file2 (it was not removed)" |
| 176 | + ); |
| 177 | + assert!( |
| 178 | + !has_commit_info, |
| 179 | + "Compacted file should NOT contain commitInfo actions (they should be excluded)" |
| 180 | + ); |
| 181 | + |
| 182 | + // Verify the remove action has the current timestamp |
| 183 | + let remove_line = compacted_lines |
| 184 | + .iter() |
| 185 | + .find(|line| line.contains("remove")) |
| 186 | + .ok_or("Remove action should be present in compacted content")?; |
| 187 | + let parsed_remove: serde_json::Value = serde_json::from_str(remove_line)?; |
| 188 | + |
| 189 | + let actual_deletion_timestamp = parsed_remove["remove"]["deletionTimestamp"] |
| 190 | + .as_i64() |
| 191 | + .ok_or_else(|| { |
| 192 | + format!( |
| 193 | + "deletionTimestamp should be present in remove action: {}", |
| 194 | + remove_line |
| 195 | + ) |
| 196 | + })?; |
| 197 | + assert_eq!(actual_deletion_timestamp, current_timestamp_millis); |
| 198 | + |
| 199 | + Ok(()) |
| 200 | +} |
0 commit comments