大模型 API 的 SSE 流式输出,我把三个最容易踩的坑都写出来了
同时接 DeepSeek、Kimi、Qwen——多模型 API 的 SSE 流式输出,我把三个最容易踩的坑都写出来了

做 AI 应用开发的同学应该都有这个体会:一开始只接一个模型,跑通了就很开心;业务跑起来之后,产品那边说"能不能让用户可以选模型",于是接第二个;后来发现不同模型擅长的事情不一样,写代码用 DeepSeek,写文案用 Qwen,长文档用 Kimi,慢慢就接了三四个。
接的模型多了以后,最明显的一个感觉是:每个模型都有各自的小脾气。
最明显的是流式输出。单模型的时候,前端随便写写就能跑;模型一多,不同模型的 chunk 格式有细微差异,前端解析偶尔会出诡异的问题。我们线上出过一次,高流量时段有用户反馈"回答的最后一句话缺几个字"——不是乱码,就是少了结尾。
一开始怀疑是模型本身截断,换了模型还是有。又怀疑是超时配置,调了 Nginx 的 proxy_read_timeout 也没用。最后排查了两天,抓包一看——是 TCP 拆包。一个完整的 SSE 帧被拆成了两个 TCP 包到达,前端按行解析的时候第二包里的 JSON 是不完整的,JSON.parse 失败了,这一帧直接丢了。
这个问题在只接一个模型的时候没出现过,因为当时前端写得粗糙,刚好绕过去了。接了多个模型之后,不同模型的 chunk 间隔不一样,触发概率一下子高了很多。
下面把完整的链路拆开讲,从 SSE 协议本身开始,到后端适配层,到前端消费与错误恢复。每一步带可运行的代码。
SSE 协议:为什么 LLM API 不用 WebSocket
SSE(Server-Sent Events)本质上是一个单向、长连接、基于 HTTP 的文本流协议。和 WebSocket 的区别:
| 特性 | SSE | WebSocket |
|---|---|---|
| 方向 | 服务器→客户端单向 | 双向 |
| 协议 | HTTP/1.1 或 HTTP/2 | 独立协议(ws://) |
| 重连 | 浏览器原生支持 | 需自行实现 |
| 数据格式 | 纯文本(text/event-stream) | 二进制或文本 |
| 防火墙友好 | 走标准 HTTP 端口 | 可能被企业代理拦截 |
LLM API 选 SSE 的原因很实际:大模型生成 Token 是单向输出,客户端只需要收,不需要发。SSE 的协议开销比 WebSocket 低,且天然支持断线重连(Last-Event-ID 头),适合流式文本传输。
一个标准的 SSE 帧长这样:
data: {"id":"chatcmpl-001","choices":[{"delta":{"content":"你好"}}]}
data: {"id":"chatcmpl-002","choices":[{"delta":{"content":"我是"}}]}
data: [DONE]
注意两个细节:每条 data: 行以 \n\n 作为帧分隔符;流结束信号是完整的一行 data: [DONE]\n\n——注意前缀 data: 不能省略,国内几个模型基本也沿用了这个约定。
后端:FastAPI + Async Generator
核心思路:用一个 async generator 逐块 yield SSE 帧,FastAPI 的 StreamingResponse 负责把生成器映射到 HTTP 响应流。
import asyncio
import json
import time
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
async def generate_sse_stream(model: str, messages: list, max_tokens: int = 2048):
"""核心流式生成器:每个 chunk yield 一个符合 SSE 格式的字符串"""
chunk_id = 0
# 实际项目中这里调用下游 LLM API,同样启用 stream=True
# 这里用模拟数据演示完整链路
sample_chunks = ["你好", ",", "我是", "一个", "AI", "助手", "。"]
for chunk_text in sample_chunks:
chunk_id += 1
payload = {
"id": f"chatcmpl-{chunk_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {"content": chunk_text},
"finish_reason": None
}]
}
# 关键:data: {json}\n\n
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
await asyncio.sleep(0.05) # 模拟生成间隔
# 流结束信号
yield "data: [DONE]\n\n"
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
body = await request.json()
messages = body.get("messages", [])
model = body.get("model", "default")
max_tokens = body.get("max_tokens", 2048)
return StreamingResponse(
generate_sse_stream(model, messages, max_tokens),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
"Access-Control-Allow-Origin": "*",
}
)
三个容易踩的坑:
X-Accel-Buffering: no——如果前面挂了 Nginx,不加这个头 Nginx 会攒够 buffer 才一次吐出,流式变批处理。这个问题在本地调试的时候发现不了,上线后才暴露。
ensure_ascii=False——中文字符不要转成 \uXXXX,前端收到后不用额外解码。如果忘了这个参数,前端拿到的是 Unicode 转义序列,虽然能解析,但调试的时候看着很痛苦。
生成器内的异常——async generator 中抛出异常时,FastAPI 会中断流,前端收到不完整的响应。生产环境需要用 try-catch 包裹,并发送一个带 error 字段的最后帧,让前端有机会展示错误信息。
多模型 Chunk 格式差异:Adapter 层
不同模型的 SSE 流式响应格式大致兼容 OpenAI,但细节上各有偏差。一个健壮的系统必须做格式归一化。
实测对接下来的差异点:
from typing import Dict, Any, Optional
import json
class ChunkAdapter:
"""各模型 SSE chunk → 统一格式的适配器
如果你用的是器灵模型广场这类统一接入平台,
网关层已经做了格式归一化,这段 Adapter 代码基本不需要自己写。
"""
@staticmethod
def normalize(model: str, raw_line: str) -> Optional[str]:
"""返回归一化后的文本内容;无文本内容时返回 None。
注意:返回 None 有两种含义,调用方如需区分应改造返回值:
- 正常无内容(如 finish_reason=stop 的空 delta 帧)
- JSON 解析失败(数据格式异常)
本实现选择静默跳过解析失败的帧,生产环境建议改为
打日志或抛异常,以便及时发现格式变更。
"""
if raw_line == "[DONE]" or raw_line.startswith("[DONE]"):
return None # 流结束信号
try:
data = json.loads(raw_line)
except json.JSONDecodeError:
return None # 非 JSON 行
# 路径1:OpenAI 标准格式(DeepSeek、Qwen、GLM 均兼容此路径)
choices = data.get("choices", [])
if choices:
delta = choices[0].get("delta", {})
content = delta.get("content", "")
if content:
return content
# 路径2:某些模型在 finish_reason 为 stop 的最后一帧里
# delta 为空对象 {},只带 role 字段,这时没有 content,返回 None 即可
# 路径3:Anthropic 旧版 Completion API(非 Chat Completions 格式)
# 有 top-level text 字段,但 Anthropic 的新版 Messages API 已迁移到 delta 格式
if "text" in data and "choices" not in data:
return data["text"]
return None
实际对接时发现的问题:部分模型在 finish_reason 为 "stop" 的最后一帧里不带 content;某些模型的首帧 delta 为 {}(空对象),只带 role: "assistant"。Adapter 需要正确处理这些边界情况,否则前端会渲染出 undefined 或丢字。
当时我们接了三个模型之后,每个模型在前端各自写了一套解析逻辑,维护起来很痛苦。后来把归一化提到 Adapter 层统一处理,前端只消费统一的文本增量,代码量少了很多。
如果你也在接多个模型,Adapter 层建议尽早抽出来,不要在每个调用方各自处理。
当然,后来我们把模型接入迁到了器灵模型广场,这件事反而省心了——统一的 OpenAI 兼容格式,不管切到哪个模型,chunk 结构完全一致,自己写的 Adapter 基本就剩了个兜底。这是后话,下面先把「自己处理」的完整链路讲清楚。
大 Chunk 拆包:TCP 拆包导致帧边界丢失
这是我们线上那个"丢最后几个字"问题的根因。
TCP 是流式协议,不保证消息边界。一个 data: {...json...}\n\n 可能被拆成两个 TCP 包到达:
包1: data: {"id":"chatcmpl-001","choices":[{"delta":
包2: {"content":"你好"}}]}\n\n
如果接收端按行处理,第二行的 JSON 是不完整的,JSON.parse 直接抛异常,这帧就丢了。
后端不负责拆包——SSE 帧的边界重建由接收端处理。正确的做法是维护一个行缓冲区。下面这个解析器放在下游 API 消费层(不是网关层,定位要准确):
class SSEParser:
"""流式 SSE 解析器,处理 TCP 拆包导致的帧边界问题
典型使用场景:
- 在 Python 后端消费第三方 LLM 的流式 API 时
- 在 Node.js BFF 层转发流式响应时
- 任何需要逐帧解析 SSE 流的地方
如果使用的是器灵模型广场这类已做归一化的平台,
前端收到的帧格式很稳定,这个解析器的逻辑可以写得比通用版更简单。
"""
def __init__(self):
self._buffer = ""
def feed(self, chunk: str) -> list[str]:
"""喂入新的数据块,返回本轮解析出的完整消息列表"""
self._buffer += chunk
messages = []
# 以 \n\n 分割帧
while "\n\n" in self._buffer:
frame, self._buffer = self._buffer.split("\n\n", 1)
for line in frame.split("\n"):
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
messages.append("[DONE]")
else:
messages.append(data)
return messages
def flush(self) -> list[str]:
"""消费缓冲区中剩余的数据(连接已关闭时调用)
注意:如果连接在正常帧边界之前断开,缓冲区里可能残留
一个不完整的 `data: ...` 行(末尾没有 \\n\\n)。此时直接
解析会得到残缺的 JSON,JSON.parse 会失败。
稳妥做法是:只处理缓冲区中**包含完整 \\n\\n 结尾**的内容,
或者干脆丢弃残余数据并在日志里记录一次 warning。
下面给出一种保守实现——丢弃不完整残余,避免向调用方传递脏数据。
"""
messages = []
if self._buffer and "\n\n" in self._buffer:
# 缓冲区里还有完整帧,正常解析
frame, rest = self._buffer.split("\n\n", 1)
for line in frame.split("\n"):
if line.startswith("data: "):
data = line[6:]
if data and data != "[DONE]":
messages.append(data)
if self._buffer:
# 残余不完整数据,记录后丢弃
import warnings
warnings.warn(f"SSEParser: discarded incomplete frame: {self._buffer[:80]!r}")
self._buffer = ""
return messages
这个解析器的关键是 _buffer 的设计:不完整的数据永远留在缓冲区,等到下一包数据到达后拼接再解析。避免了 TCP 拆包导致的帧丢失。
断流重连:Last-Event-ID 与恢复机制
大模型生成到一半网络断了——这是生产环境避不开的场景。
SSE 协议原生支持恢复:客户端在重连时带上 Last-Event-ID 请求头,服务器从该 ID 之后继续发送。但大多数 LLM API 的 SSE 实现不支持这个头——断了就断了,不会续传。
实用做法是在自己控制的网关层做断流恢复。下面是一套基于 Redis 缓存的实现思路:
import redis
import json
from fastapi import Request
from fastapi.responses import StreamingResponse
# Redis 连接(用于缓存已生成的 chunk)
redis_client = redis.Redis(host="localhost", port=6379, db=0)
@app.post("/v1/chat/completions")
async def chat_completions_with_recovery(request: Request):
body = await request.json()
headers = request.headers
last_event_id = headers.get("Last-Event-ID")
if last_event_id:
cached = redis_client.get(f"stream:{last_event_id}")
if cached:
return StreamingResponse(
replay_from_cache(json.loads(cached)),
media_type="text/event-stream"
)
stream_id = generate_stream_id()
return StreamingResponse(
generate_with_cache(stream_id, body),
media_type="text/event-stream",
headers={
"X-Stream-Id": stream_id,
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
}
)
async def generate_with_cache(stream_id: str, body: dict):
"""生成流式响应,并缓存每个 chunk 用于断流恢复"""
chunk_index = 0
cached_chunks = []
async for chunk in call_downstream_api(body, stream=True):
chunk_index += 1
chunk_data = {
"id": chunk_index,
"data": chunk
}
cached_chunks.append(chunk_data)
if chunk_index % 10 == 0:
redis_client.setex(
f"stream:{stream_id}",
300,
json.dumps(cached_chunks)
)
yield chunk
redis_client.expire(f"stream:{stream_id}", 60)
Last-Event-ID 与 X-Stream-Id 的对应关系:上面的服务端代码用 X-Stream-Id 响应头把 stream_id 传给客户端;客户端重连时需要把这个值放进 Last-Event-ID 请求头发回来。浏览器不会对 POST 请求自动处理这个头,需要前端手动实现:
// 前端重连时手动携带 Last-Event-ID
const streamId = response.headers.get("X-Stream-Id");
// 重连请求
await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
"Last-Event-ID": streamId ?? "",
},
body: JSON.stringify(body),
});
如果使用的是器灵模型广场这类托管平台,断流恢复通常由平台侧处理,前端不需要自己实现这套逻辑。
如果你没有自己实现网关层,一个更轻量的客户端方案是游标偏移重试:前端记录已接收的字符数,重连时把完整对话历史发过去,服务端跳过已生成的部分继续生成。代价是 Token 浪费(重传上下文),但实现简单,适合中小项目。
前端消费:React Hook 完整实现
前端的核心任务:消费 SSE 流,处理 TCP 拆包,展示流式输出,支持中断和重连。
import { useState, useCallback, useRef } from "react";
interface UseSSEStreamOptions {
url: string;
body: Record<string, any>;
onError?: (error: Error) => void;
}
interface UseSSEStreamReturn {
text: string;
isStreaming: boolean;
start: () => Promise<void>;
abort: () => void;
}
export function useSSEStream({
url,
body,
onError,
}: UseSSEStreamOptions): UseSSEStreamReturn {
const [text, setText] = useState("");
const [isStreaming, setIsStreaming] = useState(false);
const abortRef = useRef<AbortController | null>(null);
const start = useCallback(async () => {
setText("");
setIsStreaming(true);
abortRef.current = new AbortController();
try {
const response = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
// 注意:这里显式覆盖 stream 字段,确保始终启用流式
// body 类型为 Record<string, any>,展开时若调用方传入了
// stream: false 会被正确覆盖为 true
body: JSON.stringify({ ...body, stream: true }),
signal: abortRef.current.signal,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${await response.text()}`);
}
const reader = response.body?.getReader();
if (!reader) throw new Error("ReadableStream not supported");
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const parts = buffer.split("\n\n");
buffer = parts.pop() || "";
for (const part of parts) {
for (const line of part.split("\n")) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6);
if (data === "[DONE]") {
setIsStreaming(false);
return;
}
try {
const parsed = JSON.parse(data);
const content = parsed.choices?.[0]?.delta?.content;
if (content) {
setText((prev) => prev + content);
}
} catch {
console.warn("SSE frame parse failed, skipping:", data.slice(0, 50));
}
}
}
}
} catch (err) {
if (err instanceof Error && err.name !== "AbortError") {
onError?.(err);
}
} finally {
setIsStreaming(false);
}
}, [url, body, onError]);
const abort = useCallback(() => {
abortRef.current?.abort();
setIsStreaming(false);
}, []);
return { text, isStreaming, start, abort };
}
这个 Hook 的 url 参数,如果接的是器灵模型广场,直接填 https://api.extratoken.cn/api/v1/chat/completions 就行,所有模型共用同一个地址,切模型只改 model 字段。
stream: true 参数保证 TextDecoder 不会把跨字节边界的多字节字符(如中文)切碎。这一点在只传输英文的场景里不容易发现,但中文内容一旦出现了乱码,用户会直接感知到。
实测:流式 vs 非流式的延迟差异
我们在一个真实场景里测了两组数据。问题是「介绍一下 Python 的 GIL」,回答长度约 420 个 Token:
| 方式 | 首 Token 延迟 | 完整响应时间 | 用户可开始阅读的时间 |
|---|---|---|---|
| 非流式(等完整响应) | - | 3.2s | 3.2s |
| SSE 流式 | 0.28s | 3.4s | 0.28s |
流式输出的总生成时间其实略长(因为网络传输和计算重叠度不同),但用户感知到的响应速度是快一个数量级的——因为第一个字 280ms 就出来了。
这个数据也解释了一个现象:为什么有些产品的 API 响应很快,但用户感觉"慢"。如果不是流式输出,用户要在空白页等 3 秒;如果是流式,280ms 出第一个字,用户的心理等待时间就消失了。
多模型切换的实际成本
接一个模型的流式输出,上面这些代码基本够用。
但当你要同时接 DeepSeek、Qwen、Kimi 三个模型的流式 API 的时候,问题就不是"怎么解析 SSE"了,而是:
- 三个模型的
model参数格式不一样,切换时要改的不只是值,还有命名规则 - Kimi 的
base_url和 DeepSeek 不一样,每个调用方都要维护一套配置 - 格式差异虽然不大,但每个前端开发者各自写一套 Adapter,重复劳动
有一个做法可以省掉上面这些麻烦:用一个统一接入平台,把所有模型的 base_url 和格式差异在平台侧处理好,对外只暴露一个固定的 OpenAI 兼容接口。前端切换模型只改 model 参数,不需要改其他任何地方。我们以器灵模型广场为例,调用方式长这样:
# 所有模型共用同一个 base_url 和同一套鉴权
# 切换模型只改 model 参数,其他代码完全不变
# DeepSeek
resp = requests.post(
"https://api.extratoken.cn/api/v1/chat/completions",
headers={"Authorization": "Bearer sk-你的密钥"},
json={
"model": "deepseek-v3",
"messages": [{"role": "user", "content": "介绍一下你自己"}],
"stream": True
},
stream=True
)
# 切换到 Qwen,只改 model 参数
resp = requests.post(
"https://api.extratoken.cn/api/v1/chat/completions",
headers={"Authorization": "Bearer sk-你的密钥"},
json={
"model": "qwen-max",
"messages": [{"role": "user", "content": "介绍一下你自己"}],
"stream": True
},
stream=True
)
这种方式的另一个好处是:如果某个模型的 API 出了问题,可以在平台层做 fallback,前端无感知切换到备用模型。这个在生产环境里价值很大,比每个前端各自维护 fallback 逻辑要可靠。
一些个人判断
SSE 流式输出本身不复杂,复杂的是多个模型格式差异的归一化和生产环境的容错处理。
如果你只接一个模型,上面那些代码里去掉 Adapter 层,直接用官方 SDK,基本够用。
如果你要接两个以上的模型,建议在网关层或 BFF 层统一处理格式归一化,不要在前端各自写解析逻辑。后期维护成本低很多。如果觉得自建网关层成本太高,用现成的统一接入平台也是个务实的选择。
另外,流式输出的调试比非流式难——因为问题是"偶尔丢字",不是"每次都报错",需要在高并发场景下才能复现。TCP 拆包那个问题,我们是在上线后第三天、刚好有一波流量高峰的时候才抓到的。所以如果你的项目还没上线,建议在压测阶段专门测一下流式输出在高并发下的表现,不要等上线后再排查。
更多推荐

所有评论(0)