Skip to content

Commit 67aad5c

Browse files
authored
feat: CommitInfo sets a txnId (#1262)
Generate a txnId stored inside the commitInfo metadata of a Delta commit. - resolves #1149
1 parent f8f7d52 commit 67aad5c

File tree

3 files changed

+58
-22
lines changed

3 files changed

+58
-22
lines changed

ffi/src/transaction/mod.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,18 @@ mod tests {
157157

158158
use std::sync::Arc;
159159

160+
const ZERO_UUID: &str = "00000000-0000-0000-0000-000000000000";
161+
160162
use super::*;
161163

162164
use tempfile::tempdir;
163165

166+
fn check_txn_id_exists(commit_info: &serde_json::Value) {
167+
commit_info["txnId"]
168+
.as_str()
169+
.expect("txnId should be present in commitInfo");
170+
}
171+
164172
fn create_arrow_ffi_from_json(
165173
schema: ArrowSchema,
166174
json_string: &str,
@@ -341,9 +349,12 @@ mod tests {
341349
.into_iter::<serde_json::Value>()
342350
.try_collect()?;
343351

344-
// set timestamps to 0 and paths to known string values for comparison
345-
// (otherwise timestamps are non-deterministic and paths are random UUIDs)
352+
check_txn_id_exists(&parsed_commits[0]["commitInfo"]);
353+
354+
// set timestamps to 0, paths and txn_id to known string values for comparison
355+
// (otherwise timestamps are non-deterministic, paths and txn_id are random UUIDs)
346356
set_json_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?;
357+
set_json_value(&mut parsed_commits[0], "commitInfo.txnId", json!(ZERO_UUID))?;
347358
set_json_value(&mut parsed_commits[1], "add.modificationTime", json!(0))?;
348359
set_json_value(&mut parsed_commits[1], "add.size", json!(0))?;
349360

@@ -355,6 +366,7 @@ mod tests {
355366
"operation": "UNKNOWN",
356367
"kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")),
357368
"operationParameters": {},
369+
"txnId": ZERO_UUID
358370
}
359371
}),
360372
json!({

kernel/src/actions/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -584,8 +584,7 @@ pub(crate) struct CommitInfo {
584584
pub(crate) kernel_version: Option<String>,
585585
/// A place for the engine to store additional metadata associated with this commit
586586
pub(crate) engine_info: Option<String>,
587-
/// A unique transaction identified for this commit. When the `catalogManaged` table feature is
588-
/// enabled (not yet implemented), this field will be required. Otherwise, it is optional.
587+
/// A unique transaction identifier for this commit.
589588
pub(crate) txn_id: Option<String>,
590589
}
591590

@@ -602,7 +601,7 @@ impl CommitInfo {
602601
operation_parameters: None,
603602
kernel_version: Some(format!("v{KERNEL_VERSION}")),
604603
engine_info,
605-
txn_id: None,
604+
txn_id: Some(uuid::Uuid::new_v4().to_string()),
606605
}
607606
}
608607
}
@@ -1393,6 +1392,7 @@ mod tests {
13931392
let engine = ExprEngine::new();
13941393

13951394
let commit_info = CommitInfo::new(0, None, None);
1395+
let commit_info_txn_id = commit_info.txn_id.clone();
13961396

13971397
let engine_data = commit_info.into_engine_data(CommitInfo::to_schema().into(), &engine);
13981398

@@ -1416,7 +1416,7 @@ mod tests {
14161416
operation_parameters,
14171417
Arc::new(StringArray::from(vec![Some(format!("v{KERNEL_VERSION}"))])),
14181418
Arc::new(StringArray::from(vec![None::<String>])),
1419-
Arc::new(StringArray::from(vec![None::<String>])),
1419+
Arc::new(StringArray::from(vec![commit_info_txn_id])),
14201420
],
14211421
)
14221422
.unwrap();

kernel/tests/write.rs

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::Arc;
33

44
use delta_kernel::Error as KernelError;
55
use delta_kernel::{DeltaResult, Engine, Snapshot, Version};
6+
use uuid::Uuid;
67

78
use delta_kernel::arrow::array::{ArrayRef, BinaryArray, StructArray};
89
use delta_kernel::arrow::array::{Int32Array, StringArray, TimestampMicrosecondArray};
@@ -35,6 +36,15 @@ use test_utils::{create_table, engine_store_setup, setup_test_tables, test_read}
3536
mod common;
3637
use url::Url;
3738

39+
fn validate_txn_id(commit_info: &serde_json::Value) {
40+
let txn_id = commit_info["txnId"]
41+
.as_str()
42+
.expect("txnId should be present in commitInfo");
43+
Uuid::parse_str(txn_id).expect("txnId should be valid UUID format");
44+
}
45+
46+
const ZERO_UUID: &str = "00000000-0000-0000-0000-000000000000";
47+
3848
#[tokio::test]
3949
async fn test_commit_info() -> Result<(), Box<dyn std::error::Error>> {
4050
// setup tracing
@@ -63,11 +73,11 @@ async fn test_commit_info() -> Result<(), Box<dyn std::error::Error>> {
6373
.await?;
6474

6575
let mut parsed_commit: serde_json::Value = serde_json::from_slice(&commit1.bytes().await?)?;
66-
*parsed_commit
67-
.get_mut("commitInfo")
68-
.unwrap()
69-
.get_mut("timestamp")
70-
.unwrap() = serde_json::Value::Number(0.into());
76+
77+
validate_txn_id(&parsed_commit["commitInfo"]);
78+
79+
set_json_value(&mut parsed_commit, "commitInfo.timestamp", json!(0))?;
80+
set_json_value(&mut parsed_commit, "commitInfo.txnId", json!(ZERO_UUID))?;
7181

7282
let expected_commit = json!({
7383
"commitInfo": {
@@ -76,6 +86,7 @@ async fn test_commit_info() -> Result<(), Box<dyn std::error::Error>> {
7686
"kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")),
7787
"operationParameters": {},
7888
"engineInfo": "default engine",
89+
"txnId": ZERO_UUID,
7990
}
8091
});
8192

@@ -217,9 +228,12 @@ async fn test_commit_info_action() -> Result<(), Box<dyn std::error::Error>> {
217228
.into_iter::<serde_json::Value>()
218229
.try_collect()?;
219230

220-
// set timestamps to 0 and paths to known string values for comparison
221-
// (otherwise timestamps are non-deterministic and paths are random UUIDs)
231+
validate_txn_id(&parsed_commits[0]["commitInfo"]);
232+
233+
// set timestamps to 0, paths and txn_id to known string values for comparison
234+
// (otherwise timestamps are non-deterministic, paths and txn_id are random UUIDs)
222235
set_json_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?;
236+
set_json_value(&mut parsed_commits[0], "commitInfo.txnId", json!(ZERO_UUID))?;
223237

224238
let expected_commit = vec![json!({
225239
"commitInfo": {
@@ -228,6 +242,7 @@ async fn test_commit_info_action() -> Result<(), Box<dyn std::error::Error>> {
228242
"kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")),
229243
"operationParameters": {},
230244
"engineInfo": "default engine",
245+
"txnId": ZERO_UUID
231246
}
232247
})];
233248

@@ -270,10 +285,13 @@ async fn test_append() -> Result<(), Box<dyn std::error::Error>> {
270285
// check that the timestamps in commit_info and add actions are within 10s of SystemTime::now()
271286
// before we clear them for comparison
272287
check_action_timestamps(parsed_commits.iter())?;
288+
// check that the txn_id is valid before we clear it for comparison
289+
validate_txn_id(&parsed_commits[0]["commitInfo"]);
273290

274-
// set timestamps to 0 and paths to known string values for comparison
275-
// (otherwise timestamps are non-deterministic and paths are random UUIDs)
291+
// set timestamps to 0, paths and txn_id to known string values for comparison
292+
// (otherwise timestamps are non-deterministic, paths and txn_id are random UUIDs)
276293
set_json_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?;
294+
set_json_value(&mut parsed_commits[0], "commitInfo.txnId", json!(ZERO_UUID))?;
277295
set_json_value(&mut parsed_commits[1], "add.modificationTime", json!(0))?;
278296
set_json_value(&mut parsed_commits[1], "add.path", json!("first.parquet"))?;
279297
set_json_value(&mut parsed_commits[2], "add.modificationTime", json!(0))?;
@@ -286,6 +304,7 @@ async fn test_append() -> Result<(), Box<dyn std::error::Error>> {
286304
"operation": "UNKNOWN",
287305
"kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")),
288306
"operationParameters": {},
307+
"txnId": ZERO_UUID
289308
}
290309
}),
291310
json!({
@@ -437,10 +456,13 @@ async fn test_append_partitioned() -> Result<(), Box<dyn std::error::Error>> {
437456
// check that the timestamps in commit_info and add actions are within 10s of SystemTime::now()
438457
// before we clear them for comparison
439458
check_action_timestamps(parsed_commits.iter())?;
459+
// check that the txn_id is valid before we clear it for comparison
460+
validate_txn_id(&parsed_commits[0]["commitInfo"]);
440461

441-
// set timestamps to 0 and paths to known string values for comparison
442-
// (otherwise timestamps are non-deterministic and paths are random UUIDs)
462+
// set timestamps to 0, paths and txn_id to known string values for comparison
463+
// (otherwise timestamps are non-deterministic, paths and txn_id are random UUIDs)
443464
set_json_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?;
465+
set_json_value(&mut parsed_commits[0], "commitInfo.txnId", json!(ZERO_UUID))?;
444466
set_json_value(&mut parsed_commits[1], "add.modificationTime", json!(0))?;
445467
set_json_value(&mut parsed_commits[1], "add.path", json!("first.parquet"))?;
446468
set_json_value(&mut parsed_commits[2], "add.modificationTime", json!(0))?;
@@ -454,6 +476,7 @@ async fn test_append_partitioned() -> Result<(), Box<dyn std::error::Error>> {
454476
"kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")),
455477
"operationParameters": {},
456478
"engineInfo": "default engine",
479+
"txnId": ZERO_UUID
457480
}
458481
}),
459482
json!({
@@ -620,11 +643,7 @@ async fn test_write_txn_actions() -> Result<(), Box<dyn std::error::Error>> {
620643
.into_iter::<serde_json::Value>()
621644
.try_collect()?;
622645

623-
*parsed_commits[0]
624-
.get_mut("commitInfo")
625-
.unwrap()
626-
.get_mut("timestamp")
627-
.unwrap() = serde_json::Value::Number(0.into());
646+
set_json_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0)).unwrap();
628647

629648
let time_ms: i64 = std::time::SystemTime::now()
630649
.duration_since(std::time::UNIX_EPOCH)?
@@ -663,6 +682,10 @@ async fn test_write_txn_actions() -> Result<(), Box<dyn std::error::Error>> {
663682
assert!((last_updated.as_i64().unwrap() - time_ms).abs() < 10_000);
664683
*last_updated = serde_json::Value::Number(2.into());
665684

685+
validate_txn_id(&parsed_commits[0]["commitInfo"]);
686+
687+
set_json_value(&mut parsed_commits[0], "commitInfo.txnId", json!(ZERO_UUID))?;
688+
666689
let expected_commit = vec![
667690
json!({
668691
"commitInfo": {
@@ -671,6 +694,7 @@ async fn test_write_txn_actions() -> Result<(), Box<dyn std::error::Error>> {
671694
"kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")),
672695
"operationParameters": {},
673696
"engineInfo": "default engine",
697+
"txnId": ZERO_UUID
674698
}
675699
}),
676700
json!({

0 commit comments

Comments
 (0)