Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ spnego = ["curl-sys/spnego"]
static-curl = ["curl/static-curl"]
static-ssl = ["curl/static-ssl"]
text-decoding = ["encoding_rs", "mime"]
tracing-01 = ["tracing", "tracing-futures"]
unstable-interceptors = []

[dependencies]
Expand Down Expand Up @@ -103,13 +104,15 @@ optional = true
# tracing ecosystem compatibility
[dependencies.tracing]
version = "0.1.17"
features = ["log"]
default-features = false
optional = true

# tracing ecosystem compatibility
[dependencies.tracing-futures]
version = "0.2"
default-features = false
features = ["std", "std-future"]
optional = true

[dev-dependencies]
env_logger = "0.9"
Expand Down
66 changes: 42 additions & 24 deletions src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@ impl AgentBuilder {

// Create a span for the agent thread that outlives this method call,
// but rather was caused by it.
let agent_span = tracing::debug_span!("agent_thread", id);
agent_span.follows_from(tracing::Span::current());
let agent_span = span!(DEBUG, "agent_thread", id);
span_follows_from_current!(agent_span);

let waker = selector.waker();
let message_tx_clone = message_tx.clone();

let thread_main = move || {
let _enter = agent_span.enter();
enter_span!(agent_span);

let mut multi = Multi::new();

if max_connections > 0 {
Expand All @@ -123,12 +124,12 @@ impl AgentBuilder {

drop(wait_group_thread);

tracing::debug!("agent took {:?} to start up", create_start.elapsed());
debug!("agent took {:?} to start up", create_start.elapsed());

let result = agent.run();

if let Err(e) = &result {
tracing::error!("agent shut down with error: {:?}", e);
error!("agent shut down with error: {:?}", e);
}

result
Expand Down Expand Up @@ -270,14 +271,14 @@ impl Drop for Handle {
fn drop(&mut self) {
// Request the agent thread to shut down.
if self.send_message(Message::Close).is_err() {
tracing::error!("agent thread terminated prematurely");
error!("agent thread terminated prematurely");
}

// Wait for the agent thread to shut down before continuing.
match self.try_join() {
JoinResult::Ok => tracing::trace!("agent thread joined cleanly"),
JoinResult::Err(e) => tracing::error!("agent thread terminated with error: {}", e),
JoinResult::Panic => tracing::error!("agent thread panicked"),
JoinResult::Ok => trace!("agent thread joined cleanly"),
JoinResult::Err(e) => error!("agent thread terminated with error: {}", e),
JoinResult::Panic => error!("agent thread panicked"),
_ => {}
}
}
Expand Down Expand Up @@ -329,10 +330,15 @@ impl AgentContext {
})
}

#[tracing::instrument(level = "trace", skip(self))]
fn begin_request(&mut self, mut request: EasyHandle) -> Result<(), Error> {
let span = span!(TRACE, "begin_request", ?request);
enter_span!(span);

let id = request.get_ref().id();

let request_span = request.get_ref().span().clone();
let response_span = request.get_ref().span().clone();

// Initialize the handler.
request.get_mut().init(
{
Expand All @@ -342,7 +348,8 @@ impl AgentContext {
.chain(move |inner| match tx.try_send(Message::UnpauseRead(id)) {
Ok(()) => inner.wake_by_ref(),
Err(_) => {
tracing::warn!(id, "agent went away while resuming read for request")
enter_span!(request_span);
warn!("agent went away while resuming read for request")
}
})
},
Expand All @@ -353,7 +360,8 @@ impl AgentContext {
.chain(move |inner| match tx.try_send(Message::UnpauseWrite(id)) {
Ok(()) => inner.wake_by_ref(),
Err(_) => {
tracing::warn!(id, "agent went away while resuming write for request")
enter_span!(response_span);
warn!("agent went away while resuming write for request")
}
})
},
Expand All @@ -370,12 +378,14 @@ impl AgentContext {
Ok(())
}

#[tracing::instrument(level = "trace", skip(self))]
fn complete_request(
&mut self,
token: usize,
result: Result<(), curl::Error>,
) -> Result<(), Error> {
let span = span!(TRACE, "complete_request", token, ?result);
enter_span!(span);

let handle = self.requests.remove(&token).unwrap();
let mut handle = self.multi.remove2(handle).map_err(Error::from_any)?;

Expand All @@ -388,14 +398,16 @@ impl AgentContext {
///
/// If there are no active requests right now, this function will block
/// until a message is received.
#[tracing::instrument(level = "trace", skip(self))]
fn poll_messages(&mut self) -> Result<(), Error> {
let span = span!(TRACE, "poll_messages");
enter_span!(span);

while !self.close_requested {
if self.requests.is_empty() {
match block_on(self.message_rx.recv()) {
Ok(message) => self.handle_message(message)?,
_ => {
tracing::warn!("agent handle disconnected without close message");
warn!("agent handle disconnected without close message");
self.close_requested = true;
break;
}
Expand All @@ -405,7 +417,7 @@ impl AgentContext {
Ok(message) => self.handle_message(message)?,
Err(async_channel::TryRecvError::Empty) => break,
Err(async_channel::TryRecvError::Closed) => {
tracing::warn!("agent handle disconnected without close message");
warn!("agent handle disconnected without close message");
self.close_requested = true;
break;
}
Expand All @@ -416,15 +428,18 @@ impl AgentContext {
Ok(())
}

#[tracing::instrument(level = "trace", skip(self))]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can use #[cfg_attr(feature="tracing", tracing::instrument)] instead.

fn handle_message(&mut self, message: Message) -> Result<(), Error> {
tracing::trace!("received message from agent handle");
let span = span!(TRACE, "handle_message", ?message);
enter_span!(span);
trace!("received message from agent handle");

match message {
Message::Close => self.close_requested = true,
Message::Execute(request) => self.begin_request(request)?,
Message::UnpauseRead(token) => {
if let Some(request) = self.requests.get(&token) {
enter_span!(request.get_ref().span());

if let Err(e) = request.unpause_read() {
// If unpausing returned an error, it is likely because
// curl called our callback inline and the callback
Expand All @@ -433,17 +448,19 @@ impl AgentContext {
// the transfer alive until it errors through the normal
// means, which is likely to happen this turn of the
// event loop anyway.
tracing::debug!(id = token, "error unpausing read for request: {:?}", e);
debug!("error unpausing read for request: {:?}", e);
}
} else {
tracing::warn!(
warn!(
"received unpause request for unknown request token: {}",
token
);
}
}
Message::UnpauseWrite(token) => {
if let Some(request) = self.requests.get(&token) {
enter_span!(request.get_ref().span());

if let Err(e) = request.unpause_write() {
// If unpausing returned an error, it is likely because
// curl called our callback inline and the callback
Expand All @@ -452,10 +469,10 @@ impl AgentContext {
// the transfer alive until it errors through the normal
// means, which is likely to happen this turn of the
// event loop anyway.
tracing::debug!(id = token, "error unpausing write for request: {:?}", e);
debug!("error unpausing write for request: {:?}", e);
}
} else {
tracing::warn!(
warn!(
"received unpause request for unknown request token: {}",
token
);
Expand Down Expand Up @@ -496,7 +513,7 @@ impl AgentContext {
}
}

tracing::debug!("agent shutting down");
debug!("agent shutting down");

self.requests.clear();

Expand All @@ -517,7 +534,8 @@ impl AgentContext {
if self.selector.poll(poll_timeout)? {
// At least one I/O event occurred, handle them.
for (socket, readable, writable) in self.selector.events() {
tracing::trace!(socket, readable, writable, "socket event");
#[cfg(feature = "tracing")]
trace!(socket, readable, writable, "socket event");
let mut events = Events::new();
events.input(readable);
events.output(writable);
Expand Down
10 changes: 7 additions & 3 deletions src/agent/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ impl Selector {
// hold onto this currently invalid socket for later. Whenever
// `poll` is called, we retry registering these sockets in the
// hope that they will eventually become valid.
tracing::debug!(socket, error = ?e, "bad socket registered, will try again later");
#[cfg(feature = "tracing")]
debug!(socket, error = ?e, "bad socket registered, will try again later");
#[cfg(not(feature = "tracing"))]
debug!("bad socket registered, will try again later: {:?}", e);

self.bad_sockets.insert(socket);
Ok(())
}
Expand Down Expand Up @@ -216,7 +220,7 @@ fn poller_add(poller: &Poller, socket: Socket, readable: bool, writable: bool) -
readable,
writable,
}) {
tracing::debug!(
debug!(
"failed to add interest for socket {}, retrying as a modify: {}",
socket,
e
Expand Down Expand Up @@ -244,7 +248,7 @@ fn poller_modify(
readable,
writable,
}) {
tracing::debug!(
debug!(
"failed to modify interest for socket {}, retrying as an add: {}",
socket,
e
Expand Down
15 changes: 7 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use std::{
task::{Context, Poll},
time::Duration,
};
use tracing_futures::Instrument;

static USER_AGENT: Lazy<String> = Lazy::new(|| {
format!(
Expand Down Expand Up @@ -910,7 +909,8 @@ impl HttpClient {
where
B: Into<Body>,
{
let span = tracing::debug_span!(
let span = span!(
DEBUG,
"send",
method = ?request.method(),
uri = ?request.uri(),
Expand All @@ -925,7 +925,7 @@ impl HttpClient {
});

let response = block_on(
async move {
instrument_span!(span, async move {
// Instead of simply blocking the current thread until the response
// is received, we can use the current thread to read from the
// request body synchronously while concurrently waiting for the
Expand All @@ -943,8 +943,7 @@ impl HttpClient {
} else {
self.send_async_inner(request).await
}
}
.instrument(span),
}),
)?;

Ok(response.map(|body| body.into_sync()))
Expand Down Expand Up @@ -1001,15 +1000,15 @@ impl HttpClient {
where
B: Into<AsyncBody>,
{
let span = tracing::debug_span!(
let span = span!(
DEBUG,
"send_async",
method = ?request.method(),
uri = ?request.uri(),
);

ResponseFuture::new(
self.send_async_inner(request.map(Into::into))
.instrument(span),
instrument_span!(span, self.send_async_inner(request.map(Into::into)))
)
}

Expand Down
4 changes: 2 additions & 2 deletions src/cookies/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ impl Interceptor for CookieInterceptor {
.into_iter()
.filter_map(|header| {
header.to_str().ok().or_else(|| {
tracing::warn!("invalid encoding in Set-Cookie header");
warn!("invalid encoding in Set-Cookie header");
None
})
})
.filter_map(|header| {
Cookie::parse(header).ok().or_else(|| {
tracing::warn!("could not parse Set-Cookie header");
warn!("could not parse Set-Cookie header");
None
})
});
Expand Down
8 changes: 4 additions & 4 deletions src/cookies/jar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl CookieJar {
let request_host = if let Some(host) = request_uri.host() {
host
} else {
tracing::warn!(
warn!(
"cookie '{}' dropped, no domain specified in request URI",
cookie.name()
);
Expand All @@ -149,7 +149,7 @@ impl CookieJar {
// The given domain must domain-match the origin.
// https://tools.ietf.org/html/rfc6265#section-5.3.6
if !domain_matches(request_host, domain) {
tracing::warn!(
warn!(
"cookie '{}' dropped, domain '{}' not allowed to set cookies for '{}'",
cookie.name(),
request_host,
Expand All @@ -163,7 +163,7 @@ impl CookieJar {

// Drop cookies for top-level domains.
if !domain.contains('.') {
tracing::warn!(
warn!(
"cookie '{}' dropped, setting cookies for domain '{}' is not allowed",
cookie.name(),
domain
Expand All @@ -179,7 +179,7 @@ impl CookieJar {
#[cfg(feature = "psl")]
{
if super::psl::is_public_suffix(domain) {
tracing::warn!(
warn!(
"cookie '{}' dropped, setting cookies for domain '{}' is not allowed",
cookie.name(),
domain
Expand Down
6 changes: 3 additions & 3 deletions src/cookies/psl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ impl ListCache {
// Parse the suffix list.
self.list = response.text()?.parse()?;
self.last_updated = Some(SystemTime::now());
tracing::debug!("public suffix list updated");
debug!("public suffix list updated");
}

http::StatusCode::NOT_MODIFIED => {
// List hasn't changed and is still new.
self.last_updated = Some(SystemTime::now());
}

status => tracing::warn!(
status => warn!(
"could not update public suffix list, got status code {}",
status,
),
Expand Down Expand Up @@ -157,7 +157,7 @@ fn refresh_in_background() {
let mut cache = CACHE.write().unwrap();

if let Err(e) = cache.refresh() {
tracing::warn!("could not refresh public suffix list: {}", e);
warn!("could not refresh public suffix list: {}", e);
}

IS_REFRESHING.store(false, Ordering::SeqCst);
Expand Down
Loading