Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen
.source(new SearchSourceBuilder().query(request.indexFilter()));
searchRequest.setMaxConcurrentShardRequests(request.maxConcurrentShardRequests());
searchRequest.setCcsMinimizeRoundtrips(false);
transportSearchAction.executeRequest((SearchTask) task, searchRequest, listener.map(r -> {

transportSearchAction.executeOpenPit((SearchTask) task, searchRequest, listener.map(r -> {
assert r.pointInTimeId() != null : r;
return new OpenPointInTimeResponse(
r.pointInTimeId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final SearchResponseMetrics searchResponseMetrics;
private final Client client;
private final UsageService usageService;
private final boolean collectTelemetry;
private final boolean collectCCSTelemetry;
private final TimeValue forceConnectTimeoutSecs;

@Inject
Expand Down Expand Up @@ -213,7 +213,7 @@ public TransportSearchAction(
var settings = clusterService.getSettings();
this.defaultPreFilterShardSize = DEFAULT_PRE_FILTER_SHARD_SIZE.get(settings);
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(settings);
this.collectTelemetry = SearchService.CCS_COLLECT_TELEMETRY.get(settings);
this.collectCCSTelemetry = SearchService.CCS_COLLECT_TELEMETRY.get(settings);
this.searchResponseMetrics = searchResponseMetrics;
this.client = client;
this.usageService = usageService;
Expand Down Expand Up @@ -333,14 +333,24 @@ public long buildTookInMillis() {

@Override
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
executeRequest((SearchTask) task, searchRequest, new SearchResponseActionListener(listener), AsyncSearchActionProvider::new);
executeRequest((SearchTask) task, searchRequest, listener, AsyncSearchActionProvider::new, true);
}

void executeRequest(
void executeOpenPit(
SearchTask task,
SearchRequest original,
ActionListener<SearchResponse> listener,
ActionListener<SearchResponse> originalListener,
Function<ActionListener<SearchResponse>, SearchPhaseProvider> searchPhaseProvider
) {
executeRequest(task, original, originalListener, searchPhaseProvider, false);
}

private void executeRequest(
SearchTask task,
SearchRequest original,
ActionListener<SearchResponse> originalListener,
Function<ActionListener<SearchResponse>, SearchPhaseProvider> searchPhaseProvider,
boolean collectSearchTelemetry
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't perfect, but it gets the job done. Ideally, open PIT would have its own transport action, but I am not even sure that's a change worth making given its complexity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this change this flag was not needed because the listener had all the telemetry logic encapsulated in it. The downside was that we needed to do some instanceof checks on it. Another issue I found with that is that it does not allow to store resolved indices (without carrying a reference to the search request in the listener).

) {
final long relativeStartNanos = System.nanoTime();
final SearchTimeProvider timeProvider = new SearchTimeProvider(
Expand Down Expand Up @@ -372,48 +382,93 @@ void executeRequest(
frozenIndexCheck(resolvedIndices);
}

ActionListener<SearchRequest> rewriteListener = listener.delegateFailureAndWrap((delegate, rewritten) -> {
final SearchSourceBuilder source = original.source();
if (shouldOpenPIT(source)) {
// disabling shard reordering for request
original.setPreFilterShardSize(Integer.MAX_VALUE);
openPIT(
client,
original,
searchService.getDefaultKeepAliveInMillis(),
originalListener.delegateFailureAndWrap((delegate, resp) -> {
// We set the keep alive to -1 to indicate that we don't need the pit id in the response.
// This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore.
source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE));
var pitListener = new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse response) {
// we need to close the PIT first so we delay the release of the response to after the closing
response.incRef();
closePIT(
client,
original.source().pointInTimeBuilder(),
() -> ActionListener.respondAndRelease(delegate, response)
);
}

@Override
public void onFailure(Exception e) {
closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e));
}
};
executeRequest(task, original, pitListener, searchPhaseProvider, true);
})
);
return;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only change in this block is no longer using SearchResponseActionListener, which was not necessary (beyond my changes - added a test to verify that). This block can shortcut execution hence I moved it up.


ActionListener<SearchRequest> rewriteListener = originalListener.delegateFailureAndWrap((delegate, rewritten) -> {
if (ccsCheckCompatibility) {
checkCCSVersionCompatibility(rewritten);
}

if (resolvedIndices.getRemoteClusterIndices().isEmpty()) {
executeLocalSearch(
task,
timeProvider,
rewritten,
resolvedIndices,
projectState,
SearchResponse.Clusters.EMPTY,
searchPhaseProvider.apply(delegate)
);
} else {
if (delegate instanceof TelemetryListener tl) {
tl.setRemotes(resolvedIndices.getRemoteClusterIndices().size());
final ActionListener<SearchResponse> searchResponseActionListener;
if (collectSearchTelemetry) {
if (collectCCSTelemetry == false || resolvedIndices.getRemoteClusterIndices().isEmpty()) {
searchResponseActionListener = new SearchTelemetryListener(delegate, searchResponseMetrics);
} else {
CCSUsage.Builder usageBuilder = new CCSUsage.Builder();
usageBuilder.setRemotesCount(resolvedIndices.getRemoteClusterIndices().size());
usageBuilder.setClientFromTask(task);
if (task.isAsync()) {
tl.setFeature(CCSUsageTelemetry.ASYNC_FEATURE);
usageBuilder.setFeature(CCSUsageTelemetry.ASYNC_FEATURE);
}
if (original.pointInTimeBuilder() != null) {
tl.setFeature(CCSUsageTelemetry.PIT_FEATURE);
usageBuilder.setFeature(CCSUsageTelemetry.PIT_FEATURE);
}
tl.setClient(task);
// Check if any of the index patterns are wildcard patterns
var localIndices = resolvedIndices.getLocalIndices();
if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) {
tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
usageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
}
if (resolvedIndices.getRemoteClusterIndices()
.values()
.stream()
.anyMatch(indices -> Arrays.stream(indices.indices()).anyMatch(Regex::isSimpleMatchPattern))) {
tl.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
usageBuilder.setFeature(CCSUsageTelemetry.WILDCARD_FEATURE);
}
if (shouldMinimizeRoundtrips(rewritten)) {
usageBuilder.setFeature(CCSUsageTelemetry.MRT_FEATURE);
}
searchResponseActionListener = new SearchTelemetryListener(delegate, searchResponseMetrics, usageService, usageBuilder);
}
} else {
searchResponseActionListener = delegate;
}

if (resolvedIndices.getRemoteClusterIndices().isEmpty()) {
executeLocalSearch(
task,
timeProvider,
rewritten,
resolvedIndices,
projectState,
SearchResponse.Clusters.EMPTY,
searchPhaseProvider.apply(searchResponseActionListener)
);
} else {
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId();
if (shouldMinimizeRoundtrips(rewritten)) {
if (delegate instanceof TelemetryListener tl) {
tl.setFeature(CCSUsageTelemetry.MRT_FEATURE);
}
final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null
&& rewritten.source().aggregations() != null
? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations())
Expand All @@ -439,7 +494,7 @@ void executeRequest(
aggregationReduceContextBuilder,
remoteClusterService,
threadPool,
delegate,
searchResponseActionListener,
(r, l) -> executeLocalSearch(
task,
timeProvider,
Expand Down Expand Up @@ -473,7 +528,7 @@ void executeRequest(
clusters,
timeProvider,
transportService,
delegate.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> {
searchResponseActionListener.delegateFailureAndWrap((finalDelegate, searchShardsResponses) -> {
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup(
searchShardsResponses
);
Expand Down Expand Up @@ -517,49 +572,20 @@ void executeRequest(
}
});

final SearchSourceBuilder source = original.source();
final boolean isExplain = source != null && source.explain() != null && source.explain();
if (shouldOpenPIT(source)) {
// disabling shard reordering for request
original.setPreFilterShardSize(Integer.MAX_VALUE);
openPIT(client, original, searchService.getDefaultKeepAliveInMillis(), listener.delegateFailureAndWrap((delegate, resp) -> {
// We set the keep alive to -1 to indicate that we don't need the pit id in the response.
// This is needed since we delete the pit prior to sending the response so the id doesn't exist anymore.
source.pointInTimeBuilder(new PointInTimeBuilder(resp.getPointInTimeId()).setKeepAlive(TimeValue.MINUS_ONE));
var pitListener = new SearchResponseActionListener(delegate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I notice here that SearchResponseActionListener also collects SearchResponseMetrics. But if we convert this to just ActionListener this doesn't happen anymore? Is that intended?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good observation. Yet onResponse is entirely rewritten here, hence SearchReponseActionListener in this case does not do anything anyways. Using a plain ActionListener does not cause any change in behaviour. I added a test around this though that was missing: this is triggered e.g. when a compound retriever is used, which opens a PIT under the hood and performs a search against it (executed as an async action).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also moved up the shouldOpenPIT conditional branch, as it can quickly shortcut execution as opposed to creating the rewriteListener for nothing when a PIT needs to be opened. I think this improved readability, let me know.

@Override
public void onResponse(SearchResponse response) {
// we need to close the PIT first so we delay the release of the response to after the closing
response.incRef();
closePIT(
client,
original.source().pointInTimeBuilder(),
() -> ActionListener.respondAndRelease(delegate, response)
);
}

@Override
public void onFailure(Exception e) {
closePIT(client, original.source().pointInTimeBuilder(), () -> delegate.onFailure(e));
}
};
executeRequest(task, original, pitListener, searchPhaseProvider);
}));
} else {
Rewriteable.rewriteAndFetch(
original,
searchService.getRewriteContext(
timeProvider::absoluteStartMillis,
clusterState.getMinTransportVersion(),
original.getLocalClusterAlias(),
resolvedIndices,
original.pointInTimeBuilder(),
shouldMinimizeRoundtrips(original),
isExplain
),
rewriteListener
);
}
Rewriteable.rewriteAndFetch(
original,
searchService.getRewriteContext(
timeProvider::absoluteStartMillis,
clusterState.getMinTransportVersion(),
original.getLocalClusterAlias(),
resolvedIndices,
original.pointInTimeBuilder(),
shouldMinimizeRoundtrips(original),
isExplain
),
rewriteListener
);
}

/**
Expand Down Expand Up @@ -2001,49 +2027,34 @@ static String[] ignoreBlockedIndices(ProjectState projectState, String[] concret
.toArray(String[]::new);
}
return concreteIndices;
}

private interface TelemetryListener {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given I moved the wrapping inside of executeSearch, this interface is no longer required. It was there for open PIT only as it provides a different type of listener.

void setRemotes(int count);

void setFeature(String feature);

void setClient(Task task);
}

private class SearchResponseActionListener extends DelegatingActionListener<SearchResponse, SearchResponse>
implements
TelemetryListener {
private static class SearchTelemetryListener extends DelegatingActionListener<SearchResponse, SearchResponse> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this static, it feels much better to be able to reason about its dependencies and not have it be tied to TransportSearchAction's lifecycle.

private final CCSUsage.Builder usageBuilder;

SearchResponseActionListener(ActionListener<SearchResponse> listener) {
private final SearchResponseMetrics searchResponseMetrics;
private final UsageService usageService;
private final boolean collectCCSTelemetry;

SearchTelemetryListener(
ActionListener<SearchResponse> listener,
SearchResponseMetrics searchResponseMetrics,
UsageService usageService,
CCSUsage.Builder usageBuilder
) {
super(listener);
if (listener instanceof SearchResponseActionListener srListener) {
usageBuilder = srListener.usageBuilder;
} else {
usageBuilder = new CCSUsage.Builder();
}
}

/**
* Should we collect telemetry for this search?
*/
private boolean collectTelemetry() {
return collectTelemetry && usageBuilder.getRemotesCount() > 0;
}

public void setRemotes(int count) {
usageBuilder.setRemotesCount(count);
}

@Override
public void setFeature(String feature) {
usageBuilder.setFeature(feature);
this.searchResponseMetrics = searchResponseMetrics;
this.collectCCSTelemetry = true;
this.usageService = usageService;
this.usageBuilder = usageBuilder;
}

@Override
public void setClient(Task task) {
usageBuilder.setClientFromTask(task);
SearchTelemetryListener(ActionListener<SearchResponse> listener, SearchResponseMetrics searchResponseMetrics) {
super(listener);
this.searchResponseMetrics = searchResponseMetrics;
this.collectCCSTelemetry = false;
this.usageService = null;
this.usageBuilder = null;
}

@Override
Expand All @@ -2069,7 +2080,7 @@ public void onResponse(SearchResponse searchResponse) {
}
searchResponseMetrics.incrementResponseCount(responseCountTotalStatus);

if (collectTelemetry()) {
if (collectCCSTelemetry) {
extractCCSTelemetry(searchResponse);
recordTelemetry();
}
Expand All @@ -2084,7 +2095,7 @@ public void onResponse(SearchResponse searchResponse) {
@Override
public void onFailure(Exception e) {
searchResponseMetrics.incrementResponseCount(SearchResponseMetrics.ResponseCountTotalStatus.FAILURE);
if (collectTelemetry()) {
if (collectCCSTelemetry) {
usageBuilder.setFailure(e);
recordTelemetry();
}
Expand All @@ -2109,8 +2120,6 @@ private void extractCCSTelemetry(SearchResponse searchResponse) {
usageBuilder.perClusterUsage(clusterAlias, cluster.getTook());
}
}

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
import org.elasticsearch.search.retriever.RescorerRetrieverBuilder;
import org.elasticsearch.search.retriever.StandardRetrieverBuilder;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;

Expand Down Expand Up @@ -85,6 +90,29 @@ public void testSimpleQuery() {
assertEquals(searchResponse.getTook().millis(), measurements.getFirst().getLong());
}

public void testCompoundRetriever() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice !

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.retriever(
new RescorerRetrieverBuilder(
new StandardRetrieverBuilder(new MatchAllQueryBuilder()),
List.of(new QueryRescorerBuilder(new MatchAllQueryBuilder()))
)
);
SearchResponse searchResponse = client().prepareSearch(indexName).setSource(searchSourceBuilder).get();
try {
assertNoFailures(searchResponse);
assertSearchHits(searchResponse, "1", "2");
} finally {
searchResponse.decRef();
}

List<Measurement> measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(TOOK_DURATION_TOTAL_HISTOGRAM_NAME);
// compound retriever does its own search as an async action, whose took time is recorded separately
assertEquals(2, measurements.size());
assertThat(measurements.getFirst().getLong(), Matchers.lessThan(searchResponse.getTook().millis()));
assertEquals(searchResponse.getTook().millis(), measurements.getLast().getLong());
}

public void testMultiSearch() {
MultiSearchRequestBuilder multiSearchRequestBuilder = client().prepareMultiSearch();
int numSearchRequests = randomIntBetween(3, 10);
Expand Down