Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/light-readers-jam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"agents": patch
---

fix: Oauth2 client flow
196 changes: 166 additions & 30 deletions packages/agents/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,25 +493,11 @@ export class Agent<
return agentContext.run(
{ agent: this, connection: undefined, request, email: undefined },
async () => {
if (this.mcp.isCallbackRequest(request)) {
const result = await this.mcp.handleCallbackRequest(request);
this.broadcastMcpServers();

if (result.authSuccess) {
// Start background connection if auth was successful
this.mcp
.establishConnection(result.serverId)
.catch((error) => {
console.error("Background connection failed:", error);
})
.finally(() => {
// Broadcast after background connection resolves (success/failure)
this.broadcastMcpServers();
});
}

// Handle OAuth callback response using MCPClientManager configuration
return this.handleOAuthCallbackResponse(result, request);
// Check if this is an OAuth callback and restore state if needed
const callbackResult =
await this._handlePotentialOAuthCallback(request);
if (callbackResult) {
return callbackResult;
}

return this._tryCatch(() => _onRequest(request));
Expand Down Expand Up @@ -1458,32 +1444,182 @@ export class Agent<

const callbackUrl = `${resolvedCallbackHost}/${agentsPrefix}/${camelCaseToKebabCase(this._ParentClass.name)}/${this.name}/callback`;

const result = await this._connectToMcpServerInternal(
serverName,
url,
callbackUrl,
options
);
// Generate a serverId upfront
const serverId = nanoid(8);

// Persist to database BEFORE starting OAuth flow to survive DO hibernation
this.sql`
INSERT
OR REPLACE INTO cf_agents_mcp_servers (id, name, server_url, client_id, auth_url, callback_url, server_options)
INSERT OR REPLACE INTO cf_agents_mcp_servers (id, name, server_url, client_id, auth_url, callback_url, server_options)
VALUES (
${result.id},
${serverId},
${serverName},
${url},
${result.clientId ?? null},
${result.authUrl ?? null},
${null},
${null},
${callbackUrl},
${options ? JSON.stringify(options) : null}
);
`;

// _connectToMcpServerInternal will call mcp.connect which registers the callback URL
const result = await this._connectToMcpServerInternal(
serverName,
url,
callbackUrl,
options,
{ id: serverId }
);

// Update database with OAuth client info if auth is required
if (result.clientId || result.authUrl) {
this.sql`
UPDATE cf_agents_mcp_servers
SET client_id = ${result.clientId ?? null}, auth_url = ${result.authUrl ?? null}
WHERE id = ${serverId}
`;
}

this.broadcastMcpServers();

return result;
}

/**
* Handle potential OAuth callback requests after DO hibernation.
* Detects OAuth callbacks, restores state from database, and processes the callback.
* Returns a Response if this was an OAuth callback, otherwise returns undefined.
*/
private async _handlePotentialOAuthCallback(
request: Request
): Promise<Response | undefined> {
// Quick check: must be GET with callback pattern and code parameter
if (request.method !== "GET") {
return undefined;
}

const url = new URL(request.url);
const hasCallbackPattern =
url.pathname.includes("/callback/") && url.searchParams.has("code");

if (!hasCallbackPattern) {
return undefined;
}

// Extract serverId from callback URL
const pathParts = url.pathname.split("/");
const callbackIndex = pathParts.indexOf("callback");
const serverId = callbackIndex !== -1 ? pathParts[callbackIndex + 1] : null;

if (!serverId) {
return new Response("Invalid callback URL: missing serverId", {
status: 400
});
}

// Check if callback is already registered AND connection exists (not hibernated)
if (
this.mcp.isCallbackRequest(request) &&
this.mcp.mcpConnections[serverId]
) {
// State already restored, handle normally
return this._processOAuthCallback(request);
}

// Need to restore from database after hibernation
try {
const server = this.sql<MCPServerRow>`
SELECT id, name, server_url, client_id, auth_url, callback_url, server_options
FROM cf_agents_mcp_servers
WHERE id = ${serverId}
`.find((s) => s.id === serverId);

if (!server) {
return new Response(
`OAuth callback failed: Server ${serverId} not found in database`,
{ status: 404 }
);
}

// Register callback URL (restores it after hibernation)
if (!server.callback_url) {
return new Response(
`OAuth callback failed: No callback URL stored for server ${serverId}`,
{ status: 500 }
);
}

this.mcp.registerCallbackUrl(`${server.callback_url}/${server.id}`);

// Restore connection if not in memory
if (!this.mcp.mcpConnections[serverId]) {
let parsedOptions:
| {
client?: ConstructorParameters<typeof Client>[1];
transport?: {
headers?: HeadersInit;
type?: TransportType;
};
}
| undefined;
try {
parsedOptions = server.server_options
? JSON.parse(server.server_options)
: undefined;
} catch {
return new Response(
`OAuth callback failed: Invalid server options in database for ${serverId}`,
{ status: 500 }
);
}

await this._connectToMcpServerInternal(
server.name,
server.server_url,
server.callback_url,
parsedOptions,
{
id: server.id,
oauthClientId: server.client_id ?? undefined
}
);
}

// Now process the OAuth callback
return this._processOAuthCallback(request);
} catch (error) {
const errorMsg = error instanceof Error ? error.message : "Unknown error";
console.error(`Failed to restore MCP state for ${serverId}:`, error);
return new Response(
`OAuth callback failed during state restoration: ${errorMsg}`,
{ status: 500 }
);
}
}

/**
* Process an OAuth callback request (assumes state is already restored)
*/
private async _processOAuthCallback(request: Request): Promise<Response> {
const result = await this.mcp.handleCallbackRequest(request);
this.broadcastMcpServers();

if (result.authSuccess) {
// Start background connection if auth was successful
this.mcp
.establishConnection(result.serverId)
.catch((error) => {
console.error("Background connection failed:", error);
})
.finally(() => {
// Broadcast after background connection resolves (success/failure)
this.broadcastMcpServers();
});
}

// Handle OAuth callback response using MCPClientManager configuration
return this.handleOAuthCallbackResponse(result, request);
}

private async _connectToMcpServerInternal(
_serverName: string,
url: string,
Expand Down
Loading
Loading