1616//
1717
1818use std:: collections:: HashMap ;
19+ use std:: time:: Duration ;
1920
2021use tokio:: sync:: { broadcast, mpsc} ;
2122use tracing:: { info, warn} ;
@@ -44,6 +45,30 @@ pub enum TlsContextChange {
4445 Updated ( ( String , TransportSecret ) ) ,
4546}
4647
48+ #[ derive( Debug , Clone ) ]
49+ pub struct ListenerManagerConfig {
50+ pub max_versions_per_listener : usize ,
51+ pub cleanup_policy : CleanupPolicy ,
52+ pub cleanup_interval : Duration ,
53+ }
54+
55+ #[ derive( Debug , Clone ) ]
56+ pub enum CleanupPolicy {
57+ CountBasedOnly ( usize ) ,
58+ TimeBasedOnly ( Duration ) ,
59+ Hybrid { timeout : Duration , max_count : usize } ,
60+ }
61+
62+ impl Default for ListenerManagerConfig {
63+ fn default ( ) -> Self {
64+ Self {
65+ max_versions_per_listener : 2 ,
66+ cleanup_policy : CleanupPolicy :: CountBasedOnly ( 2 ) ,
67+ cleanup_interval : Duration :: from_secs ( 60 ) ,
68+ }
69+ }
70+ }
71+
4772struct ListenerInfo {
4873 handle : abort_on_drop:: ChildTask < ( ) > ,
4974 listener_conf : ListenerConfig ,
@@ -60,18 +85,21 @@ pub struct ListenersManager {
6085 route_configuration_channel : mpsc:: Receiver < RouteConfigurationChange > ,
6186 listener_handles : HashMap < String , Vec < ListenerInfo > > ,
6287 version_counter : u64 ,
88+ config : ListenerManagerConfig ,
6389}
6490
6591impl ListenersManager {
6692 pub fn new (
6793 listener_configuration_channel : mpsc:: Receiver < ListenerConfigurationChange > ,
6894 route_configuration_channel : mpsc:: Receiver < RouteConfigurationChange > ,
95+ config : ListenerManagerConfig ,
6996 ) -> Self {
7097 ListenersManager {
7198 listener_configuration_channel,
7299 route_configuration_channel,
73100 listener_handles : HashMap :: new ( ) ,
74101 version_counter : 0 ,
102+ config,
75103 }
76104 }
77105
@@ -149,6 +177,8 @@ impl ListenersManager {
149177 versions. push ( listener_info) ;
150178 info ! ( "Listener {} now has {} active version(s)" , listener_name, versions. len( ) ) ;
151179
180+ self . cleanup_old_versions ( & listener_name) ;
181+
152182 Ok ( ( ) )
153183 }
154184
@@ -165,6 +195,53 @@ impl ListenersManager {
165195
166196 Ok ( ( ) )
167197 }
198+
199+ fn cleanup_old_versions ( & mut self , listener_name : & str ) {
200+ if let Some ( versions) = self . listener_handles . get_mut ( listener_name) {
201+ let original_count = versions. len ( ) ;
202+
203+ match & self . config . cleanup_policy {
204+ CleanupPolicy :: CountBasedOnly ( max_count) => {
205+ if versions. len ( ) > * max_count {
206+ let to_remove = versions. len ( ) - max_count;
207+ for _ in 0 ..to_remove {
208+ let old = versions. remove ( 0 ) ;
209+ info ! ( "Cleaning up old listener {} version {} (count limit)" ,
210+ listener_name, old. version) ;
211+ }
212+ }
213+ }
214+ CleanupPolicy :: TimeBasedOnly ( _timeout) => {
215+ // TODO: Implement time-based cleanup when we have connection tracking
216+ // For now, behave like count-based with default limit
217+ if versions. len ( ) > self . config . max_versions_per_listener {
218+ let to_remove = versions. len ( ) - self . config . max_versions_per_listener ;
219+ for _ in 0 ..to_remove {
220+ let old = versions. remove ( 0 ) ;
221+ info ! ( "Cleaning up old listener {} version {} (time limit)" ,
222+ listener_name, old. version) ;
223+ }
224+ }
225+ }
226+ CleanupPolicy :: Hybrid { max_count, .. } => {
227+ if versions. len ( ) > * max_count {
228+ let to_remove = versions. len ( ) - max_count;
229+ for _ in 0 ..to_remove {
230+ let old = versions. remove ( 0 ) ;
231+ info ! ( "Cleaning up old listener {} version {} (hybrid limit)" ,
232+ listener_name, old. version) ;
233+ }
234+ }
235+ }
236+ }
237+
238+ let cleaned_count = original_count - versions. len ( ) ;
239+ if cleaned_count > 0 {
240+ info ! ( "Cleaned up {} old versions of listener {}, {} versions remaining" ,
241+ cleaned_count, listener_name, versions. len( ) ) ;
242+ }
243+ }
244+ }
168245}
169246
170247#[ cfg( test) ]
@@ -199,7 +276,8 @@ mod tests {
199276
200277 let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
201278 let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
202- let mut man = ListenersManager :: new ( conf_rx, route_rx) ;
279+ let config = ListenerManagerConfig :: default ( ) ;
280+ let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
203281
204282 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
205283 let ( _secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -242,7 +320,8 @@ mod tests {
242320
243321 let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
244322 let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
245- let mut man = ListenersManager :: new ( conf_rx, route_rx) ;
323+ let config = ListenerManagerConfig :: default ( ) ;
324+ let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
246325
247326 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
248327 let ( secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -284,7 +363,12 @@ mod tests {
284363
285364 let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
286365 let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
287- let mut man = ListenersManager :: new ( conf_rx, route_rx) ;
366+ let config = ListenerManagerConfig {
367+ max_versions_per_listener : 2 ,
368+ cleanup_policy : CleanupPolicy :: CountBasedOnly ( 2 ) ,
369+ cleanup_interval : Duration :: from_secs ( 60 ) ,
370+ } ;
371+ let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
288372
289373 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
290374 let ( _secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -310,16 +394,53 @@ mod tests {
310394 assert ! ( routeb_tx3. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
311395 tokio:: task:: yield_now ( ) . await ;
312396
313- assert ! ( routeb_tx1. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
397+ // After adding 3rd listener, first should be cleaned up due to max_versions_per_listener = 2
398+ // So routeb_tx1 should be closed (is_err), but routeb_tx2 and routeb_tx3 should work
399+ assert ! ( routeb_tx1. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_err( ) ) ;
314400 assert ! ( routeb_tx2. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
315401 assert ! ( routeb_tx3. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
316402
317- assert_eq ! ( man. listener_handles. get( name) . unwrap( ) . len( ) , 3 ) ;
403+ // Should only have 2 versions due to cleanup policy (max_count: 2)
404+ assert_eq ! ( man. listener_handles. get( name) . unwrap( ) . len( ) , 2 ) ;
318405
319406 man. stop_listener ( name) . unwrap ( ) ;
320407
321408 assert ! ( man. listener_handles. get( name) . is_none( ) ) ;
322409
323410 tokio:: task:: yield_now ( ) . await ;
324411 }
412+
413+ #[ traced_test]
414+ #[ tokio:: test]
415+ async fn test_cleanup_policy_enforcement ( ) {
416+ let chan = 10 ;
417+ let name = "cleanup-test-listener" ;
418+
419+ let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
420+ let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
421+ let config = ListenerManagerConfig {
422+ max_versions_per_listener : 3 ,
423+ cleanup_policy : CleanupPolicy :: CountBasedOnly ( 3 ) ,
424+ cleanup_interval : Duration :: from_secs ( 60 ) ,
425+ } ;
426+ let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
427+
428+ // Add 5 listeners, should only keep 3 due to cleanup policy
429+ for i in 1 ..=5 {
430+ let ( _routeb_tx, routeb_rx) = broadcast:: channel ( chan) ;
431+ let ( _secb_tx, secb_rx) = broadcast:: channel ( chan) ;
432+ let listener = Listener :: test_listener ( name, routeb_rx, secb_rx) ;
433+ let listener_info = create_test_listener_config ( name, 1230 + i) ;
434+ man. start_listener ( listener, listener_info) . unwrap ( ) ;
435+ tokio:: task:: yield_now ( ) . await ;
436+ }
437+
438+ // Should only have 3 versions due to cleanup policy
439+ assert_eq ! ( man. listener_handles. get( name) . unwrap( ) . len( ) , 3 ) ;
440+
441+ man. stop_listener ( name) . unwrap ( ) ;
442+ assert ! ( man. listener_handles. get( name) . is_none( ) ) ;
443+
444+ tokio:: task:: yield_now ( ) . await ;
445+ }
325446}
0 commit comments