-
Notifications
You must be signed in to change notification settings - Fork 641
Develop mcp #5203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Develop mcp #5203
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Welcome to the Apache EventMesh community!!
This is your first PR in our project. We're very excited to have you onboard contributing. Your contributions are greatly appreciated!
Please make sure that the changes are covered by tests.
We will be here shortly.
Let us know if you need any help!
Want to get closer to the community?
WeChat Assistant | WeChat Public Account | Slack |
---|---|---|
![]() |
![]() |
Join Slack Chat |
Mailing Lists:
Name | Description | Subscribe | Unsubscribe | Archive |
---|---|---|---|---|
Users | User support and questions mailing list | Subscribe | Unsubscribe | Mail Archives |
Development | Development related discussions | Subscribe | Unsubscribe | Mail Archives |
Commits | All commits to repositories | Subscribe | Unsubscribe | Mail Archives |
Issues | Issues or PRs comments and reviews | Subscribe | Unsubscribe | Mail Archives |
@jerryyummy please create a issue and give your proposal first, let's have some discussion |
settings.gradle
Outdated
include 'eventmesh-protocol-plugin:eventmesh-protocol-http' | ||
include 'eventmesh-protocol-plugin:eventmesh-protocol-grpc' | ||
include 'eventmesh-protocol-plugin:eventmesh-protocol-grpcmessage' | ||
include 'eventmesh-protocol-plugin:eventmesh-protocol-mcp' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why you add this module, this module didn't used in your pr
import lombok.extern.slf4j.Slf4j; | ||
|
||
@Slf4j | ||
public abstract class AbstractMCPServer extends AbstractRemotingServer{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any differences between the implementation of AbstractMCPServer\EventMeshMCPServer\EventMeshMcpBootstrap and HTTP server except for protocol parsing and distribution? Can HTTP server be reused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually no difference, so i may reuse http server instead
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
this.sinkHandler.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stop the ThreadPoolExecutor first
|
||
@Override | ||
public void onException(ConnectRecord record) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion:
exception handling or error logging need
|
||
@Override | ||
public void commit(ConnectRecord record) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question:
why do nothing?
Thread.currentThread().interrupt(); | ||
} | ||
this.sinkHandler.stop(); | ||
log.info("All tasks completed, start shut down mcp sink connector"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question:
typo? "start shut down" or "finish"?
@EqualsAndHashCode(callSuper = true) | ||
public class McpSourceConfig extends SourceConfig { | ||
|
||
public SourceConnectorConfig connectorConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion:
can merge SourceConnectorConfig
with McpSourceConfig
?
log.warn("ConnectRecord data is null, ignore."); | ||
continue; | ||
} | ||
queue.put(sinkRecord); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion:
Use the executor's built-in queue instead of a custom queue. Prefer executor.submit()
over queue.put()
.
Fixes #5208
Motivation
Explain the content here.
Explain why you want to make the changes and what problem you're trying to solve.
The motivation of this PR is to improve the MCP protocol integration in EventMesh.
Previously, the implementation relied on SSE-based streaming and custom queue management, which led to:
Inefficient multi-thread handling in SinkConnector, with difficulties in graceful shutdown.
Limitations of SSE for long-lived connections and client discovery.
Inconsistent position/offset management, causing unstable message delivery and retries.
Redundant code in MCP server implementation, since Netty-based server duplicated existing HTTP server capabilities.
This PR aims to address those issues by refactoring the architecture to:
Adopt Streamable HTTP instead of SSE, making it easier for clients (e.g., Cursor) to discover and connect.
Replace self-managed queue with standard ExecutorService, improving throughput and supporting graceful exit.
Enhance position management to ensure at-least-once semantics and reliable offset commit.
Reuse existing HTTP server for MCP server, avoiding duplicate logic and improving maintainability.
Modifications
Describe the modifications you've done.
Implemented ProtocolFactory and McpStandardProtocol to unify MCP protocol handling.
Added McpSourceConnector, McpRequest, McpResponse, and MultiMcpRequestContext to manage request/response lifecycle.
Introduced McpSinkConnector and McpSinkHandler abstraction, with CommonMcpSinkHandler and AbstractMcpSinkHandler supporting Round-Robin and Broadcast strategies.
Refactored threading model: replaced custom queue with ExecutorService in McpSinkConnector for task submission and graceful shutdown.
Added retry mechanism with McpAttemptEvent and DLQ support for failed records.
Defined export data structures: McpConnectRecord, McpExportRecord, McpExportRecordPage, and McpExportMetadata.
Migrated MCP server from a Netty-only server to reuse Vert.x HTTP server, simplifying request handling and routing.
Prepared support for Streamable HTTP transport, aligning with the new MCP specification.
Documentation