Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion prosa/src/core/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::time::Duration;

use super::msg::InternalMsg;
use prosa_utils::msg::tvf::{Tvf, TvfError};
use prosa_utils::{
config::ConfigError,
msg::tvf::{Tvf, TvfError},
};
use tokio::sync::mpsc;

/// Processor error
Expand All @@ -26,6 +29,14 @@ impl<'a, E: ProcError + Send + Sync + 'a> From<E> for Box<dyn ProcError + Send +
}
}

/// For a ProSA `ConfigError`, no recovery is possible.
/// The configuration need to be valid
impl ProcError for ConfigError {
fn recoverable(&self) -> bool {
false
}
}

impl<M> ProcError for tokio::sync::mpsc::error::SendError<InternalMsg<M>>
where
M: Sized + Clone + Tvf,
Expand Down
2 changes: 1 addition & 1 deletion prosa/src/event/speed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::time::{Instant, sleep};

/// Structure to define a transaction flow speed
///
/// ```
/// ```no_run
/// use tokio::time::Instant;
/// use std::time::Duration;
/// use prosa::event::speed::Speed;
Expand Down
176 changes: 148 additions & 28 deletions prosa/src/io/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
path::Path,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use prosa_utils::config::ssl::SslConfig;
Expand All @@ -17,6 +18,7 @@ use serde::{Deserialize, Serialize};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::{TcpStream, ToSocketAddrs},
time::timeout,
};
use url::Url;

Expand Down Expand Up @@ -751,11 +753,11 @@ pub struct TargetSetting {
#[serde(skip_serializing)]
#[serde(default = "TargetSetting::get_default_connect_timeout")]
/// Timeout for socket connection in milliseconds
pub connect_timeout: u32,
pub connect_timeout: u64,
}

impl TargetSetting {
fn get_default_connect_timeout() -> u32 {
fn get_default_connect_timeout() -> u64 {
5000
}

Expand Down Expand Up @@ -784,6 +786,19 @@ impl TargetSetting {
self.ssl.is_some() || url_is_ssl(&self.url)
}

/// Getter of the URL with masked inner credential
pub fn get_safe_url(&self) -> Url {
let mut url = self.url.clone();
if !url.username().is_empty() {
let _ = url.set_username("***");
}
if url.password().is_some() {
let _ = url.set_password(Some("***"));
}

url
}

/// Method to get authentication value out of URL username/password
///
/// - If user password is provided, it return *Basic* authentication with base64 encoded username:password
Expand Down Expand Up @@ -830,7 +845,17 @@ impl TargetSetting {
pub async fn connect(&self) -> Result<Stream, io::Error> {
#[cfg(target_family = "unix")]
if self.url.scheme() == "unix" || self.url.scheme() == "file" {
return Stream::connect_unix(self.url.path()).await;
return timeout(
Duration::from_millis(self.connect_timeout),
Stream::connect_unix(self.url.path()),
)
.await
.map_err(|e| {
io::Error::new(
io::ErrorKind::TimedOut,
format!("unix timeout after {e} for {}", self.get_safe_url()),
)
})?;
}

#[cfg(feature = "openssl")]
Expand All @@ -855,21 +880,45 @@ impl TargetSetting {
{
#[cfg(feature = "openssl")]
if let Some(ssl_cx) = openssl_context {
return Stream::connect_openssl_with_http_proxy(
self.url.host_str().unwrap_or_default(),
self.url.port_or_known_default().unwrap_or_default(),
&ssl_cx,
proxy_url,
return timeout(
Duration::from_millis(self.connect_timeout),
Stream::connect_openssl_with_http_proxy(
self.url.host_str().unwrap_or_default(),
self.url.port_or_known_default().unwrap_or_default(),
&ssl_cx,
proxy_url,
),
)
.await;
.await
.map_err(|e| {
io::Error::new(
io::ErrorKind::TimedOut,
format!(
"openssl with proxy timeout after {e} for {}",
self.get_safe_url()
),
)
})?;
}

return Stream::connect_tcp_with_http_proxy(
self.url.host_str().unwrap_or_default(),
self.url.port_or_known_default().unwrap_or_default(),
proxy_url,
return timeout(
Duration::from_millis(self.connect_timeout),
Stream::connect_tcp_with_http_proxy(
self.url.host_str().unwrap_or_default(),
self.url.port_or_known_default().unwrap_or_default(),
proxy_url,
),
)
.await;
.await
.map_err(|e| {
io::Error::new(
io::ErrorKind::TimedOut,
format!(
"tcp with proxy timeout after {e} for {}",
self.get_safe_url()
),
)
})?;
}

#[cfg(not(feature = "http-proxy"))]
Expand All @@ -887,11 +936,31 @@ impl TargetSetting {

#[cfg(feature = "openssl")]
if let Some(ssl_cx) = openssl_context {
return Stream::connect_openssl(&self.url, &ssl_cx).await;
return timeout(
Duration::from_millis(self.connect_timeout),
Stream::connect_openssl(&self.url, &ssl_cx),
)
.await
.map_err(|e| {
io::Error::new(
io::ErrorKind::TimedOut,
format!("openssl timeout after {e} for {}", self.get_safe_url()),
)
})?;
}

let addrs = self.url.socket_addrs(|| self.url.port_or_known_default())?;
Stream::connect_tcp(&*addrs).await
timeout(
Duration::from_millis(self.connect_timeout),
Stream::connect_tcp(&*addrs),
)
.await
.map_err(|e| {
io::Error::new(
io::ErrorKind::TimedOut,
format!("tcp timeout after {e} for {}", self.get_safe_url()),
)
})?
}
}

Expand All @@ -912,7 +981,7 @@ impl fmt::Debug for TargetSetting {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut binding = f.debug_struct("TargetSetting");
binding
.field("url", &self.url)
.field("url", &self.get_safe_url())
.field("ssl", &self.ssl)
.field("connect_timeout", &self.connect_timeout);
if let Some(proxy_url) = &self.proxy {
Expand All @@ -924,7 +993,7 @@ impl fmt::Debug for TargetSetting {

impl fmt::Display for TargetSetting {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut url = self.url.clone();
let mut url = self.get_safe_url();
if self.ssl.is_some() {
let url_scheme = url.scheme();
if url_scheme.is_empty() {
Expand All @@ -938,18 +1007,69 @@ impl fmt::Display for TargetSetting {
}
}

// Mask username and password to avoid data leak in logs
if !url.username().is_empty() {
let _ = url.set_username("***");
}
if url.password().is_some() {
let _ = url.set_password(Some("***"));
}

if let Some(proxy_url) = &self.proxy {
write!(f, "{url} -proxy {proxy_url}")
if f.alternate() {
if let Some(proxy_url) = &self.proxy {
write!(f, "{url} -proxy {proxy_url}")
} else {
write!(f, "{url}")
}
} else {
// Remove username and password for more visibility
let _ = url.set_username("");
let _ = url.set_password(None);
write!(f, "{url}")
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn target_settings_test() {
let target_without_credential =
TargetSetting::new(Url::parse("https://localhost:4443/v1").unwrap(), None, None);
assert_eq!(
"https://localhost:4443/v1",
target_without_credential.to_string()
);
assert_eq!(
"https://localhost:4443/v1",
format!("{target_without_credential:#}")
);

let target_with_user_password = TargetSetting::new(
Url::parse("https://admin:admin@localhost:4443/v1").unwrap(),
None,
None,
);
assert_eq!(
"https://localhost:4443/v1",
target_with_user_password.to_string()
);
assert_eq!(
"https://***:***@localhost:4443/v1",
format!("{target_with_user_password:#}")
);
assert_eq!(
"TargetSetting { url: Url { scheme: \"https\", cannot_be_a_base: false, username: \"***\", password: Some(\"***\"), host: Some(Domain(\"localhost\")), port: Some(4443), path: \"/v1\", query: None, fragment: None }, ssl: None, connect_timeout: 5000 }",
format!("{target_with_user_password:?}")
);

let target_with_token = TargetSetting::new(
Url::parse("https://:token@localhost:4443/v1").unwrap(),
None,
None,
);
assert_eq!("https://localhost:4443/v1", target_with_token.to_string());
assert_eq!(
"https://:***@localhost:4443/v1",
format!("{target_with_token:#}")
);
assert_eq!(
"TargetSetting { url: Url { scheme: \"https\", cannot_be_a_base: false, username: \"\", password: Some(\"***\"), host: Some(Domain(\"localhost\")), port: Some(4443), path: \"/v1\", query: None, fragment: None }, ssl: None, connect_timeout: 5000 }",
format!("{target_with_token:?}")
);
}
}