Skip to content

Commit dece4cd

Browse files
committed
okay commit on top
1 parent a252ed0 commit dece4cd

File tree

2 files changed

+214
-0
lines changed

2 files changed

+214
-0
lines changed

kernel/src/transaction/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,19 @@ impl Transaction {
317317
.push(DomainMetadata::new(domain.clone(), configuration, false));
318318
self
319319
}
320+
321+
/// Remove domain metadata from the Delta log.
322+
/// This creates a tombstone action to logically delete the specified domain. We don't check
323+
/// if the domain metadata exists in the log - the remove action is simply a tombstone record.
324+
/// Note that each domain can only appear once per transaction. That is, multiple operations
325+
/// on the same domain are disallowed in a single transaction, as well as setting and removing
326+
/// the same domain in a single transaction. If a duplicate domain is included, the `commit` will
327+
/// fail (that is, we don't eagerly check domain validity here).
328+
pub fn with_domain_metadata_removed(mut self, domain: String) -> Self {
329+
self.domain_metadatas
330+
.push(DomainMetadata::new(domain.clone(), String::new(), true));
331+
self
332+
}
320333
}
321334

322335
/// Convert file metadata provided by the engine into protocol-compliant add actions.

kernel/tests/write.rs

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,3 +1348,204 @@ async fn test_set_domain_metadata_unsupported_writer_feature(
13481348

13491349
Ok(())
13501350
}
1351+
1352+
#[tokio::test]
1353+
async fn test_remove_domain_metadata_basic() -> Result<(), Box<dyn std::error::Error>> {
1354+
let _ = tracing_subscriber::fmt::try_init();
1355+
1356+
let schema = Arc::new(StructType::new(vec![StructField::nullable(
1357+
"number",
1358+
DataType::INTEGER,
1359+
)]));
1360+
1361+
let table_name = "test_domain_metadata_unsupported";
1362+
1363+
let (store, engine, table_location) = engine_store_setup(table_name, None);
1364+
let table_url = create_table(
1365+
store.clone(),
1366+
table_location,
1367+
schema.clone(),
1368+
&[],
1369+
true,
1370+
false,
1371+
false,
1372+
false,
1373+
true,
1374+
)
1375+
.await?;
1376+
1377+
let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?);
1378+
let txn = snapshot.transaction()?;
1379+
1380+
let domain = "app.deprecated";
1381+
1382+
// removing domain metadata that is not present is allowed; the log simply contains a tombstone action
1383+
txn.with_domain_metadata_removed(domain.to_string())
1384+
.commit(&engine)?;
1385+
1386+
let commit_data = store
1387+
.get(&Path::from(format!(
1388+
"/{table_name}/_delta_log/00000000000000000001.json"
1389+
)))
1390+
.await?
1391+
.bytes()
1392+
.await?;
1393+
let actions: Vec<serde_json::Value> = Deserializer::from_slice(&commit_data)
1394+
.into_iter()
1395+
.try_collect()?;
1396+
1397+
let domain_action = actions
1398+
.iter()
1399+
.find(|v| v.get("domainMetadata").is_some())
1400+
.unwrap();
1401+
assert_eq!(domain_action["domainMetadata"]["domain"], "app.deprecated");
1402+
assert_eq!(domain_action["domainMetadata"]["configuration"], "");
1403+
assert_eq!(domain_action["domainMetadata"]["removed"], true);
1404+
1405+
let final_snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?);
1406+
let config = final_snapshot.get_domain_metadata(domain, &engine)?;
1407+
assert_eq!(config, None);
1408+
1409+
Ok(())
1410+
}
1411+
1412+
#[tokio::test]
1413+
async fn test_domain_metadata_set_remove_conflicts() -> Result<(), Box<dyn std::error::Error>> {
1414+
let _ = tracing_subscriber::fmt::try_init();
1415+
1416+
let schema = Arc::new(StructType::new(vec![StructField::nullable(
1417+
"number",
1418+
DataType::INTEGER,
1419+
)]));
1420+
1421+
let table_name = "test_domain_metadata_unsupported";
1422+
1423+
let (store, engine, table_location) = engine_store_setup(table_name, None);
1424+
let table_url = create_table(
1425+
store.clone(),
1426+
table_location,
1427+
schema.clone(),
1428+
&[],
1429+
true,
1430+
false,
1431+
false,
1432+
false,
1433+
true,
1434+
)
1435+
.await?;
1436+
1437+
let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?);
1438+
1439+
// set then remove same domain
1440+
let txn = snapshot.clone().transaction()?;
1441+
let err = txn
1442+
.with_domain_metadata("app.config".to_string(), "v1".to_string())
1443+
.with_domain_metadata_removed("app.config".to_string())
1444+
.commit(&engine)
1445+
.unwrap_err();
1446+
assert!(err
1447+
.to_string()
1448+
.contains("already specified in this transaction"));
1449+
1450+
// remove then set same domain
1451+
let txn2 = snapshot.clone().transaction()?;
1452+
let err = txn2
1453+
.with_domain_metadata_removed("test.domain".to_string())
1454+
.with_domain_metadata("test.domain".to_string(), "v1".to_string())
1455+
.commit(&engine)
1456+
.unwrap_err();
1457+
assert!(err
1458+
.to_string()
1459+
.contains("already specified in this transaction"));
1460+
1461+
// remove same domain twice
1462+
let txn3 = snapshot.clone().transaction()?;
1463+
let err = txn3
1464+
.with_domain_metadata_removed("another.domain".to_string())
1465+
.with_domain_metadata_removed("another.domain".to_string())
1466+
.commit(&engine)
1467+
.unwrap_err();
1468+
assert!(err
1469+
.to_string()
1470+
.contains("already specified in this transaction"));
1471+
1472+
// remove system domain
1473+
let txn4 = snapshot.clone().transaction()?;
1474+
let err = txn4
1475+
.with_domain_metadata_removed("delta.system".to_string())
1476+
.commit(&engine)
1477+
.unwrap_err();
1478+
assert!(err
1479+
.to_string()
1480+
.contains("Users cannot modify system controlled metadata domains"));
1481+
1482+
Ok(())
1483+
}
1484+
1485+
#[tokio::test]
1486+
async fn test_domain_metadata_set_then_remove_across_transactions(
1487+
) -> Result<(), Box<dyn std::error::Error>> {
1488+
let _ = tracing_subscriber::fmt::try_init();
1489+
1490+
let schema = Arc::new(StructType::new(vec![StructField::nullable(
1491+
"number",
1492+
DataType::INTEGER,
1493+
)]));
1494+
1495+
let table_name = "test_domain_metadata_unsupported";
1496+
1497+
let (store, engine, table_location) = engine_store_setup(table_name, None);
1498+
let table_url = create_table(
1499+
store.clone(),
1500+
table_location,
1501+
schema.clone(),
1502+
&[],
1503+
true,
1504+
false,
1505+
false,
1506+
false,
1507+
true,
1508+
)
1509+
.await?;
1510+
1511+
let domain = "app.config";
1512+
1513+
// txn 1: Set domain metadata
1514+
let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?);
1515+
let txn = snapshot.transaction()?;
1516+
txn.with_domain_metadata(domain.to_string(), r#"{"version": 1}"#.to_string())
1517+
.commit(&engine)?;
1518+
1519+
// txn 2: Remove the same domain metadata
1520+
let snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?);
1521+
let txn = snapshot.transaction()?;
1522+
txn.with_domain_metadata_removed(domain.to_string())
1523+
.commit(&engine)?;
1524+
1525+
// verify removal commit
1526+
let commit_data = store
1527+
.get(&Path::from(format!(
1528+
"/{table_name}/_delta_log/00000000000000000002.json"
1529+
)))
1530+
.await?
1531+
.bytes()
1532+
.await?;
1533+
let actions: Vec<serde_json::Value> = Deserializer::from_slice(&commit_data)
1534+
.into_iter()
1535+
.try_collect()?;
1536+
1537+
let domain_action = actions
1538+
.iter()
1539+
.find(|v| v.get("domainMetadata").is_some())
1540+
.unwrap();
1541+
assert_eq!(domain_action["domainMetadata"]["domain"], domain);
1542+
assert_eq!(domain_action["domainMetadata"]["configuration"], "");
1543+
assert_eq!(domain_action["domainMetadata"]["removed"], true);
1544+
1545+
// verify reads see the metadata removal
1546+
let final_snapshot = Arc::new(Snapshot::builder(table_url.clone()).build(&engine)?);
1547+
let domain_config = final_snapshot.get_domain_metadata(domain, &engine)?;
1548+
assert_eq!(domain_config, None);
1549+
1550+
Ok(())
1551+
}

0 commit comments

Comments
 (0)