1
- use std:: collections:: HashSet ;
1
+ use std:: collections:: { HashSet , HashMap } ;
2
2
use std:: iter;
3
3
use std:: sync:: { Arc , LazyLock } ;
4
4
use std:: time:: { SystemTime , UNIX_EPOCH } ;
5
5
6
- use crate :: actions:: { get_log_add_schema, get_log_commit_info_schema, get_log_txn_schema} ;
6
+ use crate :: actions:: domain_metadata:: DomainMetadataMap ;
7
+ use crate :: actions:: { DomainMetadata , get_log_add_schema, get_log_commit_info_schema, get_log_domain_metadata_schema, get_log_txn_schema} ;
7
8
use crate :: actions:: { CommitInfo , SetTransaction } ;
8
9
use crate :: error:: Error ;
9
10
use crate :: expressions:: UnaryExpressionOp ;
@@ -97,6 +98,7 @@ pub struct Transaction {
97
98
// commit-wide timestamp (in milliseconds since epoch) - used in ICT, `txn` action, etc. to
98
99
// keep all timestamps within the same commit consistent.
99
100
commit_timestamp : i64 ,
101
+ domain_metadata : DomainMetadataMap ,
100
102
}
101
103
102
104
impl std:: fmt:: Debug for Transaction {
@@ -138,6 +140,7 @@ impl Transaction {
138
140
add_files_metadata : vec ! [ ] ,
139
141
set_transactions : vec ! [ ] ,
140
142
commit_timestamp,
143
+ domain_metadata : HashMap :: new ( ) ,
141
144
} )
142
145
}
143
146
@@ -165,6 +168,12 @@ impl Transaction {
165
168
. into_iter ( )
166
169
. map ( |txn| txn. into_engine_data ( get_log_txn_schema ( ) . clone ( ) , engine) ) ;
167
170
171
+ let set_domain_metadata_actions = self
172
+ . domain_metadata
173
+ . clone ( )
174
+ . into_values ( )
175
+ . map ( |dm| dm. into_engine_data ( get_log_domain_metadata_schema ( ) . clone ( ) , engine) ) ;
176
+
168
177
// step one: construct the iterator of commit info + file actions we want to commit
169
178
let commit_info = CommitInfo :: new (
170
179
self . commit_timestamp ,
@@ -179,7 +188,8 @@ impl Transaction {
179
188
180
189
let actions = iter:: once ( commit_info_action)
181
190
. chain ( add_actions)
182
- . chain ( set_transaction_actions) ;
191
+ . chain ( set_transaction_actions)
192
+ . chain ( set_domain_metadata_actions) ;
183
193
184
194
// step two: set new commit version (current_version + 1) and path to write
185
195
let commit_version = self . read_snapshot . version ( ) + 1 ;
@@ -271,6 +281,39 @@ impl Transaction {
271
281
pub fn add_files ( & mut self , add_metadata : Box < dyn EngineData > ) {
272
282
self . add_files_metadata . push ( add_metadata) ;
273
283
}
284
+
285
+ pub fn add_domain_metadata ( & mut self , domain : String , configuration : String ) -> DeltaResult < ( ) > {
286
+ let domain_metadata = DomainMetadata {
287
+ domain : domain. clone ( ) ,
288
+ configuration : configuration. to_string ( ) ,
289
+ removed : false ,
290
+ } ;
291
+ self . validate_domain_metadata ( & domain_metadata) ?;
292
+ self . domain_metadata . insert ( domain, domain_metadata) ;
293
+ Ok ( ( ) )
294
+ }
295
+
296
+ fn validate_domain_metadata ( & self , domain_metadata : & DomainMetadata ) -> DeltaResult < ( ) > {
297
+ if domain_metadata. is_internal ( ) {
298
+ return Err ( Error :: Generic ( "User metadata cannot be added to system-controlled 'delta.*' domain" . to_string ( ) ) ) ;
299
+ }
300
+
301
+ if self . domain_metadata . contains_key ( & domain_metadata. domain ) {
302
+ return Err ( Error :: Generic ( format ! ( "Metadata for domain {} already specified in this transaction" , domain_metadata. domain) ) ) ;
303
+ }
304
+ Ok ( ( ) )
305
+ }
306
+
307
+ pub fn remove_domain_metadata ( & mut self , domain : String ) -> DeltaResult < ( ) > {
308
+ let domain_metadata = DomainMetadata {
309
+ domain : domain. clone ( ) ,
310
+ configuration : "" . to_string ( ) ,
311
+ removed : true ,
312
+ } ;
313
+ self . validate_domain_metadata ( & domain_metadata) ?;
314
+ self . domain_metadata . insert ( domain, domain_metadata) ;
315
+ Ok ( ( ) )
316
+ }
274
317
}
275
318
276
319
/// Convert file metadata provided by the engine into protocol-compliant add actions.
0 commit comments