Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ public void createConnector() {
.name("test")
.description("description")
.version("testModelVersion")
.protocol("testProtocol")
.protocol("http")
.parameters(params)
.credential(credentials)
.actions(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,7 @@ public void createConnector() {
.name("test")
.description("description")
.version("testModelVersion")
.protocol("testProtocol")
.protocol("http")
.parameters(params)
.credential(credentials)
.actions(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -122,7 +123,7 @@ private void validate() {
for (MLToolSpec toolSpec : tools) {
String toolName = Optional.ofNullable(toolSpec.getName()).orElse(toolSpec.getType());
if (toolNames.contains(toolName)) {
throw new IllegalArgumentException("Duplicate tool defined: " + toolName);
throw new IllegalArgumentException("Duplicate tool defined in agent configuration");
} else {
toolNames.add(toolName);
}
Expand All @@ -138,7 +139,7 @@ private void validateMLAgentType(String agentType) {
MLAgentType.valueOf(agentType.toUpperCase(Locale.ROOT)); // Use toUpperCase() to allow case-insensitive matching
} catch (IllegalArgumentException e) {
// The typeStr does not match any MLAgentType, so throw a new exception with a clearer message.
throw new IllegalArgumentException(agentType + " is not a valid Agent Type");
throw new IllegalArgumentException("Invalid Agent Type, Please use one of " + Arrays.toString(MLAgentType.values()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.ml.common.CommonValue;
import org.opensearch.ml.common.connector.ConnectorAction;
import org.opensearch.ml.common.connector.ConnectorClientConfig;
import org.opensearch.ml.common.connector.ConnectorProtocols;

import lombok.Builder;
import lombok.Data;
Expand Down Expand Up @@ -121,6 +122,7 @@ public MLCreateConnectorInput(
if ((url == null || url.isBlank()) && isMcpConnector) {
throw new IllegalArgumentException("MCP Connector url is null or blank");
}
ConnectorProtocols.validateProtocol(protocol);
}
this.name = name;
this.description = description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -114,7 +115,7 @@ public void constructor_NullLLMSpec() {
@Test
public void constructor_DuplicateTool() {
exceptionRule.expect(IllegalArgumentException.class);
exceptionRule.expectMessage("Duplicate tool defined: test");
exceptionRule.expectMessage("Duplicate tool defined in agent configuration");

MLAgent agent = new MLAgent(
"test_name",
Expand Down Expand Up @@ -353,7 +354,7 @@ public void fromStream() throws IOException {
@Test
public void constructor_InvalidAgentType() {
exceptionRule.expect(IllegalArgumentException.class);
exceptionRule.expectMessage(" is not a valid Agent Type");
exceptionRule.expectMessage("Invalid Agent Type, Please use one of " + Arrays.toString(MLAgentType.values()));

new MLAgent(
"test_name",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static org.junit.Assert.assertTrue;
import static org.opensearch.ml.common.connector.ConnectorProtocols.MCP_SSE;
import static org.opensearch.ml.common.connector.ConnectorProtocols.MCP_STREAMABLE_HTTP;
import static org.opensearch.ml.common.connector.ConnectorProtocols.VALID_PROTOCOLS;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -169,6 +170,29 @@ public void constructorMLCreateConnectorInput_NullProtocol() {
assertEquals("Connector protocol is null", exception.getMessage());
}

@Test
public void constructorMLCreateConnectorInput_InvalidProtocol() {
Throwable exception = assertThrows(IllegalArgumentException.class, () -> {
MLCreateConnectorInput
.builder()
.name(TEST_CONNECTOR_NAME)
.description(TEST_CONNECTOR_DESCRIPTION)
.version(TEST_CONNECTOR_VERSION)
.protocol("dummy")
.parameters(Map.of(TEST_PARAM_KEY, TEST_PARAM_VALUE))
.credential(Map.of(TEST_CREDENTIAL_KEY, TEST_CREDENTIAL_VALUE))
.actions(List.of())
.access(AccessMode.PUBLIC)
.backendRoles(Arrays.asList(TEST_ROLE1, TEST_ROLE2))
.addAllBackendRoles(false)
.build();
});
assertEquals(
"Unsupported connector protocol. Please use one of " + Arrays.toString(VALID_PROTOCOLS.toArray(new String[0])),
exception.getMessage()
);
}

@Test
public void constructorMLCreateConnectorInput_NullCredential() {
Throwable exception = assertThrows(IllegalArgumentException.class, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ public static List<String> getToolNames(Map<String, Tool> tools) {

public static Tool createTool(Map<String, Tool.Factory> toolFactories, Map<String, String> executeParams, MLToolSpec toolSpec) {
if (!toolFactories.containsKey(toolSpec.getType())) {
throw new IllegalArgumentException("Tool not found: " + toolSpec.getType());
throw new IllegalArgumentException("Tool type not found");
}
Map<String, Object> toolParams = new HashMap<>();
toolParams.putAll(executeParams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.security.PrivilegedExceptionAction;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -654,7 +655,7 @@ protected MLAgentRunner getAgentRunner(MLAgent mlAgent) {
encryptor
);
default:
throw new IllegalArgumentException("Unsupported agent type: " + mlAgent.getType());
throw new IllegalArgumentException("Unsupported agent type. Please use one of " + Arrays.toString(MLAgentType.values()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,15 @@ public static FunctionCalling create(String llmInterface) {
case LLM_INTERFACE_BEDROCK_CONVERSE_DEEPSEEK_R1:
return new BedrockConverseDeepseekR1FunctionCalling();
default:
throw new IllegalArgumentException(String.format("Invalid _llm_interface: %s", llmInterface));
throw new IllegalArgumentException(
String
.format(
"Invalid _llm_interface. Supported values are %s,%s,%s",
LLM_INTERFACE_BEDROCK_CONVERSE_CLAUDE,
LLM_INTERFACE_OPENAI_V1_CHAT_COMPLETIONS,
LLM_INTERFACE_BEDROCK_CONVERSE_DEEPSEEK_R1
)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private void registerAgent(MLAgent agent, ActionListener<MLRegisterAgentResponse
try {
FunctionCallingFactory.create(llmInterface);
} catch (Exception e) {
listener.onFailure(new IllegalArgumentException("Invalid _llm_interface: " + llmInterface));
listener.onFailure(new IllegalArgumentException("Invalid _llm_interface"));
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<MLMcpT
.map(McpToolRegisterInput::getName)
.filter(registerToolNames::contains)
.toList();
String exceptionMessage = String
.format(Locale.ROOT, "Unable to register tools: %s as they already exist", existingTools);
String exceptionMessage = "Unable to register tool: a tool with the same name already exists.";
log.warn(exceptionMessage);
restoreListener.onFailure(new IllegalArgumentException(exceptionMessage));
} else {
Expand Down Expand Up @@ -192,16 +191,18 @@ private void indexMcpTools(
errMsgBuilder.append("\n");
}
log.error(errMsgBuilder.toString());
StringBuilder respErrMsgBuilder = new StringBuilder(
String.format("Failed to persist %s mcp tool(s) into system index", indexFailedTools.get().size())
);
if (!indexSucceedTools.get().isEmpty()) {
registerMcpToolsOnNodes(
errMsgBuilder,
respErrMsgBuilder,
updateVersion(registerNodesRequest, bulkResponse),
indexSucceedTools.get(),
restoreListener
);
} else {
restoreListener
.onFailure(new OpenSearchException(errMsgBuilder.deleteCharAt(errMsgBuilder.length() - 1).toString()));
restoreListener.onFailure(new OpenSearchException(respErrMsgBuilder.toString()));
}
}
}, e -> {
Expand Down Expand Up @@ -248,6 +249,7 @@ private void registerMcpToolsOnNodes(
Set<String> indexSucceedTools,
ActionListener<MLMcpToolsRegisterNodesResponse> listener
) {
StringBuilder respErrBuilder = new StringBuilder(errMsgBuilder.toString());
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
ActionListener<MLMcpToolsRegisterNodesResponse> restoreListener = ActionListener.runBefore(listener, context::restore);
ActionListener<MLMcpToolsRegisterNodesResponse> addToMemoryResultListener = ActionListener.wrap(r -> {
Expand All @@ -267,13 +269,13 @@ private void registerMcpToolsOnNodes(
});
errMsgBuilder.deleteCharAt(errMsgBuilder.length() - 1);
log.error(errMsgBuilder.toString());
restoreListener.onFailure(new OpenSearchException(errMsgBuilder.toString()));
respErrBuilder.append("Tools are persisted successfully but failed to register to mcp server memory");
restoreListener.onFailure(new OpenSearchException(respErrBuilder.toString()));
} else {
if (errMsgBuilder.isEmpty()) {
restoreListener.onResponse(r);
} else {
restoreListener
.onFailure(new OpenSearchException(errMsgBuilder.deleteCharAt(errMsgBuilder.length() - 1).toString()));
restoreListener.onFailure(new OpenSearchException(respErrBuilder.toString()));
}
}
}, e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,8 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<MLMcpT
});
// If any to update tool not found, return error.
if (!updateToolSet.isEmpty()) {
String errMsg = String.format("Failed to find tools: %s in system index", updateToolSet);
log.warn(errMsg);
restoreListener.onFailure(new OpenSearchException(errMsg));
log.warn("Failed to find tools: {} in system index", updateToolSet);
restoreListener.onFailure(new OpenSearchException("Failed to find one or more requested tools in system index"));
} else {
updateMcpTools(updateNodesRequest, searchedMcpToolWrappers, restoreListener);
}
Expand Down Expand Up @@ -210,16 +209,18 @@ private void updateMcpTools(
errMsgBuilder.append("\n");
}
log.error(errMsgBuilder.toString());
StringBuilder responseErrorBuilder = new StringBuilder(
String.format("Failed to update %d tool(s) in system index", updateFailedTools.get().size())
);
if (!updateSucceedTools.get().isEmpty()) {
updateMcpToolsOnNodes(
errMsgBuilder,
responseErrorBuilder,
mergeDocFields(updateNodesRequest, searchedMcpToolWrappers, bulkResponse),
updateSucceedTools.get(),
restoreListener
);
} else {
restoreListener
.onFailure(new OpenSearchException(errMsgBuilder.deleteCharAt(errMsgBuilder.length() - 1).toString()));
restoreListener.onFailure(new OpenSearchException(responseErrorBuilder.toString()));
}
}
}, e -> {
Expand Down Expand Up @@ -289,6 +290,7 @@ private void updateMcpToolsOnNodes(
Set<String> indexSucceedTools,
ActionListener<MLMcpToolsUpdateNodesResponse> listener
) {
StringBuilder respErrMsgBuilder = new StringBuilder(errMsgBuilder.toString());
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
ActionListener<MLMcpToolsUpdateNodesResponse> restoreListener = ActionListener.runBefore(listener, context::restore);
ActionListener<MLMcpToolsUpdateNodesResponse> addToMemoryResultListener = ActionListener.wrap(r -> {
Expand All @@ -308,13 +310,13 @@ private void updateMcpToolsOnNodes(
});
errMsgBuilder.deleteCharAt(errMsgBuilder.length() - 1);
log.error(errMsgBuilder.toString());
restoreListener.onFailure(new OpenSearchException(errMsgBuilder.toString()));
respErrMsgBuilder.append("Tools are updated successfully, but failed to update to mcp server memory");
restoreListener.onFailure(new OpenSearchException(respErrMsgBuilder.toString()));
} else {
if (errMsgBuilder.isEmpty()) {
restoreListener.onResponse(r);
} else {
restoreListener
.onFailure(new OpenSearchException(errMsgBuilder.deleteCharAt(errMsgBuilder.length() - 1).toString()));
restoreListener.onFailure(new OpenSearchException(respErrMsgBuilder.toString()));
}
}
}, e -> {
Expand All @@ -328,7 +330,8 @@ private void updateMcpToolsOnNodes(
)
);
log.error(errMsgBuilder.toString(), e);
restoreListener.onFailure(new OpenSearchException(errMsgBuilder.toString()));
respErrMsgBuilder.append("Tools are updated successfully, but failed to update to mcp server memory");
restoreListener.onFailure(new OpenSearchException(respErrMsgBuilder.toString()));
});
client.execute(MLMcpToolsUpdateOnNodesAction.INSTANCE, toolsUpdateNodesRequest, addToMemoryResultListener);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
.filter(type -> !buildInToolNames.contains(type))
.collect(Collectors.toSet());
if (!unrecognizedTools.isEmpty()) {
exception.addValidationError(String.format(Locale.ROOT, "Unrecognized tool in request: %s", unrecognizedTools));
exception.addValidationError("Unrecognized tool in request");
throw exception;
}
return channel -> client.execute(MLMcpToolsRegisterAction.INSTANCE, registerNodesRequest, new RestToXContentListener<>(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ public void test_execute_registerAgent_InvalidLlmInterface() {

ArgumentCaptor<IllegalArgumentException> argumentCaptor = ArgumentCaptor.forClass(IllegalArgumentException.class);
verify(actionListener).onFailure(argumentCaptor.capture());
assertTrue(argumentCaptor.getValue().getMessage().contains("Invalid _llm_interface: invalid_interface"));
assertTrue(argumentCaptor.getValue().getMessage().contains("Invalid _llm_interface"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void test_doExecute_toolExists() {
transportMcpToolsRegisterAction.doExecute(task, nodesRequest, listener);
ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener).onFailure(argumentCaptor.capture());
assertEquals("Unable to register tools: [ListIndexTool] as they already exist", argumentCaptor.getValue().getMessage());
assertEquals("Unable to register tool: a tool with the same name already exists.", argumentCaptor.getValue().getMessage());
}

public void test_doExecute_bulkIndexAllFailed() {
Expand Down Expand Up @@ -280,10 +280,7 @@ public void test_doExecute_bulkIndexAllFailed() {
transportMcpToolsRegisterAction.doExecute(task, nodesRequest, listener);
ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener).onFailure(argumentCaptor.capture());
assertEquals(
"Failed to persist mcp tool: ListIndexTool into system index with error: java.lang.RuntimeException: Network issue",
argumentCaptor.getValue().getMessage()
);
assertEquals("Failed to persist 1 mcp tool(s) into system index", argumentCaptor.getValue().getMessage());
}

public void test_doExecute_bulkIndexPartialFailed() {
Expand Down Expand Up @@ -323,10 +320,7 @@ public void test_doExecute_bulkIndexPartialFailed() {
transportMcpToolsRegisterAction.doExecute(task, nodesRequest, listener);
ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener).onFailure(argumentCaptor.capture());
assertEquals(
"Failed to persist mcp tool: ListIndexTool into system index with error: java.lang.RuntimeException: Network issue",
argumentCaptor.getValue().getMessage()
);
assertEquals("Failed to persist 1 mcp tool(s) into system index", argumentCaptor.getValue().getMessage());
}

public void test_doExecute_registerOnNodeHasFailure() {
Expand All @@ -350,7 +344,7 @@ public void test_doExecute_registerOnNodeHasFailure() {
ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener).onFailure(argumentCaptor.capture());
assertEquals(
"Tools: [ListIndexTool] are persisted successfully but failed to register to mcp server memory with error: Network issue",
"Tools are persisted successfully but failed to register to mcp server memory",
argumentCaptor.getValue().getMessage()
);
}
Expand Down
Loading
Loading