10
10
//! they can be removed from the delta log.
11
11
12
12
use std:: {
13
- collections:: { HashMap , HashSet } ,
13
+ collections:: HashMap ,
14
14
pin:: Pin ,
15
- sync:: { Arc , Mutex } ,
15
+ sync:: Arc ,
16
16
task:: { Context , Poll } ,
17
17
} ;
18
18
19
- use arrow_array:: { builder:: UInt64Builder , ArrayRef , RecordBatch } ;
20
- use arrow_schema:: SchemaRef ;
19
+ use arrow:: array:: { builder:: UInt64Builder , ArrayRef , RecordBatch } ;
20
+ use arrow:: datatypes:: SchemaRef ;
21
+ use dashmap:: DashSet ;
21
22
use datafusion:: common:: { DataFusionError , Result as DataFusionResult } ;
22
23
use datafusion:: logical_expr:: { Expr , LogicalPlan , UserDefinedLogicalNodeCore } ;
23
24
use datafusion:: physical_expr:: { Distribution , PhysicalExpr } ;
@@ -32,7 +33,7 @@ use crate::{
32
33
DeltaTableError ,
33
34
} ;
34
35
35
- pub ( crate ) type BarrierSurvivorSet = Arc < Mutex < HashSet < String > > > ;
36
+ pub ( crate ) type BarrierSurvivorSet = Arc < DashSet < String > > ;
36
37
37
38
#[ derive( Debug ) ]
38
39
/// Physical Node for the MergeBarrier
@@ -55,7 +56,7 @@ impl MergeBarrierExec {
55
56
MergeBarrierExec {
56
57
input,
57
58
file_column,
58
- survivors : Arc :: new ( Mutex :: new ( HashSet :: new ( ) ) ) ,
59
+ survivors : Arc :: new ( DashSet :: new ( ) ) ,
59
60
expr,
60
61
}
61
62
}
@@ -359,17 +360,12 @@ impl Stream for MergeBarrierStream {
359
360
}
360
361
361
362
{
362
- let mut lock = self . survivors . lock ( ) . map_err ( |_| {
363
- DataFusionError :: External ( Box :: new ( DeltaTableError :: Generic (
364
- "MergeBarrier mutex is poisoned" . to_string ( ) ,
365
- ) ) )
366
- } ) ?;
367
363
for part in & self . file_partitions {
368
364
match part. state {
369
365
PartitionBarrierState :: Closed => { }
370
366
PartitionBarrierState :: Open => {
371
367
if let Some ( file_name) = & part. file_name {
372
- lock . insert ( file_name. to_owned ( ) ) ;
368
+ self . survivors . insert ( file_name. to_owned ( ) ) ;
373
369
}
374
370
}
375
371
}
@@ -532,11 +528,10 @@ mod tests {
532
528
] ;
533
529
assert_batches_sorted_eq ! ( & expected, & actual) ;
534
530
535
- let s = survivors. lock ( ) . unwrap ( ) ;
536
- assert ! ( !s. contains( & "file0" . to_string( ) ) ) ;
537
- assert ! ( s. contains( & "file1" . to_string( ) ) ) ;
538
- assert ! ( s. contains( & "file2" . to_string( ) ) ) ;
539
- assert_eq ! ( s. len( ) , 2 ) ;
531
+ assert ! ( !survivors. contains( & "file0" . to_string( ) ) ) ;
532
+ assert ! ( survivors. contains( & "file1" . to_string( ) ) ) ;
533
+ assert ! ( survivors. contains( & "file2" . to_string( ) ) ) ;
534
+ assert_eq ! ( survivors. len( ) , 2 ) ;
540
535
}
541
536
542
537
#[ tokio:: test]
0 commit comments