Skip to content

Commit 360c688

Browse files
committed
feat: add HTTP Connection Manager drain_timeout support
- Add drain_timeout field with 5000ms default (Envoy compliant) - Integrate with drain signaling and timeout enforcement - Support protocol-specific draining (HTTP/1.1, HTTP/2, TCP) - Add comprehensive test coverage for drain behaviors Signed-off-by: Eeshu-Yadav <[email protected]>
1 parent 8dce552 commit 360c688

File tree

10 files changed

+2759
-94
lines changed

10 files changed

+2759
-94
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

orion-configuration/src/config/listener.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,20 @@ use std::{
3737

3838
use orion_interner::StringInterner;
3939

40+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
41+
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
42+
pub enum DrainType {
43+
#[default]
44+
Default,
45+
ModifyOnly,
46+
}
47+
4048
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
4149
pub struct Listener {
4250
pub name: CompactString,
4351
pub address: SocketAddr,
52+
#[serde(skip_serializing_if = "Option::is_none", default)]
53+
pub version_info: Option<String>,
4454
#[serde(with = "serde_filterchains")]
4555
pub filter_chains: HashMap<FilterChainMatch, FilterChain>,
4656
#[serde(skip_serializing_if = "Option::is_none", default = "Default::default")]
@@ -53,6 +63,8 @@ pub struct Listener {
5363
pub with_tlv_listener_filter: bool,
5464
#[serde(skip_serializing_if = "Option::is_none", default = "Default::default")]
5565
pub tlv_listener_filter_config: Option<super::listener_filters::TlvListenerFilterConfig>,
66+
#[serde(default)]
67+
pub drain_type: DrainType,
5668
}
5769

5870
impl Listener {
@@ -320,7 +332,7 @@ mod envoy_conversions {
320332
use std::hash::{DefaultHasher, Hash, Hasher};
321333
use std::str::FromStr;
322334

323-
use super::{FilterChain, FilterChainMatch, Listener, MainFilter, ServerNameMatch, TlsConfig};
335+
use super::{DrainType, FilterChain, FilterChainMatch, Listener, MainFilter, ServerNameMatch, TlsConfig};
324336
use crate::config::{
325337
common::*,
326338
core::{Address, CidrRange},
@@ -401,7 +413,7 @@ mod envoy_conversions {
401413
per_connection_buffer_limit_bytes,
402414
metadata,
403415
deprecated_v1,
404-
drain_type,
416+
// drain_type,
405417
// listener_filters,
406418
listener_filters_timeout,
407419
continue_on_listener_filters_timeout,
@@ -476,6 +488,7 @@ mod envoy_conversions {
476488
.with_node("socket_options");
477489
}
478490
let bind_device = bind_device.into_iter().next();
491+
let drain_type = DrainType::try_from(drain_type).unwrap_or_default();
479492
Ok(Self {
480493
name,
481494
address,
@@ -485,12 +498,26 @@ mod envoy_conversions {
485498
proxy_protocol_config,
486499
with_tlv_listener_filter,
487500
tlv_listener_filter_config,
501+
drain_type,
502+
version_info: None,
488503
})
489504
}())
490505
.with_name(name)
491506
}
492507
}
493508

509+
impl TryFrom<i32> for DrainType {
510+
type Error = GenericError;
511+
512+
fn try_from(value: i32) -> Result<Self, Self::Error> {
513+
match value {
514+
0 => Ok(DrainType::Default),
515+
1 => Ok(DrainType::ModifyOnly),
516+
_ => Err(GenericError::from_msg(format!("Unknown drain type: {}", value))),
517+
}
518+
}
519+
}
520+
494521
struct FilterChainWrapper((FilterChainMatch, FilterChain));
495522

496523
impl TryFrom<EnvoyFilterChain> for FilterChainWrapper {

orion-configuration/src/config/network_filters/http_connection_manager.rs

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ pub struct HttpConnectionManager {
5151
#[serde(with = "humantime_serde")]
5252
#[serde(skip_serializing_if = "Option::is_none", default)]
5353
pub request_timeout: Option<Duration>,
54+
#[serde(with = "humantime_serde")]
55+
#[serde(skip_serializing_if = "Option::is_none", default)]
56+
pub drain_timeout: Option<Duration>,
5457
#[serde(skip_serializing_if = "Vec::is_empty", default)]
5558
pub http_filters: Vec<HttpFilter>,
5659
#[serde(skip_serializing_if = "Vec::is_empty", default)]
@@ -564,6 +567,61 @@ mod tests {
564567
assert!(MatchHostScoreLPM::Wildcard < MatchHostScoreLPM::Suffix("foo.bar.test.com".len()));
565568
assert!(MatchHostScoreLPM::Wildcard == MatchHostScoreLPM::Wildcard);
566569
}
570+
571+
#[test]
572+
fn test_drain_timeout_configuration() {
573+
let config = HttpConnectionManager {
574+
codec_type: CodecType::Auto,
575+
route_specifier: RouteSpecifier::RouteConfig(RouteConfiguration {
576+
name: "test_route".into(),
577+
most_specific_header_mutations_wins: false,
578+
response_header_modifier: Default::default(),
579+
request_headers_to_add: vec![],
580+
request_headers_to_remove: vec![],
581+
virtual_hosts: vec![],
582+
}),
583+
http_filters: vec![],
584+
enabled_upgrades: vec![],
585+
access_log: vec![],
586+
xff_settings: Default::default(),
587+
generate_request_id: false,
588+
preserve_external_request_id: false,
589+
always_set_request_id_in_response: false,
590+
tracing: None,
591+
request_timeout: Some(Duration::from_secs(30)),
592+
drain_timeout: Some(Duration::from_secs(10)),
593+
};
594+
595+
assert_eq!(config.drain_timeout, Some(Duration::from_secs(10)));
596+
assert_eq!(config.request_timeout, Some(Duration::from_secs(30)));
597+
}
598+
599+
#[test]
600+
fn test_drain_timeout_default() {
601+
let config = HttpConnectionManager {
602+
codec_type: CodecType::Http1,
603+
route_specifier: RouteSpecifier::RouteConfig(RouteConfiguration {
604+
name: "test_route_default".into(),
605+
most_specific_header_mutations_wins: false,
606+
response_header_modifier: Default::default(),
607+
request_headers_to_add: vec![],
608+
request_headers_to_remove: vec![],
609+
virtual_hosts: vec![],
610+
}),
611+
http_filters: vec![],
612+
enabled_upgrades: vec![],
613+
access_log: vec![],
614+
xff_settings: Default::default(),
615+
generate_request_id: false,
616+
preserve_external_request_id: false,
617+
always_set_request_id_in_response: false,
618+
tracing: None,
619+
request_timeout: None,
620+
drain_timeout: None,
621+
};
622+
623+
assert_eq!(config.drain_timeout, None);
624+
}
567625
}
568626

569627
#[cfg(feature = "envoy-conversions")]
@@ -702,7 +760,7 @@ mod envoy_conversions {
702760
stream_idle_timeout,
703761
// request_timeout,
704762
request_headers_timeout,
705-
drain_timeout,
763+
// drain_timeout,
706764
delayed_close_timeout,
707765
// access_log,
708766
access_log_flush_interval,
@@ -753,6 +811,11 @@ mod envoy_conversions {
753811
.transpose()
754812
.map_err(|_| GenericError::from_msg("failed to convert into Duration"))
755813
.with_node("request_timeout")?;
814+
let drain_timeout = drain_timeout
815+
.map(duration_from_envoy)
816+
.transpose()
817+
.map_err(|_| GenericError::from_msg("failed to convert into Duration"))
818+
.with_node("drain_timeout")?;
756819
let enabled_upgrades = upgrade_configs
757820
.iter()
758821
.filter(|upgrade_config| upgrade_config.enabled.map(|enabled| enabled.value).unwrap_or(true))
@@ -819,6 +882,7 @@ mod envoy_conversions {
819882
enabled_upgrades,
820883
route_specifier,
821884
request_timeout,
885+
drain_timeout,
822886
access_log,
823887
xff_settings,
824888
generate_request_id: generate_request_id.map(|v| v.value).unwrap_or(true),

orion-lib/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ ahash = "0.8.11"
1111
arc-swap = "1.7.1"
1212
arrayvec = "0.7.6"
1313
async-stream = "0.3"
14+
async-trait = "0.1.77"
1415
atomic-time = "0.1.4"
1516
bytes.workspace = true
1617
compact_str.workspace = true
18+
dashmap = "6.0"
1719
enum_dispatch = "0.3.13"
1820
exponential-backoff.workspace = true
1921
futures.workspace = true

0 commit comments

Comments
 (0)