Skip to content

Commit f4bfcd5

Browse files
committed
feat: support removing domain metadata
1 parent 05e6698 commit f4bfcd5

File tree

3 files changed

+216
-0
lines changed

3 files changed

+216
-0
lines changed

kernel/src/actions/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,14 @@ impl DomainMetadata {
949949
pub(crate) fn domain(&self) -> &str {
950950
&self.domain
951951
}
952+
953+
pub(crate) fn remove(domain: String) -> Self {
954+
Self {
955+
domain,
956+
configuration: String::new(),
957+
removed: true,
958+
}
959+
}
952960
}
953961

954962
#[cfg(test)]

kernel/src/transaction/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,19 @@ impl Transaction {
277277
self
278278
}
279279

280+
/// Remove domain metadata from the Delta log.
281+
/// This creates a tombstone to logically delete the specified domain. We don't check
282+
/// if the domain metadata exists in the log, the remove action is simply a tombstone record.
283+
/// Note that each domain can only appear once per transaction. That is, multiple operations
284+
/// on the same domain are disallowed in a single transaction, as well as setting and removing
285+
/// the same domain in a single transaction. If a duplicate domain is included, the `commit` will
286+
/// fail (that is, we don't eagerly check domain validity here).
287+
pub fn with_domain_metadata_removed(mut self, domain: String) -> Self {
288+
self.domain_metadatas
289+
.push(DomainMetadata::remove(domain.clone()));
290+
self
291+
}
292+
280293
/// Generate domain metadata actions with validation. Handle both user and system domains.
281294
fn generate_domain_metadata_actions<'a>(
282295
&'a self,

kernel/tests/write.rs

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1382,3 +1382,198 @@ async fn test_set_domain_metadata_unsupported_writer_feature(
13821382

13831383
Ok(())
13841384
}
1385+
1386+
#[tokio::test]
1387+
async fn test_remove_domain_metadata_basic() -> Result<(), Box<dyn std::error::Error>> {
1388+
let _ = tracing_subscriber::fmt::try_init();
1389+
1390+
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
1391+
"number",
1392+
DataType::INTEGER,
1393+
)])?);
1394+
1395+
let table_name = "test_domain_metadata_unsupported";
1396+
1397+
let (store, engine, table_location) = engine_store_setup(table_name, None);
1398+
let table_url = create_table(
1399+
store.clone(),
1400+
table_location,
1401+
schema.clone(),
1402+
&[],
1403+
true,
1404+
vec![],
1405+
vec!["domainMetadata"],
1406+
)
1407+
.await?;
1408+
1409+
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
1410+
let txn = snapshot.transaction()?;
1411+
1412+
let domain = "app.deprecated";
1413+
1414+
// removing domain metadata that is not present is allowed; the log simply contains a tombstone action
1415+
txn.with_domain_metadata_removed(domain.to_string())
1416+
.commit(&engine)?;
1417+
1418+
let commit_data = store
1419+
.get(&Path::from(format!(
1420+
"/{table_name}/_delta_log/00000000000000000001.json"
1421+
)))
1422+
.await?
1423+
.bytes()
1424+
.await?;
1425+
let actions: Vec<serde_json::Value> = Deserializer::from_slice(&commit_data)
1426+
.into_iter()
1427+
.try_collect()?;
1428+
1429+
let domain_action = actions
1430+
.iter()
1431+
.find(|v| v.get("domainMetadata").is_some())
1432+
.unwrap();
1433+
assert_eq!(domain_action["domainMetadata"]["domain"], "app.deprecated");
1434+
assert_eq!(domain_action["domainMetadata"]["configuration"], "");
1435+
assert_eq!(domain_action["domainMetadata"]["removed"], true);
1436+
1437+
let final_snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
1438+
let config = final_snapshot.get_domain_metadata(domain, &engine)?;
1439+
assert_eq!(config, None);
1440+
1441+
Ok(())
1442+
}
1443+
1444+
#[tokio::test]
1445+
async fn test_domain_metadata_set_remove_conflicts() -> Result<(), Box<dyn std::error::Error>> {
1446+
let _ = tracing_subscriber::fmt::try_init();
1447+
1448+
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
1449+
"number",
1450+
DataType::INTEGER,
1451+
)])?);
1452+
1453+
let table_name = "test_domain_metadata_unsupported";
1454+
1455+
let (store, engine, table_location) = engine_store_setup(table_name, None);
1456+
let table_url = create_table(
1457+
store.clone(),
1458+
table_location,
1459+
schema.clone(),
1460+
&[],
1461+
true,
1462+
vec![],
1463+
vec!["domainMetadata"],
1464+
)
1465+
.await?;
1466+
1467+
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
1468+
1469+
// set then remove same domain
1470+
let txn = snapshot.clone().transaction()?;
1471+
let err = txn
1472+
.with_domain_metadata("app.config".to_string(), "v1".to_string())
1473+
.with_domain_metadata_removed("app.config".to_string())
1474+
.commit(&engine)
1475+
.unwrap_err();
1476+
assert!(err
1477+
.to_string()
1478+
.contains("already specified in this transaction"));
1479+
1480+
// remove then set same domain
1481+
let txn2 = snapshot.clone().transaction()?;
1482+
let err = txn2
1483+
.with_domain_metadata_removed("test.domain".to_string())
1484+
.with_domain_metadata("test.domain".to_string(), "v1".to_string())
1485+
.commit(&engine)
1486+
.unwrap_err();
1487+
assert!(err
1488+
.to_string()
1489+
.contains("already specified in this transaction"));
1490+
1491+
// remove same domain twice
1492+
let txn3 = snapshot.clone().transaction()?;
1493+
let err = txn3
1494+
.with_domain_metadata_removed("another.domain".to_string())
1495+
.with_domain_metadata_removed("another.domain".to_string())
1496+
.commit(&engine)
1497+
.unwrap_err();
1498+
assert!(err
1499+
.to_string()
1500+
.contains("already specified in this transaction"));
1501+
1502+
// remove system domain
1503+
let txn4 = snapshot.clone().transaction()?;
1504+
let err = txn4
1505+
.with_domain_metadata_removed("delta.system".to_string())
1506+
.commit(&engine)
1507+
.unwrap_err();
1508+
assert!(err
1509+
.to_string()
1510+
.contains("Cannot modify domains that start with 'delta.' as those are system controlled"));
1511+
1512+
Ok(())
1513+
}
1514+
1515+
#[tokio::test]
1516+
async fn test_domain_metadata_set_then_remove_across_transactions(
1517+
) -> Result<(), Box<dyn std::error::Error>> {
1518+
let _ = tracing_subscriber::fmt::try_init();
1519+
1520+
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
1521+
"number",
1522+
DataType::INTEGER,
1523+
)])?);
1524+
1525+
let table_name = "test_domain_metadata_unsupported";
1526+
1527+
let (store, engine, table_location) = engine_store_setup(table_name, None);
1528+
let table_url = create_table(
1529+
store.clone(),
1530+
table_location,
1531+
schema.clone(),
1532+
&[],
1533+
true,
1534+
vec![],
1535+
vec!["domainMetadata"],
1536+
)
1537+
.await?;
1538+
1539+
let domain = "app.config";
1540+
1541+
// txn 1: Set domain metadata
1542+
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
1543+
let txn = snapshot.transaction()?;
1544+
txn.with_domain_metadata(domain.to_string(), r#"{"version": 1}"#.to_string())
1545+
.commit(&engine)?;
1546+
1547+
// txn 2: Remove the same domain metadata
1548+
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
1549+
let txn = snapshot.transaction()?;
1550+
txn.with_domain_metadata_removed(domain.to_string())
1551+
.commit(&engine)?;
1552+
1553+
// verify removal commit
1554+
let commit_data = store
1555+
.get(&Path::from(format!(
1556+
"/{table_name}/_delta_log/00000000000000000002.json"
1557+
)))
1558+
.await?
1559+
.bytes()
1560+
.await?;
1561+
let actions: Vec<serde_json::Value> = Deserializer::from_slice(&commit_data)
1562+
.into_iter()
1563+
.try_collect()?;
1564+
1565+
let domain_action = actions
1566+
.iter()
1567+
.find(|v| v.get("domainMetadata").is_some())
1568+
.unwrap();
1569+
assert_eq!(domain_action["domainMetadata"]["domain"], domain);
1570+
assert_eq!(domain_action["domainMetadata"]["configuration"], "");
1571+
assert_eq!(domain_action["domainMetadata"]["removed"], true);
1572+
1573+
// verify reads see the metadata removal
1574+
let final_snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
1575+
let domain_config = final_snapshot.get_domain_metadata(domain, &engine)?;
1576+
assert_eq!(domain_config, None);
1577+
1578+
Ok(())
1579+
}

0 commit comments

Comments
 (0)