A Step-by-Step Guide to Implementing Streamable HTTP Type MCP Protocol for Server and Client

Introduction

In the digital age, the efficiency and stability of data transmission are key drivers of technological advancement. The MCP (Model Context Protocol), as an important part of the AI ecosystem, introduces a Streamable HTTP transmission mechanism that brings revolutionary changes to data interaction. This article will delve into the Streamable HTTP of the MCP protocol, exploring the entire process from session negotiation to formal data transmission, discussing its technical architecture, protocol content, implementation methods, and its impact on AI applications.

Technical Architecture

The MCP protocol adopts a client-server architecture, with core components including:

  1. MCP Host: Contains applications for the MCP client, such as Claude Desktop, Cursor IDE, and other AI tools.
  2. MCP Client: The protocol implementation that maintains a 1:1 connection with the server within the host.
  3. MCP Server: A lightweight program that exposes specific functionalities through standardized protocols, which can be a local Node.js/Python program or a remote service.

The MCP server provides three categories of standard capabilities:

  • Resources: Such as file reading and API data retrieval.
  • Tools: Third-party services or functional functions, such as Git operations and browser control.
  • Prompts: Predefined task templates that enhance model performance in specific scenarios.

Protocol Content of Streamable HTTP

Streamable HTTP, as a significant update to the MCP protocol, aims to address the limitations of traditional HTTP+SSE solutions while retaining their advantages. Its core content includes the following aspects:

  1. Unified Message Entry: All messages from clients to the server are sent through the <span>/message</span> endpoint, eliminating the need for a dedicated SSE endpoint.
  2. Dynamic Upgrade of SSE Stream: The server can upgrade requests sent by the client to <span>/message</span> to an SSE connection as needed for push notifications or requests.
  3. Session Management Mechanism: The client establishes a session with the server through the <span>Mcp-Session-Id</span> in the request header, and the server can choose whether to maintain session state.
  4. Support for Stateless Servers: The server can choose to operate completely stateless, no longer needing to maintain long-term connections.

Implementation Method

From Session Negotiation to Formal Data Transmission

1. Session Negotiation

Session negotiation is the initial phase of Streamable HTTP communication, where the client and server establish a session through the following steps:

  1. Client Sends Initialization Request: The client sends an <span>InitializeRequest</span> message via HTTP POST to the MCP server’s <span>/message</span> endpoint, carrying protocol version and client capability information.
  2. Server Responds to Initialization: After receiving the request, the server returns an <span>InitializeResult</span> message containing the supported protocol version, server capabilities, and session ID (<span>Mcp-Session-Id</span>).
  3. Client Sends Initialized Notification: After receiving the server’s response, the client sends an <span>Initialized</span> notification to inform the server that initialization is complete.

Example: Client Sends Initialization Request

POST /message HTTP/1.1
Host: mcp.example.com
Content-Type: application/json
Accept: text/event-stream, application/json

{"jsonrpc": "2.0", "method": "initialize", "params": {"clientInfo": {"name": "MCP Client", "version": "1.0"}, "capabilities": {}}}

Example: Server Responds to Initialization

HTTP/1.1 200 OK
Content-Type: application/json
Mcp-Session-Id: 12345

{"jsonrpc": "2.0", "id": 1, "result": {"serverInfo": {"name": "MCP Server", "version": "1.0"}, "capabilities": {}}}

Example: Client Sends Initialized Notification

POST /message HTTP/1.1
Host: mcp.example.com
Content-Type: application/json
Accept: text/event-stream, application/json
Mcp-Session-Id: 12345

{"jsonrpc": "2.0", "method": "initialized", "params": {}}

2. Formal Data Transmission

Once the session is established, the client and server can proceed with formal communication through the following steps:

  1. Client Sends Message: The client sends a JSON-RPC message via HTTP POST to the MCP server’s <span>/message</span> endpoint, carrying the session identifier <span>Mcp-Session-Id</span>.
  2. Server Processes Request and Responds: The server processes the message based on the request content and returns a response via SSE stream or JSON object.
  3. Dynamic Upgrade to SSE Stream: If real-time message push is needed, the server can upgrade the connection to an SSE stream.
  4. Reconnect and Data Recovery: If network fluctuations cause connection interruptions, the client can reconnect with the <span>Last-Event-ID</span> to replay unsent messages based on that ID.

Example: Client Sends Message

POST /message HTTP/1.1
Host: mcp.example.com
Content-Type: application/json
Accept: text/event-stream, application/json
Mcp-Session-Id: 12345

{"jsonrpc": "2.0", "id": 1, "method": "get_file", "params": {"path": "/example.txt"}}

Example: Server Responds and Upgrades to SSE Stream

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Mcp-Session-Id: 12345

data: {"jsonrpc": "2.0", "id": 1, "result": "File content here"}

Example: Client Reconnects

GET /message HTTP/1.1
Host: mcp.example.com
Accept: text/event-stream
Last-Event-ID: 12345
Mcp-Session-Id: 12345

Example: Server Replays Unsent Messages

data: {"jsonrpc": "2.0", "id": 2, "result": "Continued content here"}

Server Code Implementation

from datetime import datetime
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.routing import Route
from starlette.responses import JSONResponse, StreamingResponse
import json
import uuid
from starlette.middleware.cors import CORSMiddleware
import asyncio
from typing import Dict, Any
import aiofiles
import random

# Store session IDs and corresponding task queues
sessions: Dict[str, Dict[str, Any]] = {}

# Add CORS support
app = Starlette()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
    expose_headers=["Mcp-Session-Id"],
)

@app.route('/message', methods=["POST", "GET"])
async def handle_message(request: Request):
    """Handle POST and GET requests."""
    session_id = request.headers.get("Mcp-Session-Id") or request.query_params.get("Mcp-Session-Id")

    if request.method == "POST":
        try:
            data = await request.json()

            if data.get("method") == "initialize":
                # Initialize session
                session_id = str(uuid.uuid4())
                sessions[session_id] = {
                    "initialized": True,
                    "task_queue": asyncio.Queue()
                }
                response = JSONResponse(
                    content={
                        "jsonrpc": "2.0",
                        "id": data.get("id"),
                        "result": {
                            "serverInfo": {"name": "MCP Server", "version": "1.0"},
                            "capabilities": {},
                        },
                    }
                )
                response.headers["Mcp-Session-Id"] = session_id
                return response
            elif session_id and sessions.get(session_id, {}).get("initialized"):
                # Handle initialized requests
                if data.get("method") == "get_file":
                    try:
                        # Asynchronously read file content
                        content = await async_read_file(data.get("params", {}).get("path", ""))
                        return JSONResponse(
                            content={
                                "jsonrpc": "2.0",
                                "id": data.get("id"),
                                "result": content,
                            }
                        )
                    except Exception as e:
                        return JSONResponse(
                            content={
                                "jsonrpc": "2.0",
                                "id": data.get("id"),
                                "error": f"Error reading file: {str(e)}",
                            }
                        )
                else:
                    return JSONResponse(content={"error": "Unknown method"})
            else:
                return JSONResponse(content={"error": "Session not initialized"}, status_code=400)
        except Exception as e:
            return JSONResponse(content={"error": f"Internal server error: {str(e)}"}, status_code=500)
    elif request.method == "GET":
        # Handle SSE stream requests
        if not session_id or session_id not in sessions:
            return JSONResponse(content={"error": "Session not found"}, status_code=404)

        async def event_generator(session_id):
            while True:
                try:
                    message = await asyncio.wait_for(sessions[session_id]["task_queue"].get(), timeout=10)  # Timeout of 10 seconds
                    yield f"data: {json.dumps(message)}\n\n"
                except asyncio.TimeoutError as e:
                    yield f"data: {e}\n\n"  # Send empty data as a heartbeat to prevent timeout disconnection

        return StreamingResponse(event_generator(session_id), media_type="text/event-stream")

async def async_read_file(path: str) -> str:
    """Asynchronously read file content."""
    try:
        async with aiofiles.open(path, "r") as file:
            content = await file.read()
        return content
    except Exception as e:
        raise Exception(f"Error reading file: {str(e)}")

async def background_task(session_id: str, task: Dict[str, Any]):
    """Background task processing."""
    # Simulate time-consuming operation
    await asyncio.sleep(1)
    # Put the result into the task queue
    sessions[session_id]["task_queue"].put_nowait(task)

@app.on_event("startup")
async def startup_event():
    async def push_test_messages():
        while True:
            sp = random.randint(1, 3)
            await asyncio.sleep(sp)  # Push a message every 5 seconds
            for session_id in sessions.keys():
                if sessions[session_id]["initialized"]:
                    sessions[session_id]["task_queue"].put_nowait({"message": f"Hello from server!", "sleep": sp,
                                                                   "datetime": datetime.now().strftime(
                                                                       "%Y-%m-%d %H:%M:%S")})

    asyncio.create_task(push_test_messages())  # Create background task

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)

Client Code Implementation

import httpx
import json
import asyncio
import aiofiles

class MCPClient:
    def __init__(self, server_url: str):
        self.server_url = server_url
        self.session_id = None
        self.headers = {
            "Content-Type": "application/json",
            "Accept": "text/event-stream, application/json"
        }

    async def initialize(self):
        """Initialize session."""
        async with httpx.AsyncClient() as client:
            try:
                response = await client.post(
                    f"{self.server_url}/message",
                    headers=self.headers,
                    json={
                        "jsonrpc": "2.0",
                        "method": "initialize",
                        "params": {
                            "clientInfo": {"name": "MCP Client", "version": "1.0"},
                            "capabilities": {},
                        },
                    },
                )
                response.raise_for_status()
                self.session_id = response.headers.get("Mcp-Session-Id")
                print(f"Session ID: {self.session_id}")
                return self.session_id
            except Exception as e:
                print(f"Failed to initialize session: {e}")
                return None

    async def send_message(self, method: str, params: dict = None):
        """Send message."""
        if not self.session_id:
            await self.initialize()

        async with httpx.AsyncClient() as client:
            try:
                response = await client.post(
                    f"{self.server_url}/message",
                    headers={"Mcp-Session-Id": self.session_id, **self.headers},
                    json={
                        "jsonrpc": "2.0",
                        "id": 1,
                        "method": method,
                        "params": params or {},
                    },
                )
                response.raise_for_status()
                return response.json()
            except Exception as e:
                print(f"Failed to send message: {e}")
                return None

    async def listen_sse(self):
        if not self.session_id:
            await self.initialize()

        async with httpx.AsyncClient(timeout=None) as client:  # Cancel timeout limit
            try:
                async with client.stream(
                        "GET",
                        f"{self.server_url}/message",
                        headers={"Mcp-Session-Id": self.session_id, **self.headers},
                ) as response:
                    async for line in response.aiter_lines():
                        if line.strip():  # Avoid empty lines
                            print(f"SSE Message: {line}")
            except Exception as e:
                print(f"Failed to listen SSE: {e}")
                await self.reconnect()

    async def reconnect(self):
        """Reconnect after disconnection."""
        print("Attempting to reconnect...")
        await asyncio.sleep(5)  # Wait 5 seconds before retrying
        await self.initialize()
        await self.listen_sse()

async def main():
    client = MCPClient("http://localhost:8000")
    await client.initialize()
    response = await client.send_message("get_file", {"path": "/Users/houjie/PycharmProjects/python-sdk/examples/mcp-server/example.txt"})
    print(f"Response: {response}")
    await client.listen_sse()

if __name__ == "__main__":
    asyncio.run(main())

Frontend Page Code Implementation

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>MCP Streamable HTTP Demo</title>
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 0;
            padding: 20px;
            background-color: #f5f5f5;
        }
        .container {
            max-width: 800px;
            margin: 0 auto;
            background-color: white;
            padding: 20px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
        }
        h1 {
            text-align: center;
            color: #333;
        }
        .message-area {
            margin-top: 20px;
        }
        .message {
            padding: 10px;
            margin-bottom: 10px;
            border-radius: 4px;
            background-color: #e9f7fe;
            border-left: 4px solid #0099cc;
        }
        .sse-message {
            padding: 10px;
            margin-bottom: 10px;
            border-radius: 4px;
            background-color: #f0f9ff;
            border-left: 4px solid #0077cc;
        }
        button {
            background-color: #0099cc;
            color: white;
            border: none;
            padding: 10px 15px;
            border-radius: 4px;
            cursor: pointer;
            font-size: 14px;
        }
        button:hover {
            background-color: #0077cc;
        }
        input[type="text"] {
            width: 100%;
            padding: 10px;
            margin-bottom: 10px;
            border: 1px solid #ddd;
            border-radius: 4px;
            font-size: 14px;
        }
    </style>
</head>
<body>
    <div class="container">
        <h1>MCP Streamable HTTP Demo</h1>
        <div>
            <input type="text" id="serverUrl" placeholder="Enter server URL" value="http://localhost:8000">
            <button id="initBtn">Initialize Session</button>
        </div>
        <div id="sessionId"></div>
        <div>
            <input type="text" id="filePath" placeholder="Enter file path">
            <button id="sendBtn">Send Message</button>
        </div>
        <div class="message-area" id="messages"></div>
    </div>

    <script>
        let client = null;
        let sessionInitialized = false;

        document.getElementById('initBtn').addEventListener('click', async () => {
            const serverUrl = document.getElementById('serverUrl').value;
            client = new MCPClient(serverUrl);
            await client.initialize();
            sessionInitialized = true;
            document.getElementById('sessionId').textContent = `Session ID: ${client.session_id}`;
        });

        document.getElementById('sendBtn').addEventListener('click', async () => {
            if (!sessionInitialized) {
                alert('Please initialize the session first.');
                return;
            }
            const filePath = document.getElementById('filePath').value;
            const response = await client.send_message('get_file', { path: filePath });
            addMessage(`Response: ${JSON.stringify(response)}`);
        });

        class MCPClient {
            constructor(serverUrl) {
                this.serverUrl = serverUrl;
                this.session_id = null;
                this.headers = {
                    'Content-Type': 'application/json',
                    'Accept': 'text/event-stream, application/json'
                };
            }

            async initialize() {
                try {
                    const response = await fetch(`${this.serverUrl}/message`, {
                        method: 'POST',
                        headers: this.headers,
                        body: JSON.stringify({
                            jsonrpc: '2.0',
                            method: 'initialize',
                            params: {
                                clientInfo: { name: 'MCP Client', version: '1.0' },
                                capabilities: {}
                            }
                        })
                    });
                    if (!response.ok) {
                        throw new Error(`HTTP error! status: ${response.status}`);
                    }
                    this.session_id = response.headers.get('Mcp-Session-Id');
                    addMessage(`Session ID: ${this.session_id}`);
                    this.listen_sse();
                } catch (error) {
                    addMessage(`Failed to initialize session: ${error}`);
                }
            }

            async send_message(method, params) {
                if (!this.session_id) {
                    await this.initialize();
                }
                try {
                    const response = await fetch(`${this.serverUrl}/message`, {
                        method: 'POST',
                        headers: { 'Mcp-Session-Id': this.session_id, ...this.headers },
                        body: JSON.stringify({
                            jsonrpc: '2.0',
                            id: 1,
                            method: method,
                            params: params || {}
                        })
                    });
                    if (!response.ok) {
                        throw new Error(`HTTP error! status: ${response.status}`);
                    }
                    return await response.json();
                } catch (error) {
                    addMessage(`Failed to send message: ${error}`);
                    return null;
                }
            }

            listen_sse() {
                if (!this.session_id) {
                    return;
                }
                const eventSource = new EventSource(`${this.serverUrl}/message?Mcp-Session-Id=${this.session_id}`, {
                    headers: { 'Mcp-Session-Id': this.session_id }
                });
                eventSource.onmessage = (event) => {
                    addSSEMessage(event.data);
                };
                eventSource.onerror = (error) => {
                    addMessage(`Failed to listen SSE: ${error}`);
                    this.reconnect();
                };
            }

            async reconnect() {
                addMessage('Attempting to reconnect...');
                await new Promise(resolve => setTimeout(resolve, 5000));
                await this.initialize();
                this.listen_sse();
            }
        }

        function addMessage(message) {
            const messagesDiv = document.getElementById('messages');
            const messageDiv = document.createElement('div');
            messageDiv.className = 'message';
            messageDiv.textContent = message;
            messagesDiv.appendChild(messageDiv);
            messagesDiv.scrollTop = messagesDiv.scrollHeight;
        }

        function addSSEMessage(message) {
            const messagesDiv = document.getElementById('messages');
            const messageDiv = document.createElement('div');
            messageDiv.className = 'sse-message';
            messageDiv.textContent = `SSE Message: ${message}`;
            messagesDiv.appendChild(messageDiv);
            messagesDiv.scrollTop = messagesDiv.scrollHeight;
        }
    </script>
</body>
</html>

Running Steps

  1. Install Dependencies: Ensure the required libraries are installed:
 pip install starlette uvicorn httpx aiofiles
  1. Start the Server: Save the server code as <span>server.py</span>, then run the following command to start the server:
 uvicorn server:app --reload
  1. Run the Client: Save the client code as <span>client.py</span>, then run the following command to start the client:
 python client.py
  1. Open the Frontend Page: Save the frontend page code as <span>index.html</span>, then open this file in a browser.

Example Running Effects

Client Output

Session ID: 587bb6ad-08f5-4102-8b27-4c276e9d7815
Response: {'jsonrpc': '2.0', 'id': 1, 'result': 'File content here'}
Listening for SSE messages...
SSE Message: data: {"message": "Hello from server!", "sleep": 1, "datetime": "2024-01-01 12:00:00"}
SSE Message: data: {"message": "Hello from server!", "sleep": 2, "datetime": "2024-01-01 12:00:02"}
...

Server Output

INFO:     Started server process [12345]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO:     127.0.0.1:51487 - "POST /message HTTP/1.1" 200 OK

Frontend Page Effect

The frontend page will display the session ID, sent messages, and received SSE stream messages.

A Step-by-Step Guide to Implementing Streamable HTTP Type MCP Protocol for Server and Client

Debugging

  1. Check Server Logs: Review the server logs to confirm whether the <span>Mcp-Session-Id</span> was generated and returned to the client.
  2. Check Network Requests: Use browser developer tools (F12) to inspect the response headers of network requests to confirm whether they include the <span>Mcp-Session-Id</span>.
  3. Check CORS Issues: Ensure the server is correctly configured for CORS, allowing the domain and port of the frontend page.

We hope this information helps you successfully implement the Streamable HTTP server, client, and frontend page based on the MCP protocol.

Leave a Comment