-
Notifications
You must be signed in to change notification settings - Fork 25.5k
Centralize telemetry listener wrapping in TransportSearchAction #134625
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
c07deb2
a44bb23
e64a7b1
0b9936f
176d7bf
9b9dad8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -167,7 +167,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest, | |
private final SearchResponseMetrics searchResponseMetrics; | ||
private final Client client; | ||
private final UsageService usageService; | ||
private final boolean collectTelemetry; | ||
private final boolean collectCCSTelemetry; | ||
private final TimeValue forceConnectTimeoutSecs; | ||
|
||
@Inject | ||
|
@@ -213,7 +213,7 @@ public TransportSearchAction( | |
var settings = clusterService.getSettings(); | ||
this.defaultPreFilterShardSize = DEFAULT_PRE_FILTER_SHARD_SIZE.get(settings); | ||
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(settings); | ||
this.collectTelemetry = SearchService.CCS_COLLECT_TELEMETRY.get(settings); | ||
this.collectCCSTelemetry = SearchService.CCS_COLLECT_TELEMETRY.get(settings); | ||
this.searchResponseMetrics = searchResponseMetrics; | ||
this.client = client; | ||
this.usageService = usageService; | ||
|
@@ -333,14 +333,24 @@ public long buildTookInMillis() { | |
|
||
@Override | ||
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) { | ||
executeRequest((SearchTask) task, searchRequest, new SearchResponseActionListener(listener), AsyncSearchActionProvider::new); | ||
executeRequest((SearchTask) task, searchRequest, listener, AsyncSearchActionProvider::new, true); | ||
} | ||
|
||
void executeRequest( | ||
void executeOpenPit( | ||
SearchTask task, | ||
SearchRequest original, | ||
ActionListener<SearchResponse> listener, | ||
ActionListener<SearchResponse> originalListener, | ||
Function<ActionListener<SearchResponse>, SearchPhaseProvider> searchPhaseProvider | ||
) { | ||
executeRequest(task, original, originalListener, searchPhaseProvider, false); | ||
} | ||
|
||
private void executeRequest( | ||
SearchTask task, | ||
SearchRequest original, | ||
ActionListener<SearchResponse> originalListener, | ||
Function<ActionListener<SearchResponse>, SearchPhaseProvider> searchPhaseProvider, | ||
boolean collectSearchTelemetry | ||
) { | ||
final long relativeStartNanos = System.nanoTime(); | ||
final SearchTimeProvider timeProvider = new SearchTimeProvider( | ||
|
@@ -372,11 +382,16 @@ void executeRequest( | |
frozenIndexCheck(resolvedIndices); | ||
} | ||
|
||
ActionListener<SearchRequest> rewriteListener = listener.delegateFailureAndWrap((delegate, rewritten) -> { | ||
ActionListener<SearchRequest> rewriteListener = originalListener.delegateFailureAndWrap((delegate, rewritten) -> { | ||
if (ccsCheckCompatibility) { | ||
checkCCSVersionCompatibility(rewritten); | ||
} | ||
|
||
final CCSUsage.Builder ccsUsageBuilder = new CCSUsage.Builder(); | ||
|
||
final ActionListener<SearchResponse> searchResponseActionListener = collectSearchTelemetry | ||
? new SearchTelemetryListener(delegate, ccsUsageBuilder, searchResponseMetrics, usageService, collectCCSTelemetry) | ||
: delegate; | ||
|
||
if (resolvedIndices.getRemoteClusterIndices().isEmpty()) { | ||
executeLocalSearch( | ||
task, | ||
|
@@ -385,35 +400,33 @@ void executeRequest( | |
resolvedIndices, | ||
projectState, | ||
SearchResponse.Clusters.EMPTY, | ||
searchPhaseProvider.apply(delegate) | ||
searchPhaseProvider.apply(searchResponseActionListener) | ||
); | ||
} else { | ||
if (delegate instanceof TelemetryListener tl) { | ||
tl.setRemotes(resolvedIndices.getRemoteClusterIndices().size()); | ||
if (task.isAsync()) { | ||
tl.setFeature(CCSUsageTelemetry.ASYNC_FEATURE); | ||
} | ||
if (original.pointInTimeBuilder() != null) { | ||
tl.setFeature(CCSUsageTelemetry.PIT_FEATURE); | ||
} | ||
tl.setClient(task); | ||
// Check if any of the index patterns are wildcard patterns | ||
var localIndices = resolvedIndices.getLocalIndices(); | ||
if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) { | ||
tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); | ||
} | ||
if (resolvedIndices.getRemoteClusterIndices() | ||
.values() | ||
.stream() | ||
.anyMatch(indices -> Arrays.stream(indices.indices()).anyMatch(Regex::isSimpleMatchPattern))) { | ||
tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); | ||
} | ||
ccsUsageBuilder.setRemotesCount(resolvedIndices.getRemoteClusterIndices().size()); | ||
javanna marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
if (task.isAsync()) { | ||
ccsUsageBuilder.setFeature(CCSUsageTelemetry.ASYNC_FEATURE); | ||
} | ||
if (original.pointInTimeBuilder() != null) { | ||
ccsUsageBuilder.setFeature(CCSUsageTelemetry.PIT_FEATURE); | ||
} | ||
ccsUsageBuilder.setClientFromTask(task); | ||
// Check if any of the index patterns are wildcard patterns | ||
var localIndices = resolvedIndices.getLocalIndices(); | ||
if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) { | ||
ccsUsageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); | ||
} | ||
if (resolvedIndices.getRemoteClusterIndices() | ||
.values() | ||
.stream() | ||
.anyMatch(indices -> Arrays.stream(indices.indices()).anyMatch(Regex::isSimpleMatchPattern))) { | ||
ccsUsageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE); | ||
} | ||
|
||
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId(); | ||
if (shouldMinimizeRoundtrips(rewritten)) { | ||
if (delegate instanceof TelemetryListener tl) { | ||
tl.setFeature(CCSUsageTelemetry.MRT_FEATURE); | ||
} | ||
ccsUsageBuilder.setFeature(CCSUsageTelemetry.MRT_FEATURE); | ||
|
||
final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null | ||
&& rewritten.source().aggregations() != null | ||
? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations()) | ||
|
@@ -439,7 +452,7 @@ void executeRequest( | |
aggregationReduceContextBuilder, | ||
remoteClusterService, | ||
threadPool, | ||
delegate, | ||
searchResponseActionListener, | ||
(r, l) -> executeLocalSearch( | ||
task, | ||
timeProvider, | ||
|
@@ -473,7 +486,7 @@ void executeRequest( | |
clusters, | ||
timeProvider, | ||
transportService, | ||
delegate.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> { | ||
searchResponseActionListener.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> { | ||
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup( | ||
searchShardsResponses | ||
); | ||
|
@@ -522,29 +535,34 @@ void executeRequest( | |
if (shouldOpenPIT(source)) { | ||
// disabling shard reordering for request | ||
original.setPreFilterShardSize(Integer.MAX_VALUE); | ||
openPIT(client, original, searchService.getDefaultKeepAliveInMillis(), listener.delegateFailureAndWrap((delegate, resp) -> { | ||
// We set the keep alive to -1 to indicate that we don't need the pit id in the response. | ||
// This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore. | ||
source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE)); | ||
var pitListener = new SearchResponseActionListener(delegate) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One thing I notice here that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's a good observation. Yet There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also moved up the shouldOpenPIT conditional branch, as it can quickly shortcut execution as opposed to creating the rewriteListener for nothing when a PIT needs to be opened. I think this improved readability, let me know. |
||
@Override | ||
public void onResponse(SearchResponse response) { | ||
// we need to close the PIT first so we delay the release of the response to after the closing | ||
response.incRef(); | ||
closePIT( | ||
client, | ||
original.source().pointInTimeBuilder(), | ||
() -> ActionListener.respondAndRelease(delegate, response) | ||
); | ||
} | ||
openPIT( | ||
client, | ||
original, | ||
searchService.getDefaultKeepAliveInMillis(), | ||
originalListener.delegateFailureAndWrap((delegate, resp) -> { | ||
// We set the keep alive to -1 to indicate that we don't need the pit id in the response. | ||
// This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore. | ||
source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE)); | ||
var pitListener = new ActionListener<SearchResponse>() { | ||
@Override | ||
public void onResponse(SearchResponse response) { | ||
// we need to close the PIT first so we delay the release of the response to after the closing | ||
response.incRef(); | ||
closePIT( | ||
client, | ||
original.source().pointInTimeBuilder(), | ||
() -> ActionListener.respondAndRelease(delegate, response) | ||
); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e)); | ||
} | ||
}; | ||
executeRequest(task, original, pitListener, searchPhaseProvider); | ||
})); | ||
@Override | ||
public void onFailure(Exception e) { | ||
closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e)); | ||
} | ||
}; | ||
executeRequest(task, original, pitListener, searchPhaseProvider, true); | ||
}) | ||
); | ||
} else { | ||
Rewriteable.rewriteAndFetch( | ||
original, | ||
|
@@ -2001,49 +2019,34 @@ static String[] ignoreBlockedIndices(ProjectState projectState, String[] concret | |
.toArray(String[]::new); | ||
} | ||
return concreteIndices; | ||
} | ||
|
||
private interface TelemetryListener { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given I moved the wrapping inside of executeSearch, this interface is no longer required. It was there for open PIT only as it provides a different type of listener. |
||
void setRemotes(int count); | ||
|
||
void setFeature(String feature); | ||
|
||
void setClient(Task task); | ||
} | ||
|
||
private class SearchResponseActionListener extends DelegatingActionListener<SearchResponse, SearchResponse> | ||
implements | ||
TelemetryListener { | ||
private static class SearchTelemetryListener extends DelegatingActionListener<SearchResponse, SearchResponse> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made this static, it feels much better to be able to reason about its dependencies and not have it be tied to TransportSearchAction's lifecycle. |
||
private final CCSUsage.Builder usageBuilder; | ||
|
||
SearchResponseActionListener(ActionListener<SearchResponse> listener) { | ||
private final SearchResponseMetrics searchResponseMetrics; | ||
private final UsageService usageService; | ||
private final boolean collectCCSTelemetry; | ||
|
||
SearchTelemetryListener( | ||
ActionListener<SearchResponse> listener, | ||
CCSUsage.Builder usageBuilder, | ||
SearchResponseMetrics searchResponseMetrics, | ||
UsageService usageService, | ||
boolean collectCCSTelemetry | ||
) { | ||
super(listener); | ||
if (listener instanceof SearchResponseActionListener srListener) { | ||
usageBuilder = srListener.usageBuilder; | ||
} else { | ||
usageBuilder = new CCSUsage.Builder(); | ||
} | ||
this.usageBuilder = usageBuilder; | ||
this.searchResponseMetrics = searchResponseMetrics; | ||
this.usageService = usageService; | ||
this.collectCCSTelemetry = collectCCSTelemetry; | ||
} | ||
|
||
/** | ||
* Should we collect telemetry for this search? | ||
*/ | ||
private boolean collectTelemetry() { | ||
return collectTelemetry && usageBuilder.getRemotesCount() > 0; | ||
} | ||
|
||
public void setRemotes(int count) { | ||
usageBuilder.setRemotesCount(count); | ||
} | ||
|
||
@Override | ||
public void setFeature(String feature) { | ||
usageBuilder.setFeature(feature); | ||
} | ||
|
||
@Override | ||
public void setClient(Task task) { | ||
usageBuilder.setClientFromTask(task); | ||
private boolean collectCCSTelemetry() { | ||
return collectCCSTelemetry && usageBuilder.getRemotesCount() > 0; | ||
} | ||
|
||
@Override | ||
|
@@ -2069,7 +2072,7 @@ public void onResponse(SearchResponse searchResponse) { | |
} | ||
searchResponseMetrics.incrementResponseCount(responseCountTotalStatus); | ||
|
||
if (collectTelemetry()) { | ||
if (collectCCSTelemetry()) { | ||
|
||
extractCCSTelemetry(searchResponse); | ||
recordTelemetry(); | ||
} | ||
|
@@ -2084,7 +2087,7 @@ public void onResponse(SearchResponse searchResponse) { | |
@Override | ||
public void onFailure(Exception e) { | ||
searchResponseMetrics.incrementResponseCount(SearchResponseMetrics.ResponseCountTotalStatus.FAILURE); | ||
if (collectTelemetry()) { | ||
if (collectCCSTelemetry()) { | ||
usageBuilder.setFailure(e); | ||
recordTelemetry(); | ||
} | ||
|
@@ -2109,8 +2112,6 @@ private void extractCCSTelemetry(SearchResponse searchResponse) { | |
usageBuilder.perClusterUsage(clusterAlias, cluster.getTook()); | ||
} | ||
} | ||
|
||
} | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't perfect, but it gets the job done. Ideally, open PIT would have its own transport action, but I am not even sure that's a change worth making given its complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this change this flag was not needed because the listener had all the telemetry logic encapsulated in it. The downside was that we needed to do some instanceof checks on it. Another issue I found with that is that it does not allow to store resolved indices (without carrying a reference to the search request in the listener).