Skip to content

Conversation

jerryyummy
Copy link

@jerryyummy jerryyummy commented Sep 15, 2025

Fixes #5208

Motivation

Explain the content here.
Explain why you want to make the changes and what problem you're trying to solve.

The motivation of this PR is to improve the MCP protocol integration in EventMesh.
Previously, the implementation relied on SSE-based streaming and custom queue management, which led to:
Inefficient multi-thread handling in SinkConnector, with difficulties in graceful shutdown.
Limitations of SSE for long-lived connections and client discovery.
Inconsistent position/offset management, causing unstable message delivery and retries.
Redundant code in MCP server implementation, since Netty-based server duplicated existing HTTP server capabilities.

This PR aims to address those issues by refactoring the architecture to:
Adopt Streamable HTTP instead of SSE, making it easier for clients (e.g., Cursor) to discover and connect.
Replace self-managed queue with standard ExecutorService, improving throughput and supporting graceful exit.
Enhance position management to ensure at-least-once semantics and reliable offset commit.
Reuse existing HTTP server for MCP server, avoiding duplicate logic and improving maintainability.

Modifications

Describe the modifications you've done.
Implemented ProtocolFactory and McpStandardProtocol to unify MCP protocol handling.
Added McpSourceConnector, McpRequest, McpResponse, and MultiMcpRequestContext to manage request/response lifecycle.
Introduced McpSinkConnector and McpSinkHandler abstraction, with CommonMcpSinkHandler and AbstractMcpSinkHandler supporting Round-Robin and Broadcast strategies.
Refactored threading model: replaced custom queue with ExecutorService in McpSinkConnector for task submission and graceful shutdown.
Added retry mechanism with McpAttemptEvent and DLQ support for failed records.
Defined export data structures: McpConnectRecord, McpExportRecord, McpExportRecordPage, and McpExportMetadata.
Migrated MCP server from a Netty-only server to reuse Vert.x HTTP server, simplifying request handling and routing.
Prepared support for Streamable HTTP transport, aligning with the new MCP specification.

Documentation

  • Does this pull request introduce a new feature? (yes / no) yes
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) docs
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Welcome to the Apache EventMesh community!!
This is your first PR in our project. We're very excited to have you onboard contributing. Your contributions are greatly appreciated!

Please make sure that the changes are covered by tests.
We will be here shortly.
Let us know if you need any help!

Want to get closer to the community?

WeChat Assistant WeChat Public Account Slack
Join Slack Chat

Mailing Lists:

Name Description Subscribe Unsubscribe Archive
Users User support and questions mailing list Subscribe Unsubscribe Mail Archives
Development Development related discussions Subscribe Unsubscribe Mail Archives
Commits All commits to repositories Subscribe Unsubscribe Mail Archives
Issues Issues or PRs comments and reviews Subscribe Unsubscribe Mail Archives

@qqeasonchen
Copy link
Contributor

@jerryyummy please create a issue and give your proposal first, let's have some discussion

settings.gradle Outdated
include 'eventmesh-protocol-plugin:eventmesh-protocol-http'
include 'eventmesh-protocol-plugin:eventmesh-protocol-grpc'
include 'eventmesh-protocol-plugin:eventmesh-protocol-grpcmessage'
include 'eventmesh-protocol-plugin:eventmesh-protocol-mcp'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why you add this module, this module didn't used in your pr

import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class AbstractMCPServer extends AbstractRemotingServer{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any differences between the implementation of AbstractMCPServer\EventMeshMCPServer\EventMeshMcpBootstrap and HTTP server except for protocol parsing and distribution? Can HTTP server be reused?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually no difference, so i may reuse http server instead

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
this.sinkHandler.stop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop the ThreadPoolExecutor first


@Override
public void onException(ConnectRecord record) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion:
exception handling or error logging need


@Override
public void commit(ConnectRecord record) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question:
why do nothing?

Thread.currentThread().interrupt();
}
this.sinkHandler.stop();
log.info("All tasks completed, start shut down mcp sink connector");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question:
typo? "start shut down" or "finish"?

@EqualsAndHashCode(callSuper = true)
public class McpSourceConfig extends SourceConfig {

public SourceConnectorConfig connectorConfig;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion:
can merge SourceConnectorConfig with McpSourceConfig ?

log.warn("ConnectRecord data is null, ignore.");
continue;
}
queue.put(sinkRecord);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion:
Use the executor's built-in queue instead of a custom queue. Prefer executor.submit() over queue.put().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] develop mcp protocol

4 participants