Goodbye SSE, Embrace Streamable HTTP

With the rapid development of artificial intelligence (<span>AI</span>), efficient communication between AI assistants and applications has become particularly important. The Model Context Protocol (<span>MCP</span>, abbreviated as <span>MCP</span>) has emerged to provide a standardized interface for large language models (<span>LLMs</span>) to interact with external data sources and tools. Among the many features of <span>MCP</span>, the <span>Streamable HTTP</span> transmission mechanism is gradually replacing the traditional <span>Server-Sent Events (SSE)</span> transmission method, becoming the new standard for <span>AI</span> communication.

Goodbye SSE, Embrace Streamable HTTP
image-20250513181409075

Overview of the MCP Protocol

<span>MCP</span> is an open standard promoted by <span>Anthropic</span> aimed at addressing the dependency of <span>LLMs</span> on external information when performing tasks. By defining a set of universal communication rules and data formats, <span>MCP</span> enables <span>LLMs</span> to dynamically acquire the necessary contextual information, thereby enhancing their capabilities and scope of use.<span>MCP</span> consists of the following core components:

  • MCP Client: Implementation on the <span>LLM</span> or its service infrastructure side, responsible for constructing requests and sending them to the <span>MCP</span> server.
  • MCP Server: Implementation on the external system side, receiving requests from the <span>MCP</span> client, interacting with actual data sources or tools, and returning the acquired data formatted according to the <span>MCP</span> protocol specifications to the client.
  • Context Information Exchange: Facilitates bidirectional exchange of contextual information between <span>LLMs</span> and external systems.

With <span>MCP</span>, developers can more easily integrate AI assistants with various applications and data sources, achieving more efficient <span>AI</span> communication.

Limitations of Traditional SSE

<span>Server-Sent Events (SSE)</span> is a unidirectional communication protocol based on <span>HTTP</span> that allows servers to push real-time updates to clients. However, <span>SSE</span> has limitations in the following areas:

  • Limited Communication Direction: <span>SSE</span> only supports unidirectional communication from server to client, failing to meet the needs for bidirectional interaction.
  • Lack of Session Management: <span>SSE</span> lacks a built-in session management mechanism, making it difficult to maintain complex states.
  • Poor Connection Recovery Capability: After a network interruption, <span>SSE</span> has limited connection recovery capabilities, which may lead to data loss.
  • Limited Data Format Support: <span>SSE</span> primarily supports <span>UTF-8</span> text and cannot handle multiple data formats.

These limitations make <span>SSE</span> increasingly inadequate for modern <span>AI</span> applications.

Innovations of Streamable HTTP

<span>Streamable HTTP</span> is the recommended transmission mechanism within the <span>MCP</span> framework, designed to achieve efficient, bidirectional data stream communication through standard <span>HTTP</span>. Its main features include:

  • Single Endpoint Communication: Uses a single <span>HTTP</span> endpoint to handle all <span>MCP</span> communications, simplifying network architecture.
  • Multiple Response Modes: Supports batch <span>(JSON)</span> and streaming <span>(SSE)</span> responses to meet different communication needs.
  • Built-in Session Management: Simplifies state maintenance through the <span>Mcp-Session-Id</span> header for session management.
  • Connection Recoverability: Supports recovery of <span>SSE</span> connections after network interruptions, improving communication stability.
  • Flexible Authentication: Supports various authentication methods, enhancing security.
  • Cross-Origin Resource Sharing (CORS) Configuration: Provides flexible <span>CORS</span> configuration for easy integration with web applications.

These features give <span>Streamable HTTP</span> significant advantages in modern <span>AI</span> communication.

Technical Comparison: SSE vs. Streamable HTTP

Feature Traditional<span>SSE</span> Transmission <span>Streamable HTTP</span> Protocol (<span>MCP</span>)
Communication Direction Unidirectional (Server → Client) Bidirectional (Client ↔ Server)
Session Management No built-in mechanism Based on <span>Mcp-Session-Id</span> header
Data Format Support Only supports UTF-8 text Supports multiple data formats
Automatic Reconnect Supported Supported
Compatibility with Existing Infrastructure High High

From the above comparison, it is evident that <span>Streamable HTTP</span> outperforms traditional <span>SSE</span> transmission methods in terms of communication flexibility, session management, and data format support.

Implementation and Demonstration

MCP Server Based on Streamable HTTP

In previous articles, we wrote a weather query <span>MCP Server</span> based on the <span>sse</span> protocol and successfully deployed it online.

Project address: https://gitee.com/ming_log/mcp-server

To modify the <span>MCP Server</span> from the <span>sse</span> protocol to the <span>Streamable HTTP</span> protocol, the method is very simple; just change the execution method of the <span>MCP Server</span> to <span>streamable-http</span>.

Goodbye SSE, Embrace Streamable HTTP
image-20250513175712008

Note: You need to first update the version of <span>mcp[cli]</span> to <span>1.8.0</span>.

The complete code is as follows:

from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP

# Initialize FastMCP server
mcp = FastMCP(
    name="weather",
    host="0.0.0.0",
    port=8002,
    description="Get weather information by city name (pinyin) or latitude and longitude",
    sse_path="/mcp"
)

# Constants
NWS_API_BASE = "https://api.openweathermap.org/data/2.5/weather"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36"

# Temperature unit conversion, converting Kelvin to Celsius
def kelvin_to_celsius(kelvin: float) -> float:
    return kelvin - 273.15

async def get_weather_from_cityname(cityname: str) -> dict[str, Any] | None:
    """Send a request to openweathermap and handle errors appropriately."""
    headers = {
        "User-Agent": USER_AGENT,
        "Accept": "application/geo+json"
    }
    params = {
        "q": cityname,
        "appid": "24ecadbe4bb3d55cb1f06ea48a41ac51"
    }
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(NWS_API_BASE, headers=headers, params=params)
            response.raise_for_status()
            return response.json()
        except Exception:
            return None
        
async def get_weather_from_latitude_longitude(latitude: float, longitude: float) -> dict[str, Any] | None:
    """Send a request to openweathermap and handle errors appropriately."""
    headers = {
        "User-Agent": USER_AGENT,
        "Accept": "application/geo+json"
    }
    params = {
        "lat": latitude,
        "lon": longitude,
        "appid": "24ecadbe4bb3d55cb1f06ea48a41ac51"
    }
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(NWS_API_BASE, headers=headers, params=params)
            response.raise_for_status()
            return response.json()
        except Exception:
            return None


def format_alert(feature: dict) -> str:
    """Format the weather information returned by the interface for text output"""
    if feature["cod"] == 404:
        return "Parameter error, please confirm whether the city name is correct."
    elif feature["cod"] == 401:
        return "API key error, please confirm whether the API key is correct."
    elif feature["cod"] == 200:
        return f"""
        City: {feature.get('name', 'Unknown')}
        Weather: {feature.get('weather', [{}])[0].get('description', 'Unknown')}
        Temperature: {kelvin_to_celsius(feature.get('main', {}).get('temp', 0)):.2f}°C
        Humidity: {feature.get('main', {}).get('humidity', 0)}%
        Wind Speed: {feature.get('wind', {}).get('speed', 0):.2f} m/s
        """
    else:
        return "Unknown error, please try again later."

@mcp.tool()
async def get_weather_from_cityname_tool(city: str) -> str:
    """Get weather information for a city.

    Args:
        city: City name (e.g., "wuhan"). For Chinese cities, please use pinyin
    """
    data = await get_weather_from_cityname(city)
    return format_alert(data)

@mcp.tool()
async def get_weather_from_latitude_longitude_tool(latitude: float, longitude: float) -> str:
    """Get weather information for a location.

    Args:
        latitude: Latitude of the location
        longitude: Longitude of the location
    """
    data = await get_weather_from_latitude_longitude(latitude, longitude)
    return format_alert(data)

if __name__ == "__main__":
    # Initialize and run the server
    # mcp.run(transport='stdio')
    print("Starting server...")
    mcp.run(transport='streamable-http')

Start the service

Goodbye SSE, Embrace Streamable HTTP
image-20250513175829712

Next, test whether the service can be used normally.

Since <span>Cursor</span> currently does not support the <span>Streamable HTTP</span> protocol, we will use <span>Cherry Studio</span> for testing, which you can download yourself. Official website: https://www.cherry-ai.com/

After downloading and logging in, click on the settings in the lower left corner.

Goodbye SSE, Embrace Streamable HTTP
image-20250513180108389

Then click on <span>MCP</span> Server

Goodbye SSE, Embrace Streamable HTTP
image-20250513180143361

Next, click to add a server and configure the <span>MCP Server</span> we just started.

Goodbye SSE, Embrace Streamable HTTP
image-20250513180223922

Fill in the content as shown in the figure below.

Goodbye SSE, Embrace Streamable HTTP
image-20250513180301435

Then click save in the upper right corner.

Goodbye SSE, Embrace Streamable HTTP
image-20250513180332988

If there are no issues, you should see the server update successfully as shown in the figure. If there are issues, an error will appear.

To test the <span>MCP Server</span> service, you also need to prepare an <span>LLM</span>, and you can configure it according to your situation in the settings. I choose <span>Moonshot</span>.

Goodbye SSE, Embrace Streamable HTTP
image-20250513180546680

Next, return to the chat assistant page and create a chat assistant. Select the <span>MCP Server</span> you want to use below the chat input box.

Goodbye SSE, Embrace Streamable HTTP
image-20250513180650887

At this point, our preparation and configuration work is complete, and we can now ask the chat assistant about the weather to test the <span>MCP Server</span>.

For example: I ask, “How is the weather in Wuhan and Beijing?”

Goodbye SSE, Embrace Streamable HTTP
PixPin_2025-05-13_17-21-22

<span>Streamable HTTP</span> protocol is concurrent when accessing the <span>MCP Server</span>, as can be seen from the following animation, the weather in Beijing is requested successfully first.

Goodbye SSE, Embrace Streamable HTTP
o303p-9167f

MCP Client Based on Streamable HTTP

In previous articles, we manually wrote the <span>MCP Client</span> code, and it can also load the corresponding <span>MCP Server</span> service according to the <span>mcp.json</span> configuration file. At that time, it only adapted to <span>stdio</span> and <span>sse</span> protocols, and now we add the <span>Streamable HTTP</span> protocol.

Project address: https://gitee.com/ming_log/mcp_client

The specific code is as follows:

import asyncio
from typing import Optional
from contextlib import AsyncExitStack
import json

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client

from dotenv import load_dotenv
import os, re
from openai import OpenAI
from lxml import etree

load_dotenv()  # load environment variables from .env

class MCPClient:
    def __init__(self):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        # Environment variables need to be set in the .env file
        self.API_KEY = os.getenv("API_KEY")
        self.BASE_URL = os.getenv("BASE_URL")
        self.MODEL = os.getenv("MODEL")
        self.client = OpenAI(api_key=self.API_KEY, base_url=self.BASE_URL)
        self.sessions = {}
        self.messages = []
        with open("./MCP_Prompt.txt", "r", encoding="utf-8") as file:
            self.system_prompt = file.read()

    async def mcp_json_config(self, mcp_json_file):
        try:
            with open(mcp_json_file, 'r') as f:
                mcp_config: dict = json.load(f)
        except json.JSONDecodeError:
            raise ValueError("Invalid MCP config")
        servers_config: dict = mcp_config.get('mcpServers', {})
        for k, v in servers_config.items():
            try:
                if v.get('isActive', False) == False:
                    continue
                print('-'*50)
                mcp_name = v.get('name', k)
                mcp_type: str = v.get('type', 'stdio')
                if mcp_type.lower() == 'stdio':
                    command = v.get('command', None)
                    args = v.get('args', [])
                    env = v.get('env', {})
                    if command is None:
                        raise ValueError(f'{mcp_name} command is empty.')
                    if args == []:
                        raise ValueError(f'{mcp_name} args is empty.')
                    await self.connect_to_stdio_server(mcp_name, command, args, env)
                elif mcp_type.lower() == 'sse':
                    server_url = v.get('url', None)
                    if server_url is None:
                        raise ValueError(f'{mcp_name} server_url is empty.')
                    await self.connect_to_sse_server(mcp_name, server_url)
                elif mcp_type.lower() == 'streamable_http':
                    server_url = v.get('url', None)
                    if server_url is None:
                        raise ValueError(f'{mcp_name} server_url is empty.')
                    await self.connect_to_streamable_http_server(mcp_name, server_url)
                else:
                    raise ValueError(f'{mcp_name} mcp type must be in [stdio, sse, streamable_http].')
            except Exception as e:
                print(f"Error connecting to {mcp_name}: {e}")

    async def connect_to_stdio_server(self, mcp_name, command: str, args: list[str], env: dict[str, str]={}):
        server_params = StdioServerParameters(
            command=command,
            args=args,
            env=env
        )

        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
        self.sessions[mcp_name] = self.session

        await self.session.initialize()
        # Add MCP information to system_prompt
        response = await self.session.list_tools()
        available_tools = ['##' + mcp_name + '\n### Available Tools\n- ' + tool.name + "\n" + tool.description + "\n" + json.dumps(tool.inputSchema) for tool in response.tools]
        self.system_prompt = self.system_prompt.replace("&lt;$MCP_INFO$&gt;", "\n".join(available_tools)+"\n&lt;$MCP_INFO$&gt;")
        tools = response.tools
        print(f"Successfully connected to {mcp_name} server with tools:", [tool.name for tool in tools])

    async def connect_to_sse_server(self, mcp_name, server_url: str):
        """Connect to an MCP server

        Args:
            server_script_path: Path to the server script (.py or .js)
        """
        stdio_transport = await self.exit_stack.enter_async_context(sse_client(server_url))
        self.sse, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.sse, self.write))
        self.sessions[mcp_name] = self.session

        await self.session.initialize()
        # List available tools
        response = await self.session.list_tools()
        available_tools = ['##' + mcp_name + '\n### Available Tools\n- ' + tool.name + "\n" + tool.description + "\n" + json.dumps(tool.inputSchema) for tool in response.tools]
        self.system_prompt = self.system_prompt.replace("&lt;$MCP_INFO$&gt;", "\n".join(available_tools)+"\n&lt;$MCP_INFO$&gt;\n")
        tools = response.tools
        print(f"Successfully connected to {mcp_name} server with tools:", [tool.name for tool in tools])

    async def connect_to_streamable_http_server(self, mcp_name, server_url: str):
        """Connect to an MCP server

        Args:
            server_script_path: Path to the server script (.py or .js)
        """
        stdio_transport = await self.exit_stack.enter_async_context(streamablehttp_client(server_url))
        self.streamable_http, self.write, _ = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.streamable_http, self.write))
        self.sessions[mcp_name] = self.session

        await self.session.initialize()
        # List available tools
        response = await self.session.list_tools()
        available_tools = ['##' + mcp_name + '\n### Available Tools\n- ' + tool.name + "\n" + tool.description + "\n" + json.dumps(tool.inputSchema) for tool in response.tools]
        self.system_prompt = self.system_prompt.replace("&lt;$MCP_INFO$&gt;", "\n".join(available_tools)+"\n&lt;$MCP_INFO$&gt;\n")
        tools = response.tools
        print(f"Successfully connected to {mcp_name} server with tools:", [tool.name for tool in tools])

    async def process_query(self, query: str) -> str:
        """Process a query using Claude and available tools"""
        self.messages.append(
            {
                "role": "system",
                "content": self.system_prompt
            }
        )
        self.messages.append(
            {
                "role": "user",
                "content": query
            }
        )

        # Initial Claude API call
        response = self.client.chat.completions.create(
            model=self.MODEL,
            max_tokens=1024,
            messages=self.messages
        )

        # Process response and handle tool calls
        final_text = []
        content = response.choices[0].message.content
        if '&lt;use_mcp_tool&gt;' not in content:
            final_text.append(content)
        else:
            # Parse tool_string
            server_name, tool_name, tool_args = self.parse_tool_string(content)

            # Execute tool call
            result = await self.sessions[server_name].call_tool(tool_name, tool_args)
            print(f"[Calling tool {tool_name} with args {tool_args}]")
            print("-"*40)
            print("Server:", server_name)
            print("Tool:", tool_name)
            print("Args:", tool_args)
            print("-"*40)
            print("Result:", result.content[0].text)
            print("-"*40)
            self.messages.append({
                "role": "assistant",
                "content": content
            })
            self.messages.append({
                "role": "user",
                "content": f"[Tool {tool_name} \n returned: {result}]"
            })

            response = self.client.chat.completions.create(
                model=self.MODEL,
                max_tokens=1024,
                messages=self.messages
            )
            final_text.append(response.choices[0].message.content)
        return "\n".join(final_text)
    
    def parse_tool_string(self, tool_string: str) -> tuple[str, str, dict]:
        tool_string = re.findall("(&lt;use_mcp_tool&gt;.*?&lt;/use_mcp_tool&gt;)", tool_string, re.S)[0]
        root = etree.fromstring(tool_string)
        server_name = root.find('server_name').text
        tool_name = root.find('tool_name').text
        try:
            tool_args = json.loads(root.find('arguments').text)
        except json.JSONDecodeError:
            raise ValueError("Invalid tool arguments")
        return server_name, tool_name, tool_args

    async def chat_loop(self):
        """Run an interactive chat loop"""
        print("\nMCP Client Started!")
        print("Type your queries or 'quit' to exit.")
        self.messages = []
        while True:
            try:
                query = input("\nQuery: ").strip()

                if query.lower() == 'quit':
                    break
                if query.strip() == '':
                    print("Please enter a query.")
                    continue
                response = await self.process_query(query)
                print(response)

            except Exception as e:
                print(f"\nError: {str(e)}")

    async def cleanup(self):
        """Clean up resources"""
        await self.exit_stack.aclose()

async def main():
    client = MCPClient()
    try:
        # await client.connect_to_sse_server('amap', 'https://mcp.amap.com/sse?key=d769f05385fe314e9b3ae548ba7d86b1')
        mcp_config_file = './mcp.json'
        await client.mcp_json_config(mcp_config_file)
        await client.chat_loop()
    finally:
        await client.cleanup()

if __name__ == "__main__":
    asyncio.run(main())

Next, we use our own written <span>MCP Client</span> to connect to the previous <span>MCP Server</span>.

Modify the <span>mcp.json</span> file content as follows:

{
    "mcpServers": {
      "weather-http": {
        "isActive": true,
        "type": "streamable_http",
        "url": "http://127.0.0.1:8002/mcp",
        "name": "weather-http"
      }
    }
}

It can also successfully call the <span>MCP Server</span> service.

Goodbye SSE, Embrace Streamable HTTP
image-20250513182033814

Multiple <span>MCP Server</span> tests, modify the <span>mcp.json</span> file content as follows:

{
    "mcpServers": {
      "time-http": {
        "isActive": true,
        "type": "streamable_http",
        "url": "https://time.mcp.minglog.cn/mcp",
        "name": "time-http"
      },
      "weather-http": {
        "isActive": true,
        "type": "streamable_http",
        "url": "http://127.0.0.1:8002/mcp",
        "name": "weather-http"
      }
    }
}

Here I used an <span>MCP Server</span> from my server, which is also the <span>streamable_http</span> protocol.

Goodbye SSE, Embrace Streamable HTTP
image-20250513182306857

All tools can be successfully called.

Conclusion

<span>Streamable HTTP</span> as the recommended transmission mechanism in the <span>MCP</span> protocol combines the wide compatibility of <span>HTTP</span> with the real-time data push capabilities of <span>SSE</span>, providing an efficient and flexible communication method. In the context of the growing demand for communication between AI assistants and applications, <span>Streamable HTTP</span> is expected to become the new standard for AI communication.

If you wish to learn more about the <span>MCP</span> protocol and the implementation details of <span>Streamable HTTP</span>, you can visit the following resources:

  • MCP Official Documentation: https://modelcontextprotocol.io/
  • MCP Framework Documentation: https://mcp-framework.com/docs/Transports/http-stream-transport/
  • MCP GitHub Repository: https://github.com/modelcontextprotocol/python-sdk

Through these resources, you can gain a deeper understanding of the design philosophy and implementation of the <span>MCP</span> protocol and explore its potential in practical applications.

Leave a Comment