diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 0e6b3517963..8e3b4925101 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -860,8 +860,10 @@ impl Collector for QuickwitCollector { } } -fn map_error(err: postcard::Error) -> TantivyError { - TantivyError::InternalError(format!("merge result Postcard error: {err}")) +fn map_error(error: postcard::Error) -> TantivyError { + TantivyError::InternalError(format!( + "failed to merge intermediate aggregation results: Postcard error: {error}" + )) } /// Merges a set of Leaf Results. @@ -883,24 +885,24 @@ fn merge_intermediate_aggregation_result<'a>( Some(serialized) } Some(QuickwitAggregations::TantivyAggregations(_)) => { - let fruits: Vec = intermediate_aggregation_results - .map(|intermediate_aggregation_result| { - postcard::from_bytes(intermediate_aggregation_result).map_err(map_error) - }) - .collect::>()?; - - let mut fruit_iter = fruits.into_iter(); - if let Some(first_fruit) = fruit_iter.next() { - let mut merged_fruit = first_fruit; - for fruit in fruit_iter { - merged_fruit.merge_fruits(fruit)?; - } - let serialized = postcard::to_allocvec(&merged_fruit).map_err(map_error)?; - - Some(serialized) - } else { - None - } + let merged_opt = intermediate_aggregation_results + .map(|bytes| postcard::from_bytes(bytes).map_err(map_error)) + .try_fold::<_, _, Result<_, TantivyError>>( + None, + |acc: Option, fruits_res| { + let fruits = fruits_res?; + match acc { + Some(mut merged_fruits) => { + merged_fruits.merge_fruits(fruits)?; + Ok(Some(merged_fruits)) + } + None => Ok(Some(fruits)), + } + }, + )?; + let serialized = + postcard::to_allocvec(&merged_opt.unwrap_or_default()).map_err(map_error)?; + Some(serialized) } None => None, }; @@ -1293,10 +1295,13 @@ mod tests { SortOrder, SortValue, SplitSearchError, }; use tantivy::TantivyDocument; + use tantivy::aggregation::agg_req::Aggregations; + use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults; use tantivy::collector::Collector; use super::{IncrementalCollector, make_merge_collector}; - use crate::collector::top_k_partial_hits; + use crate::QuickwitAggregations; + use crate::collector::{merge_intermediate_aggregation_result, top_k_partial_hits}; #[test] fn test_merge_partial_hits_no_tie() { @@ -2002,4 +2007,22 @@ mod tests { ); // TODO would be nice to test aggregation too. } + + #[test] + fn test_merge_empty_intermediate_aggregation_result() { + let merged = merge_intermediate_aggregation_result(&None, std::iter::empty()).unwrap(); + assert!(merged.is_none()); + + let aggregations_json = r#"{ + "avg_price": { "avg": { "field": "price" } } + }"#; + let ttv_aggregations: Aggregations = serde_json::from_str(aggregations_json).unwrap(); + let qw_aggregations = QuickwitAggregations::TantivyAggregations(ttv_aggregations); + let serialized = + merge_intermediate_aggregation_result(&Some(qw_aggregations), std::iter::empty()) + .unwrap() + .unwrap(); + let _merged: IntermediateAggregationResults = postcard::from_bytes(&serialized).unwrap(); + // Hopefully `_merged` is empty but the API does not allow us to assert that. + } }