Skip to content

Commit 068e7a7

Browse files
authored
feat/fix: improve config (#53)
Signed-off-by: Jeremy HERGAULT <[email protected]>
1 parent c9dcad1 commit 068e7a7

File tree

2 files changed

+160
-29
lines changed

2 files changed

+160
-29
lines changed

prosa/src/core/error.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use std::time::Duration;
22

33
use super::msg::InternalMsg;
4-
use prosa_utils::msg::tvf::{Tvf, TvfError};
4+
use prosa_utils::{
5+
config::ConfigError,
6+
msg::tvf::{Tvf, TvfError},
7+
};
58
use tokio::sync::mpsc;
69

710
/// Processor error
@@ -26,6 +29,14 @@ impl<'a, E: ProcError + Send + Sync + 'a> From<E> for Box<dyn ProcError + Send +
2629
}
2730
}
2831

32+
/// For a ProSA `ConfigError`, no recovery is possible.
33+
/// The configuration need to be valid
34+
impl ProcError for ConfigError {
35+
fn recoverable(&self) -> bool {
36+
false
37+
}
38+
}
39+
2940
impl<M> ProcError for tokio::sync::mpsc::error::SendError<InternalMsg<M>>
3041
where
3142
M: Sized + Clone + Tvf,

prosa/src/io/stream.rs

Lines changed: 148 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
path::Path,
77
pin::Pin,
88
task::{Context, Poll},
9+
time::Duration,
910
};
1011

1112
use prosa_utils::config::ssl::SslConfig;
@@ -17,6 +18,7 @@ use serde::{Deserialize, Serialize};
1718
use tokio::{
1819
io::{AsyncRead, AsyncWrite, ReadBuf},
1920
net::{TcpStream, ToSocketAddrs},
21+
time::timeout,
2022
};
2123
use url::Url;
2224

@@ -751,11 +753,11 @@ pub struct TargetSetting {
751753
#[serde(skip_serializing)]
752754
#[serde(default = "TargetSetting::get_default_connect_timeout")]
753755
/// Timeout for socket connection in milliseconds
754-
pub connect_timeout: u32,
756+
pub connect_timeout: u64,
755757
}
756758

757759
impl TargetSetting {
758-
fn get_default_connect_timeout() -> u32 {
760+
fn get_default_connect_timeout() -> u64 {
759761
5000
760762
}
761763

@@ -784,6 +786,19 @@ impl TargetSetting {
784786
self.ssl.is_some() || url_is_ssl(&self.url)
785787
}
786788

789+
/// Getter of the URL with masked inner credential
790+
pub fn get_safe_url(&self) -> Url {
791+
let mut url = self.url.clone();
792+
if !url.username().is_empty() {
793+
let _ = url.set_username("***");
794+
}
795+
if url.password().is_some() {
796+
let _ = url.set_password(Some("***"));
797+
}
798+
799+
url
800+
}
801+
787802
/// Method to get authentication value out of URL username/password
788803
///
789804
/// - If user password is provided, it return *Basic* authentication with base64 encoded username:password
@@ -830,7 +845,17 @@ impl TargetSetting {
830845
pub async fn connect(&self) -> Result<Stream, io::Error> {
831846
#[cfg(target_family = "unix")]
832847
if self.url.scheme() == "unix" || self.url.scheme() == "file" {
833-
return Stream::connect_unix(self.url.path()).await;
848+
return timeout(
849+
Duration::from_millis(self.connect_timeout),
850+
Stream::connect_unix(self.url.path()),
851+
)
852+
.await
853+
.map_err(|e| {
854+
io::Error::new(
855+
io::ErrorKind::TimedOut,
856+
format!("unix timeout after {e} for {}", self.get_safe_url()),
857+
)
858+
})?;
834859
}
835860

836861
#[cfg(feature = "openssl")]
@@ -855,21 +880,45 @@ impl TargetSetting {
855880
{
856881
#[cfg(feature = "openssl")]
857882
if let Some(ssl_cx) = openssl_context {
858-
return Stream::connect_openssl_with_http_proxy(
859-
self.url.host_str().unwrap_or_default(),
860-
self.url.port_or_known_default().unwrap_or_default(),
861-
&ssl_cx,
862-
proxy_url,
883+
return timeout(
884+
Duration::from_millis(self.connect_timeout),
885+
Stream::connect_openssl_with_http_proxy(
886+
self.url.host_str().unwrap_or_default(),
887+
self.url.port_or_known_default().unwrap_or_default(),
888+
&ssl_cx,
889+
proxy_url,
890+
),
863891
)
864-
.await;
892+
.await
893+
.map_err(|e| {
894+
io::Error::new(
895+
io::ErrorKind::TimedOut,
896+
format!(
897+
"openssl with proxy timeout after {e} for {}",
898+
self.get_safe_url()
899+
),
900+
)
901+
})?;
865902
}
866903

867-
return Stream::connect_tcp_with_http_proxy(
868-
self.url.host_str().unwrap_or_default(),
869-
self.url.port_or_known_default().unwrap_or_default(),
870-
proxy_url,
904+
return timeout(
905+
Duration::from_millis(self.connect_timeout),
906+
Stream::connect_tcp_with_http_proxy(
907+
self.url.host_str().unwrap_or_default(),
908+
self.url.port_or_known_default().unwrap_or_default(),
909+
proxy_url,
910+
),
871911
)
872-
.await;
912+
.await
913+
.map_err(|e| {
914+
io::Error::new(
915+
io::ErrorKind::TimedOut,
916+
format!(
917+
"tcp with proxy timeout after {e} for {}",
918+
self.get_safe_url()
919+
),
920+
)
921+
})?;
873922
}
874923

875924
#[cfg(not(feature = "http-proxy"))]
@@ -887,11 +936,31 @@ impl TargetSetting {
887936

888937
#[cfg(feature = "openssl")]
889938
if let Some(ssl_cx) = openssl_context {
890-
return Stream::connect_openssl(&self.url, &ssl_cx).await;
939+
return timeout(
940+
Duration::from_millis(self.connect_timeout),
941+
Stream::connect_openssl(&self.url, &ssl_cx),
942+
)
943+
.await
944+
.map_err(|e| {
945+
io::Error::new(
946+
io::ErrorKind::TimedOut,
947+
format!("openssl timeout after {e} for {}", self.get_safe_url()),
948+
)
949+
})?;
891950
}
892951

893952
let addrs = self.url.socket_addrs(|| self.url.port_or_known_default())?;
894-
Stream::connect_tcp(&*addrs).await
953+
timeout(
954+
Duration::from_millis(self.connect_timeout),
955+
Stream::connect_tcp(&*addrs),
956+
)
957+
.await
958+
.map_err(|e| {
959+
io::Error::new(
960+
io::ErrorKind::TimedOut,
961+
format!("tcp timeout after {e} for {}", self.get_safe_url()),
962+
)
963+
})?
895964
}
896965
}
897966

@@ -912,7 +981,7 @@ impl fmt::Debug for TargetSetting {
912981
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
913982
let mut binding = f.debug_struct("TargetSetting");
914983
binding
915-
.field("url", &self.url)
984+
.field("url", &self.get_safe_url())
916985
.field("ssl", &self.ssl)
917986
.field("connect_timeout", &self.connect_timeout);
918987
if let Some(proxy_url) = &self.proxy {
@@ -924,7 +993,7 @@ impl fmt::Debug for TargetSetting {
924993

925994
impl fmt::Display for TargetSetting {
926995
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
927-
let mut url = self.url.clone();
996+
let mut url = self.get_safe_url();
928997
if self.ssl.is_some() {
929998
let url_scheme = url.scheme();
930999
if url_scheme.is_empty() {
@@ -938,18 +1007,69 @@ impl fmt::Display for TargetSetting {
9381007
}
9391008
}
9401009

941-
// Mask username and password to avoid data leak in logs
942-
if !url.username().is_empty() {
943-
let _ = url.set_username("***");
944-
}
945-
if url.password().is_some() {
946-
let _ = url.set_password(Some("***"));
947-
}
948-
949-
if let Some(proxy_url) = &self.proxy {
950-
write!(f, "{url} -proxy {proxy_url}")
1010+
if f.alternate() {
1011+
if let Some(proxy_url) = &self.proxy {
1012+
write!(f, "{url} -proxy {proxy_url}")
1013+
} else {
1014+
write!(f, "{url}")
1015+
}
9511016
} else {
1017+
// Remove username and password for more visibility
1018+
let _ = url.set_username("");
1019+
let _ = url.set_password(None);
9521020
write!(f, "{url}")
9531021
}
9541022
}
9551023
}
1024+
1025+
#[cfg(test)]
1026+
mod tests {
1027+
use super::*;
1028+
1029+
#[test]
1030+
fn target_settings_test() {
1031+
let target_without_credential =
1032+
TargetSetting::new(Url::parse("https://localhost:4443/v1").unwrap(), None, None);
1033+
assert_eq!(
1034+
"https://localhost:4443/v1",
1035+
target_without_credential.to_string()
1036+
);
1037+
assert_eq!(
1038+
"https://localhost:4443/v1",
1039+
format!("{target_without_credential:#}")
1040+
);
1041+
1042+
let target_with_user_password = TargetSetting::new(
1043+
Url::parse("https://admin:admin@localhost:4443/v1").unwrap(),
1044+
None,
1045+
None,
1046+
);
1047+
assert_eq!(
1048+
"https://localhost:4443/v1",
1049+
target_with_user_password.to_string()
1050+
);
1051+
assert_eq!(
1052+
"https://***:***@localhost:4443/v1",
1053+
format!("{target_with_user_password:#}")
1054+
);
1055+
assert_eq!(
1056+
"TargetSetting { url: Url { scheme: \"https\", cannot_be_a_base: false, username: \"***\", password: Some(\"***\"), host: Some(Domain(\"localhost\")), port: Some(4443), path: \"/v1\", query: None, fragment: None }, ssl: None, connect_timeout: 5000 }",
1057+
format!("{target_with_user_password:?}")
1058+
);
1059+
1060+
let target_with_token = TargetSetting::new(
1061+
Url::parse("https://:token@localhost:4443/v1").unwrap(),
1062+
None,
1063+
None,
1064+
);
1065+
assert_eq!("https://localhost:4443/v1", target_with_token.to_string());
1066+
assert_eq!(
1067+
"https://:***@localhost:4443/v1",
1068+
format!("{target_with_token:#}")
1069+
);
1070+
assert_eq!(
1071+
"TargetSetting { url: Url { scheme: \"https\", cannot_be_a_base: false, username: \"\", password: Some(\"***\"), host: Some(Domain(\"localhost\")), port: Some(4443), path: \"/v1\", query: None, fragment: None }, ssl: None, connect_timeout: 5000 }",
1072+
format!("{target_with_token:?}")
1073+
);
1074+
}
1075+
}

0 commit comments

Comments
 (0)