Skip to content

Commit 1f4abac

Browse files
authored
Centralize telemetry listener wrapping in TransportSearchAction (#134625)
TransportSearchAction collects telemetry around CCS features used as well as took time of a search request. It does that via a action listener wrapping. The open PIT API uses TransportSearchAction for its execution, yet it does not wrap the listener in the same way, as search telemetry should not be affected by it. As a result, we have a TelemetryListener interface, with some instanceof checks in the code and a single implementation of such interface that is the action listener wrapper itself, which allows to record the telemetry on response. This can be simplified by moving the listener wrapping to the TransportSearchAction code, and specialize a little the open PIT path so that telemetry can still be disabled for it. Along with the simplifications, this change allows us to delay the wrapping of the listener, which will allow us to grab the resolved indices in #134232 .
1 parent 2483ef0 commit 1f4abac

File tree

3 files changed

+150
-112
lines changed

3 files changed

+150
-112
lines changed

server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen
121121
.source(new SearchSourceBuilder().query(request.indexFilter()));
122122
searchRequest.setMaxConcurrentShardRequests(request.maxConcurrentShardRequests());
123123
searchRequest.setCcsMinimizeRoundtrips(false);
124-
transportSearchAction.executeRequest((SearchTask) task, searchRequest, listener.map(r -> {
124+
125+
transportSearchAction.executeOpenPit((SearchTask) task, searchRequest, listener.map(r -> {
125126
assert r.pointInTimeId() != null : r;
126127
return new OpenPointInTimeResponse(
127128
r.pointInTimeId(),

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 120 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
167167
private final SearchResponseMetrics searchResponseMetrics;
168168
private final Client client;
169169
private final UsageService usageService;
170-
private final boolean collectTelemetry;
170+
private final boolean collectCCSTelemetry;
171171
private final TimeValue forceConnectTimeoutSecs;
172172

173173
@Inject
@@ -213,7 +213,7 @@ public TransportSearchAction(
213213
var settings = clusterService.getSettings();
214214
this.defaultPreFilterShardSize = DEFAULT_PRE_FILTER_SHARD_SIZE.get(settings);
215215
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(settings);
216-
this.collectTelemetry = SearchService.CCS_COLLECT_TELEMETRY.get(settings);
216+
this.collectCCSTelemetry = SearchService.CCS_COLLECT_TELEMETRY.get(settings);
217217
this.searchResponseMetrics = searchResponseMetrics;
218218
this.client = client;
219219
this.usageService = usageService;
@@ -333,14 +333,24 @@ public long buildTookInMillis() {
333333

334334
@Override
335335
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
336-
executeRequest((SearchTask) task, searchRequest, new SearchResponseActionListener(listener), AsyncSearchActionProvider::new);
336+
executeRequest((SearchTask) task, searchRequest, listener, AsyncSearchActionProvider::new, true);
337337
}
338338

339-
void executeRequest(
339+
void executeOpenPit(
340340
SearchTask task,
341341
SearchRequest original,
342-
ActionListener<SearchResponse> listener,
342+
ActionListener<SearchResponse> originalListener,
343343
Function<ActionListener<SearchResponse>, SearchPhaseProvider> searchPhaseProvider
344+
) {
345+
executeRequest(task, original, originalListener, searchPhaseProvider, false);
346+
}
347+
348+
private void executeRequest(
349+
SearchTask task,
350+
SearchRequest original,
351+
ActionListener<SearchResponse> originalListener,
352+
Function<ActionListener<SearchResponse>, SearchPhaseProvider> searchPhaseProvider,
353+
boolean collectSearchTelemetry
344354
) {
345355
final long relativeStartNanos = System.nanoTime();
346356
final SearchTimeProvider timeProvider = new SearchTimeProvider(
@@ -372,48 +382,93 @@ void executeRequest(
372382
frozenIndexCheck(resolvedIndices);
373383
}
374384

375-
ActionListener<SearchRequest> rewriteListener = listener.delegateFailureAndWrap((delegate, rewritten) -> {
385+
final SearchSourceBuilder source = original.source();
386+
if (shouldOpenPIT(source)) {
387+
// disabling shard reordering for request
388+
original.setPreFilterShardSize(Integer.MAX_VALUE);
389+
openPIT(
390+
client,
391+
original,
392+
searchService.getDefaultKeepAliveInMillis(),
393+
originalListener.delegateFailureAndWrap((delegate, resp) -> {
394+
// We set the keep alive to -1 to indicate that we don't need the pit id in the response.
395+
// This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore.
396+
source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE));
397+
var pitListener = new ActionListener<SearchResponse>() {
398+
@Override
399+
public void onResponse(SearchResponse response) {
400+
// we need to close the PIT first so we delay the release of the response to after the closing
401+
response.incRef();
402+
closePIT(
403+
client,
404+
original.source().pointInTimeBuilder(),
405+
() -> ActionListener.respondAndRelease(delegate, response)
406+
);
407+
}
408+
409+
@Override
410+
public void onFailure(Exception e) {
411+
closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e));
412+
}
413+
};
414+
executeRequest(task, original, pitListener, searchPhaseProvider, true);
415+
})
416+
);
417+
return;
418+
}
419+
420+
ActionListener<SearchRequest> rewriteListener = originalListener.delegateFailureAndWrap((delegate, rewritten) -> {
376421
if (ccsCheckCompatibility) {
377422
checkCCSVersionCompatibility(rewritten);
378423
}
379424

380-
if (resolvedIndices.getRemoteClusterIndices().isEmpty()) {
381-
executeLocalSearch(
382-
task,
383-
timeProvider,
384-
rewritten,
385-
resolvedIndices,
386-
projectState,
387-
SearchResponse.Clusters.EMPTY,
388-
searchPhaseProvider.apply(delegate)
389-
);
390-
} else {
391-
if (delegate instanceof TelemetryListener tl) {
392-
tl.setRemotes(resolvedIndices.getRemoteClusterIndices().size());
425+
final ActionListener<SearchResponse> searchResponseActionListener;
426+
if (collectSearchTelemetry) {
427+
if (collectCCSTelemetry == false || resolvedIndices.getRemoteClusterIndices().isEmpty()) {
428+
searchResponseActionListener = new SearchTelemetryListener(delegate, searchResponseMetrics);
429+
} else {
430+
CCSUsage.Builder usageBuilder = new CCSUsage.Builder();
431+
usageBuilder.setRemotesCount(resolvedIndices.getRemoteClusterIndices().size());
432+
usageBuilder.setClientFromTask(task);
393433
if (task.isAsync()) {
394-
tl.setFeature(CCSUsageTelemetry.ASYNC_FEATURE);
434+
usageBuilder.setFeature(CCSUsageTelemetry.ASYNC_FEATURE);
395435
}
396436
if (original.pointInTimeBuilder() != null) {
397-
tl.setFeature(CCSUsageTelemetry.PIT_FEATURE);
437+
usageBuilder.setFeature(CCSUsageTelemetry.PIT_FEATURE);
398438
}
399-
tl.setClient(task);
400439
// Check if any of the index patterns are wildcard patterns
401440
var localIndices = resolvedIndices.getLocalIndices();
402441
if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) {
403-
tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
442+
usageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
404443
}
405444
if (resolvedIndices.getRemoteClusterIndices()
406445
.values()
407446
.stream()
408447
.anyMatch(indices -> Arrays.stream(indices.indices()).anyMatch(Regex::isSimpleMatchPattern))) {
409-
tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
448+
usageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
449+
}
450+
if (shouldMinimizeRoundtrips(rewritten)) {
451+
usageBuilder.setFeature(CCSUsageTelemetry.MRT_FEATURE);
410452
}
453+
searchResponseActionListener = new SearchTelemetryListener(delegate, searchResponseMetrics, usageService, usageBuilder);
411454
}
455+
} else {
456+
searchResponseActionListener = delegate;
457+
}
458+
459+
if (resolvedIndices.getRemoteClusterIndices().isEmpty()) {
460+
executeLocalSearch(
461+
task,
462+
timeProvider,
463+
rewritten,
464+
resolvedIndices,
465+
projectState,
466+
SearchResponse.Clusters.EMPTY,
467+
searchPhaseProvider.apply(searchResponseActionListener)
468+
);
469+
} else {
412470
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId();
413471
if (shouldMinimizeRoundtrips(rewritten)) {
414-
if (delegate instanceof TelemetryListener tl) {
415-
tl.setFeature(CCSUsageTelemetry.MRT_FEATURE);
416-
}
417472
final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null
418473
&& rewritten.source().aggregations() != null
419474
? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations())
@@ -439,7 +494,7 @@ void executeRequest(
439494
aggregationReduceContextBuilder,
440495
remoteClusterService,
441496
threadPool,
442-
delegate,
497+
searchResponseActionListener,
443498
(r, l) -> executeLocalSearch(
444499
task,
445500
timeProvider,
@@ -473,7 +528,7 @@ void executeRequest(
473528
clusters,
474529
timeProvider,
475530
transportService,
476-
delegate.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> {
531+
searchResponseActionListener.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> {
477532
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup(
478533
searchShardsResponses
479534
);
@@ -517,49 +572,20 @@ void executeRequest(
517572
}
518573
});
519574

520-
final SearchSourceBuilder source = original.source();
521575
final boolean isExplain = source != null && source.explain() != null && source.explain();
522-
if (shouldOpenPIT(source)) {
523-
// disabling shard reordering for request
524-
original.setPreFilterShardSize(Integer.MAX_VALUE);
525-
openPIT(client, original, searchService.getDefaultKeepAliveInMillis(), listener.delegateFailureAndWrap((delegate, resp) -> {
526-
// We set the keep alive to -1 to indicate that we don't need the pit id in the response.
527-
// This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore.
528-
source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE));
529-
var pitListener = new SearchResponseActionListener(delegate) {
530-
@Override
531-
public void onResponse(SearchResponse response) {
532-
// we need to close the PIT first so we delay the release of the response to after the closing
533-
response.incRef();
534-
closePIT(
535-
client,
536-
original.source().pointInTimeBuilder(),
537-
() -> ActionListener.respondAndRelease(delegate, response)
538-
);
539-
}
540-
541-
@Override
542-
public void onFailure(Exception e) {
543-
closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e));
544-
}
545-
};
546-
executeRequest(task, original, pitListener, searchPhaseProvider);
547-
}));
548-
} else {
549-
Rewriteable.rewriteAndFetch(
550-
original,
551-
searchService.getRewriteContext(
552-
timeProvider::absoluteStartMillis,
553-
clusterState.getMinTransportVersion(),
554-
original.getLocalClusterAlias(),
555-
resolvedIndices,
556-
original.pointInTimeBuilder(),
557-
shouldMinimizeRoundtrips(original),
558-
isExplain
559-
),
560-
rewriteListener
561-
);
562-
}
576+
Rewriteable.rewriteAndFetch(
577+
original,
578+
searchService.getRewriteContext(
579+
timeProvider::absoluteStartMillis,
580+
clusterState.getMinTransportVersion(),
581+
original.getLocalClusterAlias(),
582+
resolvedIndices,
583+
original.pointInTimeBuilder(),
584+
shouldMinimizeRoundtrips(original),
585+
isExplain
586+
),
587+
rewriteListener
588+
);
563589
}
564590

565591
/**
@@ -2001,49 +2027,34 @@ static String[] ignoreBlockedIndices(ProjectState projectState, String[] concret
20012027
.toArray(String[]::new);
20022028
}
20032029
return concreteIndices;
2004-
}
2005-
2006-
private interface TelemetryListener {
2007-
void setRemotes(int count);
2008-
2009-
void setFeature(String feature);
20102030

2011-
void setClient(Task task);
20122031
}
20132032

2014-
private class SearchResponseActionListener extends DelegatingActionListener<SearchResponse, SearchResponse>
2015-
implements
2016-
TelemetryListener {
2033+
private static class SearchTelemetryListener extends DelegatingActionListener<SearchResponse, SearchResponse> {
20172034
private final CCSUsage.Builder usageBuilder;
2018-
2019-
SearchResponseActionListener(ActionListener<SearchResponse> listener) {
2035+
private final SearchResponseMetrics searchResponseMetrics;
2036+
private final UsageService usageService;
2037+
private final boolean collectCCSTelemetry;
2038+
2039+
SearchTelemetryListener(
2040+
ActionListener<SearchResponse> listener,
2041+
SearchResponseMetrics searchResponseMetrics,
2042+
UsageService usageService,
2043+
CCSUsage.Builder usageBuilder
2044+
) {
20202045
super(listener);
2021-
if (listener instanceof SearchResponseActionListener srListener) {
2022-
usageBuilder = srListener.usageBuilder;
2023-
} else {
2024-
usageBuilder = new CCSUsage.Builder();
2025-
}
2026-
}
2027-
2028-
/**
2029-
* Should we collect telemetry for this search?
2030-
*/
2031-
private boolean collectTelemetry() {
2032-
return collectTelemetry && usageBuilder.getRemotesCount() > 0;
2033-
}
2034-
2035-
public void setRemotes(int count) {
2036-
usageBuilder.setRemotesCount(count);
2037-
}
2038-
2039-
@Override
2040-
public void setFeature(String feature) {
2041-
usageBuilder.setFeature(feature);
2046+
this.searchResponseMetrics = searchResponseMetrics;
2047+
this.collectCCSTelemetry = true;
2048+
this.usageService = usageService;
2049+
this.usageBuilder = usageBuilder;
20422050
}
20432051

2044-
@Override
2045-
public void setClient(Task task) {
2046-
usageBuilder.setClientFromTask(task);
2052+
SearchTelemetryListener(ActionListener<SearchResponse> listener, SearchResponseMetrics searchResponseMetrics) {
2053+
super(listener);
2054+
this.searchResponseMetrics = searchResponseMetrics;
2055+
this.collectCCSTelemetry = false;
2056+
this.usageService = null;
2057+
this.usageBuilder = null;
20472058
}
20482059

20492060
@Override
@@ -2069,7 +2080,7 @@ public void onResponse(SearchResponse searchResponse) {
20692080
}
20702081
searchResponseMetrics.incrementResponseCount(responseCountTotalStatus);
20712082

2072-
if (collectTelemetry()) {
2083+
if (collectCCSTelemetry) {
20732084
extractCCSTelemetry(searchResponse);
20742085
recordTelemetry();
20752086
}
@@ -2084,7 +2095,7 @@ public void onResponse(SearchResponse searchResponse) {
20842095
@Override
20852096
public void onFailure(Exception e) {
20862097
searchResponseMetrics.incrementResponseCount(SearchResponseMetrics.ResponseCountTotalStatus.FAILURE);
2087-
if (collectTelemetry()) {
2098+
if (collectCCSTelemetry) {
20882099
usageBuilder.setFailure(e);
20892100
recordTelemetry();
20902101
}
@@ -2109,8 +2120,6 @@ private void extractCCSTelemetry(SearchResponse searchResponse) {
21092120
usageBuilder.perClusterUsage(clusterAlias, cluster.getTook());
21102121
}
21112122
}
2112-
21132123
}
2114-
21152124
}
21162125
}

server/src/test/java/org/elasticsearch/search/TelemetryMetrics/SearchTookTimeTelemetryTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@
1616
import org.elasticsearch.cluster.metadata.IndexMetadata;
1717
import org.elasticsearch.common.settings.Settings;
1818
import org.elasticsearch.core.TimeValue;
19+
import org.elasticsearch.index.query.MatchAllQueryBuilder;
1920
import org.elasticsearch.plugins.Plugin;
2021
import org.elasticsearch.plugins.PluginsService;
2122
import org.elasticsearch.search.builder.SearchSourceBuilder;
23+
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
24+
import org.elasticsearch.search.retriever.RescorerRetrieverBuilder;
25+
import org.elasticsearch.search.retriever.StandardRetrieverBuilder;
2226
import org.elasticsearch.telemetry.Measurement;
2327
import org.elasticsearch.telemetry.TestTelemetryPlugin;
2428
import org.elasticsearch.test.ESSingleNodeTestCase;
29+
import org.hamcrest.Matchers;
2530
import org.junit.After;
2631
import org.junit.Before;
2732

@@ -85,6 +90,29 @@ public void testSimpleQuery() {
8590
assertEquals(searchResponse.getTook().millis(), measurements.getFirst().getLong());
8691
}
8792

93+
public void testCompoundRetriever() {
94+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
95+
searchSourceBuilder.retriever(
96+
new RescorerRetrieverBuilder(
97+
new StandardRetrieverBuilder(new MatchAllQueryBuilder()),
98+
List.of(new QueryRescorerBuilder(new MatchAllQueryBuilder()))
99+
)
100+
);
101+
SearchResponse searchResponse = client().prepareSearch(indexName).setSource(searchSourceBuilder).get();
102+
try {
103+
assertNoFailures(searchResponse);
104+
assertSearchHits(searchResponse, "1", "2");
105+
} finally {
106+
searchResponse.decRef();
107+
}
108+
109+
List<Measurement> measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(TOOK_DURATION_TOTAL_HISTOGRAM_NAME);
110+
// compound retriever does its own search as an async action, whose took time is recorded separately
111+
assertEquals(2, measurements.size());
112+
assertThat(measurements.getFirst().getLong(), Matchers.lessThan(searchResponse.getTook().millis()));
113+
assertEquals(searchResponse.getTook().millis(), measurements.getLast().getLong());
114+
}
115+
88116
public void testMultiSearch() {
89117
MultiSearchRequestBuilder multiSearchRequestBuilder = client().prepareMultiSearch();
90118
int numSearchRequests = randomIntBetween(3, 10);

0 commit comments

Comments
 (0)