diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json index 0b881f85a13..38af3b1589e 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json @@ -74,6 +74,7 @@ "jaeger": { "enable_endpoint": true, "lookback_period_hours": 24, + "lookback_period_traces_hours": 72, "max_trace_duration_secs": 600, "max_fetch_spans": 1000 } diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml index 97b44376a75..4b19a80cbca 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml @@ -64,5 +64,6 @@ max_num_retries = 2 [jaeger] enable_endpoint = true lookback_period_hours = 24 +lookback_period_traces_hours = 72 max_trace_duration_secs = 600 max_fetch_spans = 1_000 diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml index c23ec0c0e3f..94f592191e0 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml @@ -67,5 +67,6 @@ searcher: jaeger: enable_endpoint: true lookback_period_hours: 24 + lookback_period_traces_hours: 72 max_trace_duration_secs: 600 max_fetch_spans: 1000 diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index bb8a17daaeb..f03df23dae4 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -497,9 +497,14 @@ pub struct JaegerConfig { #[serde(default = "JaegerConfig::default_enable_endpoint")] pub enable_endpoint: bool, /// How far back in time we look for spans when queries at not time-bound (`get_services`, - /// `get_operations`, `get_trace` operations). + /// `get_operations` operations). #[serde(default = "JaegerConfig::default_lookback_period_hours")] lookback_period_hours: NonZeroU64, + + #[serde(default = "JaegerConfig::default_lookback_period_traces_hours")] + /// How far back in time we look for traces when queries at not time-bound (`get_trace` + /// operation). + lookback_period_traces_hours: NonZeroU64, /// The assumed maximum duration of a trace in seconds. /// /// Finding a trace happens in two phases: the first phase identifies at least one span that @@ -519,6 +524,10 @@ impl JaegerConfig { Duration::from_secs(self.lookback_period_hours.get() * 3600) } + pub fn lookback_period_traces(&self) -> Duration { + Duration::from_secs(self.lookback_period_traces_hours.get() * 3600) + } + pub fn max_trace_duration(&self) -> Duration { Duration::from_secs(self.max_trace_duration_secs.get()) } @@ -538,6 +547,10 @@ impl JaegerConfig { NonZeroU64::new(72).unwrap() // 3 days } + fn default_lookback_period_traces_hours() -> NonZeroU64 { + NonZeroU64::new(72).unwrap() // 3 days + } + fn default_max_trace_duration_secs() -> NonZeroU64 { NonZeroU64::new(3600).unwrap() // 1 hour } @@ -552,6 +565,7 @@ impl Default for JaegerConfig { Self { enable_endpoint: Self::default_enable_endpoint(), lookback_period_hours: Self::default_lookback_period_hours(), + lookback_period_traces_hours: Self::default_lookback_period_traces_hours(), max_trace_duration_secs: Self::default_max_trace_duration_secs(), max_fetch_spans: Self::default_max_fetch_spans(), } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index b5f39ceb0ac..7bef46c60d2 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -17,11 +17,11 @@ use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; use std::time::Duration; -use anyhow::{Context, bail}; +use anyhow::{bail, Context}; use bytesize::ByteSize; use http::HeaderMap; use quickwit_common::fs::get_disk_size; -use quickwit_common::net::{Host, find_private_ip, get_short_hostname}; +use quickwit_common::net::{find_private_ip, get_short_hostname, Host}; use quickwit_common::new_coolid; use quickwit_common::uri::Uri; use quickwit_proto::types::NodeId; @@ -35,8 +35,8 @@ use crate::service::QuickwitService; use crate::storage_config::StorageConfigs; use crate::templating::render_config; use crate::{ - ConfigFormat, IndexerConfig, IngestApiConfig, JaegerConfig, MetastoreConfigs, NodeConfig, - SearcherConfig, TlsConfig, validate_identifier, validate_node_id, + validate_identifier, validate_node_id, ConfigFormat, IndexerConfig, IngestApiConfig, + JaegerConfig, MetastoreConfigs, NodeConfig, SearcherConfig, TlsConfig, }; pub const DEFAULT_CLUSTER_ID: &str = "quickwit-default-cluster"; @@ -679,6 +679,7 @@ mod tests { JaegerConfig { enable_endpoint: true, lookback_period_hours: NonZeroU64::new(24).unwrap(), + lookback_period_traces_hours: NonZeroU64::new(24).unwrap(), max_trace_duration_secs: NonZeroU64::new(600).unwrap(), max_fetch_spans: NonZeroU64::new(1_000).unwrap(), } @@ -718,10 +719,8 @@ mod tests { ) .await .unwrap_err(); - assert!( - format!("{parsing_error:?}") - .contains("unknown field `max_num_concurrent_split_searches_with_typo`") - ); + assert!(format!("{parsing_error:?}") + .contains("unknown field `max_num_concurrent_split_searches_with_typo`")); } #[tokio::test] @@ -1058,15 +1057,13 @@ mod tests { node_id: 1 metastore_uri: '' "#; - assert!( - load_node_config_with_env( - ConfigFormat::Yaml, - config_yaml.as_bytes(), - &Default::default() - ) - .await - .is_err() - ); + assert!(load_node_config_with_env( + ConfigFormat::Yaml, + config_yaml.as_bytes(), + &Default::default() + ) + .await + .is_err()); } { let config_yaml = r#" @@ -1075,15 +1072,13 @@ mod tests { metastore_uri: postgres://username:password@host:port/db default_index_root_uri: '' "#; - assert!( - load_node_config_with_env( - ConfigFormat::Yaml, - config_yaml.as_bytes(), - &Default::default() - ) - .await - .is_err() - ); + assert!(load_node_config_with_env( + ConfigFormat::Yaml, + config_yaml.as_bytes(), + &Default::default() + ) + .await + .is_err()); } } @@ -1163,11 +1158,9 @@ mod tests { max_trace_duration_secs: 0 "#; let error = serde_yaml::from_str::(jaeger_config_yaml).unwrap_err(); - assert!( - error - .to_string() - .contains("max_trace_duration_secs: invalid value: integer `0`") - ) + assert!(error + .to_string() + .contains("max_trace_duration_secs: invalid value: integer `0`")) } #[tokio::test] diff --git a/quickwit/quickwit-jaeger/src/lib.rs b/quickwit/quickwit-jaeger/src/lib.rs index 5749842655a..78dd11c3331 100644 --- a/quickwit/quickwit-jaeger/src/lib.rs +++ b/quickwit/quickwit-jaeger/src/lib.rs @@ -24,9 +24,9 @@ use prost::Message; use prost_types::{Duration as WellKnownDuration, Timestamp as WellKnownTimestamp}; use quickwit_config::JaegerConfig; use quickwit_opentelemetry::otlp::{ - Event as QwEvent, Link as QwLink, OTEL_TRACES_INDEX_ID, Span as QwSpan, SpanFingerprint, - SpanId, SpanKind as QwSpanKind, SpanStatus as QwSpanStatus, TraceId, - extract_otel_traces_index_id_patterns_from_metadata, + extract_otel_traces_index_id_patterns_from_metadata, Event as QwEvent, Link as QwLink, + Span as QwSpan, SpanFingerprint, SpanId, SpanKind as QwSpanKind, SpanStatus as QwSpanStatus, + TraceId, OTEL_TRACES_INDEX_ID, }; use quickwit_proto::jaeger::api_v2::{ KeyValue as JaegerKeyValue, Log as JaegerLog, Process as JaegerProcess, Span as JaegerSpan, @@ -40,19 +40,19 @@ use quickwit_proto::jaeger::storage::v1::{ }; use quickwit_proto::opentelemetry::proto::trace::v1::status::StatusCode as OtlpStatusCode; use quickwit_proto::search::{CountHits, ListTermsRequest, SearchRequest}; -use quickwit_query::BooleanOperand; use quickwit_query::query_ast::{BoolQuery, QueryAst, RangeQuery, TermQuery, UserInputQuery}; +use quickwit_query::BooleanOperand; use quickwit_search::{FindTraceIdsCollector, SearchService}; use serde::Deserialize; use serde_json::Value as JsonValue; use tantivy::collector::Collector; -use time::OffsetDateTime; use time::format_description::well_known::Rfc3339; +use time::OffsetDateTime; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; use tracing::field::Empty; -use tracing::{Span as RuntimeSpan, debug, error, instrument, warn}; +use tracing::{debug, error, instrument, warn, Span as RuntimeSpan}; use crate::metrics::JAEGER_SERVICE_METRICS; @@ -70,6 +70,7 @@ type SpanStream = ReceiverStream>; #[derive(Clone)] pub struct JaegerService { search_service: Arc, + lookback_period_traces_secs: i64, lookback_period_secs: i64, max_trace_duration_secs: i64, max_fetch_spans: u64, @@ -80,6 +81,7 @@ impl JaegerService { Self { search_service, lookback_period_secs: config.lookback_period().as_secs() as i64, + lookback_period_traces_secs: config.lookback_period_traces().as_secs() as i64, max_trace_duration_secs: config.max_trace_duration().as_secs() as i64, max_fetch_spans: config.max_fetch_spans.get(), } @@ -226,7 +228,7 @@ impl JaegerService { let trace_id = TraceId::try_from(request.trace_id) .map_err(|error| Status::invalid_argument(error.to_string()))?; let end = OffsetDateTime::now_utc().unix_timestamp(); - let start = end - self.lookback_period_secs; + let start = end - self.lookback_period_traces_secs; let search_window = start..=end; let response = self .stream_spans( @@ -1113,7 +1115,9 @@ fn collect_trace_ids( #[allow(clippy::result_large_err)] fn json_deserialize<'a, T>(json: &'a str, label: &'static str) -> Result -where T: Deserialize<'a> { +where + T: Deserialize<'a>, +{ match serde_json::from_str(json) { Ok(deserialized) => Ok(deserialized), Err(error) => { @@ -1127,7 +1131,9 @@ where T: Deserialize<'a> { #[allow(clippy::result_large_err)] fn postcard_deserialize<'a, T>(json: &'a [u8], label: &'static str) -> Result -where T: Deserialize<'a> { +where + T: Deserialize<'a>, +{ match postcard::from_bytes(json) { Ok(deserialized) => Ok(deserialized), Err(error) => { @@ -1141,9 +1147,9 @@ where T: Deserialize<'a> { #[cfg(test)] mod tests { - use quickwit_opentelemetry::otlp::{OTEL_TRACES_INDEX_ID_PATTERN, OtelSignal}; + use quickwit_opentelemetry::otlp::{OtelSignal, OTEL_TRACES_INDEX_ID_PATTERN}; use quickwit_proto::jaeger::api_v2::ValueType; - use quickwit_search::{MockSearchService, QuickwitAggregations, encode_term_for_test}; + use quickwit_search::{encode_term_for_test, MockSearchService, QuickwitAggregations}; use serde_json::json; use super::*; @@ -1209,13 +1215,11 @@ mod tests { min_span_duration_secs, max_span_duration_secs )), - vec![ - TermQuery { - field: "service_name".to_string(), - value: service_name.to_string(), - } - .into() - ] + vec![TermQuery { + field: "service_name".to_string(), + value: service_name.to_string(), + } + .into()] ); } { @@ -1267,13 +1271,11 @@ mod tests { min_span_duration_secs, max_span_duration_secs )), - vec![ - TermQuery { - field: "span_kind".to_string(), - value: "3".to_string(), - } - .into() - ] + vec![TermQuery { + field: "span_kind".to_string(), + value: "3".to_string(), + } + .into()] ); } { @@ -1296,13 +1298,11 @@ mod tests { min_span_duration_secs, max_span_duration_secs )), - vec![ - TermQuery { - field: "span_name".to_string(), - value: span_name.to_string(), - } - .into() - ] + vec![TermQuery { + field: "span_name".to_string(), + value: span_name.to_string(), + } + .into()] ); } { @@ -1325,13 +1325,11 @@ mod tests { min_span_duration_secs, max_span_duration_secs )), - vec![ - TermQuery { - field: "span_status.code".to_string(), - value: "error".to_string(), - } - .into(), - ], + vec![TermQuery { + field: "span_status.code".to_string(), + value: "error".to_string(), + } + .into(),], ); } { @@ -1354,13 +1352,11 @@ mod tests { min_span_duration_secs, max_span_duration_secs )), - vec![ - TermQuery { - field: "span_status.code".to_string(), - value: "error".to_string(), - } - .into(), - ], + vec![TermQuery { + field: "span_status.code".to_string(), + value: "error".to_string(), + } + .into(),], ); } { @@ -1384,29 +1380,27 @@ mod tests { min_span_duration_secs, max_span_duration_secs )), - vec![ - BoolQuery { - should: vec![ - TermQuery { - field: "resource_attributes.foo".to_string(), - value: tag_value.to_string(), - } - .into(), - TermQuery { - field: "span_attributes.foo".to_string(), - value: tag_value.to_string(), - } - .into(), - TermQuery { - field: "events.event_attributes.foo".to_string(), - value: tag_value.to_string(), - } - .into(), - ], - ..Default::default() - } - .into() - ] + vec![BoolQuery { + should: vec![ + TermQuery { + field: "resource_attributes.foo".to_string(), + value: tag_value.to_string(), + } + .into(), + TermQuery { + field: "span_attributes.foo".to_string(), + value: tag_value.to_string(), + } + .into(), + TermQuery { + field: "events.event_attributes.foo".to_string(), + value: tag_value.to_string(), + } + .into(), + ], + ..Default::default() + } + .into()] ); } { @@ -1430,13 +1424,11 @@ mod tests { min_span_duration_secs, max_span_duration_secs )), - vec![ - TermQuery { - field: "events.event_name".to_string(), - value: event_name.to_string(), - } - .into() - ] + vec![TermQuery { + field: "events.event_name".to_string(), + value: event_name.to_string(), + } + .into()] ); } { @@ -1583,14 +1575,12 @@ mod tests { min_span_duration_secs, max_span_duration_secs )), - vec![ - RangeQuery { - field: "span_start_timestamp_nanos".to_string(), - lower_bound: Bound::Included("1970-01-01T00:00:03Z".to_string().into()), - upper_bound: Bound::Unbounded - } - .into() - ] + vec![RangeQuery { + field: "span_start_timestamp_nanos".to_string(), + lower_bound: Bound::Included("1970-01-01T00:00:03Z".to_string().into()), + upper_bound: Bound::Unbounded + } + .into()] ); } { @@ -1613,14 +1603,12 @@ mod tests { min_span_duration_secs, max_span_duration_secs )), - vec![ - RangeQuery { - field: "span_start_timestamp_nanos".to_string(), - lower_bound: Bound::Unbounded, - upper_bound: Bound::Included("1970-01-01T00:00:33Z".to_string().into()), - } - .into() - ] + vec![RangeQuery { + field: "span_start_timestamp_nanos".to_string(), + lower_bound: Bound::Unbounded, + upper_bound: Bound::Included("1970-01-01T00:00:33Z".to_string().into()), + } + .into()] ); } { @@ -1643,14 +1631,12 @@ mod tests { min_span_duration_secs, max_span_duration_secs )), - vec![ - RangeQuery { - field: "span_start_timestamp_nanos".to_string(), - lower_bound: Bound::Included("1970-01-01T00:00:03Z".to_string().into()), - upper_bound: Bound::Included("1970-01-01T00:00:33Z".to_string().into()), - } - .into() - ] + vec![RangeQuery { + field: "span_start_timestamp_nanos".to_string(), + lower_bound: Bound::Included("1970-01-01T00:00:03Z".to_string().into()), + upper_bound: Bound::Included("1970-01-01T00:00:33Z".to_string().into()), + } + .into()] ); } { @@ -1673,14 +1659,12 @@ mod tests { min_span_duration_secs, max_span_duration_secs )), - vec![ - RangeQuery { - field: "span_duration_millis".to_string(), - lower_bound: Bound::Included(7u64.into()), - upper_bound: Bound::Unbounded - } - .into() - ] + vec![RangeQuery { + field: "span_duration_millis".to_string(), + lower_bound: Bound::Included(7u64.into()), + upper_bound: Bound::Unbounded + } + .into()] ); } { @@ -1703,14 +1687,12 @@ mod tests { min_span_duration_secs, max_span_duration_secs )), - vec![ - RangeQuery { - field: "span_duration_millis".to_string(), - lower_bound: Bound::Unbounded, - upper_bound: Bound::Included(77u64.into()), - } - .into() - ] + vec![RangeQuery { + field: "span_duration_millis".to_string(), + lower_bound: Bound::Unbounded, + upper_bound: Bound::Included(77u64.into()), + } + .into()] ); } { @@ -1733,14 +1715,12 @@ mod tests { min_span_duration_secs, max_span_duration_secs )), - vec![ - RangeQuery { - field: "span_duration_millis".to_string(), - lower_bound: Bound::Included(7u64.into()), - upper_bound: Bound::Included(77u64.into()), - } - .into() - ] + vec![RangeQuery { + field: "span_duration_millis".to_string(), + lower_bound: Bound::Included(7u64.into()), + upper_bound: Bound::Included(77u64.into()), + } + .into()] ); } { diff --git a/quickwit/quickwit-opentelemetry/src/otlp/mod.rs b/quickwit/quickwit-opentelemetry/src/otlp/mod.rs index 95060232805..ae17695c352 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/mod.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/mod.rs @@ -15,12 +15,12 @@ use std::collections::HashMap; use quickwit_common::rate_limited_warn; -use quickwit_config::{INGEST_V2_SOURCE_ID, validate_identifier, validate_index_id_pattern}; +use quickwit_config::{validate_identifier, validate_index_id_pattern, INGEST_V2_SOURCE_ID}; use quickwit_ingest::{CommitType, IngestServiceError}; -use quickwit_proto::ingest::DocBatchV2; use quickwit_proto::ingest::router::{ IngestRequestV2, IngestRouterService, IngestRouterServiceClient, IngestSubrequest, }; +use quickwit_proto::ingest::DocBatchV2; use quickwit_proto::opentelemetry::proto::common::v1::any_value::Value as OtlpValue; use quickwit_proto::opentelemetry::proto::common::v1::{ AnyValue as OtlpAnyValue, ArrayValue as OtlpArrayValue, KeyValue as OtlpKeyValue, @@ -36,8 +36,8 @@ mod trace_id; mod traces; pub use logs::{ - JsonLogIterator, OTEL_LOGS_INDEX_ID, OtlpGrpcLogsService, OtlpLogsError, parse_otlp_logs_json, - parse_otlp_logs_protobuf, + parse_otlp_logs_json, parse_otlp_logs_protobuf, JsonLogIterator, OtlpGrpcLogsService, + OtlpLogsError, OTEL_LOGS_INDEX_ID, }; pub use span_id::{SpanId, TryFromSpanIdError}; #[cfg(any(test, feature = "testsuite"))] @@ -45,9 +45,9 @@ pub use test_utils::make_resource_spans_for_test; use tonic::Status; pub use trace_id::{TraceId, TryFromTraceIdError}; pub use traces::{ - Event, JsonSpanIterator, Link, OTEL_TRACES_INDEX_ID, OTEL_TRACES_INDEX_ID_PATTERN, + parse_otlp_spans_json, parse_otlp_spans_protobuf, Event, JsonSpanIterator, Link, OtlpGrpcTracesService, OtlpTracesError, Span, SpanFingerprint, SpanKind, SpanStatus, - parse_otlp_spans_json, parse_otlp_spans_protobuf, + OTEL_TRACES_INDEX_ID, OTEL_TRACES_INDEX_ID_PATTERN, }; #[derive(Debug, Clone, Copy)] @@ -267,7 +267,7 @@ mod tests { use quickwit_proto::opentelemetry::proto::common::v1::{ ArrayValue as OtlpArrayValue, KeyValueList as OtlpKeyValueList, }; - use serde_json::{Value as JsonValue, json}; + use serde_json::{json, Value as JsonValue}; use super::*; use crate::otlp::{extract_attributes, oltp_value_to_json_value, parse_log_record_body}; diff --git a/quickwit/quickwit-search/src/find_trace_ids_collector.rs b/quickwit/quickwit-search/src/find_trace_ids_collector.rs index 510cb8f81d4..3cab11949f4 100644 --- a/quickwit/quickwit-search/src/find_trace_ids_collector.rs +++ b/quickwit/quickwit-search/src/find_trace_ids_collector.rs @@ -344,12 +344,16 @@ mod serde_datetime { use tantivy::DateTime; pub(crate) fn serialize(datetime: &DateTime, serializer: S) -> Result - where S: Serializer { + where + S: Serializer, + { serializer.serialize_i64(datetime.into_timestamp_nanos()) } pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result - where D: Deserializer<'de> { + where + D: Deserializer<'de>, + { let datetime_i64: i64 = Deserialize::deserialize(deserializer)?; Ok(DateTime::from_timestamp_nanos(datetime_i64)) } @@ -357,8 +361,8 @@ mod serde_datetime { #[cfg(test)] mod tests { - use tantivy::DateTime; use tantivy::time::OffsetDateTime; + use tantivy::DateTime; use super::*; use crate::collector::QuickwitAggregations;