33
44use criterion:: { criterion_group, criterion_main, Criterion } ;
55
6- use event_manager:: utilities:: subscribers:: {
7- CounterInnerMutSubscriber , CounterSubscriber , CounterSubscriberWithData ,
8- } ;
9- use event_manager:: { EventManager , EventSubscriber , MutEventSubscriber , SubscriberOps } ;
6+ use event_manager:: { BufferedEventManager , EventManager } ;
7+ use std:: os:: fd:: AsFd ;
8+ use std:: os:: fd:: FromRawFd ;
9+ use std:: os:: fd:: OwnedFd ;
10+ use std:: sync:: atomic:: AtomicU64 ;
11+ use std:: sync:: atomic:: Ordering ;
1012use std:: sync:: { Arc , Mutex } ;
13+ use vmm_sys_util:: epoll:: EventSet ;
1114
1215// Test the performance of event manager when it manages a single subscriber type.
1316// The performance is assessed under stress, all added subscribers have active events.
1417fn run_basic_subscriber ( c : & mut Criterion ) {
15- let no_of_subscribers = 200 ;
18+ let no_of_subscribers = 200i32 ;
1619
17- let mut event_manager = EventManager :: < CounterSubscriber > :: new ( ) . unwrap ( ) ;
18- for _ in 0 ..no_of_subscribers {
19- let mut counter_subscriber = CounterSubscriber :: default ( ) ;
20- counter_subscriber. trigger_event ( ) ;
21- event_manager. add_subscriber ( counter_subscriber) ;
22- }
20+ let mut event_manager =
21+ BufferedEventManager :: with_capacity ( false , no_of_subscribers as usize ) . unwrap ( ) ;
22+
23+ let subscribers = ( 0 ..no_of_subscribers) . map ( |_| {
24+ // Create an eventfd that is initialized with 1 waiting event.
25+ let event_fd = unsafe {
26+ let raw_fd = libc:: eventfd ( 1 , 0 ) ;
27+ assert_ne ! ( raw_fd, -1 ) ;
28+ OwnedFd :: from_raw_fd ( raw_fd)
29+ } ;
30+
31+ event_manager. add ( event_fd. as_fd ( ) , EventSet :: IN | EventSet :: ERROR | EventSet :: HANG_UP , Box :: new ( move |_: & mut EventManager , event_set : EventSet | {
32+ match event_set {
33+ EventSet :: IN => ( ) ,
34+ EventSet :: ERROR => {
35+ eprintln ! ( "Got error on the monitored event." ) ;
36+ } ,
37+ EventSet :: HANG_UP => {
38+ // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118
39+ panic ! ( "Cannot continue execution. Associated fd was closed." ) ;
40+ } ,
41+ _ => {
42+ eprintln ! ( "Received spurious event from the event manager {event_set:#?}." ) ;
43+ }
44+ }
45+ } ) ) . unwrap ( ) ;
46+
47+ event_fd
48+ } ) . collect :: < Vec < _ > > ( ) ;
2349
2450 c. bench_function ( "process_basic" , |b| {
2551 b. iter ( || {
26- let ev_count = event_manager. run ( ) . unwrap ( ) ;
27- assert_eq ! ( ev_count, no_of_subscribers)
52+ assert_eq ! ( event_manager. wait( Some ( 0 ) ) , Ok ( no_of_subscribers) ) ;
2853 } )
2954 } ) ;
55+
56+ drop ( subscribers) ;
3057}
3158
3259// Test the performance of event manager when the subscribers are wrapped in an Arc<Mutex>.
3360// The performance is assessed under stress, all added subscribers have active events.
3461fn run_arc_mutex_subscriber ( c : & mut Criterion ) {
35- let no_of_subscribers = 200 ;
62+ let no_of_subscribers = 200i32 ;
63+
64+ let mut event_manager =
65+ BufferedEventManager :: with_capacity ( false , no_of_subscribers as usize ) . unwrap ( ) ;
66+
67+ let subscribers = ( 0 ..no_of_subscribers) . map ( |_| {
68+ // Create an eventfd that is initialized with 1 waiting event.
69+ let event_fd = unsafe {
70+ let raw_fd = libc:: eventfd ( 1 , 0 ) ;
71+ assert_ne ! ( raw_fd, -1 ) ;
72+ OwnedFd :: from_raw_fd ( raw_fd)
73+ } ;
74+ let counter = Arc :: new ( Mutex :: new ( 0u64 ) ) ;
75+ let counter_clone = counter. clone ( ) ;
76+
77+ event_manager. add ( event_fd. as_fd ( ) , EventSet :: IN | EventSet :: ERROR | EventSet :: HANG_UP , Box :: new ( move |_: & mut EventManager , event_set : EventSet | {
78+ match event_set {
79+ EventSet :: IN => {
80+ * counter_clone. lock ( ) . unwrap ( ) += 1 ;
81+ } ,
82+ EventSet :: ERROR => {
83+ eprintln ! ( "Got error on the monitored event." ) ;
84+ } ,
85+ EventSet :: HANG_UP => {
86+ // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118
87+ panic ! ( "Cannot continue execution. Associated fd was closed." ) ;
88+ } ,
89+ _ => {
90+ eprintln ! ( "Received spurious event from the event manager {event_set:#?}." ) ;
91+ }
92+ }
93+ } ) ) . unwrap ( ) ;
3694
37- let mut event_manager = EventManager :: < Arc < Mutex < CounterSubscriber > > > :: new ( ) . unwrap ( ) ;
38- for _ in 0 ..no_of_subscribers {
39- let counter_subscriber = Arc :: new ( Mutex :: new ( CounterSubscriber :: default ( ) ) ) ;
40- counter_subscriber. lock ( ) . unwrap ( ) . trigger_event ( ) ;
41- event_manager. add_subscriber ( counter_subscriber) ;
42- }
95+ ( event_fd, counter)
96+ } ) . collect :: < Vec < _ > > ( ) ;
4397
4498 c. bench_function ( "process_with_arc_mutex" , |b| {
4599 b. iter ( || {
46- let ev_count = event_manager. run ( ) . unwrap ( ) ;
47- assert_eq ! ( ev_count, no_of_subscribers)
100+ assert_eq ! ( event_manager. wait( Some ( 0 ) ) , Ok ( no_of_subscribers) ) ;
48101 } )
49102 } ) ;
103+
104+ drop ( subscribers) ;
50105}
51106
52107// Test the performance of event manager when the subscribers are wrapped in an Arc, and they
53108// leverage inner mutability to update their internal state.
54109// The performance is assessed under stress, all added subscribers have active events.
55110fn run_subscriber_with_inner_mut ( c : & mut Criterion ) {
56- let no_of_subscribers = 200 ;
111+ let no_of_subscribers = 200i32 ;
112+
113+ let mut event_manager =
114+ BufferedEventManager :: with_capacity ( false , no_of_subscribers as usize ) . unwrap ( ) ;
115+
116+ let subscribers = ( 0 ..no_of_subscribers) . map ( |_| {
117+ // Create an eventfd that is initialized with 1 waiting event.
118+ let event_fd = unsafe {
119+ let raw_fd = libc:: eventfd ( 1 , 0 ) ;
120+ assert_ne ! ( raw_fd, -1 ) ;
121+ OwnedFd :: from_raw_fd ( raw_fd)
122+ } ;
123+ let counter = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
124+ let counter_clone = counter. clone ( ) ;
125+
126+ event_manager. add ( event_fd. as_fd ( ) , EventSet :: IN | EventSet :: ERROR | EventSet :: HANG_UP , Box :: new ( move |_: & mut EventManager , event_set : EventSet | {
127+ match event_set {
128+ EventSet :: IN => {
129+ counter_clone. fetch_add ( 1 , Ordering :: SeqCst ) ;
130+ } ,
131+ EventSet :: ERROR => {
132+ eprintln ! ( "Got error on the monitored event." ) ;
133+ } ,
134+ EventSet :: HANG_UP => {
135+ // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118
136+ panic ! ( "Cannot continue execution. Associated fd was closed." ) ;
137+ } ,
138+ _ => {
139+ eprintln ! ( "Received spurious event from the event manager {event_set:#?}." ) ;
140+ }
141+ }
142+ } ) ) . unwrap ( ) ;
57143
58- let mut event_manager = EventManager :: < Arc < dyn EventSubscriber + Send + Sync > > :: new ( ) . unwrap ( ) ;
59- for _ in 0 ..no_of_subscribers {
60- let counter_subscriber = CounterInnerMutSubscriber :: default ( ) ;
61- counter_subscriber. trigger_event ( ) ;
62- event_manager. add_subscriber ( Arc :: new ( counter_subscriber) ) ;
63- }
144+ ( event_fd, counter)
145+ } ) . collect :: < Vec < _ > > ( ) ;
64146
65147 c. bench_function ( "process_with_inner_mut" , |b| {
66148 b. iter ( || {
67- let ev_count = event_manager. run ( ) . unwrap ( ) ;
68- assert_eq ! ( ev_count, no_of_subscribers)
149+ assert_eq ! ( event_manager. wait( Some ( 0 ) ) , Ok ( no_of_subscribers) ) ;
69150 } )
70151 } ) ;
152+
153+ drop ( subscribers) ;
71154}
72155
73156// Test the performance of event manager when it manages subscribers of different types, that are
@@ -76,63 +159,151 @@ fn run_subscriber_with_inner_mut(c: &mut Criterion) {
76159// The performance is assessed under stress, all added subscribers have active events, and the
77160// CounterSubscriberWithData subscribers have multiple active events.
78161fn run_multiple_subscriber_types ( c : & mut Criterion ) {
79- let no_of_subscribers = 100 ;
162+ let no_of_subscribers = 100i32 ;
163+
164+ let total = no_of_subscribers + ( no_of_subscribers * i32:: try_from ( EVENTS ) . unwrap ( ) ) ;
165+
166+ let mut event_manager =
167+ BufferedEventManager :: with_capacity ( false , usize:: try_from ( total) . unwrap ( ) ) . unwrap ( ) ;
80168
81- let mut event_manager = EventManager :: < Arc < Mutex < dyn MutEventSubscriber > > > :: new ( )
82- . expect ( "Cannot create event manager." ) ;
169+ let subscribers = ( 0 ..no_of_subscribers) . map ( |_| {
170+ // Create an eventfd that is initialized with 1 waiting event.
171+ let event_fd = unsafe {
172+ let raw_fd = libc:: eventfd ( 1 , 0 ) ;
173+ assert_ne ! ( raw_fd, -1 ) ;
174+ OwnedFd :: from_raw_fd ( raw_fd)
175+ } ;
176+ let counter = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
177+ let counter_clone = counter. clone ( ) ;
83178
84- for i in 0 ..no_of_subscribers {
85- // The `CounterSubscriberWithData` expects to receive a number as a parameter that
86- // represents the number it can use as its inner Events data.
87- let mut data_subscriber = CounterSubscriberWithData :: new ( i * no_of_subscribers) ;
88- data_subscriber. trigger_all_counters ( ) ;
89- event_manager. add_subscriber ( Arc :: new ( Mutex :: new ( data_subscriber) ) ) ;
179+ event_manager. add ( event_fd. as_fd ( ) , EventSet :: IN | EventSet :: ERROR | EventSet :: HANG_UP , Box :: new ( move |_: & mut EventManager , event_set : EventSet | {
180+ match event_set {
181+ EventSet :: IN => {
182+ counter_clone. fetch_add ( 1 , Ordering :: SeqCst ) ;
183+ } ,
184+ EventSet :: ERROR => {
185+ eprintln ! ( "Got error on the monitored event." ) ;
186+ } ,
187+ EventSet :: HANG_UP => {
188+ // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118
189+ panic ! ( "Cannot continue execution. Associated fd was closed." ) ;
190+ } ,
191+ _ => {
192+ eprintln ! ( "Received spurious event from the event manager {event_set:#?}." ) ;
193+ }
194+ }
195+ } ) ) . unwrap ( ) ;
90196
91- let mut counter_subscriber = CounterSubscriber :: default ( ) ;
92- counter_subscriber. trigger_event ( ) ;
93- event_manager. add_subscriber ( Arc :: new ( Mutex :: new ( counter_subscriber) ) ) ;
94- }
197+ ( event_fd, counter)
198+ } ) . collect :: < Vec < _ > > ( ) ;
199+
200+ const EVENTS : usize = 3 ;
201+
202+ let subscribers_with_data = ( 0 ..no_of_subscribers)
203+ . map ( |_| {
204+ let data = Arc :: new ( [ AtomicU64 :: new ( 0 ) , AtomicU64 :: new ( 0 ) , AtomicU64 :: new ( 0 ) ] ) ;
205+ assert_eq ! ( data. len( ) , EVENTS ) ;
206+
207+ // Create eventfd's that are initialized with 1 waiting event.
208+ let inner_subscribers = ( 0 ..EVENTS )
209+ . map ( |_| unsafe {
210+ let raw_fd = libc:: eventfd ( 1 , 0 ) ;
211+ assert_ne ! ( raw_fd, -1 ) ;
212+ OwnedFd :: from_raw_fd ( raw_fd)
213+ } )
214+ . collect :: < Vec < _ > > ( ) ;
215+
216+ for i in 0 ..EVENTS {
217+ let data_clone = data. clone ( ) ;
218+
219+ event_manager
220+ . add (
221+ inner_subscribers[ i] . as_fd ( ) ,
222+ EventSet :: IN | EventSet :: ERROR | EventSet :: HANG_UP ,
223+ Box :: new ( move |_: & mut EventManager , event_set : EventSet | {
224+ match event_set {
225+ EventSet :: IN => {
226+ data_clone[ i] . fetch_add ( 1 , Ordering :: SeqCst ) ;
227+ }
228+ EventSet :: ERROR => {
229+ eprintln ! ( "Got error on the monitored event." ) ;
230+ }
231+ EventSet :: HANG_UP => {
232+ // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118
233+ panic ! ( "Cannot continue execution. Associated fd was closed." ) ;
234+ }
235+ _ => { }
236+ }
237+ } ) ,
238+ )
239+ . unwrap ( ) ;
240+ }
241+
242+ ( inner_subscribers, data)
243+ } )
244+ . collect :: < Vec < _ > > ( ) ;
95245
96246 c. bench_function ( "process_dynamic_dispatch" , |b| {
97247 b. iter ( || {
98- let _ = event_manager. run ( ) . unwrap ( ) ;
248+ assert_eq ! ( event_manager. wait ( Some ( 0 ) ) , Ok ( total ) ) ;
99249 } )
100250 } ) ;
251+
252+ drop ( subscribers) ;
253+ drop ( subscribers_with_data) ;
101254}
102255
103256// Test the performance of event manager when it manages a single subscriber type.
104257// Just a few of the events are active in this test scenario.
105258fn run_with_few_active_events ( c : & mut Criterion ) {
106- let no_of_subscribers = 200 ;
259+ let no_of_subscribers = 200i32 ;
260+ let active = 1 + no_of_subscribers / 23 ;
261+
262+ let mut event_manager =
263+ BufferedEventManager :: with_capacity ( false , no_of_subscribers as usize ) . unwrap ( ) ;
107264
108- let mut event_manager = EventManager :: < CounterSubscriber > :: new ( ) . unwrap ( ) ;
265+ let subscribers = ( 0 ..no_of_subscribers) . map ( |i| {
266+ // Create an eventfd that is initialized with 1 waiting event.
267+ let event_fd = unsafe {
268+ let raw_fd = libc:: eventfd ( ( i % 23 == 0 ) as u8 as u32 , 0 ) ;
269+ assert_ne ! ( raw_fd, -1 ) ;
270+ OwnedFd :: from_raw_fd ( raw_fd)
271+ } ;
109272
110- for i in 0 ..no_of_subscribers {
111- let mut counter_subscriber = CounterSubscriber :: default ( ) ;
112- // Let's activate the events for a few subscribers (i.e. only the ones that are
113- // divisible by 23). 23 is a random number that I just happen to like.
114- if i % 23 == 0 {
115- counter_subscriber. trigger_event ( ) ;
116- }
117- event_manager. add_subscriber ( counter_subscriber) ;
118- }
273+ event_manager. add ( event_fd. as_fd ( ) , EventSet :: IN | EventSet :: ERROR | EventSet :: HANG_UP , Box :: new ( move |_: & mut EventManager , event_set : EventSet | {
274+ match event_set {
275+ EventSet :: IN => ( ) ,
276+ EventSet :: ERROR => {
277+ eprintln ! ( "Got error on the monitored event." ) ;
278+ } ,
279+ EventSet :: HANG_UP => {
280+ // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118
281+ panic ! ( "Cannot continue execution. Associated fd was closed." ) ;
282+ } ,
283+ _ => {
284+ eprintln ! ( "Received spurious event from the event manager {event_set:#?}." ) ;
285+ }
286+ }
287+ } ) ) . unwrap ( ) ;
288+
289+ event_fd
290+ } ) . collect :: < Vec < _ > > ( ) ;
119291
120292 c. bench_function ( "process_dispatch_few_events" , |b| {
121293 b. iter ( || {
122- let _ = event_manager. run ( ) . unwrap ( ) ;
294+ assert_eq ! ( event_manager. wait ( Some ( 0 ) ) , Ok ( active ) ) ;
123295 } )
124296 } ) ;
297+
298+ drop ( subscribers) ;
125299}
126300
127- criterion_group ! {
301+ criterion_group ! (
128302 name = benches;
129303 config = Criterion :: default ( )
130304 . sample_size( 200 )
131305 . measurement_time( std:: time:: Duration :: from_secs( 40 ) ) ;
132306 targets = run_basic_subscriber, run_arc_mutex_subscriber, run_subscriber_with_inner_mut,
133- run_multiple_subscriber_types, run_with_few_active_events
134- }
135-
136- criterion_main ! {
137- benches
138- }
307+ run_multiple_subscriber_types, run_with_few_active_events
308+ ) ;
309+ criterion_main ! ( benches) ;
0 commit comments