From 2925777ddddc5972f81c6482876b1f1d7a9507cc Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Fri, 12 Sep 2025 10:10:24 -0400 Subject: [PATCH] Don't set doc count error to 0 when batched reduction occurred --- .../aggregations/bucket/TermsDocCountErrorIT.java | 15 --------------- .../action/search/QueryPhaseResultConsumer.java | 10 ++++++---- .../aggregations/AggregationReduceContext.java | 13 ++++++++++++- .../bucket/terms/AbstractInternalTerms.java | 5 ++++- 4 files changed, 22 insertions(+), 21 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java index a180674ba2378..a6c01852e2f16 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java @@ -13,15 +13,12 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode; import org.elasticsearch.test.ESIntegTestCase; -import org.junit.After; -import org.junit.Before; import java.io.IOException; import java.util.ArrayList; @@ -53,18 +50,6 @@ public static String randomExecutionHint() { private static int numRoutingValues; - @Before - public void disableBatchedExecution() { - // TODO: it's practically impossible to get a 100% deterministic test with batched execution unfortunately, adjust this test to - // still do something useful with batched execution (i.e. use somewhat relaxed assertions) - updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false)); - } - - @After - public void resetSettings() { - updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey())); - } - @Override public void setupSuiteScopeCluster() throws Exception { assertAcked(indicesAdmin().prepareCreate("idx").setMapping(STRING_FIELD_NAME, "type=keyword").get()); diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java index ec63d38616153..fb12fc0b560bc 100644 --- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java @@ -220,6 +220,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { batchedResults = this.batchedResults; } final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1) + batchedResults.size(); + final boolean hasBatchedResults = batchedResults.isEmpty() == false; final List topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null; final Deque> aggsList = hasAggs ? new ArrayDeque<>(resultSize) : null; // consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level @@ -247,6 +248,10 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { if (aggsList != null) { // Add an estimate of the final reduce size breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(breakerSize)); + AggregationReduceContext aggReduceContext = performFinalReduce + ? aggReduceContextBuilder.forFinalReduction() + : aggReduceContextBuilder.forPartialReduction(); + aggReduceContext.setFinalReduceHasBatchedResult(hasBatchedResults); aggs = aggregate(buffer.iterator(), new Iterator<>() { @Override public boolean hasNext() { @@ -257,10 +262,7 @@ public boolean hasNext() { public DelayableWriteable next() { return aggsList.pollFirst(); } - }, - resultSize, - performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction() - ); + }, resultSize, aggReduceContext); } else { aggs = null; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationReduceContext.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationReduceContext.java index fbfffd21fef93..39ad713fea7eb 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationReduceContext.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationReduceContext.java @@ -47,6 +47,7 @@ public interface Builder { @Nullable private final AggregationBuilder builder; private final AggregatorFactories.Builder subBuilders; + private boolean finalReduceHasBatchedResult; private AggregationReduceContext( BigArrays bigArrays, @@ -136,6 +137,14 @@ public final AggregationReduceContext forAgg(String name) { protected abstract AggregationReduceContext forSubAgg(AggregationBuilder sub); + public boolean doesFinalReduceHaveBatchedResult() { + return finalReduceHasBatchedResult; + } + + public void setFinalReduceHasBatchedResult(boolean finalReduceHasBatchedResult) { + this.finalReduceHasBatchedResult = finalReduceHasBatchedResult; + } + /** * A {@linkplain AggregationReduceContext} to perform a partial reduction. */ @@ -234,7 +243,9 @@ public PipelineTree pipelineTreeRoot() { @Override protected AggregationReduceContext forSubAgg(AggregationBuilder sub) { - return new ForFinal(bigArrays(), scriptService(), isCanceled(), sub, multiBucketConsumer, pipelineTreeRoot); + ForFinal subContext = new ForFinal(bigArrays(), scriptService(), isCanceled(), sub, multiBucketConsumer, pipelineTreeRoot); + subContext.setFinalReduceHasBatchedResult(doesFinalReduceHaveBatchedResult()); + return subContext; } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java index 1892cc8859639..09ecbb4098dc3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java @@ -332,7 +332,10 @@ public InternalAggregation get() { } long docCountError = -1; if (sumDocCountError != -1) { - docCountError = size == 1 ? 0 : sumDocCountError; + // If we are reducing only one aggregation (size == 1), the doc count error should be 0. + // However, the presence of a batched query result implies this is a final reduction and a partial reduction with size > 1 + // has already occurred on a data node. The doc count error should not be 0 in this case. + docCountError = size == 1 && reduceContext.doesFinalReduceHaveBatchedResult() == false ? 0 : sumDocCountError; } return create(name, result, reduceContext.isFinalReduce() ? getOrder() : thisReduceOrder, docCountError, otherDocCount); }