Skip to content

Commit 5f47ee8

Browse files
authored
Merge pull request #99 from Eeshu-Yadav/feat/multiple-listener-versions
feat: allow multiple versions of the same listener
2 parents 3e9820e + eb8cdb5 commit 5f47ee8

File tree

3 files changed

+118
-29
lines changed

3 files changed

+118
-29
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

orion-lib/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ hyper = { version = "1", features = ["full"] }
2727
hyper-rustls = { version = "0.27.1", features = ["default", "http2"] }
2828
ipnet = "2.9"
2929
lru_time_cache = "0.11.11"
30+
multimap = "0.10.0"
3031
once_cell = { version = "1.19" }
3132
opentelemetry = "0.29.0"
3233
hyper-util.workspace = true

orion-lib/src/listeners/listeners_manager.rs

Lines changed: 113 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@
1515
//
1616
//
1717

18-
use std::collections::BTreeMap;
19-
18+
use multimap::MultiMap;
2019
use tokio::sync::{broadcast, mpsc};
21-
#[cfg(debug_assertions)]
22-
use tracing::debug;
2320
use tracing::{info, warn};
2421

2522
use orion_configuration::config::{
@@ -49,17 +46,19 @@ pub enum TlsContextChange {
4946
struct ListenerInfo {
5047
handle: abort_on_drop::ChildTask<()>,
5148
listener_conf: ListenerConfig,
49+
version: u64,
5250
}
5351
impl ListenerInfo {
54-
fn new(handle: tokio::task::JoinHandle<()>, listener_conf: ListenerConfig) -> Self {
55-
Self { handle: handle.into(), listener_conf }
52+
fn new(handle: tokio::task::JoinHandle<()>, listener_conf: ListenerConfig, version: u64) -> Self {
53+
Self { handle: handle.into(), listener_conf, version }
5654
}
5755
}
5856

5957
pub struct ListenersManager {
6058
listener_configuration_channel: mpsc::Receiver<ListenerConfigurationChange>,
6159
route_configuration_channel: mpsc::Receiver<RouteConfigurationChange>,
62-
listener_handles: BTreeMap<&'static str, ListenerInfo>,
60+
listener_handles: MultiMap<String, ListenerInfo>,
61+
version_counter: u64,
6362
}
6463

6564
impl ListenersManager {
@@ -70,7 +69,8 @@ impl ListenersManager {
7069
ListenersManager {
7170
listener_configuration_channel,
7271
route_configuration_channel,
73-
listener_handles: BTreeMap::new(),
72+
listener_handles: MultiMap::new(),
73+
version_counter: 0,
7474
}
7575
}
7676

@@ -102,8 +102,8 @@ impl ListenersManager {
102102
},
103103
ListenerConfigurationChange::GetConfiguration(config_dump_tx) => {
104104
let listeners: Vec<ListenerConfig> = self.listener_handles
105-
.values()
106-
.map(|info| info.listener_conf.clone())
105+
.iter()
106+
.map(|(_, info)| info.listener_conf.clone())
107107
.collect();
108108
config_dump_tx.send(ConfigDump { listeners: Some(listeners), ..Default::default() }).await?;
109109
},
@@ -125,31 +125,39 @@ impl ListenersManager {
125125
}
126126

127127
pub fn start_listener(&mut self, listener: Listener, listener_conf: ListenerConfig) -> Result<()> {
128-
let listener_name = listener.get_name();
128+
let listener_name = listener.get_name().to_string();
129129
let (addr, dev) = listener.get_socket();
130130
info!("Listener {} at {addr} (device bind:{})", listener_name, dev.is_some());
131-
// spawn the task for this listener address, this will spawn additional task per connection
131+
132+
self.version_counter += 1;
133+
let version = self.version_counter;
134+
135+
let listener_name_for_async = listener_name.clone();
136+
132137
let join_handle = tokio::spawn(async move {
133138
let error = listener.start().await;
134-
warn!("Listener {listener_name} exited: {error}");
139+
info!("Listener {} version {} exited: {}", listener_name_for_async, version, error);
135140
});
136-
#[cfg(debug_assertions)]
137-
if self.listener_handles.contains_key(&listener_name) {
138-
debug!("Listener {listener_name} already exists, replacing it");
139-
}
140-
// note: join handle gets overwritten here if it already exists.
141-
// handles are abort on drop so will be aborted, closing the socket
142-
// but the any tasks spawned within this task, which happens on a per-connection basis,
143-
// will survive past this point and only get dropped when their session ends
144-
self.listener_handles.insert(listener_name, ListenerInfo::new(join_handle, listener_conf));
141+
142+
let listener_info = ListenerInfo::new(join_handle, listener_conf, version);
143+
self.listener_handles.insert(listener_name.clone(), listener_info);
144+
145+
let version_count = self.listener_handles.get_vec(&listener_name).map(|v| v.len()).unwrap_or(0);
146+
info!("Started version {} of listener {} ({} total active version(s))", version, listener_name, version_count);
145147

146148
Ok(())
147149
}
148150

149151
pub fn stop_listener(&mut self, listener_name: &str) -> Result<()> {
150-
if let Some(abort_handler) = self.listener_handles.remove(listener_name) {
151-
info!("{listener_name} : Stopped");
152-
abort_handler.handle.abort();
152+
if let Some(listeners) = self.listener_handles.get_vec_mut(listener_name) {
153+
info!("Stopping all {} version(s) of listener {}", listeners.len(), listener_name);
154+
for listener_info in listeners.drain(..) {
155+
info!("Stopping listener {} version {}", listener_name, listener_info.version);
156+
listener_info.handle.abort();
157+
}
158+
self.listener_handles.remove(listener_name);
159+
} else {
160+
info!("No listeners found with name {}", listener_name);
153161
}
154162

155163
Ok(())
@@ -202,9 +210,11 @@ mod tests {
202210
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
203211
tokio::task::yield_now().await;
204212

205-
// This should fail because the old listener exited already dropping the rx
206-
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_err());
207-
// Yield once more just in case more logs can be seen
213+
// Both listeners should still be active (multiple versions allowed)
214+
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
215+
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
216+
217+
assert_eq!(man.listener_handles.get_vec(name).unwrap().len(), 2);
208218
tokio::task::yield_now().await;
209219
}
210220

@@ -239,7 +249,7 @@ mod tests {
239249

240250
// See .start_listener() - in the case all channels are dropped the task there
241251
// should exit with this warning msg
242-
let expected = format!("Listener {name} exited: channel closed");
252+
let expected = format!("Listener {name} version 1 exited: channel closed");
243253
logs_assert(|lines: &[&str]| {
244254
let logs: Vec<_> = lines.iter().filter(|ln| ln.contains(&expected)).collect();
245255
if logs.len() == 1 {
@@ -249,4 +259,78 @@ mod tests {
249259
}
250260
});
251261
}
262+
263+
#[traced_test]
264+
#[tokio::test]
265+
async fn start_multiple_listener_versions() {
266+
let chan = 10;
267+
let name = "multi-version-listener";
268+
269+
let (_conf_tx, conf_rx) = mpsc::channel(chan);
270+
let (_route_tx, route_rx) = mpsc::channel(chan);
271+
let mut man = ListenersManager::new(conf_rx, route_rx);
272+
273+
let (routeb_tx1, routeb_rx) = broadcast::channel(chan);
274+
let (_secb_tx1, secb_rx) = broadcast::channel(chan);
275+
let l1 = Listener::test_listener(name, routeb_rx, secb_rx);
276+
let l1_info = ListenerConfig {
277+
name: name.into(),
278+
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234),
279+
filter_chains: HashMap::default(),
280+
bind_device: None,
281+
with_tls_inspector: false,
282+
proxy_protocol_config: None,
283+
with_tlv_listener_filter: false,
284+
tlv_listener_filter_config: None,
285+
};
286+
man.start_listener(l1, l1_info).unwrap();
287+
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
288+
tokio::task::yield_now().await;
289+
290+
let (routeb_tx2, routeb_rx) = broadcast::channel(chan);
291+
let (_secb_tx2, secb_rx) = broadcast::channel(chan);
292+
let l2 = Listener::test_listener(name, routeb_rx, secb_rx);
293+
let l2_info = ListenerConfig {
294+
name: name.into(),
295+
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235), // Different port
296+
filter_chains: HashMap::default(),
297+
bind_device: None,
298+
with_tls_inspector: false,
299+
proxy_protocol_config: None,
300+
with_tlv_listener_filter: false,
301+
tlv_listener_filter_config: None,
302+
};
303+
man.start_listener(l2, l2_info).unwrap();
304+
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
305+
tokio::task::yield_now().await;
306+
307+
let (routeb_tx3, routeb_rx) = broadcast::channel(chan);
308+
let (_secb_tx3, secb_rx) = broadcast::channel(chan);
309+
let l3 = Listener::test_listener(name, routeb_rx, secb_rx);
310+
let l3_info = ListenerConfig {
311+
name: name.into(),
312+
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1236), // Different port
313+
filter_chains: HashMap::default(),
314+
bind_device: None,
315+
with_tls_inspector: false,
316+
proxy_protocol_config: None,
317+
with_tlv_listener_filter: false,
318+
tlv_listener_filter_config: None,
319+
};
320+
man.start_listener(l3, l3_info).unwrap();
321+
assert!(routeb_tx3.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
322+
tokio::task::yield_now().await;
323+
324+
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
325+
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
326+
assert!(routeb_tx3.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
327+
328+
assert_eq!(man.listener_handles.get_vec(name).unwrap().len(), 3);
329+
330+
man.stop_listener(name).unwrap();
331+
332+
assert!(man.listener_handles.get_vec(name).is_none());
333+
334+
tokio::task::yield_now().await;
335+
}
252336
}

0 commit comments

Comments
 (0)