1515//
1616//
1717
18- use std:: collections:: HashMap ;
19-
18+ use multimap:: MultiMap ;
2019use tokio:: sync:: { broadcast, mpsc} ;
2120use tracing:: { info, warn} ;
2221
@@ -58,7 +57,7 @@ impl ListenerInfo {
5857pub struct ListenersManager {
5958 listener_configuration_channel : mpsc:: Receiver < ListenerConfigurationChange > ,
6059 route_configuration_channel : mpsc:: Receiver < RouteConfigurationChange > ,
61- listener_handles : HashMap < String , Vec < ListenerInfo > > ,
60+ listener_handles : MultiMap < String , ListenerInfo > ,
6261 version_counter : u64 ,
6362}
6463
@@ -70,7 +69,7 @@ impl ListenersManager {
7069 ListenersManager {
7170 listener_configuration_channel,
7271 route_configuration_channel,
73- listener_handles : HashMap :: new ( ) ,
72+ listener_handles : MultiMap :: new ( ) ,
7473 version_counter : 0 ,
7574 }
7675 }
@@ -103,9 +102,8 @@ impl ListenersManager {
103102 } ,
104103 ListenerConfigurationChange :: GetConfiguration ( config_dump_tx) => {
105104 let listeners: Vec <ListenerConfig > = self . listener_handles
106- . values( )
107- . flatten( )
108- . map( |info| info. listener_conf. clone( ) )
105+ . iter( )
106+ . map( |( _, info) | info. listener_conf. clone( ) )
109107 . collect( ) ;
110108 config_dump_tx. send( ConfigDump { listeners: Some ( listeners) , ..Default :: default ( ) } ) . await ?;
111109 } ,
@@ -134,30 +132,24 @@ impl ListenersManager {
134132 self . version_counter += 1 ;
135133 let version = self . version_counter ;
136134
137- info ! ( "Starting new version {} of listener {}" , version, listener_name) ;
138-
139- let listener_name_clone = listener_name. clone ( ) ;
135+ let listener_name_for_async = listener_name. clone ( ) ;
140136
141137 let join_handle = tokio:: spawn ( async move {
142138 let error = listener. start ( ) . await ;
143- warn ! ( "Listener {} version {} exited: {}" , listener_name_clone , version, error) ;
139+ info ! ( "Listener {} version {} exited: {}" , listener_name_for_async , version, error) ;
144140 } ) ;
145141
146142 let listener_info = ListenerInfo :: new ( join_handle, listener_conf, version) ;
143+ self . listener_handles . insert ( listener_name. clone ( ) , listener_info) ;
147144
148- self . listener_handles . entry ( listener_name. clone ( ) ) . or_insert_with ( Vec :: new) . push ( listener_info) ;
149-
150- info ! (
151- "Listener {} now has {} active version(s)" ,
152- listener_name,
153- self . listener_handles. get( & listener_name) . unwrap( ) . len( )
154- ) ;
145+ let version_count = self . listener_handles . get_vec ( & listener_name) . map ( |v| v. len ( ) ) . unwrap_or ( 0 ) ;
146+ info ! ( "Started version {} of listener {} ({} total active version(s))" , version, listener_name, version_count) ;
155147
156148 Ok ( ( ) )
157149 }
158150
159151 pub fn stop_listener ( & mut self , listener_name : & str ) -> Result < ( ) > {
160- if let Some ( listeners) = self . listener_handles . get_mut ( listener_name) {
152+ if let Some ( listeners) = self . listener_handles . get_vec_mut ( listener_name) {
161153 info ! ( "Stopping all {} version(s) of listener {}" , listeners. len( ) , listener_name) ;
162154 for listener_info in listeners. drain ( ..) {
163155 info ! ( "Stopping listener {} version {}" , listener_name, listener_info. version) ;
@@ -222,7 +214,7 @@ mod tests {
222214 assert ! ( routeb_tx1. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
223215 assert ! ( routeb_tx2. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
224216
225- assert_eq ! ( man. listener_handles. get ( name) . unwrap( ) . len( ) , 2 ) ;
217+ assert_eq ! ( man. listener_handles. get_vec ( name) . unwrap( ) . len( ) , 2 ) ;
226218 tokio:: task:: yield_now ( ) . await ;
227219 }
228220
@@ -333,11 +325,11 @@ mod tests {
333325 assert ! ( routeb_tx2. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
334326 assert ! ( routeb_tx3. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
335327
336- assert_eq ! ( man. listener_handles. get ( name) . unwrap( ) . len( ) , 3 ) ;
328+ assert_eq ! ( man. listener_handles. get_vec ( name) . unwrap( ) . len( ) , 3 ) ;
337329
338330 man. stop_listener ( name) . unwrap ( ) ;
339331
340- assert ! ( man. listener_handles. get ( name) . is_none( ) ) ;
332+ assert ! ( man. listener_handles. get_vec ( name) . is_none( ) ) ;
341333
342334 tokio:: task:: yield_now ( ) . await ;
343335 }
0 commit comments