Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen
.source(new SearchSourceBuilder().query(request.indexFilter()));
searchRequest.setMaxConcurrentShardRequests(request.maxConcurrentShardRequests());
searchRequest.setCcsMinimizeRoundtrips(false);
transportSearchAction.executeRequest((SearchTask) task, searchRequest, listener.map(r -> {

transportSearchAction.executeOpenPit((SearchTask) task, searchRequest, listener.map(r -> {
assert r.pointInTimeId() != null : r;
return new OpenPointInTimeResponse(
r.pointInTimeId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final SearchResponseMetrics searchResponseMetrics;
private final Client client;
private final UsageService usageService;
private final boolean collectTelemetry;
private final boolean collectCCSTelemetry;
private final TimeValue forceConnectTimeoutSecs;

@Inject
Expand Down Expand Up @@ -213,7 +213,7 @@ public TransportSearchAction(
var settings = clusterService.getSettings();
this.defaultPreFilterShardSize = DEFAULT_PRE_FILTER_SHARD_SIZE.get(settings);
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(settings);
this.collectTelemetry = SearchService.CCS_COLLECT_TELEMETRY.get(settings);
this.collectCCSTelemetry = SearchService.CCS_COLLECT_TELEMETRY.get(settings);
this.searchResponseMetrics = searchResponseMetrics;
this.client = client;
this.usageService = usageService;
Expand Down Expand Up @@ -333,14 +333,24 @@ public long buildTookInMillis() {

@Override
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
executeRequest((SearchTask) task, searchRequest, new SearchResponseActionListener(listener), AsyncSearchActionProvider::new);
executeRequest((SearchTask) task, searchRequest, listener, AsyncSearchActionProvider::new, true);
}

void executeRequest(
void executeOpenPit(
SearchTask task,
SearchRequest original,
ActionListener<SearchResponse> listener,
ActionListener<SearchResponse> originalListener,
Function<ActionListener<SearchResponse>, SearchPhaseProvider> searchPhaseProvider
) {
executeRequest(task, original, originalListener, searchPhaseProvider, false);
}

private void executeRequest(
SearchTask task,
SearchRequest original,
ActionListener<SearchResponse> originalListener,
Function<ActionListener<SearchResponse>, SearchPhaseProvider> searchPhaseProvider,
boolean collectSearchTelemetry
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't perfect, but it gets the job done. Ideally, open PIT would have its own transport action, but I am not even sure that's a change worth making given its complexity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this change this flag was not needed because the listener had all the telemetry logic encapsulated in it. The downside was that we needed to do some instanceof checks on it. Another issue I found with that is that it does not allow to store resolved indices (without carrying a reference to the search request in the listener).

) {
final long relativeStartNanos = System.nanoTime();
final SearchTimeProvider timeProvider = new SearchTimeProvider(
Expand Down Expand Up @@ -372,11 +382,16 @@ void executeRequest(
frozenIndexCheck(resolvedIndices);
}

ActionListener<SearchRequest> rewriteListener = listener.delegateFailureAndWrap((delegate, rewritten) -> {
ActionListener<SearchRequest> rewriteListener = originalListener.delegateFailureAndWrap((delegate, rewritten) -> {
if (ccsCheckCompatibility) {
checkCCSVersionCompatibility(rewritten);
}

final CCSUsage.Builder ccsUsageBuilder = new CCSUsage.Builder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we always create and update ccsUsageBuilder but only use it if collectSearchTelemetry is true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I was a bit lazy here, I will iterate over this once more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not so different from how things already were: a CCS usage builder always got created despite we may not be collecting CCS telemetry. I did not like that my change made things even harder to read. I tied the creation of the listener to the telemetry collection now, so that we only create the usage builder when necessary, and the code should be easier to follow.

final ActionListener<SearchResponse> searchResponseActionListener = collectSearchTelemetry
? new SearchTelemetryListener(delegate, ccsUsageBuilder, searchResponseMetrics, usageService, collectCCSTelemetry)
: delegate;

if (resolvedIndices.getRemoteClusterIndices().isEmpty()) {
executeLocalSearch(
task,
Expand All @@ -385,35 +400,33 @@ void executeRequest(
resolvedIndices,
projectState,
SearchResponse.Clusters.EMPTY,
searchPhaseProvider.apply(delegate)
searchPhaseProvider.apply(searchResponseActionListener)
);
} else {
if (delegate instanceof TelemetryListener tl) {
tl.setRemotes(resolvedIndices.getRemoteClusterIndices().size());
if (task.isAsync()) {
tl.setFeature(CCSUsageTelemetry.ASYNC_FEATURE);
}
if (original.pointInTimeBuilder() != null) {
tl.setFeature(CCSUsageTelemetry.PIT_FEATURE);
}
tl.setClient(task);
// Check if any of the index patterns are wildcard patterns
var localIndices = resolvedIndices.getLocalIndices();
if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) {
tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
}
if (resolvedIndices.getRemoteClusterIndices()
.values()
.stream()
.anyMatch(indices -> Arrays.stream(indices.indices()).anyMatch(Regex::isSimpleMatchPattern))) {
tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
}
ccsUsageBuilder.setRemotesCount(resolvedIndices.getRemoteClusterIndices().size());
if (task.isAsync()) {
ccsUsageBuilder.setFeature(CCSUsageTelemetry.ASYNC_FEATURE);
}
if (original.pointInTimeBuilder() != null) {
ccsUsageBuilder.setFeature(CCSUsageTelemetry.PIT_FEATURE);
}
ccsUsageBuilder.setClientFromTask(task);
// Check if any of the index patterns are wildcard patterns
var localIndices = resolvedIndices.getLocalIndices();
if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) {
ccsUsageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
}
if (resolvedIndices.getRemoteClusterIndices()
.values()
.stream()
.anyMatch(indices -> Arrays.stream(indices.indices()).anyMatch(Regex::isSimpleMatchPattern))) {
ccsUsageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
}

final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId();
if (shouldMinimizeRoundtrips(rewritten)) {
if (delegate instanceof TelemetryListener tl) {
tl.setFeature(CCSUsageTelemetry.MRT_FEATURE);
}
ccsUsageBuilder.setFeature(CCSUsageTelemetry.MRT_FEATURE);

final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null
&& rewritten.source().aggregations() != null
? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations())
Expand All @@ -439,7 +452,7 @@ void executeRequest(
aggregationReduceContextBuilder,
remoteClusterService,
threadPool,
delegate,
searchResponseActionListener,
(r, l) -> executeLocalSearch(
task,
timeProvider,
Expand Down Expand Up @@ -473,7 +486,7 @@ void executeRequest(
clusters,
timeProvider,
transportService,
delegate.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> {
searchResponseActionListener.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> {
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup(
searchShardsResponses
);
Expand Down Expand Up @@ -522,29 +535,34 @@ void executeRequest(
if (shouldOpenPIT(source)) {
// disabling shard reordering for request
original.setPreFilterShardSize(Integer.MAX_VALUE);
openPIT(client, original, searchService.getDefaultKeepAliveInMillis(), listener.delegateFailureAndWrap((delegate, resp) -> {
// We set the keep alive to -1 to indicate that we don't need the pit id in the response.
// This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore.
source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE));
var pitListener = new SearchResponseActionListener(delegate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I notice here that SearchResponseActionListener also collects SearchResponseMetrics. But if we convert this to just ActionListener this doesn't happen anymore? Is that intended?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good observation. Yet onResponse is entirely rewritten here, hence SearchReponseActionListener in this case does not do anything anyways. Using a plain ActionListener does not cause any change in behaviour. I added a test around this though that was missing: this is triggered e.g. when a compound retriever is used, which opens a PIT under the hood and performs a search against it (executed as an async action).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also moved up the shouldOpenPIT conditional branch, as it can quickly shortcut execution as opposed to creating the rewriteListener for nothing when a PIT needs to be opened. I think this improved readability, let me know.

@Override
public void onResponse(SearchResponse response) {
// we need to close the PIT first so we delay the release of the response to after the closing
response.incRef();
closePIT(
client,
original.source().pointInTimeBuilder(),
() -> ActionListener.respondAndRelease(delegate, response)
);
}
openPIT(
client,
original,
searchService.getDefaultKeepAliveInMillis(),
originalListener.delegateFailureAndWrap((delegate, resp) -> {
// We set the keep alive to -1 to indicate that we don't need the pit id in the response.
// This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore.
source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE));
var pitListener = new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse response) {
// we need to close the PIT first so we delay the release of the response to after the closing
response.incRef();
closePIT(
client,
original.source().pointInTimeBuilder(),
() -> ActionListener.respondAndRelease(delegate, response)
);
}

@Override
public void onFailure(Exception e) {
closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e));
}
};
executeRequest(task, original, pitListener, searchPhaseProvider);
}));
@Override
public void onFailure(Exception e) {
closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e));
}
};
executeRequest(task, original, pitListener, searchPhaseProvider, true);
})
);
} else {
Rewriteable.rewriteAndFetch(
original,
Expand Down Expand Up @@ -2001,49 +2019,34 @@ static String[] ignoreBlockedIndices(ProjectState projectState, String[] concret
.toArray(String[]::new);
}
return concreteIndices;
}

private interface TelemetryListener {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given I moved the wrapping inside of executeSearch, this interface is no longer required. It was there for open PIT only as it provides a different type of listener.

void setRemotes(int count);

void setFeature(String feature);

void setClient(Task task);
}

private class SearchResponseActionListener extends DelegatingActionListener<SearchResponse, SearchResponse>
implements
TelemetryListener {
private static class SearchTelemetryListener extends DelegatingActionListener<SearchResponse, SearchResponse> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this static, it feels much better to be able to reason about its dependencies and not have it be tied to TransportSearchAction's lifecycle.

private final CCSUsage.Builder usageBuilder;

SearchResponseActionListener(ActionListener<SearchResponse> listener) {
private final SearchResponseMetrics searchResponseMetrics;
private final UsageService usageService;
private final boolean collectCCSTelemetry;

SearchTelemetryListener(
ActionListener<SearchResponse> listener,
CCSUsage.Builder usageBuilder,
SearchResponseMetrics searchResponseMetrics,
UsageService usageService,
boolean collectCCSTelemetry
) {
super(listener);
if (listener instanceof SearchResponseActionListener srListener) {
usageBuilder = srListener.usageBuilder;
} else {
usageBuilder = new CCSUsage.Builder();
}
this.usageBuilder = usageBuilder;
this.searchResponseMetrics = searchResponseMetrics;
this.usageService = usageService;
this.collectCCSTelemetry = collectCCSTelemetry;
}

/**
* Should we collect telemetry for this search?
*/
private boolean collectTelemetry() {
return collectTelemetry && usageBuilder.getRemotesCount() > 0;
}

public void setRemotes(int count) {
usageBuilder.setRemotesCount(count);
}

@Override
public void setFeature(String feature) {
usageBuilder.setFeature(feature);
}

@Override
public void setClient(Task task) {
usageBuilder.setClientFromTask(task);
private boolean collectCCSTelemetry() {
return collectCCSTelemetry && usageBuilder.getRemotesCount() > 0;
}

@Override
Expand All @@ -2069,7 +2072,7 @@ public void onResponse(SearchResponse searchResponse) {
}
searchResponseMetrics.incrementResponseCount(responseCountTotalStatus);

if (collectTelemetry()) {
if (collectCCSTelemetry()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this to avoid confusion, as this flag controls only whether we record CCS specific telemetry.

extractCCSTelemetry(searchResponse);
recordTelemetry();
}
Expand All @@ -2084,7 +2087,7 @@ public void onResponse(SearchResponse searchResponse) {
@Override
public void onFailure(Exception e) {
searchResponseMetrics.incrementResponseCount(SearchResponseMetrics.ResponseCountTotalStatus.FAILURE);
if (collectTelemetry()) {
if (collectCCSTelemetry()) {
usageBuilder.setFailure(e);
recordTelemetry();
}
Expand All @@ -2109,8 +2112,6 @@ private void extractCCSTelemetry(SearchResponse searchResponse) {
usageBuilder.perClusterUsage(clusterAlias, cluster.getTook());
}
}

}

}
}