FastAPI 流式响应实战:用 SSE 实现 ChatGPT 式逐字输出
前言
在 ChatGPT 的带动下,用户已经习惯了"逐字蹦出来"的流式体验。对于企业知识库问答系统来说,检索 + 生成的过程可能耗时 3-10 秒,如果没有流式输出,用户只能干等。本文介绍如何用 FastAPI 的 StreamingResponse 实现 Server-Sent Events(SSE)流式输出。
为什么选 SSE 而不是 WebSocket?
| 特性 | SSE | WebSocket |
|---|---|---|
| 方向 | 单向(服务端→客户端) | 双向 |
| 协议 | HTTP | 独立协议(需要升级) |
| 实现复杂度 | 低(标准 EventSource API) | 中(需要心跳、重连逻辑) |
| 适用场景 | 日志流、AI 生成、通知 | 聊天、游戏、协作编辑 |
聊天问答场景中,客户端只需要接收答案,不需要向服务端推送数据。SSE 完全够用,且实现简单。
整体数据流
LLM token → AsyncGenerator → StreamingResponse → EventSource → 前端渲染
全链路异步、零缓冲。每一步都是"生成一个、发送一个"。
后端实现
API 路由
# api/main.py
@app.get("/api/chat/stream")
async def chat_stream(question: str, session_id: str):
async def generate_stream():
try:
async for chunk in get_response(question, session_id):
# 将 Python 字典序列化为 JSON,符合 SSE 格式
yield f"data: {json.dumps(chunk)}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'data': {'message': str(e)}})}\n\n"
return StreamingResponse(
generate_stream(),
media_type="text/event-stream"
)
关键点:
-
路由是
GET方法(SSE 标准用 GET,因为 EventSource API 只支持 GET) -
generate_stream()是异步生成器函数,每个yield对应一个 SSE 事件 -
StreamingResponse的media_type必须是"text/event-stream" -
每个事件格式:
data: <json>\n\n(SSE 协议要求双换行结尾)
生成器输出的三种事件
# 下游 processor 输出的统一格式
yield {"type": "metadata", "data": {"intent": "knowledge_base", ...}}
yield {"type": "content", "data": "请假"}
yield {"type": "content", "data": "流程"}
yield {"type": "content", "data": "是"}
# ... 更多 token
yield {"type": "end", "data": {"full_answer": "...", "metadata": {...}}}
| type | 含义 | 前端处理 |
|---|---|---|
| metadata | 流开始,携带意图等元信息 | 仅日志 |
| content | 文本 token | 追加到消息末尾 |
| error | 异常 | 显示错误提示 |
| end | 流结束 | 关闭连接,停止 loading 动画 |
为什么不直接用 POST?
SSE 标准使用 GET,且 EventSource API 不支持自定义请求头、请求体。参数只能放在 URL 查询字符串中。虽然不是最优雅的方案,但在这种场景下足够实用。
如果你确实需要用 POST + 流式,可以在前端用 fetch + ReadableStream 替代 EventSource,但代码会复杂很多。
前端实现
流式请求
// ui/src/components/ChatWindow.vue
const sendStreamRequest = (message, lastMsg) => {
// 构建 SSE 连接 URL(GET 方式,参数在 URL 中)
const url = `${API_BASE}/api/chat/stream?question=${encodeURIComponent(message)}&session_id=${uuid.value}`;
const eventSource = new EventSource(url);
eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (data.type === 'content') {
// 追加 token 到消息内容
lastMsg.content += data.data;
scrollToBottom();
}
else if (data.type === 'end') {
// 流结束,关闭连接
lastMsg.isTyping = false;
isSending.value = false;
eventSource.close();
}
else if (data.type === 'error') {
lastMsg.content = '请求出错: ' + (data.data.message || '未知错误');
lastMsg.isTyping = false;
eventSource.close();
}
} catch (error) {
// 非 JSON 格式,可能是纯文本
if (event.data.trim()) {
lastMsg.content += event.data;
}
}
};
eventSource.onerror = (error) => {
// SSE 连接失败,降级到普通请求
console.log('流式请求失败,尝试普通请求');
lastMsg.isTyping = false;
eventSource.close();
sendNormalRequest(message, lastMsg);
};
};
关键细节
1. encodeURIComponent 编码
问题文本可能包含特殊字符(中文标点、空格等),放在 URL 中必须编码,否则 URL 解析错误。
2. 错误回退
SSE 连接可能因网络问题失败(代理不兼容、防火墙等),onerror 中自动降级为普通 POST 请求,确保功能可用:
// 普通请求作为 fallback
const sendNormalRequest = (message, lastMsg) => {
axios.post(`${API_BASE}/api/chat`, {
question: message,
session_id: uuid.value
}).then(response => {
lastMsg.content = response.data.answer;
lastMsg.isTyping = false;
});
};
3. scrollToBottom
每次收到 token 都自动滚到底部,确保用户始终看到最新内容:
const scrollToBottom = () => {
if (messaggListRef.value) {
messaggListRef.value.scrollTop = messaggListRef.value.scrollHeight;
}
};
4. Loading 动画
机器人消息的 isTyping 属性控制 loading 点动画的显示:
<span class="loading-dots" v-if="message.isTyping"> <span class="dot"></span> <span class="dot"></span> <span class="dot"></span> </span>
收到第一个 token 时 content 就有值了,loading 动画自动被内容覆盖,无需手动关闭。end 事件只负责将 isTyping 置为 false。
HTML 转义
由于前端用 v-html 渲染消息内容(用于支持换行 <br> 等),需要防止 XSS:
const convertStreamOutput = (output) => {
return output
.replace(/&/g, '&')
.replace(/</g, '<')
.replace(/>/g, '>')
.replace(/\n/g, '<br>')
.replace(/\t/g, ' ');
};
在模板中调用:
<span v-html="convertStreamOutput(message.content)"></span>
注意 & 要先转义,否则后面 < 中的 & 会被二次转义。
完整请求-响应时间线
[0ms] 用户点击发送 [1ms] 前端创建 EventSource,发出 GET 请求 [5ms] FastAPI 接收,异步生成器开始执行 [10ms] 加载 MongoDB 历史 → 意图识别 [50ms] 发送 metadata 事件 [100ms] 查询增强 → RAGFlow 检索 [500ms] LLM 开始生成第一个 token [500ms] 前端收到第一个 content 事件,显示第一个字 [500ms] LLM 生成第二个 token [501ms] 前端收到第二个 content 事件 ... [3500ms] LLM 生成最后一个 token [3501ms] 发送 end 事件 [3502ms] 前端关闭 EventSource,停止 loading
用户从 ~100ms 后就开始看到内容"逐字蹦出",而不是干等 3.5 秒看完整答案。
小结
-
SSE 比 WebSocket 更适合单向流式场景,实现复杂度低
-
AsyncGenerator 全链路透传,从 LLM token 到 HTTP 事件零缓冲
-
前端 EventSource 自动处理重连,比手动
fetch+ReadableStream更简单 -
错误回退到普通请求,保证极端网络环境下的可用性
更多推荐



所有评论(0)