Skip to content

Commit a52ec16

Browse files
committed
add UT and move out remove in separate PR
1 parent 5220a12 commit a52ec16

File tree

3 files changed

+123
-25
lines changed

3 files changed

+123
-25
lines changed

kernel/src/actions/mod.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -877,9 +877,9 @@ pub(crate) struct CheckpointMetadata {
877877
#[derive(Debug, Clone, PartialEq, Eq, ToSchema, IntoEngineData)]
878878
#[internal_api]
879879
pub(crate) struct DomainMetadata {
880-
pub(crate) domain: String,
881-
pub(crate) configuration: String,
882-
pub(crate) removed: bool,
880+
domain: String,
881+
configuration: String,
882+
removed: bool,
883883
}
884884

885885
impl DomainMetadata {
@@ -889,6 +889,18 @@ impl DomainMetadata {
889889
pub(crate) fn is_internal(&self) -> bool {
890890
self.domain.starts_with(INTERNAL_DOMAIN_PREFIX)
891891
}
892+
893+
pub fn domain(&self) -> &str {
894+
&self.domain
895+
}
896+
897+
pub(crate) fn new(domain: String, configuration: String, removed: bool) -> Self {
898+
Self {
899+
domain,
900+
configuration,
901+
removed,
902+
}
903+
}
892904
}
893905

894906
#[cfg(test)]

kernel/src/transaction/mod.rs

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
use std::collections::{HashSet, HashMap};
1+
use std::collections::{HashMap, HashSet};
22
use std::iter;
33
use std::sync::{Arc, LazyLock};
44
use std::time::{SystemTime, UNIX_EPOCH};
55

66
use crate::actions::domain_metadata::DomainMetadataMap;
7-
use crate::actions::{DomainMetadata, get_log_add_schema, get_log_commit_info_schema, get_log_domain_metadata_schema, get_log_txn_schema};
7+
use crate::actions::{
8+
get_log_add_schema, get_log_commit_info_schema, get_log_domain_metadata_schema,
9+
get_log_txn_schema, DomainMetadata,
10+
};
811
use crate::actions::{CommitInfo, SetTransaction};
912
use crate::error::Error;
1013
use crate::expressions::UnaryExpressionOp;
@@ -282,38 +285,34 @@ impl Transaction {
282285
self.add_files_metadata.push(add_metadata);
283286
}
284287

285-
pub fn add_domain_metadata(&mut self, domain: String, configuration: String) -> DeltaResult<()> {
286-
let domain_metadata = DomainMetadata {
287-
domain: domain.clone(),
288-
configuration: configuration.to_string(),
289-
removed: false,
290-
};
288+
/// Set domain metadata to be written to the Delta log. Each domain can only be modified once per
289+
/// transaction. System-controlled domains (those starting with `delta.`) cannot be modified.
290+
pub fn set_domain_metadata(
291+
&mut self,
292+
domain: String,
293+
configuration: String,
294+
) -> DeltaResult<()> {
295+
let domain_metadata = DomainMetadata::new(domain.clone(), configuration, false);
291296
self.validate_domain_metadata(&domain_metadata)?;
292297
self.domain_metadata.insert(domain, domain_metadata);
293298
Ok(())
294299
}
295300

296301
fn validate_domain_metadata(&self, domain_metadata: &DomainMetadata) -> DeltaResult<()> {
297302
if domain_metadata.is_internal() {
298-
return Err(Error::Generic("User metadata cannot be added to system-controlled 'delta.*' domain".to_string()));
303+
return Err(Error::Generic(
304+
"User metadata cannot be added to system-controlled 'delta.*' domain".to_string(),
305+
));
299306
}
300307

301-
if self.domain_metadata.contains_key(&domain_metadata.domain) {
302-
return Err(Error::Generic(format!("Metadata for domain {} already specified in this transaction", domain_metadata.domain)));
308+
if self.domain_metadata.contains_key(domain_metadata.domain()) {
309+
return Err(Error::Generic(format!(
310+
"Metadata for domain {} already specified in this transaction",
311+
domain_metadata.domain()
312+
)));
303313
}
304314
Ok(())
305315
}
306-
307-
pub fn remove_domain_metadata(&mut self, domain: String) -> DeltaResult<()> {
308-
let domain_metadata = DomainMetadata {
309-
domain: domain.clone(),
310-
configuration: "".to_string(),
311-
removed: true,
312-
};
313-
self.validate_domain_metadata(&domain_metadata)?;
314-
self.domain_metadata.insert(domain, domain_metadata);
315-
Ok(())
316-
}
317316
}
318317

319318
/// Convert file metadata provided by the engine into protocol-compliant add actions.

kernel/tests/write.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,3 +1152,90 @@ async fn test_shredded_variant_read_rejection() -> Result<(), Box<dyn std::error
11521152

11531153
Ok(())
11541154
}
1155+
1156+
#[tokio::test]
1157+
async fn test_set_domain_metadata_basic() -> Result<(), Box<dyn std::error::Error>> {
1158+
let _ = tracing_subscriber::fmt::try_init();
1159+
1160+
let schema = Arc::new(StructType::new(vec![StructField::nullable(
1161+
"number",
1162+
DataType::INTEGER,
1163+
)]));
1164+
1165+
for (table_url, engine, store, table_name) in
1166+
setup_test_tables(schema.clone(), &[], None, "test_table").await?
1167+
{
1168+
let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?);
1169+
let mut txn = snapshot.transaction()?;
1170+
1171+
// Set multiple domain metadata
1172+
let domain1 = "app.config";
1173+
let config1 = r#"{"version": 1}"#;
1174+
let domain2 = "spark.settings";
1175+
let config2 = r#"{"cores": 4}"#;
1176+
1177+
txn.set_domain_metadata(domain1.into(), config1.into())?;
1178+
txn.set_domain_metadata(domain2.into(), config2.into())?;
1179+
txn.commit(&engine)?;
1180+
1181+
let commit_data = store
1182+
.get(&Path::from(format!(
1183+
"/{table_name}/_delta_log/00000000000000000001.json"
1184+
)))
1185+
.await?
1186+
.bytes()
1187+
.await?;
1188+
let actions: Vec<serde_json::Value> = Deserializer::from_slice(&commit_data)
1189+
.into_iter()
1190+
.try_collect()?;
1191+
1192+
let domain_actions: Vec<_> = actions
1193+
.iter()
1194+
.filter(|v| v.get("domainMetadata").is_some())
1195+
.collect();
1196+
assert_eq!(domain_actions.len(), 2);
1197+
1198+
// Check domains and their configurations
1199+
for action in &domain_actions {
1200+
let domain = action["domainMetadata"]["domain"].as_str().unwrap();
1201+
let config = action["domainMetadata"]["configuration"].as_str().unwrap();
1202+
assert!(!action["domainMetadata"]["removed"].as_bool().unwrap());
1203+
1204+
match domain {
1205+
d if d == domain1 => assert_eq!(config, config1),
1206+
d if d == domain2 => assert_eq!(config, config2),
1207+
_ => panic!("Unexpected domain: {}", domain),
1208+
}
1209+
}
1210+
}
1211+
Ok(())
1212+
}
1213+
1214+
#[tokio::test]
1215+
async fn test_set_domain_metadata_errors() -> Result<(), Box<dyn std::error::Error>> {
1216+
let _ = tracing_subscriber::fmt::try_init();
1217+
1218+
let schema = Arc::new(StructType::new(vec![StructField::nullable(
1219+
"number",
1220+
DataType::INTEGER,
1221+
)]));
1222+
1223+
for (table_url, engine, _, _) in
1224+
setup_test_tables(schema.clone(), &[], None, "test_table").await?
1225+
{
1226+
let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?);
1227+
let mut txn = snapshot.transaction()?;
1228+
1229+
// System domain rejection
1230+
assert!(txn
1231+
.set_domain_metadata("delta.system".into(), "config".into())
1232+
.is_err());
1233+
1234+
// Duplicate domain rejection
1235+
txn.set_domain_metadata("app.config".into(), "v1".into())?;
1236+
assert!(txn
1237+
.set_domain_metadata("app.config".into(), "v2".into())
1238+
.is_err());
1239+
}
1240+
Ok(())
1241+
}

0 commit comments

Comments
 (0)