-
Notifications
You must be signed in to change notification settings - Fork 25.5k
Centralize telemetry listener wrapping in TransportSearchAction #134625
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c07deb2
a44bb23
e64a7b1
0b9936f
176d7bf
9b9dad8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -167,7 +167,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest, | |
private final SearchResponseMetrics searchResponseMetrics; | ||
private final Client client; | ||
private final UsageService usageService; | ||
private final boolean collectTelemetry; | ||
private final boolean collectCCSTelemetry; | ||
private final TimeValue forceConnectTimeoutSecs; | ||
|
||
@Inject | ||
|
@@ -213,7 +213,7 @@ public TransportSearchAction( | |
var settings = clusterService.getSettings(); | ||
this.defaultPreFilterShardSize = DEFAULT_PRE_FILTER_SHARD_SIZE.get(settings); | ||
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(settings); | ||
this.collectTelemetry = SearchService.CCS_COLLECT_TELEMETRY.get(settings); | ||
this.collectCCSTelemetry = SearchService.CCS_COLLECT_TELEMETRY.get(settings); | ||
this.searchResponseMetrics = searchResponseMetrics; | ||
this.client = client; | ||
this.usageService = usageService; | ||
|
@@ -333,14 +333,24 @@ public long buildTookInMillis() { | |
|
||
@Override | ||
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) { | ||
executeRequest((SearchTask) task, searchRequest, new SearchResponseActionListener(listener), AsyncSearchActionProvider::new); | ||
executeRequest((SearchTask) task, searchRequest, listener, AsyncSearchActionProvider::new, true); | ||
} | ||
|
||
void executeRequest( | ||
void executeOpenPit( | ||
SearchTask task, | ||
SearchRequest original, | ||
ActionListener<SearchResponse> listener, | ||
ActionListener<SearchResponse> originalListener, | ||
Function<ActionListener<SearchResponse>, SearchPhaseProvider> searchPhaseProvider | ||
) { | ||
executeRequest(task, original, originalListener, searchPhaseProvider, false); | ||
} | ||
|
||
private void executeRequest( | ||
SearchTask task, | ||
SearchRequest original, | ||
ActionListener<SearchResponse> originalListener, | ||
Function<ActionListener<SearchResponse>, SearchPhaseProvider> searchPhaseProvider, | ||
boolean collectSearchTelemetry | ||
) { | ||
final long relativeStartNanos = System.nanoTime(); | ||
final SearchTimeProvider timeProvider = new SearchTimeProvider( | ||
|
@@ -372,48 +382,93 @@ void executeRequest( | |
frozenIndexCheck(resolvedIndices); | ||
} | ||
|
||
ActionListener<SearchRequest> rewriteListener = listener.delegateFailureAndWrap((delegate, rewritten) -> { | ||
final SearchSourceBuilder source = original.source(); | ||
if (shouldOpenPIT(source)) { | ||
// disabling shard reordering for request | ||
original.setPreFilterShardSize(Integer.MAX_VALUE); | ||
openPIT( | ||
client, | ||
original, | ||
searchService.getDefaultKeepAliveInMillis(), | ||
originalListener.delegateFailureAndWrap((delegate, resp) -> { | ||
// We set the keep alive to -1 to indicate that we don't need the pit id in the response. | ||
// This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore. | ||
source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE)); | ||
var pitListener = new ActionListener<SearchResponse>() { | ||
@Override | ||
public void onResponse(SearchResponse response) { | ||
// we need to close the PIT first so we delay the release of the response to after the closing | ||
response.incRef(); | ||
closePIT( | ||
client, | ||
original.source().pointInTimeBuilder(), | ||
() -> ActionListener.respondAndRelease(delegate, response) | ||
); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e)); | ||
} | ||
}; | ||
executeRequest(task, original, pitListener, searchPhaseProvider, true); | ||
}) | ||
); | ||
return; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the only change in this block is no longer using |
||
|
||
ActionListener<SearchRequest> rewriteListener = originalListener.delegateFailureAndWrap((delegate, rewritten) -> { | ||
if (ccsCheckCompatibility) { | ||
checkCCSVersionCompatibility(rewritten); | ||
} | ||
|
||
if (resolvedIndices.getRemoteClusterIndices().isEmpty()) { | ||
executeLocalSearch( | ||
task, | ||
timeProvider, | ||
rewritten, | ||
resolvedIndices, | ||
projectState, | ||
SearchResponse.Clusters.EMPTY, | ||
searchPhaseProvider.apply(delegate) | ||
); | ||
} else { | ||
if (delegate instanceof TelemetryListener tl) { | ||
tl.setRemotes(resolvedIndices.getRemoteClusterIndices().size()); | ||
final ActionListener<SearchResponse> searchResponseActionListener; | ||
if (collectSearchTelemetry) { | ||
if (collectCCSTelemetry == false || resolvedIndices.getRemoteClusterIndices().isEmpty()) { | ||
searchResponseActionListener = new SearchTelemetryListener(delegate, searchResponseMetrics); | ||
} else { | ||
CCSUsage.Builder usageBuilder = new CCSUsage.Builder(); | ||
usageBuilder.setRemotesCount(resolvedIndices.getRemoteClusterIndices().size()); | ||
usageBuilder.setClientFromTask(task); | ||
if (task.isAsync()) { | ||
tl.setFeature(CCSUsageTelemetry.ASYNC_FEATURE); | ||
usageBuilder.setFeature(CCSUsageTelemetry.ASYNC_FEATURE); | ||
} | ||
if (original.pointInTimeBuilder() != null) { | ||
tl.setFeature(CCSUsageTelemetry.PIT_FEATURE); | ||
usageBuilder.setFeature(CCSUsageTelemetry.PIT_FEATURE); | ||
} | ||
tl.setClient(task); | ||
// Check if any of the index patterns are wildcard patterns | ||
var localIndices = resolvedIndices.getLocalIndices(); | ||
if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) { | ||
tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); | ||
usageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); | ||
} | ||
if (resolvedIndices.getRemoteClusterIndices() | ||
.values() | ||
.stream() | ||
.anyMatch(indices -> Arrays.stream(indices.indices()).anyMatch(Regex::isSimpleMatchPattern))) { | ||
tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); | ||
usageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); | ||
} | ||
if (shouldMinimizeRoundtrips(rewritten)) { | ||
usageBuilder.setFeature(CCSUsageTelemetry.MRT_FEATURE); | ||
} | ||
searchResponseActionListener = new SearchTelemetryListener(delegate, searchResponseMetrics, usageService, usageBuilder); | ||
} | ||
} else { | ||
searchResponseActionListener = delegate; | ||
} | ||
|
||
if (resolvedIndices.getRemoteClusterIndices().isEmpty()) { | ||
executeLocalSearch( | ||
task, | ||
timeProvider, | ||
rewritten, | ||
resolvedIndices, | ||
projectState, | ||
SearchResponse.Clusters.EMPTY, | ||
searchPhaseProvider.apply(searchResponseActionListener) | ||
); | ||
} else { | ||
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId(); | ||
if (shouldMinimizeRoundtrips(rewritten)) { | ||
if (delegate instanceof TelemetryListener tl) { | ||
tl.setFeature(CCSUsageTelemetry.MRT_FEATURE); | ||
} | ||
final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null | ||
&& rewritten.source().aggregations() != null | ||
? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations()) | ||
|
@@ -439,7 +494,7 @@ void executeRequest( | |
aggregationReduceContextBuilder, | ||
remoteClusterService, | ||
threadPool, | ||
delegate, | ||
searchResponseActionListener, | ||
(r, l) -> executeLocalSearch( | ||
task, | ||
timeProvider, | ||
|
@@ -473,7 +528,7 @@ void executeRequest( | |
clusters, | ||
timeProvider, | ||
transportService, | ||
delegate.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> { | ||
searchResponseActionListener.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> { | ||
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup( | ||
searchShardsResponses | ||
); | ||
|
@@ -517,49 +572,20 @@ void executeRequest( | |
} | ||
}); | ||
|
||
final SearchSourceBuilder source = original.source(); | ||
final boolean isExplain = source != null && source.explain() != null && source.explain(); | ||
if (shouldOpenPIT(source)) { | ||
// disabling shard reordering for request | ||
original.setPreFilterShardSize(Integer.MAX_VALUE); | ||
openPIT(client, original, searchService.getDefaultKeepAliveInMillis(), listener.delegateFailureAndWrap((delegate, resp) -> { | ||
// We set the keep alive to -1 to indicate that we don't need the pit id in the response. | ||
// This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore. | ||
source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE)); | ||
var pitListener = new SearchResponseActionListener(delegate) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One thing I notice here that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's a good observation. Yet There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also moved up the shouldOpenPIT conditional branch, as it can quickly shortcut execution as opposed to creating the rewriteListener for nothing when a PIT needs to be opened. I think this improved readability, let me know. |
||
@Override | ||
public void onResponse(SearchResponse response) { | ||
// we need to close the PIT first so we delay the release of the response to after the closing | ||
response.incRef(); | ||
closePIT( | ||
client, | ||
original.source().pointInTimeBuilder(), | ||
() -> ActionListener.respondAndRelease(delegate, response) | ||
); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e)); | ||
} | ||
}; | ||
executeRequest(task, original, pitListener, searchPhaseProvider); | ||
})); | ||
} else { | ||
Rewriteable.rewriteAndFetch( | ||
original, | ||
searchService.getRewriteContext( | ||
timeProvider::absoluteStartMillis, | ||
clusterState.getMinTransportVersion(), | ||
original.getLocalClusterAlias(), | ||
resolvedIndices, | ||
original.pointInTimeBuilder(), | ||
shouldMinimizeRoundtrips(original), | ||
isExplain | ||
), | ||
rewriteListener | ||
); | ||
} | ||
Rewriteable.rewriteAndFetch( | ||
original, | ||
searchService.getRewriteContext( | ||
timeProvider::absoluteStartMillis, | ||
clusterState.getMinTransportVersion(), | ||
original.getLocalClusterAlias(), | ||
resolvedIndices, | ||
original.pointInTimeBuilder(), | ||
shouldMinimizeRoundtrips(original), | ||
isExplain | ||
), | ||
rewriteListener | ||
); | ||
} | ||
|
||
/** | ||
|
@@ -2001,49 +2027,34 @@ static String[] ignoreBlockedIndices(ProjectState projectState, String[] concret | |
.toArray(String[]::new); | ||
} | ||
return concreteIndices; | ||
} | ||
|
||
private interface TelemetryListener { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given I moved the wrapping inside of executeSearch, this interface is no longer required. It was there for open PIT only as it provides a different type of listener. |
||
void setRemotes(int count); | ||
|
||
void setFeature(String feature); | ||
|
||
void setClient(Task task); | ||
} | ||
|
||
private class SearchResponseActionListener extends DelegatingActionListener<SearchResponse, SearchResponse> | ||
implements | ||
TelemetryListener { | ||
private static class SearchTelemetryListener extends DelegatingActionListener<SearchResponse, SearchResponse> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made this static, it feels much better to be able to reason about its dependencies and not have it be tied to TransportSearchAction's lifecycle. |
||
private final CCSUsage.Builder usageBuilder; | ||
|
||
SearchResponseActionListener(ActionListener<SearchResponse> listener) { | ||
private final SearchResponseMetrics searchResponseMetrics; | ||
private final UsageService usageService; | ||
private final boolean collectCCSTelemetry; | ||
|
||
SearchTelemetryListener( | ||
ActionListener<SearchResponse> listener, | ||
SearchResponseMetrics searchResponseMetrics, | ||
UsageService usageService, | ||
CCSUsage.Builder usageBuilder | ||
) { | ||
super(listener); | ||
if (listener instanceof SearchResponseActionListener srListener) { | ||
usageBuilder = srListener.usageBuilder; | ||
} else { | ||
usageBuilder = new CCSUsage.Builder(); | ||
} | ||
} | ||
|
||
/** | ||
* Should we collect telemetry for this search? | ||
*/ | ||
private boolean collectTelemetry() { | ||
return collectTelemetry && usageBuilder.getRemotesCount() > 0; | ||
} | ||
|
||
public void setRemotes(int count) { | ||
usageBuilder.setRemotesCount(count); | ||
} | ||
|
||
@Override | ||
public void setFeature(String feature) { | ||
usageBuilder.setFeature(feature); | ||
this.searchResponseMetrics = searchResponseMetrics; | ||
this.collectCCSTelemetry = true; | ||
this.usageService = usageService; | ||
this.usageBuilder = usageBuilder; | ||
} | ||
|
||
@Override | ||
public void setClient(Task task) { | ||
usageBuilder.setClientFromTask(task); | ||
SearchTelemetryListener(ActionListener<SearchResponse> listener, SearchResponseMetrics searchResponseMetrics) { | ||
super(listener); | ||
this.searchResponseMetrics = searchResponseMetrics; | ||
this.collectCCSTelemetry = false; | ||
this.usageService = null; | ||
this.usageBuilder = null; | ||
} | ||
|
||
@Override | ||
|
@@ -2069,7 +2080,7 @@ public void onResponse(SearchResponse searchResponse) { | |
} | ||
searchResponseMetrics.incrementResponseCount(responseCountTotalStatus); | ||
|
||
if (collectTelemetry()) { | ||
if (collectCCSTelemetry) { | ||
extractCCSTelemetry(searchResponse); | ||
recordTelemetry(); | ||
} | ||
|
@@ -2084,7 +2095,7 @@ public void onResponse(SearchResponse searchResponse) { | |
@Override | ||
public void onFailure(Exception e) { | ||
searchResponseMetrics.incrementResponseCount(SearchResponseMetrics.ResponseCountTotalStatus.FAILURE); | ||
if (collectTelemetry()) { | ||
if (collectCCSTelemetry) { | ||
usageBuilder.setFailure(e); | ||
recordTelemetry(); | ||
} | ||
|
@@ -2109,8 +2120,6 @@ private void extractCCSTelemetry(SearchResponse searchResponse) { | |
usageBuilder.perClusterUsage(clusterAlias, cluster.getTook()); | ||
} | ||
} | ||
|
||
} | ||
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,12 +16,17 @@ | |
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.index.query.MatchAllQueryBuilder; | ||
import org.elasticsearch.plugins.Plugin; | ||
import org.elasticsearch.plugins.PluginsService; | ||
import org.elasticsearch.search.builder.SearchSourceBuilder; | ||
import org.elasticsearch.search.rescore.QueryRescorerBuilder; | ||
import org.elasticsearch.search.retriever.RescorerRetrieverBuilder; | ||
import org.elasticsearch.search.retriever.StandardRetrieverBuilder; | ||
import org.elasticsearch.telemetry.Measurement; | ||
import org.elasticsearch.telemetry.TestTelemetryPlugin; | ||
import org.elasticsearch.test.ESSingleNodeTestCase; | ||
import org.hamcrest.Matchers; | ||
import org.junit.After; | ||
import org.junit.Before; | ||
|
||
|
@@ -85,6 +90,29 @@ public void testSimpleQuery() { | |
assertEquals(searchResponse.getTook().millis(), measurements.getFirst().getLong()); | ||
} | ||
|
||
public void testCompoundRetriever() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. very nice ! |
||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); | ||
searchSourceBuilder.retriever( | ||
new RescorerRetrieverBuilder( | ||
new StandardRetrieverBuilder(new MatchAllQueryBuilder()), | ||
List.of(new QueryRescorerBuilder(new MatchAllQueryBuilder())) | ||
) | ||
); | ||
SearchResponse searchResponse = client().prepareSearch(indexName).setSource(searchSourceBuilder).get(); | ||
try { | ||
assertNoFailures(searchResponse); | ||
assertSearchHits(searchResponse, "1", "2"); | ||
} finally { | ||
searchResponse.decRef(); | ||
} | ||
|
||
List<Measurement> measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(TOOK_DURATION_TOTAL_HISTOGRAM_NAME); | ||
// compound retriever does its own search as an async action, whose took time is recorded separately | ||
assertEquals(2, measurements.size()); | ||
assertThat(measurements.getFirst().getLong(), Matchers.lessThan(searchResponse.getTook().millis())); | ||
assertEquals(searchResponse.getTook().millis(), measurements.getLast().getLong()); | ||
} | ||
|
||
public void testMultiSearch() { | ||
MultiSearchRequestBuilder multiSearchRequestBuilder = client().prepareMultiSearch(); | ||
int numSearchRequests = randomIntBetween(3, 10); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't perfect, but it gets the job done. Ideally, open PIT would have its own transport action, but I am not even sure that's a change worth making given its complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this change this flag was not needed because the listener had all the telemetry logic encapsulated in it. The downside was that we needed to do some instanceof checks on it. Another issue I found with that is that it does not allow to store resolved indices (without carrying a reference to the search request in the listener).