网络编程基础
文章目录
计算机网络基础
为什么要了解网络?
在 MCP Server 项目中,服务器需要与外部客户端(如 Claude Desktop、Claude Code、浏览器等)进行通信。这个通信过程本质上就是网络通信。项目提供了三种传输模式:
- Stdio:通过标准输入输出通信(不涉及网络)
- SSE:基于 HTTP 协议的长连接推送
- HTTP Stream:基于 HTTP 协议的请求-响应 + 服务器推送
后两种模式需要你理解网络基础知识。
OSI模型的简化理解
OSI 七层模型是网络通信的理论框架,但对于本项目,你只需要理解三层:
通俗理解:
- 网络层决定数据包"送到哪台机器",靠 IP 地址寻址(就像快递的省市区地址)
- 传输层决定数据"怎么送",靠 TCP 保证可靠到达(就像快递的签收确认)
- 应用层决定数据"说的是什么",靠 HTTP/MCP 这样的协议约定格式(就像收件人拆开信封读信)
在代码中,你不需要手动操作网络层和传输层,cpp-httplib 库已经封装好了这些细节。你只需要关注应用层:定义什么样的 HTTP 路由,处理什么样的 JSON 请求,返回什么样的 JSON 响应。
IP地址与端口
IP 地址
IP 地址是网络中每台计算机的唯一标识。本项目使用的地址:
127.0.0.1
这是回环地址(Loopback Address),俗称 localhost。它永远指向本机。
为什么项目用 127.0.0.1?因为 MCP Server 和客户端通常运行在同一台机器上:
// SseTransport.h 第51行 - SSE 传输的构造函数默认值
explicit SSE(int port = 8080, std::string host = "127.0.0.1");
// HttpStreamTransport.hpp 第42行 - HTTP Stream 传输的构造函数默认值
explicit HttpStream(int port = 8080, std::string host = "127.0.0.1");
端口
一台计算机可以同时运行多个网络程序。端口就是用来区分不同程序的。它是一个 0~65535 的整数。
127.0.0.1:8080 的含义是:
127.0.0.1:本机8080:这台机器上的第 8080 号端口- 合起来:本机上的 8080 端口
本项目默认使用 8080 端口。为什么不是 80?因为 80 端口是 HTTP 的默认端口,在 Linux/Mac 上监听 1024 以下的端口需要 root 权限。8080 是 HTTP 开发中最常用的替代端口。
// SseTransport.cpp 第117行 - 实际监听
server_->listen(host_.c_str(), port_);
// 展开就是:server_->listen("127.0.0.1", 8080);
端口冲突
如果 8080 端口已经被其他程序占用(比如另一个开发服务器),listen() 会失败:
// SseTransport.cpp 第119-122行
if (!server_->listen(host_.c_str(), port_)) {
LOG(ERROR) << "Failed to start SSE server on " << host_ << ":" << port_ << std::endl;
server_running_.store(false);
}
这就是为什么 Start() 方法返回 bool 值——它告诉调用者启动是否成功。
TCP vs UDP
| 特性 | TCP | UDP |
|---|---|---|
| 连接方式 | 需要建立连接(三次握手) | 不需要连接,直接发送 |
| 可靠性 | 保证数据不丢失、不乱序 | 不保证,可能丢包 |
| 速度 | 较慢 | 较快 |
| 适用场景 | 网页、文件传输、API 调用 | 视频直播、在线游戏 |
| 本项目使用 | 是 | 否 |
本项目使用 TCP。HTTP 协议底层就是 TCP。为什么?
- MCP 协议中的 JSON-RPC 消息必须完整、有序地到达
- cpp-httplib 库底层使用 TCP socket
- SSE 和 HTTP Stream 都建立在 HTTP 之上,HTTP 建立在 TCP 之上
在 CMakeLists.txt 中可以看到链接了 socket 库:
# CMakeLists.txt 第46-48行
if(WIN32)
# Windows Sockets library is required on Windows
target_link_libraries(${PROJECT_NAME} PRIVATE ws2_32)
ws2_32 是 Windows 上的 socket 库,cpp-httplib 通过它使用 TCP。
客户端-服务器模型
本项目遵循经典的客户端-服务器(Client-Server)模型:
在 MCP 的语境中,这是双向的:
客户端 → 服务器(Request / Notification):
{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"weather","arguments":{"city":"Beijing"}}}
服务器 → 客户端(Response):
{"jsonrpc":"2.0","id":1,"result":{"content":[{"type":"text","text":"Beijing: 25°C, sunny"}]}}
服务器 → 客户端(Notification,服务器主动推送):
{"jsonrpc":"2.0","method":"notifications/tools/list_changed"}
在项目代码中,服务器的核心循环就在 Server::Connect() 中:
// Server.cpp 第127-154行
while (!isStopping_) {
auto [length, json_string] = transport->Read(); // 等待客户端请求
// ...
json request = json::parse(json_string); // 解析 JSON
json response = HandleRequest(request); // 处理请求
transport_->Write(response.dump()); // 返回响应
}
这是一个典型的请求-响应循环:读取 → 处理 → 写入 → 读取 → …
阻塞I/O vs 非阻塞I/O
阻塞I/O(Blocking I/O)
当一个线程执行 Read() 时,如果还没有数据到达,这个线程会被操作系统挂起,CPU 会去执行其他线程。直到数据到达后,操作系统再唤醒这个线程。
就像你在餐厅点餐:
- 阻塞:你站在柜台前,菜没做好就一直等,不做别的事
- 非阻塞:你拿了号,去做别的事,等叫号了再回来
本项目中的所有 Read() 调用都是阻塞的:
// StdioTransport.cpp 第31-45行
std::pair<size_t, std::string> Stdio::Read() {
std::string json_data;
int c;
while ((c = std::getc(stdin)) != EOF && c != '\n') {
json_data += static_cast<char>(c);
}
return {json_data.length(), json_data};
}
std::getc(stdin) 是阻塞的——标准输入没有数据时,函数不会返回,整个线程停下来等待。
// SseTransport.cpp 第55-79行
std::pair<size_t, std::string> SSE::Read() {
std::unique_lock<std::mutex> lock(incoming_mutex_);
incoming_cv_.wait(lock, [this]() {
return !incoming_messages_.empty() || !server_running_.load();
});
// ...
}
incoming_cv_.wait() 也是阻塞的——队列为空时,线程在条件变量上等待,CPU 不消耗。
为什么使用阻塞I/O?
- 简单:代码逻辑是线性的,读 → 处理 → 写,清晰易懂
- 适合请求量不大的场景:MCP Server 通常只有一个客户端(如 Claude Desktop),不需要处理几千个并发连接
- 配合多线程使用:阻塞的只是当前线程,其他线程(如 Writer 线程、Watcher 线程)不受影响
项目在信号处理中也利用了阻塞的性质:
// main.cpp 第43-46行 - 关于 SIGINT 对阻塞 read() 的影响
// 对于 stdio 传输,SIGINT 会中断阻塞的 read() 系统调用使 Connect 循环退出。
// 对于 HTTP/SSE 传输,Connect 循环会在下次迭代检测到 isStopping_ 后退出。
同步I/O vs 异步I/O
本项目既使用了同步I/O,也提供了异步I/O接口。
同步I/O
调用 Read() 后,当前线程阻塞,直到数据到达才返回:
auto [length, json_string] = transport->Read();
// 这行代码执行完时,数据一定已经拿到了
异步I/O
调用 ReadAsync() 后,立即返回一个 future 对象。你可以做其他事情,之后再通过 .get() 获取结果(此时如果数据还没到,.get() 会阻塞):
// Server.cpp 第181-182行
auto future = transport_->ReadAsync();
auto [length, json_string] = future.get(); // 这里才真正等待
项目中的异步实现非常简单——就是把同步版本包了一层:
// StdioTransport.cpp 第47-56行
std::future<std::pair<size_t, std::string>> Stdio::ReadAsync() {
return std::async(std::launch::async, []() {
std::string json_data;
int c;
while ((c = std::getc(stdin)) != EOF && c != '\n') {
json_data += static_cast<char>(c);
}
return std::make_pair(json_data.length(), json_data);
});
}
std::async(std::launch::async, ...) 会在另一个线程中执行阻塞的 getc,调用者的线程可以继续执行。
关键区别
| 概念 | 含义 | 项目中的对应 |
|---|---|---|
| 阻塞 vs 非阻塞 | 描述了函数调用的行为 | Read() 会阻塞调用线程 |
| 同步 vs 异步 | 描述了获取结果的方式 | ReadAsync() 返回 future,延迟获取结果 |
注意:项目的当前实现中,异步只是把阻塞操作放到了另一个线程。真正的"非阻塞异步I/O"(如 Linux 的 epoll、Windows 的 IOCP)项目中没有使用,因为 MCP Server 的连接数很少,阻塞+多线程已经完全够用。
HTTP协议深入
本项目使用 HTTP 作为 SSE 和 HTTP Stream 两种传输方式的底层协议。理解 HTTP 协议是理解这两个 Transport 的前提。
HTTP请求格式
一个 HTTP 请求由四部分组成:
方法 路径 HTTP/1.1 ← 请求行
Host: 127.0.0.1:8080 ← 请求头部(Headers)
Content-Type: application/json
Mcp-Session-Id: abc123
{"method":"tools/call",...} ← 请求正文(Body,可选)
HTTP方法
项目用到了四种方法:
| 方法 | 用途 | 项目中的路由 |
|---|---|---|
| GET | 获取资源/建立长连接 | /sse(SSE), /mcp(HTTP Stream 的 SSE 通道) |
| POST | 提交数据 | /messages(SSE), /mcp(HTTP Stream 请求) |
| DELETE | 删除资源/结束会话 | /mcp(HTTP Stream 会话终止) |
| OPTIONS | 预检请求 | 所有路径(CORS 预检) |
项目中的路由注册
// SseTransport.cpp 第150-165行
void SSE::SetupRoutes() {
server_->Options("/.*", [this](...) { HandleOptionsRequest(req, res); });
server_->Get("/health", [](...) { res.set_content(...); });
server_->Post("/messages", [this](...) { HandlePostMessage(req, res); });
server_->Get("/sse", [this](...) { HandleSSEConnection(req, res); });
}
// HttpStreamTransport.cpp 第163-183行
void HttpStream::SetupRoutes() {
server_->Options("/.*", [](...) { HandleOptionsRequest(req, res); });
server_->Get("/health", [](...) { res.set_content(...); });
server_->Post("/mcp", [this](...) { HandlePostMessage(req, res); });
server_->Get("/mcp", [this](...) { HandleGetSSE(req, res); });
server_->Delete("/mcp", [this](...) { HandleDeleteSession(req, res); });
}
注意:
/.*是一个正则表达式,匹配所有路径(用于 OPTIONS)- 同一个路径
/mcp同时注册了 GET、POST、DELETE 三个方法的不同处理函数
HTTP头部(Headers)
头部是键值对,用来传递元信息。项目中最关键的头部:
| 头部名称 | 作用 | 示例值 |
|---|---|---|
Content-Type |
告诉对方正文的格式 | application/json, text/event-stream |
Accept |
告诉对方自己能接受什么格式 | application/json, text/event-stream |
Access-Control-Allow-Origin |
CORS:允许哪些来源访问 | *(允许所有) |
Mcp-Session-Id |
MCP 自定义:会话标识 | 18f3a2b1c-4d5e6f7a |
Cache-Control |
缓存策略 | no-cache |
Connection |
连接管理 | keep-alive |
项目中的头部读取
// HttpStreamTransport.cpp 第189行
auto content_type = req.get_header_value("Content-Type");
// HttpStreamTransport.cpp 第197行
auto accept = req.get_header_value("Accept");
// HttpStreamTransport.cpp 第416-418行
auto client_session = req.get_header_value("Mcp-Session-Id");
get_header_value() 是 cpp-httplib 提供的便捷方法(见 httplib.h 第657行),如果头部不存在则返回空字符串。
HTTP正文(Body)
正文是请求/响应的实际数据。在项目中,正文永远是 JSON 字符串:
// SseTransport.cpp 第286行 - 读取 POST 请求的正文
std::string message = req.body;
// HttpStreamTransport.cpp 第204行
std::string message = req.body;
HTTP响应格式
响应也由四部分组成:
HTTP/1.1 200 OK ← 状态行
Content-Type: application/json
Mcp-Session-Id: abc123 ← 响应头部
Access-Control-Allow-Origin: *
{"jsonrpc":"2.0","id":1,...} ← 响应正文
状态码
项目中使用的状态码:
| 状态码 | 含义 | 项目中的使用场景 |
|---|---|---|
| 200 | OK,请求成功 | 正常返回响应 |
| 202 | Accepted,已接受但尚未处理 | 通知消息(不需要响应) |
| 400 | Bad Request,请求格式错误 | 空消息体、JSON 解析失败 |
| 404 | Not Found,资源不存在 | 会话 ID 无效 |
| 406 | Not Acceptable,不接受此格式 | Accept 头部不满足要求 |
| 415 | Unsupported Media Type | Content-Type 不是 application/json |
| 503 | Service Unavailable | SSE 连接未建立 |
| 504 | Gateway Timeout | 请求处理超时 |
项目中的使用示例:
// HttpStreamTransport.cpp 第191-193行
if (content_type.find("application/json") == std::string::npos) {
res.status = 415; // Unsupported Media Type
res.set_content("{\"error\":\"Unsupported Media Type. Expected application/json\"}", "application/json");
return;
}
// HttpStreamTransport.cpp 第291行 - 请求超时
res.status = 504; // Gateway Timeout
res.set_content("{\"error\":\"Request timed out\"}", "application/json");
// SseTransport.cpp 第281行 - SSE连接不存在时拒绝POST
if (!client_connected_.load()) {
res.status = 503; // Service Unavailable
res.set_content("{\"error\":\"No SSE connection\"}", "application/json");
return;
}
响应体设置
// 最简单的响应
res.set_content("{\"status\":\"ok\"}", "application/json");
// 第一个参数:响应正文
// 第二个参数:Content-Type
// 带自定义头部的响应
res.status = 200;
res.set_content(response_data, "application/json");
res.set_header("Mcp-Session-Id", session_id_);
Content-Type 协商
Content-Type 决定了数据的格式。客户端和服务器需要通过它来"协商"使用什么格式交换数据。
在 MCP Server 中,有两个关键的 Content-Type:
application/json
标准的 JSON 格式,用于普通的请求和响应:
Content-Type: application/json
{"jsonrpc":"2.0","id":1,"result":{"tools":[...]}}
项目在 HTTP Stream 中强制要求 POST 请求必须是 JSON:
// HttpStreamTransport.cpp 第189-194行
auto content_type = req.get_header_value("Content-Type");
if (content_type.find("application/json") == std::string::npos) {
res.status = 415;
res.set_content("{\"error\":\"Unsupported Media Type. Expected application/json\"}", "application/json");
return;
}
同时还检查 Accept 头部,确保客户端能接受 JSON 响应:
// HttpStreamTransport.cpp 第197-202行
auto accept = req.get_header_value("Accept");
if (!accept.empty() && accept.find("application/json") == std::string::npos) {
res.status = 406;
res.set_content("{\"error\":\"Not Acceptable. Must accept application/json\"}", "application/json");
return;
}
text/event-stream
SSE 专用的 MIME 类型,用于服务器向客户端的实时推送:
// SseTransport.cpp 第181行
res.set_header("Content-Type", "text/event-stream");
为什么需要区分 application/json 和 text/event-stream?因为浏览器/客户端需要知道如何解析数据:
application/json→ 直接JSON.parse(body)就行text/event-stream→ 需要按 SSE 协议解析(按行读取,解析event:、data:字段)
CORS 跨域资源共享
什么是跨域?
浏览器的同源策略(Same-Origin Policy)规定:一个网页只能请求同源(协议+域名+端口都相同)的服务器。
例如,https://claude.ai 的网页想请求 http://localhost:8080 的数据,浏览器会阻止。因为:
- 协议不同:
httpsvshttp - 域名不同:
claude.aivslocalhost - 端口不同:
443vs8080
CORS 如何解决?
CORS 通过在 HTTP 响应中添加特定的头部来告诉浏览器:“我允许来自其他来源的请求”。
本项目的 CORS 配置:
// SseTransport.cpp 第309-315行
void SSE::SetCORSHeaders(httplib::Response& res) {
res.set_header("Access-Control-Allow-Origin", "*"); // 允许所有来源
res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); // 允许的方法
res.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization, x-api-key"); // 允许的请求头
res.set_header("Access-Control-Expose-Headers", "Content-Type, Authorization, x-api-key"); // 允许客户端读取的响应头
res.set_header("Access-Control-Max-Age", "86400"); // 预检结果缓存 24 小时
}
HTTP Stream 版本的 CORS 配置多了一个重要头部:
// HttpStreamTransport.cpp 第432-438行
void HttpStream::SetCORSHeaders(httplib::Response& res) {
res.set_header("Access-Control-Allow-Origin", "*");
res.set_header("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS"); // 多了 DELETE
res.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization, Mcp-Session-Id"); // 多了 Mcp-Session-Id
res.set_header("Access-Control-Expose-Headers", "Content-Type, Mcp-Session-Id");
res.set_header("Access-Control-Max-Age", "86400");
}
关键区别:
- Allow-Methods 多了
DELETE(HTTP Stream 需要删除会话) - Allow-Headers 和 Expose-Headers 中多了
Mcp-Session-Id(HTTP Stream 的会话管理需要)
为什么 SSE 的 Allow-Headers 没有 Mcp-Session-Id?
SSE 模式下,会话 ID 是通过 URL 查询参数传递的(/messages?session_id=xxx),而不是通过 HTTP 头部。
// SseTransport.cpp 第209行
std::string event_endpoint = "event: endpoint\ndata: /messages?session_id=" + sessionId + "\n\n";
而 HTTP Stream 模式下,会话 ID 通过 Mcp-Session-Id 头部传递:
// HttpStreamTransport.cpp 第416行
auto client_session = req.get_header_value("Mcp-Session-Id");
预检请求 OPTIONS
对于某些"非简单"的跨域请求(如带自定义头部的 POST、DELETE 等),浏览器会在发送实际请求之前,先发送一个 OPTIONS 请求来询问服务器:“我可以发这个请求吗?”
流程如下:
项目中对 OPTIONS 的处理非常简单——直接返回 200 和 CORS 头部:
// SseTransport.cpp 第304-307行
void SSE::HandleOptionsRequest(const httplib::Request& req, httplib::Response& res) {
SetCORSHeaders(res);
res.status = 200;
}
不需要处理任何正文,因为 OPTIONS 请求只关心响应头部中的 CORS 信息。
路由注册使用了正则表达式来匹配所有路径:
// SseTransport.cpp 第151-153行
server_->Options("/.*", [this](const httplib::Request& req, httplib::Response& res) {
HandleOptionsRequest(req, res);
});
/.* 匹配所有路径,这样无论客户端 OPTIONS 哪个路径,服务器都能正确响应。
自定义头部 Mcp-Session-Id
Mcp-Session-Id 是本项目使用的一个自定义 HTTP 头部,用于 HTTP Stream 模式下管理会话。
标准的 HTTP 头部由 IANA 注册管理,但应用程序可以自定义任何以 X- 开头(传统习惯)的头部。Mcp-Session-Id 虽然没有 X- 前缀,但它是 MCP 规范约定的自定义头部。
在 HTTP Stream 中,每个客户端在初始化(initialize 方法)时获得一个唯一的会话 ID:
// HttpStreamTransport.cpp 第224-228行
if (is_initialize) {
// Generate session ID on initialize
session_id_ = vx::utils::SessionBuilder::GenerateUniqueSessionID();
session_initialized_ = true;
client_connected_.store(true);
LOG(INFO) << "Session initialized: " << session_id_ << std::endl;
}
之后,客户端的每个请求都必须带上这个会话 ID:
// HttpStreamTransport.cpp 第231-233行
} else if (session_initialized_) {
if (!ValidateSession(req, res)) {
return;
}
}
验证逻辑:
// HttpStreamTransport.cpp 第416-425行
bool HttpStream::ValidateSession(const httplib::Request& req, httplib::Response& res) const {
auto client_session = req.get_header_value("Mcp-Session-Id");
if (client_session.empty() || client_session != session_id_) {
LOG(ERROR) << "Invalid session ID: " << client_session << " (expected: " << session_id_ << ")" << std::endl;
res.status = 404;
res.set_content("{\"error\":\"Invalid or missing session ID\"}", "application/json");
return false;
}
return true;
}
服务器在响应中也返回这个会话 ID,让客户端知道会话已建立:
// HttpStreamTransport.cpp 第305-306行
res.set_header("Mcp-Session-Id", session_id_);
cpp-httplib库详解
本项目使用 cpp-httplib(include/httplib.h)作为 HTTP 服务器库。它是一个**仅头文件(header-only)**的 C++ 库——不需要编译链接,只需 #include "httplib.h" 就可以使用。
为什么选 cpp-httplib?
| 特性 | 说明 |
|---|---|
| Header-only | 不需要额外的编译步骤 |
| 跨平台 | Windows / Linux / macOS |
| 支持 HTTPS | 通过 OpenSSL(本项目未使用) |
| 轻量级 | 约 8000 行代码,无额外依赖 |
| 支持流式传输 | set_content_provider 是实现 SSE 的核心 |
Server 创建与路由注册
创建服务器
// SseTransport.cpp 第47行 - SSE Transport 的构造函数
SSE::SSE(const int port, std::string host)
: host_(std::move(host)), port_(port),
server_(std::make_unique<httplib::Server>()) // ← 创建 HTTP 服务器
{
SetupRoutes();
}
httplib::Server 是 cpp-httplib 的核心类。std::make_unique 说明每个 Transport 对象拥有一个独占的 Server 实例。
路由注册
cpp-httplib 使用流畅的 API 注册路由:
// 语法:server_->Method(path, handler);
server_->Get("/sse", handler); // 处理 GET 请求
server_->Post("/messages", handler); // 处理 POST 请求
server_->Delete("/mcp", handler); // 处理 DELETE 请求
server_->Options("/.*", handler); // 处理 OPTIONS 请求(支持正则)
handler 是一个 lambda 函数(或任何可调用对象),签名为:
void(const httplib::Request& req, httplib::Response& res)
请求处理
httplib::Request 对象包含了客户端发来的所有信息。
常用属性和方法
// httplib.h 第627-682行 - Request 结构体定义
struct Request {
std::string method; // "GET", "POST", "DELETE", etc.
std::string path; // "/sse", "/messages", "/mcp"
std::string body; // 请求正文
Headers headers; // 所有 HTTP 头部的 map
std::string remote_addr; // 客户端 IP 地址
int remote_port; // 客户端端口
std::string get_header_value(const std::string& key) const; // 获取指定头部
};
项目中的实际使用:
// 读取请求方法、路径、版本、地址
// SseTransport.cpp 第170-178行
LOG(DEBUG) << "Request method: " << req.method << std::endl;
LOG(DEBUG) << "Request path: " << req.path << std::endl;
LOG(DEBUG) << "Request version: " << req.version << std::endl;
LOG(DEBUG) << "Request remote address: " << req.remote_addr << std::endl;
// 读取请求正文
// SseTransport.cpp 第286行
std::string message = req.body;
// 读取特定头部
// HttpStreamTransport.cpp 第189行
auto content_type = req.get_header_value("Content-Type");
// HttpStreamTransport.cpp 第416行
auto client_session = req.get_header_value("Mcp-Session-Id");
// 遍历所有头部(调试用)
// SseTransport.cpp 第171-173行
for (const auto &header: req.headers) {
LOG(DEBUG) << " - " << header.first << ": " << header.second << std::endl;
}
响应设置
httplib::Response 对象用于构建返回给客户端的响应。
常用方法和属性
// httplib.h 第684-731行 - Response 结构体定义
struct Response {
int status; // HTTP 状态码
std::string body; // 响应正文
void set_header(const std::string& key, const std::string& val);
void set_content(const std::string& s, const std::string& content_type);
void set_content_provider(const std::string& content_type, ContentProviderWithoutLength provider);
};
项目中的实际使用:
最简单的情况:
// HttpStreamTransport.cpp 第169行
res.set_content("{\"status\":\"ok\"}", "application/json");
设置状态码 + 错误消息:
// HttpStreamTransport.cpp 第191-193行
res.status = 415;
res.set_content("{\"error\":\"Unsupported Media Type. Expected application/json\"}", "application/json");
设置多个头部(SSE 连接的建立):
// SseTransport.cpp 第180-183行
SetCORSHeaders(res);
res.set_header("Content-Type", "text/event-stream");
res.set_header("Cache-Control", "no-cache");
res.set_header("Connection", "keep-alive");
set_content_provider — 流式传输的核心
这是理解 SSE 实现最关键的部分。
普通响应 vs 流式响应
普通响应(set_content):
res.set_content("done", "text/plain");
// 1. 设置 Content-Type
// 2. 把整个 body 写入响应
// 3. 连接结束
流式响应(set_content_provider):
res.set_content_provider("text/plain", [](size_t offset, DataSink& sink) -> bool {
sink.write("chunk1", 6);
sleep(1);
sink.write("chunk2", 6);
return false; // false 表示结束
});
// cpp-httplib 会反复调用这个 lambda 函数
// 每次调用时,通过 sink.write() 发送一批数据
// 返回 true 表示"还有数据,继续调我"
// 返回 false 表示"数据发完了,可以关闭连接了"
关键区别:
set_content:一次性写入全部数据,连接随即关闭set_content_provider:cpp-httplib 反复调用你的函数,你每次写入一小块。这实现了流式传输(streaming)
ContentProvider 的函数签名
// httplib.h 第578-579行
using ContentProviderWithoutLength =
std::function<bool(size_t offset, DataSink &sink)>;
参数:
offset:当前已写入的字节数(cpp-httplib 传入,你可以忽略)sink:数据输出接口,通过它写入数据- 返回值:
true= 继续调用,false= 结束流
在 SSE 传输中,set_content_provider 创建了一个持久连接——只要你的回调函数一直返回 true,连接就一直保持。这正是 SSE 需要的"长连接"。
DataSink 接口
DataSink 是你向客户端发送数据的"水槽"。
// httplib.h 第542-573行
class DataSink {
public:
std::function<bool(const char *data, size_t data_len)> write; // 写入数据
std::function<bool()> is_writable; // 检查连接是否可写
std::function<void()> done; // 标记完成
std::ostream os; // 流式输出(包装了 write)
};
项目中使用 sink.write() 发送数据:
// SseTransport.cpp 第210行 - 发送 SSE 事件
sink.write(event_endpoint.data(), event_endpoint.size());
// SseTransport.cpp 第222行 - 发送心跳
sink.write(ping, std::strlen(ping));
// SseTransport.cpp 第259行 - 发送消息
sink.write(sse_msg.data(), sse_msg.size());
项目中使用 sink.is_writable() 检测连接断开:
// SseTransport.cpp 第245-248行
#ifdef CPPHTTPLIB_HAS_SINK_IS_WRITABLE
if (!sink.is_writable()) {
LOG(DEBUG) << "Sink no longer writable; client likely disconnected" << std::endl;
return terminate();
}
#endif
sink.write() 的返回值:
true:写入成功false:写入失败(通常是客户端断开连接)
这就是断连检测的机制——当 sink.write() 返回 false,说明客户端已经离开,content_provider 应该返回 false 来终止流。
项目中的实际使用:每个 Transport 的 Start() 方法
三种 Transport 的 Start() 方法展示了不同的启动模式。
StdioTransport
// StdioTransport.h 第46行
bool Start() override { return true; }
Stdio 不需要启动任何网络服务,Start() 只是返回 true。
SseTransport
// SseTransport.cpp 第109-127行
bool SSE::Start() {
if (server_running_.load()) {
return false; // 防止重复启动
}
server_running_.store(true);
server_thread_ = std::thread([this]() {
LOG(INFO) << "Starting SSE server on " << host_ << ":" << port_ << std::endl;
if (!server_->listen(host_.c_str(), port_)) { // ← 阻塞调用
LOG(ERROR) << "Failed to start SSE server..." << std::endl;
server_running_.store(false);
}
});
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 等待启动完成
return server_running_.load();
}
关键点:
- 在一个独立线程中调用
server_->listen(),因为它是阻塞的 - 等待 100ms 让服务器有时间启动
- 返回实际的运行状态
HttpStreamTransport
// HttpStreamTransport.cpp 第41-59行
bool HttpStream::Start() {
if (server_running_.load()) {
return false;
}
server_running_.store(true);
server_thread_ = std::thread([this]() {
LOG(INFO) << "Starting HttpStream server on " << host_ << ":" << port_ << std::endl;
if (!server_->listen(host_.c_str(), port_)) { // ← 阻塞调用
LOG(ERROR) << "Failed to start HttpStream server..." << std::endl;
server_running_.store(false);
}
});
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return server_running_.load();
}
与 SSE 几乎完全相同——都是在新线程中启动 HTTP 服务器。
Stop() 方法的共同模式
// SseTransport.cpp 第129-148行
void SSE::Stop() {
server_->stop(); // 停止 cpp-httplib 服务器
if (server_thread_.joinable()) {
server_thread_.join(); // 等待服务器线程退出
}
// 唤醒所有等待中的线程
incoming_cv_.notify_all();
outgoing_cv_.notify_all();
}
SSE(Server-Sent Events)协议
SSE 的本质
SSE (Server-Sent Events) 是一种让服务器主动向客户端推送数据的 Web 技术。它的本质是:
HTTP 长连接 + text/event-stream 格式 + 单向推送
在本项目中,SSE Transport 对应文件 src/transport/SseTransport.cpp。
SSE协议、SSE流、SSE传输模式辨析
这三个词只差几个字,但指的是三个不同层级的概念。理解它们的区别是理解整个项目通信架构的关键。
概念层级总览
通俗类比:
| 层级 | 概念 | 类比 |
|---|---|---|
| SSE 协议 | 数据格式规范 | “电报的编码规则”——规定了电文怎么开头、怎么分段 |
| SSE 流 | 协议的具体实例 | “一条正在通话的电话线”——用这套规则传输数据的实际连接 |
| SSE 传输模式 | 完整的通信方案 | “整个邮局系统”——既有收件窗口(POST),又有投递线路(SSE 流) |
第1层:SSE 协议 —— 只定义了"怎么说"
SSE 协议(Server-Sent Events)是 W3C/WHATWG 定义的 Web 标准。它的核心规定只有三条:
1. 服务器响应的 Content-Type 必须是 text/event-stream
2. 数据按 "field: value\n" 的文本格式组织,双换行 \n\n 分隔每条消息
3. 方向是单向的:服务器 → 客户端
协议本身不规定客户端怎么发请求,也不规定连接怎么建立。它只是一个数据格式规范。
SSE 协议规定的消息格式:
event: message ← 事件类型(可选,默认 "message")
data: {"key": "value"} ← 数据内容(必需)
id: 42 ← 事件序号(可选,用于断线重连)
← 空行表示一条消息结束
: 冒号开头的行是注释 ← 用于心跳保活
SSE 协议不关心的事情:
- 客户端怎么把请求发给服务器? → 不管
- 请求和响应怎么匹配? → 不管
- 连接断了怎么重连? → 只规定了
id:字段和Last-Event-Id头部,具体逻辑由客户端实现
第2层:SSE 流 —— SSE 协议的实例
SSE 流(SSE Stream)是一条具体的、使用 SSE 协议的 TCP 长连接。
在项目中,SSE 流是这样创建的:
// SseTransport.cpp 第180-183行
res.set_header("Content-Type", "text/event-stream"); // ← 声明使用 SSE 协议
res.set_header("Cache-Control", "no-cache");
res.set_header("Connection", "keep-alive"); // ← 长连接
res.set_content_provider( // ← 流式传输,连接不关闭
"text/event-stream",
[this](size_t offset, httplib::DataSink& sink) -> bool {
// 只要这个函数返回 true,连接就一直保持
// 每次通过 sink.write() 发送的数据都按 SSE 协议格式
}
);
关键认知:一条 SSE 流 = 一条 TCP 连接。它本质上是一根"水管",水管里流的水遵循 SSE 协议格式。
第3层:SSE 传输模式 —— 用双通道拼出双向通信
SSE 传输模式(项目中的 vx::transport::SSE 类)是一个完整的通信方案。
回忆上一层的结论:一条 SSE 流只能服务器→客户端单向推送。那客户端怎么发请求?答案很简单:再开一条短连接专门发请求。
所以 SSE 传输模式 = POST 短连接 + SSE 长连接:
POST /messages → 把客户端请求扔进 incoming_messages_ 队列 → 立刻返回 200(连接关闭)
GET /sse → 一直连着,从 outgoing_messages_ 队列取响应 → 包装成 SSE 格式推送
注意:POST 返回的 200 只表示"消息投递成功",不代表"请求已处理完毕"。真正的处理结果要等 Server 处理完后,通过 SSE 流推送回来。
三种层级的关系总结
图中用 emoji 标注了三个层级的对应关系:
- 🟠 标注的节点:属于 SSE 传输模式的 POST 通道(短连接)
- 🟢 标注的节点:属于 SSE 流 + SSE 协议(长连接 + SSE 格式推送)
- 未标注的节点:属于 Server 公共逻辑(与传输模式无关)
连接生命周期对比
关键结论:
- POST /messages:每次请求都是一条新连接,发完即关(就像寄信,每封信独立投递)
- GET /sse:建立后一直保持,所有响应都通过这一条连接推送(就像一条专属快递传送带)
- SSE 传输模式 = 无数条短命的 POST 连接 + 一条长寿的 SSE 长连接
SSE 协议的建立
客户端发起一个普通的 HTTP GET 请求,服务器返回特殊的响应头:
// SseTransport.cpp 第168-186行
void SSE::HandleSSEConnection(const httplib::Request& req, httplib::Response& res) {
// 设置 CORS 头部
SetCORSHeaders(res);
// SSE 的关键头部
res.set_header("Content-Type", "text/event-stream"); // ← 告诉浏览器这是 SSE
res.set_header("Cache-Control", "no-cache"); // ← 禁止缓存
res.set_header("Connection", "keep-alive"); // ← 保持连接
client_connected_.store(true);
sse_active_.store(true);
// 注册内容提供者(流式传输)
res.set_content_provider(
"text/event-stream",
[this](size_t offset, httplib::DataSink& sink) -> bool {
// ... 推送逻辑
}
);
}
三个关键头部缺一不可:
Content-Type: text/event-stream— 告诉客户端按 SSE 格式解析Cache-Control: no-cache— 代理服务器不应缓存 SSE 流Connection: keep-alive— HTTP 连接保持打开
SSE 事件格式
SSE 数据流的格式非常简单,就是纯文本,每条消息由两个换行符分隔:
event: endpoint\ndata: /messages?session_id=abc123\n\n
格式规则:
event:— 事件类型(可选,默认为 “message”)data:— 数据内容(可以有多个data:行,组成多行数据)id:— 事件 ID(可选,用于断线重连):开头 — 注释行(用于心跳保活)\n\n— 双换行符表示一条消息结束
项目中的事件构建:
// SseTransport.cpp 第209行 - 发送 endpoint 事件
std::string event_endpoint = "event: endpoint\ndata: /messages?session_id=" + sessionId + "\n\n";
// SseTransport.cpp 第256行 - 发送消息事件
std::string sse_msg = "data: " + message + "\n\n";
注意 SSE 模式中,消息事件的格式是没有 event: 行的(默认为 message 类型),只有 data: 行。
而在 HTTP Stream 中,消息明确声明了事件类型:
// HttpStreamTransport.cpp 第368行
std::string sse_msg = "event: message\ndata: " + message + "\n\n";
Keep-Alive 心跳
SSE 是长连接,如果长时间没有数据传输,中间的网络设备(路由器、代理、负载均衡器)可能会关闭这个"闲置"的连接。
心跳机制就是定期发送一条无意义的数据来"告诉"沿途设备"我还活着"。
在 SSE 协议中,以冒号 : 开头的行是注释行,客户端会忽略它们,非常适合做心跳:
: ping\n\n
项目中的心跳实现:
// SseTransport.cpp 第193-227行
const auto ping_interval = std::chrono::seconds(15); // 每 15 秒一次
// ...
if (clock::now() - last_ping >= ping_interval) {
// SSE comment line as keep-alive
const char* ping = ": ping\n\n";
// If write fails, client disconnected
if (!sink.write(ping, std::strlen(ping))) {
LOG(ERROR) << "Keep-alive write failed; client likely disconnected" << std::endl;
return terminate(); // 连接断开,退出
}
last_ping = clock::now();
}
心跳间隔 15 秒是一个合理的值——大多数网络设备的空闲超时通常在 30 秒到 2 分钟之间,15 秒可以安全地在超时前刷新连接状态。
断连检测与重连
服务器端检测断连
// SseTransport.cpp 第196-201行
auto terminate = [this]() -> bool {
sse_active_.store(false);
client_connected_.store(false);
outgoing_cv_.notify_all(); // 唤醒可能阻塞的发送线程
incoming_cv_.notify_all(); // 唤醒可能阻塞的读取线程
return false; // 返回 false 表示 content_provider 结束
};
三种检测方式:
- 写入失败:
// SseTransport.cpp 第222行
if (!sink.write(ping, std::strlen(ping))) {
return terminate();
}
- is_writable() 检查:
// SseTransport.cpp 第245-248行
if (!sink.is_writable()) {
return terminate();
}
- 异常捕获:
// SseTransport.cpp 第266-272行
} catch (const std::exception& ex) {
LOG(ERROR) << "Exception in SSE content provider: " << ex.what() << std::endl;
return terminate();
} catch (...) {
LOG(ERROR) << "Unknown exception in SSE content provider" << std::endl;
return terminate();
}
客户端重连
SSE 协议内置了重连支持。如果连接断开,浏览器(使用 EventSource API)会自动重连。服务器可以通过 id: 字段告诉客户端最后一个事件 ID,客户端重连时会在请求中带上 Last-Event-Id 头部,服务器可以从中断的地方继续推送。
本项目目前没有实现 id: 和 Last-Event-Id 的重连机制,断连后客户端需要重新建立完整的会话。
多客户端并发推送
SSE Transport 的 write() 方法通过消息队列和条件变量实现线程安全的消息推送:
// SseTransport.cpp 第81-94行
void SSE::Write(const std::string& json_data) {
if (!client_connected_.load()) {
return; // 没有客户端连接,忽略
}
{
std::lock_guard<std::mutex> lock(outgoing_mutex_);
outgoing_messages_.push(json_data); // 加入队列
}
outgoing_cv_.notify_one(); // 唤醒 content_provider 线程
}
content_provider 中的处理:
// SseTransport.cpp 第230-263行
std::unique_lock<std::mutex> lock(outgoing_mutex_);
outgoing_cv_.wait_for(lock, std::chrono::milliseconds(200), [this]() {
return !outgoing_messages_.empty() || !sse_active_.load();
});
if (!outgoing_messages_.empty()) {
std::string message = outgoing_messages_.front();
outgoing_messages_.pop();
lock.unlock();
std::string sse_msg = "data: " + message + "\n\n";
sink.write(sse_msg.data(), sse_msg.size());
}
这段代码做了三件事:
- 等待队列中出现消息(最多等 200ms)
- 取出消息
- 包装成 SSE 格式并发送
注意:当前 SSE 实现只支持单个客户端(只有一个 client_connected_ 标志和一个 outgoing_messages_ 队列)。如果需要支持多个同时连接,需要维护每个连接的独立队列。
项目代码逐行分析:SseTransport.cpp
让我们从头到尾分析 SSE Transport 的关键代码。
构造函数(第 47-49 行)
SSE::SSE(const int port, std::string host)
: host_(std::move(host)), port_(port),
server_(std::make_unique<httplib::Server>()) {
SetupRoutes(); // 注册 HTTP 路由
}
创建一个 cpp-httplib 服务器实例,然后立即注册路由。
析构函数(第 51-53 行)
SSE::~SSE() {
SSE::Stop(); // 确保析构时停止服务器
}
SetupRoutes(第 150-166 行)
void SSE::SetupRoutes() {
server_->Options("/.*", ...); // CORS 预检
server_->Get("/health", ...); // 健康检查
server_->Post("/messages", ...); // 接收客户端消息
server_->Get("/sse", ...); // SSE 长连接
}
注册四个路由。注意注册顺序无关紧要,cpp-httplib 内部会维护一个路由表。
Read()(第 55-79 行)
std::pair<size_t, std::string> SSE::Read() {
std::unique_lock<std::mutex> lock(incoming_mutex_);
incoming_cv_.wait(lock, [this]() {
return !incoming_messages_.empty() || !server_running_.load();
});
// 取出队列中的消息
if (!incoming_messages_.empty()) {
std::string message = incoming_messages_.front();
incoming_messages_.pop();
return {message.length(), message};
}
return {0, ""};
}
这是 ITransport 接口的实现。它阻塞等待来自 incoming_messages_ 队列的消息。消息如何进入队列?通过 HandlePostMessage()。
HandlePostMessage(第 277-302 行)
void SSE::HandlePostMessage(const httplib::Request& req, httplib::Response& res) {
SetCORSHeaders(res);
if (!client_connected_.load()) {
res.status = 503; // SSE 连接不存在
return;
}
std::string message = req.body;
if (message.empty()) {
res.status = 400; // 空消息
return;
}
// 将消息放入 incoming 队列
{
std::lock_guard<std::mutex> lock(incoming_mutex_);
incoming_messages_.push(message);
}
incoming_cv_.notify_one(); // 唤醒 Read() 中等待的线程
res.status = 200;
}
客户端通过 POST 发送消息,服务器将其放入队列,然后阻塞在 Read() 上的主线程被唤醒,取出消息并处理。
Write()(第 81-94 行)
void SSE::Write(const std::string& json_data) {
if (!client_connected_.load()) {
return; // 忽略——没有客户端
}
{
std::lock_guard<std::mutex> lock(outgoing_mutex_);
outgoing_messages_.push(json_data); // 加入发送队列
}
outgoing_cv_.notify_one(); // 唤醒 content_provider 线程
}
服务器处理完请求后调用 Write() 返回响应。消息进入 outgoing_messages_ 队列,由 SSE content_provider 取出并以 SSE 格式发送。
数据流总结
SSE 通信中消息格式的实际例子
客户端通过 POST 发送请求:
POST /messages HTTP/1.1
Content-Type: application/json
{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"weather","arguments":{"city":"Tokyo"}}}
服务器通过 SSE 长连接返回响应:
data: {"jsonrpc":"2.0","id":1,"result":{"content":[{"type":"text","text":"Tokyo: 20°C, cloudy"}]}}
服务器主动推送通知:
data: {"jsonrpc":"2.0","method":"notifications/tools/list_changed"}
HTTP Stream / Streamable HTTP
与 SSE 的区别
HTTP Stream Transport 是本项目中对 MCP 规范中 “Streamable HTTP” 传输的实现。它与 SSE 的核心区别:
| 特性 | SSE | HTTP Stream |
|---|---|---|
| 通信方向 | 单向(服务器→客户端推送) | 双向(客户端请求 + 服务器推送) |
| 请求发送 | POST /messages | POST /mcp(同一个端点) |
| 响应接收 | GET /sse 长连接 | GET /mcp 长连接 + POST 直接返回 |
| 会话管理 | URL 参数 ?session_id= |
HTTP 头部 Mcp-Session-Id |
| 断开方式 | 关闭 SSE 连接 | DELETE /mcp |
| 请求-响应匹配 | 不匹配(客户端自己关联) | 基于 jsonrpc.id 匹配 |
在 SSE 模式中,请求和响应走不同的通道:
- 请求走 POST /messages
- 响应走 GET /sse 长连接
在 HTTP Stream 模式中,所有通信都通过 /mcp 这一个端点:
- 请求通过 POST /mcp 发送
- 响应可以直接从 POST 响应返回,也可以通过 GET /mcp 的 SSE 流推送服务器通知
会话管理
HTTP Stream 的最大特点是基于会话。每个客户端首先发送 initialize 请求建立会话,之后的所有请求都要携带会话 ID。
会话建立(initialize 方法触发):
// HttpStreamTransport.cpp 第220-228行
bool is_initialize = parsed.contains("method") && parsed["method"] == "initialize";
if (is_initialize) {
session_id_ = vx::utils::SessionBuilder::GenerateUniqueSessionID();
session_initialized_ = true;
client_connected_.store(true);
LOG(INFO) << "Session initialized: " << session_id_ << std::endl;
}
会话 ID 的生成(时间戳 + 随机数):
// SessionBuilder.h 第37-49行
static std::string GenerateUniqueSessionID() {
auto now = std::chrono::high_resolution_clock::now();
auto timestamp = std::chrono::duration_cast<std::chrono::microseconds>(
now.time_since_epoch()).count();
static std::random_device rd;
static std::mt19937 gen(rd());
static std::uniform_int_distribution<uint32_t> dis;
std::stringstream ss;
ss << std::hex << timestamp << "-" << dis(gen);
return ss.str();
}
生成的 ID 格式如 18f3a2b1c-4d5e6f7a(微秒级时间戳 + 随机数,十六进制)。
会话验证:
// HttpStreamTransport.cpp 第416-425行
bool HttpStream::ValidateSession(const httplib::Request& req, httplib::Response& res) const {
auto client_session = req.get_header_value("Mcp-Session-Id");
if (client_session.empty() || client_session != session_id_) {
res.status = 404;
res.set_content("{\"error\":\"Invalid or missing session ID\"}", "application/json");
return false;
}
return true;
}
会话终止(客户端主动关闭):
// HttpStreamTransport.cpp 第389-414行
void HttpStream::HandleDeleteSession(const httplib::Request& req, httplib::Response& res) {
SetCORSHeaders(res);
if (!session_initialized_) {
res.status = 404;
return;
}
if (!ValidateSession(req, res)) {
return;
}
session_initialized_ = false;
client_connected_.store(false);
sse_stream_active_.store(false);
incoming_cv_.notify_all(); // 唤醒等待中的线程
sse_cv_.notify_all();
res.status = 200;
res.set_content("{\"status\":\"session terminated\"}", "application/json");
}
会话的生命周期:
请求-响应匹配(基于 jsonrpc.id)
在 HTTP Stream 中,请求的响应不再通过 SSE 流返回,而是直接从 POST 请求的响应中返回。这是通过 promise/future 机制实现的。
PendingRequest 结构
// HttpStreamTransport.hpp 第97-99行
struct PendingRequest {
std::promise<std::string> promise; // 用于传递响应的 promise
};
std::unordered_map<std::string, std::shared_ptr<PendingRequest>> pending_requests_;
pending_requests_ 是一个 map,key 是请求的 id,value 是一个 promise。
完整流程
步骤 1:收到 POST 请求,创建 pending request
// HttpStreamTransport.cpp 第257-275行
// 提取请求 ID
std::string id_str;
if (parsed["id"].is_number()) {
id_str = std::to_string(parsed["id"].get<int>());
} else {
id_str = parsed["id"].get<std::string>();
}
// 创建 PendingRequest(包含 promise/future 对)
auto pending = std::make_shared<PendingRequest>();
std::future<std::string> response_future = pending->promise.get_future();
{
std::lock_guard<std::mutex> lock(pending_mutex_);
pending_requests_[id_str] = pending; // 注册到 map
}
// 将消息放入 incoming 队列
{
std::lock_guard<std::mutex> lock(incoming_mutex_);
incoming_messages_.push(message);
}
incoming_cv_.notify_one();
步骤 2:Server 处理请求,调用 Write() 返回响应
// HttpStreamTransport.cpp 第113-149行
void HttpStream::Write(const std::string& json_data) {
auto parsed = nlohmann::json::parse(json_data);
// 检查是否是请求的响应(有 "id" 和 "result"/"error")
if (parsed.contains("id") && (parsed.contains("result") || parsed.contains("error"))) {
std::string id_str = /* 提取 ID */;
std::lock_guard<std::mutex> lock(pending_mutex_);
auto it = pending_requests_.find(id_str);
if (it != pending_requests_.end()) {
// 找到了!通过 promise 传递响应
it->second->promise.set_value(json_data);
pending_requests_.erase(it); // 清理
return; // 不需要走 SSE 推送
}
}
// 不是请求响应,是服务器通知 → 放入 SSE 队列
if (sse_stream_active_.load()) {
std::lock_guard<std::mutex> lock(sse_mutex_);
sse_notifications_.push(json_data);
sse_cv_.notify_one();
}
}
步骤 3:POST 处理函数等待 promise
// HttpStreamTransport.cpp 第283-307行
// 等待 Server 处理并调用 Write() 返回结果
auto status = response_future.wait_for(std::chrono::seconds(30));
if (status == std::future_status::timeout) {
// 超时:清理并返回 504
{
std::lock_guard<std::mutex> lock(pending_mutex_);
pending_requests_.erase(id_str);
}
res.status = 504;
res.set_content("{\"error\":\"Request timed out\"}", "application/json");
return;
}
std::string response_data = response_future.get();
// 将响应直接返回给客户端
res.status = 200;
res.set_content(response_data, "application/json");
res.set_header("Mcp-Session-Id", session_id_);
数据流总结
通知(Notifications)
在 HTTP Stream 中,通知消息(没有 id 字段的 JSON-RPC 消息)不走 promise/future 匹配,而是直接确认并返回 202:
// HttpStreamTransport.cpp 第237-253行
bool is_notification = !parsed.contains("id");
if (is_notification) {
// 将通知放入队列供 Server 处理
{
std::lock_guard<std::mutex> lock(incoming_mutex_);
incoming_messages_.push(message);
}
incoming_cv_.notify_one();
// 通知只返回 202 Accepted
res.status = 202;
res.set_header("Mcp-Session-Id", session_id_);
return;
}
服务器的主动通知(如 tools/list_changed)通过 GET /mcp 的 SSE 流推送:
// HttpStreamTransport.cpp 第385-387行(HandleGetSSE 中的内容提供者)
if (!sse_notifications_.empty()) {
std::string message = sse_notifications_.front();
sse_notifications_.pop();
std::string sse_msg = "event: message\ndata: " + message + "\n\n";
sink.write(sse_msg.data(), sse_msg.size());
}
项目代码逐行分析:HttpStreamTransport.cpp
构造函数(第 32-35 行)
HttpStream::HttpStream(int port, std::string host)
: port_(port), host_(std::move(host)),
server_(std::make_unique<httplib::Server>()) {
SetupRoutes();
}
与 SSE Transport 构造函数几乎相同。
SetupRoutes(第 163-183 行)
void HttpStream::SetupRoutes() {
server_->Options("/.*", ...); // CORS 预检
server_->Get("/health", ...); // 健康检查
server_->Post("/mcp", ...); // 处理客户端请求
server_->Get("/mcp", ...); // SSE 流(接收服务器通知)
server_->Delete("/mcp", ...); // 终止会话
}
对比 SSE 的路由:SSE 用 /messages 和 /sse 两个端点,HTTP Stream 统一用 /mcp 一个端点。
Read()(第 93-111 行)
std::pair<size_t, std::string> HttpStream::Read() {
std::unique_lock<std::mutex> lock(incoming_mutex_);
incoming_cv_.wait(lock, [this]() {
return !incoming_messages_.empty() || !server_running_.load();
});
// 取出消息...
}
与 SSE 的 Read() 逻辑完全相同——都是从 incoming_messages_ 队列中取消息。
Write()(第 113-149 行)
这是 HTTP Stream 最复杂的部分。与前两个 Transport 不同,Write() 需要区分请求响应和服务器通知:
void HttpStream::Write(const std::string& json_data) {
if (!client_connected_.load()) return;
auto parsed = nlohmann::json::parse(json_data);
// 情况1:请求的响应 → 通过 promise 返回
if (parsed.contains("id") && (parsed.contains("result") || parsed.contains("error"))) {
// 匹配 pending_requests_,通过 promise.set_value() 传递
}
// 情况2:服务器通知 → 通过 SSE 流推送
if (sse_stream_active_.load()) {
sse_notifications_.push(json_data);
sse_cv_.notify_one();
}
}
Stop()(第 61-91 行)
void HttpStream::Stop() {
server_running_.store(false);
server_->stop();
incoming_cv_.notify_all();
sse_cv_.notify_all();
// 清理所有 pending requests,防止内存泄漏
{
std::lock_guard<std::mutex> lock(pending_mutex_);
for (auto& [id, pending] : pending_requests_) {
try {
pending->promise.set_value(""); // 唤醒等待中的线程
} catch (...) {}
}
pending_requests_.clear();
}
}
三种传输模式完整对比
总览
| 维度 | Stdio | SSE | HTTP Stream |
|---|---|---|---|
| C++ 类 | vx::transport::Stdio |
vx::transport::SSE |
vx::transport::HttpStream |
| 源文件 | StdioTransport.cpp |
SseTransport.cpp |
HttpStreamTransport.cpp |
| 传输介质 | stdin/stdout | HTTP (TCP port) | HTTP (TCP port) |
| 默认端口 | 无 | 8080 | 8080 |
| 通信方向 | 双向(请求-响应) | 半双向(POST请求 + SSE推送) | 全双向(POST + SSE 通知) |
| 连接数量 | 1对1(单客户端) | 1对1(当前实现单客户端) | 1对1(当前实现单会话) |
| 启动方式 | ./mcp_server |
./mcp_server -s |
./mcp_server -t |
| GetName() | "stdio" |
"sse" |
"httpstream" |
| GetVersion() | "0.2" |
"0.4" |
"0.1" |
| Start() | 空操作(返回 true) | 在独立线程中启动 HTTP 服务器 | 在独立线程中启动 HTTP 服务器 |
架构对比
数据流对比
Stdio 的数据流
最简单直接。没有队列,没有线程间通信。一切都在主线程中顺序执行。
SSE 的数据流
两个队列将网络 I/O 与业务逻辑分离:
incoming_messages_:HTTP 处理线程写入 → 主线程 Read() 读取outgoing_messages_:主线程 Write() 写入 → content_provider 线程读取并通过 SSE 发送
HTTP Stream 的数据流
三步走:
- 消息进入
incoming_messages_,同时创建PendingRequest - Server 处理完后调用
Write(),Write()识别是否匹配到pending_requests_ - POST 处理函数从
future获取结果并返回
各自的优缺点
Stdio
:::color1
优点:
- 零网络开销(不经过网络协议栈)
- 零依赖(不需要 HTTP 库)
- 代码最简单(
StdioTransport.cpp只有 68 行) - 天然单客户端(stdin/stdout 只有一个来源/目标)
- 无需端口管理
:::
:::color4
缺点:
- 只能本地使用(无法远程访问)
- 客户端必须是能启动子进程的程序(如 Claude Desktop)
- 无法在浏览器中使用
- 不支持多客户端
:::
SSE
:::color1
优点:
- 浏览器原生支持(
EventSourceAPI) - 适合服务器主动推送
- 实现相对简单
- HTTP 协议通用性好(可通过代理、防火墙)
:::
:::color4
缺点:
- 请求和响应走不同通道,实现复杂
- 当前实现只支持单客户端
- POST 请求和 SSE 连接之间的会话管理通过 URL 参数,不够优雅
- SSE 只支持文本数据(在 MCP 场景中足够,因为所有消息都是 JSON)
- 没有内置的请求-响应匹配,客户端需要自行关联
:::
HTTP Stream
:::color1
优点:
- 请求响应走同一通道(POST 直接返回响应),逻辑更清晰
- 基于 HTTP 头部的会话管理更规范
- 支持显式的会话终止(DELETE)
- Content-Type 协商更严格
- 更符合 RESTful 风格
:::
:::color4
缺点:
- 实现最复杂(
HttpStreamTransport.cpp约 440 行) - 当前实现只支持单会话
- 使用
promise/future增加了实现复杂度 - 超时处理增加了边界情况
:::
何时使用哪种传输
在 MCP 规范中:
- Stdio 是基础传输,所有 MCP 服务器都应该支持
- SSE 是早期规范中的 Web 传输方式
- Streamable HTTP(本项目中的 HTTP Stream)是较新的 MCP 规范推荐传输方式,正在逐步取代 SSE
ITransport 接口的统一
三种 Transport 都实现了 ITransport 接口:
// ITransport.h - 第33-48行
class ITransport {
public:
virtual bool Start() = 0;
virtual void Stop() = 0;
virtual bool IsRunning() = 0;
virtual std::pair<size_t, std::string> Read() = 0;
virtual void Write(const std::string& json_data) = 0;
virtual std::future<std::pair<size_t, std::string>> ReadAsync() = 0;
virtual std::future<void> WriteAsync(const std::string& json_data) = 0;
virtual std::string GetName() = 0;
virtual std::string GetVersion() = 0;
virtual int GetPort() = 0;
};
这正是面向接口编程的体现。Server 类不需要知道它使用的是哪种传输方式,只需要调用 transport->Read() 和 transport->Write()。
// Server.cpp 第107-159行 - Server::Connect()
bool Server::Connect(const std::shared_ptr<ITransport>& transport) {
transport_ = transport;
transport_->Start();
while (!isStopping_) {
auto [length, json_string] = transport->Read();
json response = HandleRequest(request);
transport_->Write(response.dump());
}
transport_->Stop();
}
无论是 Stdio、SSE 还是 HTTP Stream,这段代码的逻辑都不需要改变。
主函数中的传输选择
// main.cpp 第120-126行
if (use_sse_server->count() > 0) {
transport = std::make_shared<vx::transport::SSE>();
} else if (use_httpstream_server->count() > 0) {
transport = std::make_shared<vx::transport::HttpStream>();
} else {
transport = std::make_shared<vx::transport::Stdio>();
}
然后用统一的接口使用:
// main.cpp 第339行
server->Connect(transport);
这就是策略模式在项目中的实际应用。
更多推荐


所有评论(0)