Skip to content

Instantly share code, notes, and snippets.

@perfectbase
Last active June 17, 2025 15:52
Show Gist options
  • Select an option

  • Save perfectbase/d134c174e9e6c479727c160f6997f0e6 to your computer and use it in GitHub Desktop.

Select an option

Save perfectbase/d134c174e9e6c479727c160f6997f0e6 to your computer and use it in GitHub Desktop.

Revisions

  1. perfectbase revised this gist Apr 25, 2025. 2 changed files with 42 additions and 0 deletions.
    File renamed without changes.
    42 changes: 42 additions & 0 deletions mcp_route.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,42 @@
    import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
    import { toFetchResponse, toReqRes } from "fetch-to-node";
    import { Hono } from "hono";
    import { handle } from "hono/vercel";
    import { buildMcpServer } from "./_server";

    const app = new Hono().basePath("/api/mcp");

    app.post("/", async (c) => {
    const { req, res } = toReqRes(c.req.raw);

    const server = buildMcpServer();
    try {
    const transport = new StreamableHTTPServerTransport({
    sessionIdGenerator: undefined,
    });
    await server.connect(transport);
    await transport.handleRequest(req, res, await c.req.json());
    res.on("close", () => {
    console.log("Request closed");
    void transport.close();
    void server.close();
    });
    return toFetchResponse(res);
    } catch (e) {
    console.error(e);
    return c.json(
    {
    jsonrpc: "2.0",
    error: {
    code: -32603,
    message: "Internal server error",
    },
    id: null,
    },
    { status: 500 },
    );
    }
    });

    export const GET = handle(app);
    export const POST = handle(app);
  2. perfectbase revised this gist Apr 25, 2025. 1 changed file with 11 additions and 0 deletions.
    11 changes: 11 additions & 0 deletions route.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,11 @@
    //...
    const transport = new StreamableHTTPTransport(
    createStreamableHTTPTransportConfig("http://localhost:3000/mcp"),
    );

    mcpClient = await experimental_createMCPClient({
    transport,
    });

    const mcpTools = await mcpClient.tools();
    //...
  3. perfectbase created this gist Apr 25, 2025.
    438 changes: 438 additions & 0 deletions streamable-http-transport.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,438 @@
    import { MCPClientError, type JSONRPCMessage, type MCPTransport } from "ai";

    export interface StreamableHTTPTransportConfig {
    type: "streamable-http";
    url: string;
    fetch?: typeof fetch; // Allow providing custom fetch implementation
    }

    /**
    * This is a temporary transport until the ai-sdk supports StreamableHTTP.
    */
    export class StreamableHTTPTransport implements MCPTransport {
    private url: string;
    private fetchImplementation: typeof fetch;
    private eventSource: EventSource | null = null;
    private sessionId: string | null = null;
    private isClosed = true;
    private abortController: AbortController | null = null;

    onclose?: () => void;
    onerror?: (error: Error) => void;
    onmessage?: (message: JSONRPCMessage) => void;

    constructor(config: StreamableHTTPTransportConfig) {
    this.url = config.url;
    this.fetchImplementation = config.fetch ?? fetch;
    }

    async start(): Promise<void> {
    if (!this.isClosed) {
    throw new MCPClientError({ message: "Transport already started" });
    }
    this.isClosed = false;
    this.abortController = new AbortController();

    // Streamable HTTP doesn't require an initial GET like SSE.
    // The initialization happens with the first POST request sent via send()
    // (typically the 'initialize' request from the MCPClient).
    // The server will then respond, potentially including the session ID.
    // The client *may* make a GET request later if it needs to receive server-initiated messages.
    // For now, we assume the primary flow is client request -> server response via POST.
    // A dedicated GET stream will be established if needed.
    console.log(
    "StreamableHTTPTransport started. Ready to send initial request.",
    );
    }

    private async establishReceiveStream(): Promise<void> {
    if (this.eventSource || this.isClosed) {
    console.log(
    `[GET Stream] Skipping establish: eventSource=${!!this.eventSource}, isClosed=${this.isClosed}`,
    );
    return; // Already established or closed
    }
    console.log("[GET Stream] Attempting to establish...");

    const headers: HeadersInit = {
    Accept: "text/event-stream",
    };
    if (this.sessionId) {
    headers["Mcp-Session-Id"] = this.sessionId;
    }

    try {
    // Use fetch for SSE - EventSource has limitations (e.g., custom headers on initial req)
    const response = await this.fetchImplementation(this.url, {
    method: "GET",
    headers: headers,
    signal: this.abortController?.signal,
    cache: "no-store",
    });

    console.log(`[GET Stream] Fetch response status: ${response.status}`);
    if (!response.ok) {
    const errorText = await response.text();
    throw new MCPClientError({
    message: `Failed to establish SSE stream: ${response.status} ${response.statusText} - ${errorText}`,
    });
    }
    if (!response.body) {
    throw new MCPClientError({
    message: "SSE stream response has no body",
    });
    }

    console.log("[GET Stream] Established successfully.");
    // Process the stream
    void this.processSSEStream(response.body, "GET");
    } catch (error) {
    console.error("[GET Stream] Error establishing stream:", error);
    this.handleError(
    error instanceof Error ? error : new Error(String(error)),
    );
    await this.close(); // Close transport if stream fails
    }
    }

    private async processSSEStream(
    stream: ReadableStream<Uint8Array>,
    streamType: "POST" | "GET",
    ): Promise<void> {
    const reader = stream.getReader();
    const decoder = new TextDecoder();
    let buffer = "";
    console.log(`[Process ${streamType} Stream] Starting reader loop...`);

    try {
    while (!this.isClosed) {
    const { done, value } = await reader.read();
    if (done) {
    console.log(
    `[Process ${streamType} Stream] Stream finished (done = true).`,
    );
    break;
    }

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split("\n");

    // Keep the last potentially incomplete line in the buffer
    buffer = lines.pop() ?? "";

    for (const line of lines) {
    if (line.trim() === "") {
    // Empty line signifies end of an event
    if (buffer.startsWith("data:")) {
    // Check if we actually have data
    const data = buffer.substring(5).trim();
    try {
    const message = JSON.parse(data) as JSONRPCMessage;
    console.log(
    `[Process ${streamType} Stream] Received message via SSE:`,
    message,
    );
    this.onmessage?.(message);
    } catch (parseError) {
    this.handleError(
    new MCPClientError({
    message: `Failed to parse SSE message data: ${data}`,
    cause: parseError,
    }),
    );
    }
    }
    // Reset for next event
    buffer = "";
    continue; // Move to next line
    }

    if (line.startsWith("event:")) {
    const eventType = line.substring(6).trim();
    if (eventType !== "message") {
    console.warn(`Received unknown SSE event type: ${eventType}`);
    // If we get an unexpected event, we might need to reset buffer
    buffer = "";
    }
    } else if (line.startsWith("data:")) {
    // Append data line to buffer (strip 'data:' prefix)
    // If buffer already has data, add newline back (since split removed it)
    buffer += (buffer ? "\n" : "") + line.substring(5).trim();
    } else {
    // Ignore comments and other lines
    }
    }
    // After processing all complete lines, the remaining buffer (incomplete line)
    // is carried over to the next read() iteration.
    }
    } catch (error) {
    if (!this.isClosed) {
    // Avoid erroring if closed intentionally
    console.error(
    `[Process ${streamType} Stream] Error reading from stream:`,
    error,
    );
    this.handleError(
    error instanceof Error ? error : new Error(String(error)),
    );
    }
    } finally {
    // Do not close the transport here just because the stream ended.
    // Closing should be handled by explicit calls to this.close() or errors during reading.
    // if (!this.isClosed) {
    // await this.close(); // Ensure cleanup if stream ends unexpectedly
    // }
    reader.releaseLock();
    console.log(`[Process ${streamType} Stream] Reader released.`);
    }
    }

    async send(message: JSONRPCMessage): Promise<void> {
    if (this.isClosed) {
    throw new MCPClientError({ message: "Transport is closed" });
    }

    const headers: HeadersInit = {
    "Content-Type": "application/json",
    Accept: "application/json, text/event-stream", // Client must accept both
    };

    if (this.sessionId) {
    headers["Mcp-Session-Id"] = this.sessionId;
    }

    console.log("Sending POST message:", message);
    try {
    const response = await this.fetchImplementation(this.url, {
    method: "POST",
    headers: headers,
    body: JSON.stringify(message),
    signal: this.abortController?.signal,
    });

    // Store session ID from the first successful response (usually initialize)
    if (!this.sessionId) {
    const serverSessionId = response.headers.get("Mcp-Session-Id");
    if (serverSessionId) {
    this.sessionId = serverSessionId;
    console.log(`Session ID received: ${this.sessionId}`);
    // Now that we have a session ID, establish the GET stream for server messages
    await this.establishReceiveStream();
    } else if ("method" in message && message.method === "initialize") {
    // Initialize should return a session ID if server is stateful
    console.warn(
    "Initialize response did not contain Mcp-Session-Id header.",
    );
    }
    }

    if (!response.ok) {
    const errorText = await response.text();
    const errorResponse = this.tryParseErrorResponse(errorText);
    throw new MCPClientError({
    message: `Server responded with error: ${response.status} ${response.statusText}`,
    cause: errorResponse ?? errorText, // Include parsed error or raw text
    });
    }

    const contentType = response.headers.get("content-type");

    // Handle different response types based on Content-Type
    // Prioritize JSON, as it's the expected response for initialize in stateless mode.
    if (contentType?.includes("application/json")) {
    const responseBody = (await response.json()) as unknown; // Assign to unknown first
    console.log("Received JSON response:", responseBody);
    // Process single or batch JSON responses
    const messages = (
    Array.isArray(responseBody) ? responseBody : [responseBody]
    ) as unknown[];

    for (const msg of messages) {
    // Basic validation, MCPClient will do full parsing
    if (typeof msg === "object" && msg !== null && "jsonrpc" in msg) {
    this.onmessage?.(msg as JSONRPCMessage);
    } else {
    this.handleError(
    new MCPClientError({
    message: `Received invalid JSON message structure: ${JSON.stringify(msg)}`,
    }),
    );
    }
    }
    } else if (contentType?.includes("text/event-stream")) {
    // WORKAROUND: Server is incorrectly sending SSE for POST response even in stateless mode.
    // Assume the *first* data message contains the complete JSON response.
    console.log(
    "[WORKAROUND] Received unexpected SSE stream in POST response. Attempting to extract first message as JSON.",
    );
    if (!response.body) {
    throw new MCPClientError({
    message: "[WORKAROUND] SSE stream response has no body",
    });
    }

    // Extract only the first message
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = "";
    try {
    const { done, value } = await reader.read();
    if (!done && value) {
    buffer += decoder.decode(value, { stream: false }); // Decode the single chunk
    const lines = buffer.split("\n");
    for (const line of lines) {
    if (line.startsWith("data:")) {
    const data = line.substring(5).trim();
    try {
    const jsonRpcMessage = JSON.parse(data) as JSONRPCMessage;
    console.log(
    "[WORKAROUND] Extracted JSON message from first SSE data:",
    jsonRpcMessage,
    );
    this.onmessage?.(jsonRpcMessage);
    break; // Got the first message, stop processing lines from this chunk
    } catch (parseError) {
    this.handleError(
    new MCPClientError({
    message: `[WORKAROUND] Failed to parse first SSE message data: ${data}`,
    cause: parseError,
    }),
    );
    break;
    }
    }
    }
    }
    } catch (readError) {
    this.handleError(
    new MCPClientError({
    message: `[WORKAROUND] Error reading first chunk of SSE response`,
    cause: readError,
    }),
    );
    } finally {
    // Ensure the reader is released even if we only read one chunk
    reader.releaseLock();
    console.log(
    "[WORKAROUND] Released reader after extracting first SSE message.",
    );
    }
    } else if (response.status === 202) {
    // Accepted - Notification likely processed, no content expected.
    console.log("Received 202 Accepted.");
    } else {
    // Handle other potential valid responses or unexpected content types
    const responseText = await response.text();
    console.warn(
    `Received unexpected Content-Type: ${contentType} or status ${response.status}. Body: ${responseText}`,
    );
    // If it looks like JSON-RPC, try parsing it anyway.
    const potentialMessage = this.tryParseJsonRpc(responseText);
    if (potentialMessage) {
    this.onmessage?.(potentialMessage);
    }
    }
    } catch (error) {
    console.error("Error sending message:", error);
    this.handleError(
    error instanceof Error ? error : new Error(String(error)),
    );
    // Optional: Decide if send errors should close the transport
    // await this.close();
    }
    }

    async close(): Promise<void> {
    if (this.isClosed) {
    return;
    }
    this.isClosed = true;
    console.log("Closing StreamableHTTPTransport...");

    this.abortController?.abort(); // Abort ongoing fetch requests/streams
    this.eventSource?.close(); // Close explicit EventSource if used (though fetch stream is preferred)
    this.eventSource = null;

    // Send DELETE request to terminate session on the server if we have a session ID
    if (this.sessionId) {
    console.log(`Sending DELETE to terminate session: ${this.sessionId}`);
    try {
    const headers: HeadersInit = { "Mcp-Session-Id": this.sessionId };
    const response = await this.fetchImplementation(this.url, {
    method: "DELETE",
    headers: headers,
    });
    if (!response.ok) {
    console.warn(
    `Failed to terminate session on server: ${response.status} ${response.statusText}`,
    );
    } else {
    console.log("Session terminated successfully on server.");
    }
    } catch (error) {
    console.warn("Error sending DELETE request:", error);
    // Don't propagate error during close, just warn
    }
    }

    this.sessionId = null;
    this.onclose?.();
    console.log("StreamableHTTPTransport closed.");
    }

    private handleError(error: Error): void {
    if (
    this.isClosed &&
    error instanceof DOMException &&
    error.name === "AbortError"
    ) {
    // Ignore abort errors during close
    return;
    }
    console.error("StreamableHTTPTransport Error:", error);
    this.onerror?.(error);
    }

    // Helper to attempt parsing error responses
    private tryParseErrorResponse(text: string): object | null {
    try {
    const json = JSON.parse(text) as unknown;
    if (
    typeof json === "object" &&
    json !== null &&
    "error" in json &&
    typeof json.error === "object"
    ) {
    return json.error;
    }
    } catch {
    /* ignore parse error */
    }
    return null;
    }

    private tryParseJsonRpc(text: string): JSONRPCMessage | null {
    try {
    const json = JSON.parse(text) as unknown;
    if (
    typeof json === "object" &&
    json !== null &&
    "jsonrpc" in json &&
    json.jsonrpc === "2.0"
    ) {
    // Basic check, client will validate schema fully
    return json as JSONRPCMessage;
    }
    } catch {
    /* ignore parse error */
    }
    return null;
    }
    }

    // Helper function to easily create the transport config
    export function createStreamableHTTPTransportConfig(
    url: string,
    ): StreamableHTTPTransportConfig {
    return { type: "streamable-http", url };
    }