From 78976911736489d0b43be1b289c7eb8de3a75f2d Mon Sep 17 00:00:00 2001 From: MannDP Date: Wed, 17 Sep 2025 16:13:50 +0000 Subject: [PATCH 1/3] feat: support removing domain metadata --- kernel/src/actions/mod.rs | 9 ++ kernel/src/transaction/mod.rs | 14 +++ kernel/tests/write.rs | 195 ++++++++++++++++++++++++++++++++++ 3 files changed, 218 insertions(+) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 58dff9bac..119097330 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -939,6 +939,15 @@ impl DomainMetadata { } } + // Create a new DomainMetadata action to remove a domain. + pub(crate) fn remove(domain: String) -> Self { + Self { + domain, + configuration: String::new(), + removed: true, + } + } + // returns true if the domain metadata is an system-controlled domain (all domains that start // with "delta.") #[allow(unused)] diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 5ac14e513..a73b843e9 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -277,6 +277,20 @@ impl Transaction { self } + /// Remove domain metadata from the Delta log. + /// This creates a tombstone to logically delete the specified domain. We don't check + /// if the domain metadata exists in the log, the remove action is simply a tombstone record. + /// Note that each domain can only appear once per transaction. That is, multiple operations + /// on the same domain are disallowed in a single transaction, as well as setting and removing + /// the same domain in a single transaction. If a duplicate domain is included, the `commit` will + /// fail (that is, we don't eagerly check domain validity here). + /// Removing metadata for multiple distinct domains is allowed. + pub fn with_domain_metadata_removed(mut self, domain: String) -> Self { + self.domain_metadatas + .push(DomainMetadata::remove(domain.clone())); + self + } + /// Generate domain metadata actions with validation. Handle both user and system domains. fn generate_domain_metadata_actions<'a>( &'a self, diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index a46dc5f1b..018f33adc 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -1382,3 +1382,198 @@ async fn test_set_domain_metadata_unsupported_writer_feature( Ok(()) } + +#[tokio::test] +async fn test_remove_domain_metadata_basic() -> Result<(), Box> { + let _ = tracing_subscriber::fmt::try_init(); + + let schema = Arc::new(StructType::try_new(vec![StructField::nullable( + "number", + DataType::INTEGER, + )])?); + + let table_name = "test_domain_metadata_unsupported"; + + let (store, engine, table_location) = engine_store_setup(table_name, None); + let table_url = create_table( + store.clone(), + table_location, + schema.clone(), + &[], + true, + vec![], + vec!["domainMetadata"], + ) + .await?; + + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let txn = snapshot.transaction()?; + + let domain = "app.deprecated"; + + // removing domain metadata that is not present is allowed; the log simply contains a tombstone action + txn.with_domain_metadata_removed(domain.to_string()) + .commit(&engine)?; + + let commit_data = store + .get(&Path::from(format!( + "/{table_name}/_delta_log/00000000000000000001.json" + ))) + .await? + .bytes() + .await?; + let actions: Vec = Deserializer::from_slice(&commit_data) + .into_iter() + .try_collect()?; + + let domain_action = actions + .iter() + .find(|v| v.get("domainMetadata").is_some()) + .unwrap(); + assert_eq!(domain_action["domainMetadata"]["domain"], "app.deprecated"); + assert_eq!(domain_action["domainMetadata"]["configuration"], ""); + assert_eq!(domain_action["domainMetadata"]["removed"], true); + + let final_snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let config = final_snapshot.get_domain_metadata(domain, &engine)?; + assert_eq!(config, None); + + Ok(()) +} + +#[tokio::test] +async fn test_domain_metadata_set_remove_conflicts() -> Result<(), Box> { + let _ = tracing_subscriber::fmt::try_init(); + + let schema = Arc::new(StructType::try_new(vec![StructField::nullable( + "number", + DataType::INTEGER, + )])?); + + let table_name = "test_domain_metadata_unsupported"; + + let (store, engine, table_location) = engine_store_setup(table_name, None); + let table_url = create_table( + store.clone(), + table_location, + schema.clone(), + &[], + true, + vec![], + vec!["domainMetadata"], + ) + .await?; + + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + + // set then remove same domain + let txn = snapshot.clone().transaction()?; + let err = txn + .with_domain_metadata("app.config".to_string(), "v1".to_string()) + .with_domain_metadata_removed("app.config".to_string()) + .commit(&engine) + .unwrap_err(); + assert!(err + .to_string() + .contains("already specified in this transaction")); + + // remove then set same domain + let txn2 = snapshot.clone().transaction()?; + let err = txn2 + .with_domain_metadata_removed("test.domain".to_string()) + .with_domain_metadata("test.domain".to_string(), "v1".to_string()) + .commit(&engine) + .unwrap_err(); + assert!(err + .to_string() + .contains("already specified in this transaction")); + + // remove same domain twice + let txn3 = snapshot.clone().transaction()?; + let err = txn3 + .with_domain_metadata_removed("another.domain".to_string()) + .with_domain_metadata_removed("another.domain".to_string()) + .commit(&engine) + .unwrap_err(); + assert!(err + .to_string() + .contains("already specified in this transaction")); + + // remove system domain + let txn4 = snapshot.clone().transaction()?; + let err = txn4 + .with_domain_metadata_removed("delta.system".to_string()) + .commit(&engine) + .unwrap_err(); + assert!(err + .to_string() + .contains("Cannot modify domains that start with 'delta.' as those are system controlled")); + + Ok(()) +} + +#[tokio::test] +async fn test_domain_metadata_set_then_remove_across_transactions( +) -> Result<(), Box> { + let _ = tracing_subscriber::fmt::try_init(); + + let schema = Arc::new(StructType::try_new(vec![StructField::nullable( + "number", + DataType::INTEGER, + )])?); + + let table_name = "test_domain_metadata_unsupported"; + + let (store, engine, table_location) = engine_store_setup(table_name, None); + let table_url = create_table( + store.clone(), + table_location, + schema.clone(), + &[], + true, + vec![], + vec!["domainMetadata"], + ) + .await?; + + let domain = "app.config"; + + // txn 1: Set domain metadata + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let txn = snapshot.transaction()?; + txn.with_domain_metadata(domain.to_string(), r#"{"version": 1}"#.to_string()) + .commit(&engine)?; + + // txn 2: Remove the same domain metadata + let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let txn = snapshot.transaction()?; + txn.with_domain_metadata_removed(domain.to_string()) + .commit(&engine)?; + + // verify removal commit + let commit_data = store + .get(&Path::from(format!( + "/{table_name}/_delta_log/00000000000000000002.json" + ))) + .await? + .bytes() + .await?; + let actions: Vec = Deserializer::from_slice(&commit_data) + .into_iter() + .try_collect()?; + + let domain_action = actions + .iter() + .find(|v| v.get("domainMetadata").is_some()) + .unwrap(); + assert_eq!(domain_action["domainMetadata"]["domain"], domain); + assert_eq!(domain_action["domainMetadata"]["configuration"], ""); + assert_eq!(domain_action["domainMetadata"]["removed"], true); + + // verify reads see the metadata removal + let final_snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; + let domain_config = final_snapshot.get_domain_metadata(domain, &engine)?; + assert_eq!(domain_config, None); + + Ok(()) +} From 27f59c6857ff9142cc24d9af968a030506130e33 Mon Sep 17 00:00:00 2001 From: Mann Patel <36581058+MannDP@users.noreply.github.com> Date: Wed, 24 Sep 2025 13:52:17 -0700 Subject: [PATCH 2/3] Apply suggestion from @zachschuermann Co-authored-by: Zach Schuermann --- kernel/src/transaction/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index a73b843e9..2091b7c2a 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -287,7 +287,7 @@ impl Transaction { /// Removing metadata for multiple distinct domains is allowed. pub fn with_domain_metadata_removed(mut self, domain: String) -> Self { self.domain_metadatas - .push(DomainMetadata::remove(domain.clone())); + .push(DomainMetadata::remove(domain)); self } From eae43940f11f309c4526baf26ef200ae1171a249 Mon Sep 17 00:00:00 2001 From: MannDP Date: Thu, 25 Sep 2025 16:41:11 +0000 Subject: [PATCH 3/3] impl pr review comments --- kernel/src/actions/domain_metadata.rs | 2 +- kernel/src/actions/mod.rs | 12 ++++- kernel/src/transaction/mod.rs | 64 ++++++++++++++++++++------- kernel/tests/write.rs | 34 +++++++------- 4 files changed, 77 insertions(+), 35 deletions(-) diff --git a/kernel/src/actions/domain_metadata.rs b/kernel/src/actions/domain_metadata.rs index 8ef5e89bd..aa6f2f23d 100644 --- a/kernel/src/actions/domain_metadata.rs +++ b/kernel/src/actions/domain_metadata.rs @@ -38,7 +38,7 @@ pub(crate) fn domain_metadata_configuration( /// Scan the entire log for all domain metadata actions but terminate early if a specific domain /// is provided. Note that this returns the latest domain metadata for each domain, accounting for /// tombstones (removed=true) - that is, removed domain metadatas will _never_ be returned. -fn scan_domain_metadatas( +pub(crate) fn scan_domain_metadatas( log_segment: &LogSegment, domain: Option<&str>, engine: &dyn Engine, diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 119097330..d072221b5 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -940,10 +940,10 @@ impl DomainMetadata { } // Create a new DomainMetadata action to remove a domain. - pub(crate) fn remove(domain: String) -> Self { + pub(crate) fn remove(domain: String, configuration: String) -> Self { Self { domain, - configuration: String::new(), + configuration, removed: true, } } @@ -958,6 +958,14 @@ impl DomainMetadata { pub(crate) fn domain(&self) -> &str { &self.domain } + + pub(crate) fn configuration(&self) -> &str { + &self.configuration + } + + pub(crate) fn is_removed(&self) -> bool { + self.removed + } } #[cfg(test)] diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 2091b7c2a..2998c5200 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::iter; use std::ops::Deref; use std::sync::{Arc, LazyLock}; @@ -6,8 +6,8 @@ use std::sync::{Arc, LazyLock}; use url::Url; use crate::actions::{ - as_log_add_schema, get_log_commit_info_schema, get_log_domain_metadata_schema, - get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction, + as_log_add_schema, domain_metadata::scan_domain_metadatas, get_log_commit_info_schema, + get_log_domain_metadata_schema, get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction, }; use crate::error::Error; use crate::expressions::{ArrayData, Transform, UnaryExpressionOp::ToJson}; @@ -278,20 +278,26 @@ impl Transaction { } /// Remove domain metadata from the Delta log. - /// This creates a tombstone to logically delete the specified domain. We don't check - /// if the domain metadata exists in the log, the remove action is simply a tombstone record. + /// If the domain exists in the Delta log, this creates a tombstone to logically delete + /// the domain. The tombstone preserves the previous configuration value. + /// If the domain does not exist in the Delta log, this is a no-op. /// Note that each domain can only appear once per transaction. That is, multiple operations /// on the same domain are disallowed in a single transaction, as well as setting and removing /// the same domain in a single transaction. If a duplicate domain is included, the `commit` will /// fail (that is, we don't eagerly check domain validity here). /// Removing metadata for multiple distinct domains is allowed. pub fn with_domain_metadata_removed(mut self, domain: String) -> Self { + // actual configuration value determined during commit self.domain_metadatas - .push(DomainMetadata::remove(domain)); + .push(DomainMetadata::remove(domain, String::new())); self } /// Generate domain metadata actions with validation. Handle both user and system domains. + /// + /// This function may perform an expensive log replay operation if there are any domain removals. + /// The log replay is required to fetch the previous configuration value for the domain to preserve + /// in removal tombstones. fn generate_domain_metadata_actions<'a>( &'a self, engine: &'a dyn Engine, @@ -309,32 +315,58 @@ impl Transaction { )); } - // validate domain metadata - let mut domains = HashSet::new(); - for domain_metadata in &self.domain_metadatas { - if domain_metadata.is_internal() { + // validate user domain metadata and check if we have removals + let mut seen_domains = HashSet::new(); + let mut has_removals = false; + for dm in &self.domain_metadatas { + if dm.is_internal() { return Err(Error::Generic( "Cannot modify domains that start with 'delta.' as those are system controlled" .to_string(), )); } - if !domains.insert(domain_metadata.domain()) { + + if !seen_domains.insert(dm.domain()) { return Err(Error::Generic(format!( "Metadata for domain {} already specified in this transaction", - domain_metadata.domain() + dm.domain() ))); } + + if dm.is_removed() { + has_removals = true; + } } + // fetch previous configuration values (requires log replay) + let existing_domains = if has_removals { + scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)? + } else { + HashMap::new() + }; + + let user_domains = self + .domain_metadatas + .iter() + .filter_map(move |dm: &DomainMetadata| { + if dm.is_removed() { + existing_domains.get(dm.domain()).map(|existing| { + DomainMetadata::remove( + dm.domain().to_string(), + existing.configuration().to_string(), + ) + }) + } else { + Some(dm.clone()) + } + }); + let system_domains = row_tracking_high_watermark .map(DomainMetadata::try_from) .transpose()? .into_iter(); - Ok(self - .domain_metadatas - .iter() - .cloned() + Ok(user_domains .chain(system_domains) .map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine))) } diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 018f33adc..85b2d1f6d 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -1384,7 +1384,8 @@ async fn test_set_domain_metadata_unsupported_writer_feature( } #[tokio::test] -async fn test_remove_domain_metadata_basic() -> Result<(), Box> { +async fn test_remove_domain_metadata_non_existent_domain() -> Result<(), Box> +{ let _ = tracing_subscriber::fmt::try_init(); let schema = Arc::new(StructType::try_new(vec![StructField::nullable( @@ -1411,7 +1412,7 @@ async fn test_remove_domain_metadata_basic() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box> { +async fn test_domain_metadata_set_then_remove() -> Result<(), Box> { let _ = tracing_subscriber::fmt::try_init(); let schema = Arc::new(StructType::try_new(vec![StructField::nullable( @@ -1537,20 +1535,21 @@ async fn test_domain_metadata_set_then_remove_across_transactions( .await?; let domain = "app.config"; + let configuration = r#"{"version": 1}"#; - // txn 1: Set domain metadata + // txn 1: set domain metadata let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; let txn = snapshot.transaction()?; - txn.with_domain_metadata(domain.to_string(), r#"{"version": 1}"#.to_string()) + txn.with_domain_metadata(domain.to_string(), configuration.to_string()) .commit(&engine)?; - // txn 2: Remove the same domain metadata + // txn 2: remove the same domain metadata let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; let txn = snapshot.transaction()?; txn.with_domain_metadata_removed(domain.to_string()) .commit(&engine)?; - // verify removal commit + // verify removal commit preserves the previous configuration let commit_data = store .get(&Path::from(format!( "/{table_name}/_delta_log/00000000000000000002.json" @@ -1567,7 +1566,10 @@ async fn test_domain_metadata_set_then_remove_across_transactions( .find(|v| v.get("domainMetadata").is_some()) .unwrap(); assert_eq!(domain_action["domainMetadata"]["domain"], domain); - assert_eq!(domain_action["domainMetadata"]["configuration"], ""); + assert_eq!( + domain_action["domainMetadata"]["configuration"], + configuration + ); assert_eq!(domain_action["domainMetadata"]["removed"], true); // verify reads see the metadata removal