Skip to content

Commit b6e14d6

Browse files
committed
feat: add tracing spans for queries
1 parent 3db51fa commit b6e14d6

File tree

7 files changed

+208
-59
lines changed

7 files changed

+208
-59
lines changed

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ clickhouse-macros = { version = "0.3.0", path = "macros" }
126126
clickhouse-types = { version = "0.1.0", path = "types" }
127127

128128
thiserror = "2.0"
129-
serde = "1.0.106"
129+
serde = { version = "1.0.106", features = ["derive"] }
130+
serde_json = "1"
130131
bytes = "1.5.0"
131132
tokio = { version = "1.0.1", features = ["rt", "macros"] }
132133
http-body-util = "0.1.2"
@@ -153,18 +154,17 @@ chrono = { version = "0.4", optional = true, features = ["serde"] }
153154
bstr = { version = "1.11.0", default-features = false }
154155
quanta = { version = "0.12", optional = true }
155156
replace_with = { version = "0.1.7" }
157+
tracing = "0.1"
156158

157159
[dev-dependencies]
158160
clickhouse-macros = { version = "0.3.0", path = "macros" }
159161
criterion = "0.6"
160-
serde = { version = "1.0.106", features = ["derive"] }
161162
tokio = { version = "1.0.1", features = ["full", "test-util"] }
162163
hyper = { version = "1.1", features = ["server"] }
163164
indexmap = { version = "2.10.0", features = ["serde"] }
164165
linked-hash-map = { version = "0.5.6", features = ["serde_impl"] }
165166
fxhash = { version = "0.2.1" }
166167
serde_bytes = "0.11.4"
167-
serde_json = "1"
168168
serde_repr = "0.1.7"
169169
uuid = { version = "1", features = ["v4", "serde"] }
170170
time = { version = "0.3.17", features = ["macros", "rand", "parsing"] }

src/insert.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,21 @@ impl<T> Insert<T> {
354354
debug_assert!(matches!(self.state, InsertState::NotStarted { .. }));
355355
let (client, sql) = self.state.client_with_sql().unwrap(); // checked above
356356

357+
let span = tracing::info_span!(
358+
"clickhouse.insert",
359+
status = tracing::field::Empty,
360+
otel.status_code = tracing::field::Empty,
361+
otel.kind = "CLIENT",
362+
db.system.name = "clickhouse",
363+
db.query.text = sql,
364+
db.response.returned_rows = tracing::field::Empty,
365+
db.response.read_bytes = tracing::field::Empty,
366+
db.response.read_rows = tracing::field::Empty,
367+
db.response.written_bytes = tracing::field::Empty,
368+
db.response.written_rows = tracing::field::Empty,
369+
)
370+
.entered();
371+
357372
let mut url = Url::parse(&client.url).map_err(|err| Error::InvalidParams(err.into()))?;
358373
let mut pairs = url.query_pairs_mut();
359374
pairs.clear();
@@ -385,9 +400,13 @@ impl<T> Insert<T> {
385400
.map_err(|err| Error::InvalidParams(Box::new(err)))?;
386401

387402
let future = client.http.request(request);
403+
let span = span.exit();
388404
// TODO: introduce `Executor` to allow bookkeeping of spawned tasks.
389-
let handle =
390-
tokio::spawn(async move { Response::new(future, Compression::None).finish().await });
405+
let handle = tokio::spawn(async move {
406+
Response::new(future, Compression::None, span)
407+
.finish()
408+
.await
409+
});
391410

392411
match self.row_metadata {
393412
None => (), // RowBinary is used, no header is required.

src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ mod response;
3939
mod row;
4040
mod row_metadata;
4141
mod rowbinary;
42+
mod summary_header;
4243
#[cfg(feature = "inserter")]
4344
mod ticks;
4445

@@ -385,6 +386,12 @@ impl Client {
385386
query::Query::new(self, query)
386387
}
387388

389+
/// Starts a new SELECT/DDL query, with the `wait_end_of_query` setting enabled
390+
/// to buffer the full query results on the server
391+
pub fn query_buffered(&self, query: &str) -> query::Query {
392+
query::Query::new_buffered(self, query)
393+
}
394+
388395
/// Enables or disables [`Row`] data types validation against the database schema
389396
/// at the cost of performance. Validation is enabled by default, and in this mode,
390397
/// the client will use `RowBinaryWithNamesAndTypes` format.

src/query.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,23 @@ use crate::headers::with_authentication;
2424
pub struct Query {
2525
client: Client,
2626
sql: SqlBuilder,
27+
wait_end_of_query: bool,
2728
}
2829

2930
impl Query {
3031
pub(crate) fn new(client: &Client, template: &str) -> Self {
3132
Self {
3233
client: client.clone(),
3334
sql: SqlBuilder::new(template),
35+
wait_end_of_query: false,
36+
}
37+
}
38+
39+
pub(crate) fn new_buffered(client: &Client, template: &str) -> Self {
40+
Self {
41+
client: client.clone(),
42+
sql: SqlBuilder::new(template),
43+
wait_end_of_query: true,
3444
}
3545
}
3646

@@ -154,8 +164,25 @@ impl Query {
154164
read_only: bool,
155165
default_format: Option<&str>,
156166
) -> Result<Response> {
167+
let query_formatted = format!("{}", self.sql_display());
157168
let query = self.sql.finish()?;
158169

170+
let execution_span = tracing::info_span!(
171+
"clickhouse.query",
172+
status = tracing::field::Empty,
173+
otel.status_code = tracing::field::Empty,
174+
otel.kind = "CLIENT",
175+
db.system.name = "clickhouse",
176+
db.query.text = query_formatted,
177+
db.response.returned_rows = tracing::field::Empty,
178+
db.response.read_bytes = tracing::field::Empty,
179+
db.response.read_rows = tracing::field::Empty,
180+
db.response.written_bytes = tracing::field::Empty,
181+
db.response.written_rows = tracing::field::Empty,
182+
clickhouse.wait_end_of_query = self.wait_end_of_query,
183+
)
184+
.entered();
185+
159186
let mut url =
160187
Url::parse(&self.client.url).map_err(|err| Error::InvalidParams(Box::new(err)))?;
161188
let mut pairs = url.query_pairs_mut();
@@ -186,6 +213,10 @@ impl Query {
186213
pairs.append_pair("compress", "1");
187214
}
188215

216+
if self.wait_end_of_query {
217+
pairs.append_pair("wait_end_of_query", "1");
218+
}
219+
189220
for (name, value) in &self.client.options {
190221
pairs.append_pair(name, value);
191222
}
@@ -206,7 +237,11 @@ impl Query {
206237
.map_err(|err| Error::InvalidParams(Box::new(err)))?;
207238

208239
let future = self.client.http.request(request);
209-
Ok(Response::new(future, self.client.compression))
240+
Ok(Response::new(
241+
future,
242+
self.client.compression,
243+
execution_span.exit(),
244+
))
210245
}
211246

212247
/// Similar to [`Client::with_option`], but for this particular query only.

src/response.rs

Lines changed: 98 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,63 +19,111 @@ use crate::compression::lz4::Lz4Decoder;
1919
use crate::{
2020
compression::Compression,
2121
error::{Error, Result},
22+
summary_header::Summary,
2223
};
2324

25+
use tracing::{Instrument, Span};
26+
2427
// === Response ===
2528

2629
pub(crate) enum Response {
2730
// Headers haven't been received yet.
2831
// `Box<_>` improves performance by reducing the size of the whole future.
29-
Waiting(ResponseFuture),
32+
Waiting(ResponseFuture, Span),
3033
// Headers have been received, streaming the body.
31-
Loading(Chunks),
34+
Loading(Chunks, Span),
3235
}
3336

3437
pub(crate) type ResponseFuture = Pin<Box<dyn Future<Output = Result<Chunks>> + Send>>;
3538

3639
impl Response {
37-
pub(crate) fn new(response: HyperResponseFuture, compression: Compression) -> Self {
38-
Self::Waiting(Box::pin(async move {
39-
let response = response.await?;
40-
41-
let status = response.status();
42-
let exception_code = response.headers().get("X-ClickHouse-Exception-Code");
43-
44-
if status == StatusCode::OK && exception_code.is_none() {
45-
// More likely to be successful, start streaming.
46-
// It still can fail, but we'll handle it in `DetectDbException`.
47-
Ok(Chunks::new(response.into_body(), compression))
48-
} else {
49-
// An instantly failed request.
50-
Err(collect_bad_response(
51-
status,
52-
exception_code
53-
.and_then(|value| value.to_str().ok())
54-
.map(|code| format!("Code: {code}")),
55-
response.into_body(),
56-
compression,
57-
)
58-
.await)
40+
pub(crate) fn new(response: HyperResponseFuture, compression: Compression, span: Span) -> Self {
41+
let inner_span = span.clone();
42+
Self::Waiting(
43+
Box::pin(async move {
44+
let response = response.await?;
45+
if let Some(summary_header) = response.headers().get("x-clickhouse-summary") {
46+
match serde_json::from_slice::<Summary>(summary_header.as_bytes()) {
47+
Ok(summary_header) => {
48+
if let Some(rows) = summary_header.result_rows {
49+
inner_span.record("db.response.returned_rows", rows);
50+
}
51+
if let Some(rows) = summary_header.read_rows {
52+
inner_span.record("db.response.read_rows", rows);
53+
}
54+
if let Some(rows) = summary_header.written_rows {
55+
inner_span.record("db.response.written_rows", rows);
56+
}
57+
if let Some(bytes) = summary_header.read_bytes {
58+
inner_span.record("db.response.read_bytes", bytes);
59+
}
60+
if let Some(bytes) = summary_header.written_bytes {
61+
inner_span.record("db.response.written_bytes", bytes);
62+
}
63+
tracing::debug!(
64+
read_rows = summary_header.read_rows,
65+
read_bytes = summary_header.read_bytes,
66+
written_rows = summary_header.written_bytes,
67+
written_bytes = summary_header.written_rows,
68+
total_rows_to_read = summary_header.total_rows_to_read,
69+
result_rows = summary_header.result_rows,
70+
result_bytes = summary_header.result_bytes,
71+
elapsed_ns = summary_header.elapsed_ns,
72+
"finished processing query"
73+
)
74+
}
75+
Err(e) => {
76+
tracing::warn!(
77+
error = &e as &dyn std::error::Error,
78+
?summary_header,
79+
"invalid x-clickhouse-summary header returned",
80+
);
81+
}
82+
}
83+
}
84+
85+
let status = response.status();
86+
let exception_code = response.headers().get("X-ClickHouse-Exception-Code");
87+
88+
if status == StatusCode::OK && exception_code.is_none() {
89+
inner_span.record("otel.status_code", "OK");
90+
// More likely to be successful, start streaming.
91+
// It still can fail, but we'll handle it in `DetectDbException`.
92+
Ok(Chunks::new(response.into_body(), compression, inner_span))
93+
} else {
94+
inner_span.record("otel.status_code", "ERROR");
95+
// An instantly failed request.
96+
Err(collect_bad_response(
97+
status,
98+
exception_code
99+
.and_then(|value| value.to_str().ok())
100+
.map(|code| format!("Code: {code}")),
101+
response.into_body(),
102+
compression,
103+
)
104+
.await)
59105
}
60-
}))
106+
}), span)
61107
}
62108

63109
pub(crate) fn into_future(self) -> ResponseFuture {
64110
match self {
65-
Self::Waiting(future) => future,
66-
Self::Loading(_) => panic!("response is already streaming"),
111+
Self::Waiting(future, span) => Box::pin(future.instrument(span)),
112+
Self::Loading(_, _) => panic!("response is already streaming"),
67113
}
68114
}
69115

70116
pub(crate) async fn finish(&mut self) -> Result<()> {
71-
let chunks = loop {
117+
let (chunks, span) = loop {
72118
match self {
73-
Self::Waiting(future) => *self = Self::Loading(future.await?),
74-
Self::Loading(chunks) => break chunks,
119+
Self::Waiting(future, span) => {
120+
*self = Self::Loading(future.instrument(span.clone()).await?, span.clone())
121+
}
122+
Self::Loading(chunks, span) => break (chunks, span),
75123
}
76124
};
77125

78-
while chunks.try_next().await?.is_some() {}
126+
while chunks.try_next().instrument(span.clone()).await?.is_some() {}
79127
Ok(())
80128
}
81129
}
@@ -153,40 +201,52 @@ pub(crate) struct Chunk {
153201

154202
// * Uses `Option<_>` to make this stream fused.
155203
// * Uses `Box<_>` in order to reduce the size of cursors.
156-
pub(crate) struct Chunks(Option<Box<DetectDbException<Decompress<IncomingStream>>>>);
204+
pub(crate) struct Chunks {
205+
stream: Option<Box<DetectDbException<Decompress<IncomingStream>>>>,
206+
span: Option<Span>,
207+
}
157208

158209
impl Chunks {
159-
fn new(stream: Incoming, compression: Compression) -> Self {
210+
fn new(stream: Incoming, compression: Compression, span: Span) -> Self {
160211
let stream = IncomingStream(stream);
161212
let stream = Decompress::new(stream, compression);
162213
let stream = DetectDbException(stream);
163-
Self(Some(Box::new(stream)))
214+
Self {
215+
stream: Some(Box::new(stream)),
216+
span: Some(span),
217+
}
164218
}
165219

166220
pub(crate) fn empty() -> Self {
167-
Self(None)
221+
Self {
222+
stream: None,
223+
span: None,
224+
}
168225
}
169226

170227
#[cfg(feature = "futures03")]
171228
pub(crate) fn is_terminated(&self) -> bool {
172-
self.0.is_none()
229+
self.stream.is_none()
173230
}
174231
}
175232

176233
impl Stream for Chunks {
177234
type Item = Result<Chunk>;
178235

179236
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
237+
let guard = self.span.take().map(|s| s.entered());
180238
// We use `take()` to make the stream fused, including the case of panics.
181-
if let Some(mut stream) = self.0.take() {
239+
if let Some(mut stream) = self.stream.take() {
182240
let res = Pin::new(&mut stream).poll_next(cx);
183241

184242
if matches!(res, Poll::Pending | Poll::Ready(Some(Ok(_)))) {
185-
self.0 = Some(stream);
243+
self.stream = Some(stream);
244+
self.span = guard.map(|g| g.exit());
186245
}
187246

188247
res
189248
} else {
249+
self.span = guard.map(|g| g.exit());
190250
Poll::Ready(None)
191251
}
192252
}

0 commit comments

Comments
 (0)