前言

在 ChatGPT 的带动下,用户已经习惯了"逐字蹦出来"的流式体验。对于企业知识库问答系统来说,检索 + 生成的过程可能耗时 3-10 秒,如果没有流式输出,用户只能干等。本文介绍如何用 FastAPI 的 StreamingResponse 实现 Server-Sent Events(SSE)流式输出。

为什么选 SSE 而不是 WebSocket?

特性 SSE WebSocket
方向 单向(服务端→客户端) 双向
协议 HTTP 独立协议(需要升级)
实现复杂度 低(标准 EventSource API) 中(需要心跳、重连逻辑)
适用场景 日志流、AI 生成、通知 聊天、游戏、协作编辑

聊天问答场景中,客户端只需要接收答案,不需要向服务端推送数据。SSE 完全够用,且实现简单。

整体数据流

LLM token → AsyncGenerator → StreamingResponse → EventSource → 前端渲染

全链路异步、零缓冲。每一步都是"生成一个、发送一个"。

后端实现

API 路由

# api/main.py
​
@app.get("/api/chat/stream")
async def chat_stream(question: str, session_id: str):
    async def generate_stream():
        try:
            async for chunk in get_response(question, session_id):
                # 将 Python 字典序列化为 JSON,符合 SSE 格式
                yield f"data: {json.dumps(chunk)}\n\n"
​
        except Exception as e:
            yield f"data: {json.dumps({'type': 'error', 'data': {'message': str(e)}})}\n\n"
​
    return StreamingResponse(
        generate_stream(),
        media_type="text/event-stream"
    )

关键点:

  • 路由是 GET 方法(SSE 标准用 GET,因为 EventSource API 只支持 GET)

  • generate_stream() 是异步生成器函数,每个 yield 对应一个 SSE 事件

  • StreamingResponsemedia_type 必须是 "text/event-stream"

  • 每个事件格式:data: <json>\n\n(SSE 协议要求双换行结尾)

生成器输出的三种事件

# 下游 processor 输出的统一格式
yield {"type": "metadata", "data": {"intent": "knowledge_base", ...}}
yield {"type": "content", "data": "请假"}
yield {"type": "content", "data": "流程"}
yield {"type": "content", "data": "是"}
# ... 更多 token
yield {"type": "end", "data": {"full_answer": "...", "metadata": {...}}}
type 含义 前端处理
metadata 流开始,携带意图等元信息 仅日志
content 文本 token 追加到消息末尾
error 异常 显示错误提示
end 流结束 关闭连接,停止 loading 动画

为什么不直接用 POST?

SSE 标准使用 GET,且 EventSource API 不支持自定义请求头、请求体。参数只能放在 URL 查询字符串中。虽然不是最优雅的方案,但在这种场景下足够实用。

如果你确实需要用 POST + 流式,可以在前端用 fetch + ReadableStream 替代 EventSource,但代码会复杂很多。

前端实现

流式请求

// ui/src/components/ChatWindow.vue
​
const sendStreamRequest = (message, lastMsg) => {
  // 构建 SSE 连接 URL(GET 方式,参数在 URL 中)
  const url = `${API_BASE}/api/chat/stream?question=${encodeURIComponent(message)}&session_id=${uuid.value}`;
​
  const eventSource = new EventSource(url);
​
  eventSource.onmessage = (event) => {
    try {
      const data = JSON.parse(event.data);
​
      if (data.type === 'content') {
        // 追加 token 到消息内容
        lastMsg.content += data.data;
        scrollToBottom();
      }
      else if (data.type === 'end') {
        // 流结束,关闭连接
        lastMsg.isTyping = false;
        isSending.value = false;
        eventSource.close();
      }
      else if (data.type === 'error') {
        lastMsg.content = '请求出错: ' + (data.data.message || '未知错误');
        lastMsg.isTyping = false;
        eventSource.close();
      }
    } catch (error) {
      // 非 JSON 格式,可能是纯文本
      if (event.data.trim()) {
        lastMsg.content += event.data;
      }
    }
  };
​
  eventSource.onerror = (error) => {
    // SSE 连接失败,降级到普通请求
    console.log('流式请求失败,尝试普通请求');
    lastMsg.isTyping = false;
    eventSource.close();
    sendNormalRequest(message, lastMsg);
  };
};

关键细节

1. encodeURIComponent 编码

问题文本可能包含特殊字符(中文标点、空格等),放在 URL 中必须编码,否则 URL 解析错误。

2. 错误回退

SSE 连接可能因网络问题失败(代理不兼容、防火墙等),onerror 中自动降级为普通 POST 请求,确保功能可用:

// 普通请求作为 fallback
const sendNormalRequest = (message, lastMsg) => {
  axios.post(`${API_BASE}/api/chat`, {
    question: message,
    session_id: uuid.value
  }).then(response => {
    lastMsg.content = response.data.answer;
    lastMsg.isTyping = false;
  });
};

3. scrollToBottom

每次收到 token 都自动滚到底部,确保用户始终看到最新内容:

const scrollToBottom = () => {
  if (messaggListRef.value) {
    messaggListRef.value.scrollTop = messaggListRef.value.scrollHeight;
  }
};

4. Loading 动画

机器人消息的 isTyping 属性控制 loading 点动画的显示:

<span class="loading-dots" v-if="message.isTyping">
  <span class="dot"></span>
  <span class="dot"></span>
  <span class="dot"></span>
</span>

收到第一个 token 时 content 就有值了,loading 动画自动被内容覆盖,无需手动关闭。end 事件只负责将 isTyping 置为 false

HTML 转义

由于前端用 v-html 渲染消息内容(用于支持换行 <br> 等),需要防止 XSS:

const convertStreamOutput = (output) => {
  return output
    .replace(/&/g, '&amp;')
    .replace(/</g, '&lt;')
    .replace(/>/g, '&gt;')
    .replace(/\n/g, '<br>')
    .replace(/\t/g, '&nbsp;&nbsp;&nbsp;&nbsp;');
};

在模板中调用:

<span v-html="convertStreamOutput(message.content)"></span>

注意 & 要先转义,否则后面 &lt; 中的 & 会被二次转义。

完整请求-响应时间线

[0ms]    用户点击发送
[1ms]    前端创建 EventSource,发出 GET 请求
[5ms]    FastAPI 接收,异步生成器开始执行
[10ms]   加载 MongoDB 历史 → 意图识别
[50ms]   发送 metadata 事件
[100ms]  查询增强 → RAGFlow 检索
[500ms]  LLM 开始生成第一个 token
[500ms]  前端收到第一个 content 事件,显示第一个字
[500ms]  LLM 生成第二个 token
[501ms]  前端收到第二个 content 事件
...
[3500ms] LLM 生成最后一个 token
[3501ms] 发送 end 事件
[3502ms] 前端关闭 EventSource,停止 loading

用户从 ~100ms 后就开始看到内容"逐字蹦出",而不是干等 3.5 秒看完整答案。

小结

  1. SSE 比 WebSocket 更适合单向流式场景,实现复杂度低

  2. AsyncGenerator 全链路透传,从 LLM token 到 HTTP 事件零缓冲

  3. 前端 EventSource 自动处理重连,比手动 fetch + ReadableStream 更简单

  4. 错误回退到普通请求,保证极端网络环境下的可用性

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐