Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions orion-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ hyper = { version = "1", features = ["full"] }
hyper-rustls = { version = "0.27.1", features = ["default", "http2"] }
ipnet = "2.9"
lru_time_cache = "0.11.11"
multimap = "0.10.0"
once_cell = { version = "1.19" }
opentelemetry = "0.29.0"
hyper-util.workspace = true
Expand Down
142 changes: 113 additions & 29 deletions orion-lib/src/listeners/listeners_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
//
//

use std::collections::BTreeMap;

use multimap::MultiMap;
use tokio::sync::{broadcast, mpsc};
#[cfg(debug_assertions)]
use tracing::debug;
use tracing::{info, warn};

use orion_configuration::config::{
Expand Down Expand Up @@ -49,17 +46,19 @@ pub enum TlsContextChange {
struct ListenerInfo {
handle: abort_on_drop::ChildTask<()>,
listener_conf: ListenerConfig,
version: u64,
}
impl ListenerInfo {
fn new(handle: tokio::task::JoinHandle<()>, listener_conf: ListenerConfig) -> Self {
Self { handle: handle.into(), listener_conf }
fn new(handle: tokio::task::JoinHandle<()>, listener_conf: ListenerConfig, version: u64) -> Self {
Self { handle: handle.into(), listener_conf, version }
}
}

pub struct ListenersManager {
listener_configuration_channel: mpsc::Receiver<ListenerConfigurationChange>,
route_configuration_channel: mpsc::Receiver<RouteConfigurationChange>,
listener_handles: BTreeMap<&'static str, ListenerInfo>,
listener_handles: MultiMap<String, ListenerInfo>,
version_counter: u64,
}

impl ListenersManager {
Expand All @@ -70,7 +69,8 @@ impl ListenersManager {
ListenersManager {
listener_configuration_channel,
route_configuration_channel,
listener_handles: BTreeMap::new(),
listener_handles: MultiMap::new(),
version_counter: 0,
}
}

Expand Down Expand Up @@ -102,8 +102,8 @@ impl ListenersManager {
},
ListenerConfigurationChange::GetConfiguration(config_dump_tx) => {
let listeners: Vec<ListenerConfig> = self.listener_handles
.values()
.map(|info| info.listener_conf.clone())
.iter()
.map(|(_, info)| info.listener_conf.clone())
.collect();
config_dump_tx.send(ConfigDump { listeners: Some(listeners), ..Default::default() }).await?;
},
Expand All @@ -125,31 +125,39 @@ impl ListenersManager {
}

pub fn start_listener(&mut self, listener: Listener, listener_conf: ListenerConfig) -> Result<()> {
let listener_name = listener.get_name();
let listener_name = listener.get_name().to_string();
let (addr, dev) = listener.get_socket();
info!("Listener {} at {addr} (device bind:{})", listener_name, dev.is_some());
// spawn the task for this listener address, this will spawn additional task per connection

self.version_counter += 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use the version from xds configuration

let version = self.version_counter;

let listener_name_for_async = listener_name.clone();

let join_handle = tokio::spawn(async move {
let error = listener.start().await;
warn!("Listener {listener_name} exited: {error}");
info!("Listener {} version {} exited: {}", listener_name_for_async, version, error);
});
#[cfg(debug_assertions)]
if self.listener_handles.contains_key(&listener_name) {
debug!("Listener {listener_name} already exists, replacing it");
}
// note: join handle gets overwritten here if it already exists.
// handles are abort on drop so will be aborted, closing the socket
// but the any tasks spawned within this task, which happens on a per-connection basis,
// will survive past this point and only get dropped when their session ends
self.listener_handles.insert(listener_name, ListenerInfo::new(join_handle, listener_conf));

let listener_info = ListenerInfo::new(join_handle, listener_conf, version);
self.listener_handles.insert(listener_name.clone(), listener_info);

let version_count = self.listener_handles.get_vec(&listener_name).map(|v| v.len()).unwrap_or(0);
info!("Started version {} of listener {} ({} total active version(s))", version, listener_name, version_count);

Ok(())
}

pub fn stop_listener(&mut self, listener_name: &str) -> Result<()> {
if let Some(abort_handler) = self.listener_handles.remove(listener_name) {
info!("{listener_name} : Stopped");
abort_handler.handle.abort();
if let Some(listeners) = self.listener_handles.get_vec_mut(listener_name) {
info!("Stopping all {} version(s) of listener {}", listeners.len(), listener_name);
for listener_info in listeners.drain(..) {
info!("Stopping listener {} version {}", listener_name, listener_info.version);
listener_info.handle.abort();
}
self.listener_handles.remove(listener_name);
} else {
info!("No listeners found with name {}", listener_name);
}

Ok(())
Expand Down Expand Up @@ -202,9 +210,11 @@ mod tests {
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
tokio::task::yield_now().await;

// This should fail because the old listener exited already dropping the rx
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_err());
// Yield once more just in case more logs can be seen
// Both listeners should still be active (multiple versions allowed)
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());

assert_eq!(man.listener_handles.get_vec(name).unwrap().len(), 2);
tokio::task::yield_now().await;
}

Expand Down Expand Up @@ -239,7 +249,7 @@ mod tests {

// See .start_listener() - in the case all channels are dropped the task there
// should exit with this warning msg
let expected = format!("Listener {name} exited: channel closed");
let expected = format!("Listener {name} version 1 exited: channel closed");
logs_assert(|lines: &[&str]| {
let logs: Vec<_> = lines.iter().filter(|ln| ln.contains(&expected)).collect();
if logs.len() == 1 {
Expand All @@ -249,4 +259,78 @@ mod tests {
}
});
}

#[traced_test]
#[tokio::test]
async fn start_multiple_listener_versions() {
let chan = 10;
let name = "multi-version-listener";

let (_conf_tx, conf_rx) = mpsc::channel(chan);
let (_route_tx, route_rx) = mpsc::channel(chan);
let mut man = ListenersManager::new(conf_rx, route_rx);

let (routeb_tx1, routeb_rx) = broadcast::channel(chan);
let (_secb_tx1, secb_rx) = broadcast::channel(chan);
let l1 = Listener::test_listener(name, routeb_rx, secb_rx);
let l1_info = ListenerConfig {
name: name.into(),
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234),
filter_chains: HashMap::default(),
bind_device: None,
with_tls_inspector: false,
proxy_protocol_config: None,
with_tlv_listener_filter: false,
tlv_listener_filter_config: None,
};
man.start_listener(l1, l1_info).unwrap();
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
tokio::task::yield_now().await;

let (routeb_tx2, routeb_rx) = broadcast::channel(chan);
let (_secb_tx2, secb_rx) = broadcast::channel(chan);
let l2 = Listener::test_listener(name, routeb_rx, secb_rx);
let l2_info = ListenerConfig {
name: name.into(),
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235), // Different port
filter_chains: HashMap::default(),
bind_device: None,
with_tls_inspector: false,
proxy_protocol_config: None,
with_tlv_listener_filter: false,
tlv_listener_filter_config: None,
};
man.start_listener(l2, l2_info).unwrap();
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
tokio::task::yield_now().await;

let (routeb_tx3, routeb_rx) = broadcast::channel(chan);
let (_secb_tx3, secb_rx) = broadcast::channel(chan);
let l3 = Listener::test_listener(name, routeb_rx, secb_rx);
let l3_info = ListenerConfig {
name: name.into(),
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1236), // Different port
filter_chains: HashMap::default(),
bind_device: None,
with_tls_inspector: false,
proxy_protocol_config: None,
with_tlv_listener_filter: false,
tlv_listener_filter_config: None,
};
man.start_listener(l3, l3_info).unwrap();
assert!(routeb_tx3.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
tokio::task::yield_now().await;

assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
assert!(routeb_tx3.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());

assert_eq!(man.listener_handles.get_vec(name).unwrap().len(), 3);

man.stop_listener(name).unwrap();

assert!(man.listener_handles.get_vec(name).is_none());

tokio::task::yield_now().await;
}
}
Loading