Skip to content

Commit e0c5d2a

Browse files
committed
feat: allow multiple versions of the same listener
- Change storage from BTreeMap to HashMap<String, Vec<ListenerInfo>> - Add version tracking to prevent breaking existing connections - Update start_listener to create new versions instead of stopping existing ones - Modify stop_listener to handle multiple versions - Update tests to verify multiple version functionality Signed-off-by: Eeshu-Yadav <[email protected]>
1 parent 59adc36 commit e0c5d2a

File tree

1 file changed

+118
-26
lines changed

1 file changed

+118
-26
lines changed

orion-lib/src/listeners/listeners_manager.rs

Lines changed: 118 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@
1515
//
1616
//
1717

18-
use std::collections::BTreeMap;
18+
use std::collections::HashMap;
1919

2020
use tokio::sync::{broadcast, mpsc};
21-
#[cfg(debug_assertions)]
22-
use tracing::debug;
2321
use tracing::{info, warn};
2422

2523
use orion_configuration::config::{
@@ -49,17 +47,19 @@ pub enum TlsContextChange {
4947
struct ListenerInfo {
5048
handle: abort_on_drop::ChildTask<()>,
5149
listener_conf: ListenerConfig,
50+
version: u64,
5251
}
5352
impl ListenerInfo {
54-
fn new(handle: tokio::task::JoinHandle<()>, listener_conf: ListenerConfig) -> Self {
55-
Self { handle: handle.into(), listener_conf }
53+
fn new(handle: tokio::task::JoinHandle<()>, listener_conf: ListenerConfig, version: u64) -> Self {
54+
Self { handle: handle.into(), listener_conf, version }
5655
}
5756
}
5857

5958
pub struct ListenersManager {
6059
listener_configuration_channel: mpsc::Receiver<ListenerConfigurationChange>,
6160
route_configuration_channel: mpsc::Receiver<RouteConfigurationChange>,
62-
listener_handles: BTreeMap<&'static str, ListenerInfo>,
61+
listener_handles: HashMap<String, Vec<ListenerInfo>>,
62+
version_counter: u64,
6363
}
6464

6565
impl ListenersManager {
@@ -70,7 +70,8 @@ impl ListenersManager {
7070
ListenersManager {
7171
listener_configuration_channel,
7272
route_configuration_channel,
73-
listener_handles: BTreeMap::new(),
73+
listener_handles: HashMap::new(),
74+
version_counter: 0,
7475
}
7576
}
7677

@@ -103,6 +104,7 @@ impl ListenersManager {
103104
ListenerConfigurationChange::GetConfiguration(config_dump_tx) => {
104105
let listeners: Vec<ListenerConfig> = self.listener_handles
105106
.values()
107+
.flatten()
106108
.map(|info| info.listener_conf.clone())
107109
.collect();
108110
config_dump_tx.send(ConfigDump { listeners: Some(listeners), ..Default::default() }).await?;
@@ -125,31 +127,45 @@ impl ListenersManager {
125127
}
126128

127129
pub fn start_listener(&mut self, listener: Listener, listener_conf: ListenerConfig) -> Result<()> {
128-
let listener_name = listener.get_name();
130+
let listener_name = listener.get_name().to_string();
129131
let (addr, dev) = listener.get_socket();
130132
info!("Listener {} at {addr} (device bind:{})", listener_name, dev.is_some());
131-
// spawn the task for this listener address, this will spawn additional task per connection
133+
134+
self.version_counter += 1;
135+
let version = self.version_counter;
136+
137+
info!("Starting new version {} of listener {}", version, listener_name);
138+
139+
let listener_name_clone = listener_name.clone();
140+
132141
let join_handle = tokio::spawn(async move {
133142
let error = listener.start().await;
134-
warn!("Listener {listener_name} exited: {error}");
143+
warn!("Listener {} version {} exited: {}", listener_name_clone, version, error);
135144
});
136-
#[cfg(debug_assertions)]
137-
if self.listener_handles.contains_key(&listener_name) {
138-
debug!("Listener {listener_name} already exists, replacing it");
139-
}
140-
// note: join handle gets overwritten here if it already exists.
141-
// handles are abort on drop so will be aborted, closing the socket
142-
// but the any tasks spawned within this task, which happens on a per-connection basis,
143-
// will survive past this point and only get dropped when their session ends
144-
self.listener_handles.insert(listener_name, ListenerInfo::new(join_handle, listener_conf));
145+
146+
let listener_info = ListenerInfo::new(join_handle, listener_conf, version);
147+
148+
self.listener_handles.entry(listener_name.clone()).or_insert_with(Vec::new).push(listener_info);
149+
150+
info!(
151+
"Listener {} now has {} active version(s)",
152+
listener_name,
153+
self.listener_handles.get(&listener_name).unwrap().len()
154+
);
145155

146156
Ok(())
147157
}
148158

149159
pub fn stop_listener(&mut self, listener_name: &str) -> Result<()> {
150-
if let Some(abort_handler) = self.listener_handles.remove(listener_name) {
151-
info!("{listener_name} : Stopped");
152-
abort_handler.handle.abort();
160+
if let Some(listeners) = self.listener_handles.get_mut(listener_name) {
161+
info!("Stopping all {} version(s) of listener {}", listeners.len(), listener_name);
162+
for listener_info in listeners.drain(..) {
163+
info!("Stopping listener {} version {}", listener_name, listener_info.version);
164+
listener_info.handle.abort();
165+
}
166+
self.listener_handles.remove(listener_name);
167+
} else {
168+
info!("No listeners found with name {}", listener_name);
153169
}
154170

155171
Ok(())
@@ -202,9 +218,11 @@ mod tests {
202218
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
203219
tokio::task::yield_now().await;
204220

205-
// This should fail because the old listener exited already dropping the rx
206-
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_err());
207-
// Yield once more just in case more logs can be seen
221+
// Both listeners should still be active (multiple versions allowed)
222+
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
223+
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
224+
225+
assert_eq!(man.listener_handles.get(name).unwrap().len(), 2);
208226
tokio::task::yield_now().await;
209227
}
210228

@@ -239,7 +257,7 @@ mod tests {
239257

240258
// See .start_listener() - in the case all channels are dropped the task there
241259
// should exit with this warning msg
242-
let expected = format!("Listener {name} exited: channel closed");
260+
let expected = format!("Listener {name} version 1 exited: channel closed");
243261
logs_assert(|lines: &[&str]| {
244262
let logs: Vec<_> = lines.iter().filter(|ln| ln.contains(&expected)).collect();
245263
if logs.len() == 1 {
@@ -249,4 +267,78 @@ mod tests {
249267
}
250268
});
251269
}
270+
271+
#[traced_test]
272+
#[tokio::test]
273+
async fn start_multiple_listener_versions() {
274+
let chan = 10;
275+
let name = "multi-version-listener";
276+
277+
let (_conf_tx, conf_rx) = mpsc::channel(chan);
278+
let (_route_tx, route_rx) = mpsc::channel(chan);
279+
let mut man = ListenersManager::new(conf_rx, route_rx);
280+
281+
let (routeb_tx1, routeb_rx) = broadcast::channel(chan);
282+
let (_secb_tx1, secb_rx) = broadcast::channel(chan);
283+
let l1 = Listener::test_listener(name, routeb_rx, secb_rx);
284+
let l1_info = ListenerConfig {
285+
name: name.into(),
286+
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234),
287+
filter_chains: HashMap::default(),
288+
bind_device: None,
289+
with_tls_inspector: false,
290+
proxy_protocol_config: None,
291+
with_tlv_listener_filter: false,
292+
tlv_listener_filter_config: None,
293+
};
294+
man.start_listener(l1, l1_info).unwrap();
295+
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
296+
tokio::task::yield_now().await;
297+
298+
let (routeb_tx2, routeb_rx) = broadcast::channel(chan);
299+
let (_secb_tx2, secb_rx) = broadcast::channel(chan);
300+
let l2 = Listener::test_listener(name, routeb_rx, secb_rx);
301+
let l2_info = ListenerConfig {
302+
name: name.into(),
303+
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235), // Different port
304+
filter_chains: HashMap::default(),
305+
bind_device: None,
306+
with_tls_inspector: false,
307+
proxy_protocol_config: None,
308+
with_tlv_listener_filter: false,
309+
tlv_listener_filter_config: None,
310+
};
311+
man.start_listener(l2, l2_info).unwrap();
312+
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
313+
tokio::task::yield_now().await;
314+
315+
let (routeb_tx3, routeb_rx) = broadcast::channel(chan);
316+
let (_secb_tx3, secb_rx) = broadcast::channel(chan);
317+
let l3 = Listener::test_listener(name, routeb_rx, secb_rx);
318+
let l3_info = ListenerConfig {
319+
name: name.into(),
320+
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1236), // Different port
321+
filter_chains: HashMap::default(),
322+
bind_device: None,
323+
with_tls_inspector: false,
324+
proxy_protocol_config: None,
325+
with_tlv_listener_filter: false,
326+
tlv_listener_filter_config: None,
327+
};
328+
man.start_listener(l3, l3_info).unwrap();
329+
assert!(routeb_tx3.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
330+
tokio::task::yield_now().await;
331+
332+
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
333+
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
334+
assert!(routeb_tx3.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
335+
336+
assert_eq!(man.listener_handles.get(name).unwrap().len(), 3);
337+
338+
man.stop_listener(name).unwrap();
339+
340+
assert!(man.listener_handles.get(name).is_none());
341+
342+
tokio::task::yield_now().await;
343+
}
252344
}

0 commit comments

Comments
 (0)