Skip to content

Commit 450e5cf

Browse files
authored
[INFERENCE] Remove node pinning logic (#134630)
Developed for EIS and no longer required
1 parent 56fea57 commit 450e5cf

23 files changed

+43
-909
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ static TransportVersion def(int id) {
328328
public static final TransportVersion INFERENCE_API_OPENAI_HEADERS = def(9_161_0_00);
329329
public static final TransportVersion NEW_SEMANTIC_QUERY_INTERCEPTORS = def(9_162_0_00);
330330
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_EXPRESSION = def(9_163_0_00);
331+
public static final TransportVersion INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING_REMOVED = def(9_164_0_00);
331332

332333
/*
333334
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/inference/telemetry/InferenceStats.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,6 @@ public static Map<String, Object> modelAttributes(Model model) {
6161
return modelAttributesMap;
6262
}
6363

64-
public static Map<String, Object> routingAttributes(boolean hasBeenRerouted, String nodeIdHandlingRequest) {
65-
return Map.of("rerouted", hasBeenRerouted, "node_id", nodeIdHandlingRequest);
66-
}
67-
6864
public static Map<String, Object> modelAttributes(UnparsedModel model) {
6965
return Map.of("service", model.service(), "task_type", model.taskType().toString());
7066
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/inference/action/BaseInferenceActionRequest.java

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
*/
2424
public abstract class BaseInferenceActionRequest extends LegacyActionRequest {
2525

26-
private boolean hasBeenRerouted;
27-
2826
private final InferenceContext context;
2927

3028
public BaseInferenceActionRequest(InferenceContext context) {
@@ -34,12 +32,9 @@ public BaseInferenceActionRequest(InferenceContext context) {
3432

3533
public BaseInferenceActionRequest(StreamInput in) throws IOException {
3634
super(in);
37-
if (in.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING)) {
38-
this.hasBeenRerouted = in.readBoolean();
39-
} else {
40-
// For backwards compatibility, we treat all inference requests coming from ES nodes having
41-
// a version pre-node-local-rate-limiting as already rerouted to maintain pre-node-local-rate-limiting behavior.
42-
this.hasBeenRerouted = true;
35+
if (in.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING)
36+
&& in.getTransportVersion().before(TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING_REMOVED)) {
37+
in.readBoolean();
4338
}
4439

4540
if (in.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_CONTEXT)
@@ -56,23 +51,16 @@ public BaseInferenceActionRequest(StreamInput in) throws IOException {
5651

5752
public abstract String getInferenceEntityId();
5853

59-
public void setHasBeenRerouted(boolean hasBeenRerouted) {
60-
this.hasBeenRerouted = hasBeenRerouted;
61-
}
62-
63-
public boolean hasBeenRerouted() {
64-
return hasBeenRerouted;
65-
}
66-
6754
public InferenceContext getContext() {
6855
return context;
6956
}
7057

7158
@Override
7259
public void writeTo(StreamOutput out) throws IOException {
7360
super.writeTo(out);
74-
if (out.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING)) {
75-
out.writeBoolean(hasBeenRerouted);
61+
if (out.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING)
62+
&& out.getTransportVersion().before(TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING_REMOVED)) {
63+
out.writeBoolean(true);
7664
}
7765

7866
if (out.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_CONTEXT)
@@ -86,11 +74,11 @@ public boolean equals(Object o) {
8674
if (this == o) return true;
8775
if (o == null || getClass() != o.getClass()) return false;
8876
BaseInferenceActionRequest that = (BaseInferenceActionRequest) o;
89-
return hasBeenRerouted == that.hasBeenRerouted && Objects.equals(context, that.context);
77+
return Objects.equals(context, that.context);
9078
}
9179

9280
@Override
9381
public int hashCode() {
94-
return Objects.hash(hasBeenRerouted, context);
82+
return Objects.hash(context);
9583
}
9684
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/inference/action/InferenceActionRequestTests.java

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -730,13 +730,6 @@ protected InferenceAction.Request mutateInstanceForVersion(InferenceAction.Reque
730730
mutated = instance;
731731
}
732732

733-
// We always assume that a request has been rerouted, if it came from a node without adaptive rate limiting
734-
if (version.before(TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING)) {
735-
mutated.setHasBeenRerouted(true);
736-
} else {
737-
mutated.setHasBeenRerouted(instance.hasBeenRerouted());
738-
}
739-
740733
return mutated;
741734
}
742735

@@ -788,7 +781,7 @@ public void testWriteTo_WhenVersionIsBeforeInputTypeAdded_ShouldSetInputTypeToUn
788781
assertThat(deserializedInstance.getInputType(), is(InputType.UNSPECIFIED));
789782
}
790783

791-
public void testWriteTo_WhenVersionIsBeforeAdaptiveRateLimiting_ShouldSetHasBeenReroutedToTrue() throws IOException {
784+
public void testWriteTo_ForHasBeenReroutedChanges() throws IOException {
792785
var instance = new InferenceAction.Request(
793786
TaskType.TEXT_EMBEDDING,
794787
"model",
@@ -802,15 +795,39 @@ public void testWriteTo_WhenVersionIsBeforeAdaptiveRateLimiting_ShouldSetHasBeen
802795
false
803796
);
804797

805-
InferenceAction.Request deserializedInstance = copyWriteable(
806-
instance,
807-
getNamedWriteableRegistry(),
808-
instanceReader(),
809-
TransportVersions.V_8_13_0
810-
);
798+
{
799+
// From a version before the rerouting logic was added
800+
InferenceAction.Request deserializedInstance = copyWriteable(
801+
instance,
802+
getNamedWriteableRegistry(),
803+
instanceReader(),
804+
TransportVersions.V_8_17_0
805+
);
806+
807+
assertEquals(instance, deserializedInstance);
808+
}
809+
{
810+
// From a version with rerouting
811+
InferenceAction.Request deserializedInstance = copyWriteable(
812+
instance,
813+
getNamedWriteableRegistry(),
814+
instanceReader(),
815+
TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING
816+
);
811817

812-
// Verify that hasBeenRerouted is true after deserializing a request coming from an older transport version
813-
assertTrue(deserializedInstance.hasBeenRerouted());
818+
assertEquals(instance, deserializedInstance);
819+
}
820+
{
821+
// From a version with rerouting removed
822+
InferenceAction.Request deserializedInstance = copyWriteable(
823+
instance,
824+
getNamedWriteableRegistry(),
825+
instanceReader(),
826+
TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING_REMOVED
827+
);
828+
829+
assertEquals(instance, deserializedInstance);
830+
}
814831
}
815832

816833
public void testWriteTo_WhenVersionIsBeforeInferenceContext_ShouldSetContextToEmptyContext() throws IOException {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/inference/action/UnifiedCompletionActionRequestTests.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -68,25 +68,6 @@ public void testValidation_ReturnsNull_When_TaskType_IsAny() {
6868
assertNull(request.validate());
6969
}
7070

71-
public void testWriteTo_WhenVersionIsBeforeAdaptiveRateLimiting_ShouldSetHasBeenReroutedToTrue() throws IOException {
72-
var instance = new UnifiedCompletionAction.Request(
73-
"model",
74-
TaskType.ANY,
75-
UnifiedCompletionRequest.of(List.of(UnifiedCompletionRequestTests.randomMessage())),
76-
TimeValue.timeValueSeconds(10)
77-
);
78-
79-
UnifiedCompletionAction.Request deserializedInstance = copyWriteable(
80-
instance,
81-
getNamedWriteableRegistry(),
82-
instanceReader(),
83-
TransportVersions.ELASTIC_INFERENCE_SERVICE_UNIFIED_CHAT_COMPLETIONS_INTEGRATION
84-
);
85-
86-
// Verify that hasBeenRerouted is true after deserializing a request coming from an older transport version
87-
assertTrue(deserializedInstance.hasBeenRerouted());
88-
}
89-
9071
public void testWriteTo_WhenVersionIsBeforeInferenceContext_ShouldSetContextToEmptyContext() throws IOException {
9172
var instance = new UnifiedCompletionAction.Request(
9273
"model",

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,6 @@
8282
import org.elasticsearch.xpack.inference.action.TransportUnifiedCompletionInferenceAction;
8383
import org.elasticsearch.xpack.inference.action.TransportUpdateInferenceModelAction;
8484
import org.elasticsearch.xpack.inference.action.filter.ShardBulkInferenceActionFilter;
85-
import org.elasticsearch.xpack.inference.common.InferenceServiceNodeLocalRateLimitCalculator;
86-
import org.elasticsearch.xpack.inference.common.InferenceServiceRateLimitCalculator;
87-
import org.elasticsearch.xpack.inference.common.NoopNodeLocalRateLimitCalculator;
8885
import org.elasticsearch.xpack.inference.common.Truncator;
8986
import org.elasticsearch.xpack.inference.external.http.HttpClientManager;
9087
import org.elasticsearch.xpack.inference.external.http.HttpSettings;
@@ -164,7 +161,6 @@
164161

165162
import static java.util.Collections.singletonList;
166163
import static org.elasticsearch.xpack.inference.action.filter.ShardBulkInferenceActionFilter.INDICES_INFERENCE_BATCH_SIZE;
167-
import static org.elasticsearch.xpack.inference.common.InferenceAPIClusterAwareRateLimitingFeature.INFERENCE_API_CLUSTER_AWARE_RATE_LIMITING_FEATURE_FLAG;
168164

169165
public class InferencePlugin extends Plugin
170166
implements
@@ -386,19 +382,6 @@ public Collection<?> createComponents(PluginServices services) {
386382
new TransportGetInferenceDiagnosticsAction.ClientManagers(httpClientManager, elasticInferenceServiceHttpClientManager)
387383
);
388384
components.add(inferenceStatsBinding);
389-
390-
// Only add InferenceServiceNodeLocalRateLimitCalculator (which is a ClusterStateListener) for cluster aware rate limiting,
391-
// if the rate limiting feature flags are enabled, otherwise provide noop implementation
392-
InferenceServiceRateLimitCalculator calculator;
393-
if (INFERENCE_API_CLUSTER_AWARE_RATE_LIMITING_FEATURE_FLAG) {
394-
calculator = new InferenceServiceNodeLocalRateLimitCalculator(services.clusterService(), serviceRegistry);
395-
} else {
396-
calculator = new NoopNodeLocalRateLimitCalculator();
397-
}
398-
399-
// Add binding for interface -> implementation
400-
components.add(new PluginComponentBinding<>(InferenceServiceRateLimitCalculator.class, calculator));
401-
402385
components.add(
403386
new InferenceEndpointRegistry(
404387
services.clusterService(),

0 commit comments

Comments
 (0)