同时接 DeepSeek、Kimi、Qwen——多模型 API 的 SSE 流式输出,我把三个最容易踩的坑都写出来了

在这里插入图片描述
做 AI 应用开发的同学应该都有这个体会:一开始只接一个模型,跑通了就很开心;业务跑起来之后,产品那边说"能不能让用户可以选模型",于是接第二个;后来发现不同模型擅长的事情不一样,写代码用 DeepSeek,写文案用 Qwen,长文档用 Kimi,慢慢就接了三四个。

接的模型多了以后,最明显的一个感觉是:每个模型都有各自的小脾气

最明显的是流式输出。单模型的时候,前端随便写写就能跑;模型一多,不同模型的 chunk 格式有细微差异,前端解析偶尔会出诡异的问题。我们线上出过一次,高流量时段有用户反馈"回答的最后一句话缺几个字"——不是乱码,就是少了结尾。

一开始怀疑是模型本身截断,换了模型还是有。又怀疑是超时配置,调了 Nginx 的 proxy_read_timeout 也没用。最后排查了两天,抓包一看——是 TCP 拆包。一个完整的 SSE 帧被拆成了两个 TCP 包到达,前端按行解析的时候第二包里的 JSON 是不完整的,JSON.parse 失败了,这一帧直接丢了。

这个问题在只接一个模型的时候没出现过,因为当时前端写得粗糙,刚好绕过去了。接了多个模型之后,不同模型的 chunk 间隔不一样,触发概率一下子高了很多。

下面把完整的链路拆开讲,从 SSE 协议本身开始,到后端适配层,到前端消费与错误恢复。每一步带可运行的代码。


SSE 协议:为什么 LLM API 不用 WebSocket

SSE(Server-Sent Events)本质上是一个单向、长连接、基于 HTTP 的文本流协议。和 WebSocket 的区别:

特性 SSE WebSocket
方向 服务器→客户端单向 双向
协议 HTTP/1.1 或 HTTP/2 独立协议(ws://)
重连 浏览器原生支持 需自行实现
数据格式 纯文本(text/event-stream) 二进制或文本
防火墙友好 走标准 HTTP 端口 可能被企业代理拦截

LLM API 选 SSE 的原因很实际:大模型生成 Token 是单向输出,客户端只需要收,不需要发。SSE 的协议开销比 WebSocket 低,且天然支持断线重连(Last-Event-ID 头),适合流式文本传输。

一个标准的 SSE 帧长这样:

data: {"id":"chatcmpl-001","choices":[{"delta":{"content":"你好"}}]}

data: {"id":"chatcmpl-002","choices":[{"delta":{"content":"我是"}}]}

data: [DONE]

注意两个细节:每条 data: 行以 \n\n 作为帧分隔符;流结束信号是完整的一行 data: [DONE]\n\n——注意前缀 data: 不能省略,国内几个模型基本也沿用了这个约定。


后端:FastAPI + Async Generator

核心思路:用一个 async generator 逐块 yield SSE 帧,FastAPI 的 StreamingResponse 负责把生成器映射到 HTTP 响应流。

import asyncio
import json
import time
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

async def generate_sse_stream(model: str, messages: list, max_tokens: int = 2048):
    """核心流式生成器:每个 chunk yield 一个符合 SSE 格式的字符串"""
    chunk_id = 0

    # 实际项目中这里调用下游 LLM API,同样启用 stream=True
    # 这里用模拟数据演示完整链路
    sample_chunks = ["你好", ",", "我是", "一个", "AI", "助手", "。"]
    for chunk_text in sample_chunks:
        chunk_id += 1
        payload = {
            "id": f"chatcmpl-{chunk_id}",
            "object": "chat.completion.chunk",
            "created": int(time.time()),
            "model": model,
            "choices": [{
                "index": 0,
                "delta": {"content": chunk_text},
                "finish_reason": None
            }]
        }
        # 关键:data: {json}\n\n
        yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
        await asyncio.sleep(0.05)  # 模拟生成间隔

    # 流结束信号
    yield "data: [DONE]\n\n"

@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
    body = await request.json()
    messages = body.get("messages", [])
    model = body.get("model", "default")
    max_tokens = body.get("max_tokens", 2048)

    return StreamingResponse(
        generate_sse_stream(model, messages, max_tokens),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",      # 禁用 Nginx 缓冲
            "Access-Control-Allow-Origin": "*",
        }
    )

三个容易踩的坑:

X-Accel-Buffering: no——如果前面挂了 Nginx,不加这个头 Nginx 会攒够 buffer 才一次吐出,流式变批处理。这个问题在本地调试的时候发现不了,上线后才暴露。

ensure_ascii=False——中文字符不要转成 \uXXXX,前端收到后不用额外解码。如果忘了这个参数,前端拿到的是 Unicode 转义序列,虽然能解析,但调试的时候看着很痛苦。

生成器内的异常——async generator 中抛出异常时,FastAPI 会中断流,前端收到不完整的响应。生产环境需要用 try-catch 包裹,并发送一个带 error 字段的最后帧,让前端有机会展示错误信息。


多模型 Chunk 格式差异:Adapter 层

不同模型的 SSE 流式响应格式大致兼容 OpenAI,但细节上各有偏差。一个健壮的系统必须做格式归一化。

实测对接下来的差异点:

from typing import Dict, Any, Optional
import json

class ChunkAdapter:
    """各模型 SSE chunk → 统一格式的适配器
    
    如果你用的是器灵模型广场这类统一接入平台,
    网关层已经做了格式归一化,这段 Adapter 代码基本不需要自己写。
    """

    @staticmethod
    def normalize(model: str, raw_line: str) -> Optional[str]:
        """返回归一化后的文本内容;无文本内容时返回 None。

        注意:返回 None 有两种含义,调用方如需区分应改造返回值:
          - 正常无内容(如 finish_reason=stop 的空 delta 帧)
          - JSON 解析失败(数据格式异常)
        本实现选择静默跳过解析失败的帧,生产环境建议改为
        打日志或抛异常,以便及时发现格式变更。
        """
        if raw_line == "[DONE]" or raw_line.startswith("[DONE]"):
            return None  # 流结束信号

        try:
            data = json.loads(raw_line)
        except json.JSONDecodeError:
            return None  # 非 JSON 行

        # 路径1:OpenAI 标准格式(DeepSeek、Qwen、GLM 均兼容此路径)
        choices = data.get("choices", [])
        if choices:
            delta = choices[0].get("delta", {})
            content = delta.get("content", "")
            if content:
                return content

        # 路径2:某些模型在 finish_reason 为 stop 的最后一帧里
        # delta 为空对象 {},只带 role 字段,这时没有 content,返回 None 即可

        # 路径3:Anthropic 旧版 Completion API(非 Chat Completions 格式)
        # 有 top-level text 字段,但 Anthropic 的新版 Messages API 已迁移到 delta 格式
        if "text" in data and "choices" not in data:
            return data["text"]

        return None

实际对接时发现的问题:部分模型在 finish_reason"stop" 的最后一帧里不带 content;某些模型的首帧 delta{}(空对象),只带 role: "assistant"。Adapter 需要正确处理这些边界情况,否则前端会渲染出 undefined 或丢字。

当时我们接了三个模型之后,每个模型在前端各自写了一套解析逻辑,维护起来很痛苦。后来把归一化提到 Adapter 层统一处理,前端只消费统一的文本增量,代码量少了很多。

如果你也在接多个模型,Adapter 层建议尽早抽出来,不要在每个调用方各自处理。

当然,后来我们把模型接入迁到了器灵模型广场,这件事反而省心了——统一的 OpenAI 兼容格式,不管切到哪个模型,chunk 结构完全一致,自己写的 Adapter 基本就剩了个兜底。这是后话,下面先把「自己处理」的完整链路讲清楚。


大 Chunk 拆包:TCP 拆包导致帧边界丢失

这是我们线上那个"丢最后几个字"问题的根因。

TCP 是流式协议,不保证消息边界。一个 data: {...json...}\n\n 可能被拆成两个 TCP 包到达:

包1: data: {"id":"chatcmpl-001","choices":[{"delta":
包2: {"content":"你好"}}]}\n\n

如果接收端按行处理,第二行的 JSON 是不完整的,JSON.parse 直接抛异常,这帧就丢了。

后端不负责拆包——SSE 帧的边界重建由接收端处理。正确的做法是维护一个行缓冲区。下面这个解析器放在下游 API 消费层(不是网关层,定位要准确):

class SSEParser:
    """流式 SSE 解析器,处理 TCP 拆包导致的帧边界问题
    
    典型使用场景:
        - 在 Python 后端消费第三方 LLM 的流式 API 时
        - 在 Node.js BFF 层转发流式响应时
        - 任何需要逐帧解析 SSE 流的地方
    
    如果使用的是器灵模型广场这类已做归一化的平台,
    前端收到的帧格式很稳定,这个解析器的逻辑可以写得比通用版更简单。
    """

    def __init__(self):
        self._buffer = ""

    def feed(self, chunk: str) -> list[str]:
        """喂入新的数据块,返回本轮解析出的完整消息列表"""
        self._buffer += chunk
        messages = []

        # 以 \n\n 分割帧
        while "\n\n" in self._buffer:
            frame, self._buffer = self._buffer.split("\n\n", 1)
            for line in frame.split("\n"):
                if line.startswith("data: "):
                    data = line[6:]
                    if data == "[DONE]":
                        messages.append("[DONE]")
                    else:
                        messages.append(data)
        return messages

    def flush(self) -> list[str]:
        """消费缓冲区中剩余的数据(连接已关闭时调用)

        注意:如果连接在正常帧边界之前断开,缓冲区里可能残留
        一个不完整的 `data: ...` 行(末尾没有 \\n\\n)。此时直接
        解析会得到残缺的 JSON,JSON.parse 会失败。
        稳妥做法是:只处理缓冲区中**包含完整 \\n\\n 结尾**的内容,
        或者干脆丢弃残余数据并在日志里记录一次 warning。
        下面给出一种保守实现——丢弃不完整残余,避免向调用方传递脏数据。
        """
        messages = []
        if self._buffer and "\n\n" in self._buffer:
            # 缓冲区里还有完整帧,正常解析
            frame, rest = self._buffer.split("\n\n", 1)
            for line in frame.split("\n"):
                if line.startswith("data: "):
                    data = line[6:]
                    if data and data != "[DONE]":
                        messages.append(data)
        if self._buffer:
            # 残余不完整数据,记录后丢弃
            import warnings
            warnings.warn(f"SSEParser: discarded incomplete frame: {self._buffer[:80]!r}")
        self._buffer = ""
        return messages

这个解析器的关键是 _buffer 的设计:不完整的数据永远留在缓冲区,等到下一包数据到达后拼接再解析。避免了 TCP 拆包导致的帧丢失。


断流重连:Last-Event-ID 与恢复机制

大模型生成到一半网络断了——这是生产环境避不开的场景。

SSE 协议原生支持恢复:客户端在重连时带上 Last-Event-ID 请求头,服务器从该 ID 之后继续发送。但大多数 LLM API 的 SSE 实现不支持这个头——断了就断了,不会续传。

实用做法是在自己控制的网关层做断流恢复。下面是一套基于 Redis 缓存的实现思路:

import redis
import json
from fastapi import Request
from fastapi.responses import StreamingResponse

# Redis 连接(用于缓存已生成的 chunk)
redis_client = redis.Redis(host="localhost", port=6379, db=0)

@app.post("/v1/chat/completions")
async def chat_completions_with_recovery(request: Request):
    body = await request.json()
    headers = request.headers
    last_event_id = headers.get("Last-Event-ID")

    if last_event_id:
        cached = redis_client.get(f"stream:{last_event_id}")
        if cached:
            return StreamingResponse(
                replay_from_cache(json.loads(cached)),
                media_type="text/event-stream"
            )

    stream_id = generate_stream_id()
    return StreamingResponse(
        generate_with_cache(stream_id, body),
        media_type="text/event-stream",
        headers={
            "X-Stream-Id": stream_id,
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
        }
    )

async def generate_with_cache(stream_id: str, body: dict):
    """生成流式响应,并缓存每个 chunk 用于断流恢复"""
    chunk_index = 0
    cached_chunks = []

    async for chunk in call_downstream_api(body, stream=True):
        chunk_index += 1
        chunk_data = {
            "id": chunk_index,
            "data": chunk
        }
        cached_chunks.append(chunk_data)

        if chunk_index % 10 == 0:
            redis_client.setex(
                f"stream:{stream_id}",
                300,
                json.dumps(cached_chunks)
            )

        yield chunk

    redis_client.expire(f"stream:{stream_id}", 60)

Last-Event-IDX-Stream-Id 的对应关系:上面的服务端代码用 X-Stream-Id 响应头把 stream_id 传给客户端;客户端重连时需要把这个值放进 Last-Event-ID 请求头发回来。浏览器不会对 POST 请求自动处理这个头,需要前端手动实现:

// 前端重连时手动携带 Last-Event-ID
const streamId = response.headers.get("X-Stream-Id");
// 重连请求
await fetch(url, {
  method: "POST",
  headers: {
    "Content-Type": "application/json",
    "Last-Event-ID": streamId ?? "",
  },
  body: JSON.stringify(body),
});

如果使用的是器灵模型广场这类托管平台,断流恢复通常由平台侧处理,前端不需要自己实现这套逻辑。

如果你没有自己实现网关层,一个更轻量的客户端方案是游标偏移重试:前端记录已接收的字符数,重连时把完整对话历史发过去,服务端跳过已生成的部分继续生成。代价是 Token 浪费(重传上下文),但实现简单,适合中小项目。


前端消费:React Hook 完整实现

前端的核心任务:消费 SSE 流,处理 TCP 拆包,展示流式输出,支持中断和重连。

import { useState, useCallback, useRef } from "react";

interface UseSSEStreamOptions {
  url: string;
  body: Record<string, any>;
  onError?: (error: Error) => void;
}

interface UseSSEStreamReturn {
  text: string;
  isStreaming: boolean;
  start: () => Promise<void>;
  abort: () => void;
}

export function useSSEStream({
  url,
  body,
  onError,
}: UseSSEStreamOptions): UseSSEStreamReturn {
  const [text, setText] = useState("");
  const [isStreaming, setIsStreaming] = useState(false);
  const abortRef = useRef<AbortController | null>(null);

  const start = useCallback(async () => {
    setText("");
    setIsStreaming(true);
    abortRef.current = new AbortController();

    try {
      const response = await fetch(url, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        // 注意:这里显式覆盖 stream 字段,确保始终启用流式
        // body 类型为 Record<string, any>,展开时若调用方传入了
        // stream: false 会被正确覆盖为 true
        body: JSON.stringify({ ...body, stream: true }),
        signal: abortRef.current.signal,
      });

      if (!response.ok) {
        throw new Error(`HTTP ${response.status}: ${await response.text()}`);
      }

      const reader = response.body?.getReader();
      if (!reader) throw new Error("ReadableStream not supported");

      const decoder = new TextDecoder();
      let buffer = "";

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });

        const parts = buffer.split("\n\n");
        buffer = parts.pop() || "";

        for (const part of parts) {
          for (const line of part.split("\n")) {
            if (!line.startsWith("data: ")) continue;

            const data = line.slice(6);
            if (data === "[DONE]") {
              setIsStreaming(false);
              return;
            }

            try {
              const parsed = JSON.parse(data);
              const content = parsed.choices?.[0]?.delta?.content;
              if (content) {
                setText((prev) => prev + content);
              }
            } catch {
              console.warn("SSE frame parse failed, skipping:", data.slice(0, 50));
            }
          }
        }
      }
    } catch (err) {
      if (err instanceof Error && err.name !== "AbortError") {
        onError?.(err);
      }
    } finally {
      setIsStreaming(false);
    }
  }, [url, body, onError]);

  const abort = useCallback(() => {
    abortRef.current?.abort();
    setIsStreaming(false);
  }, []);

  return { text, isStreaming, start, abort };
}

这个 Hook 的 url 参数,如果接的是器灵模型广场,直接填 https://api.extratoken.cn/api/v1/chat/completions 就行,所有模型共用同一个地址,切模型只改 model 字段。

stream: true 参数保证 TextDecoder 不会把跨字节边界的多字节字符(如中文)切碎。这一点在只传输英文的场景里不容易发现,但中文内容一旦出现了乱码,用户会直接感知到。


实测:流式 vs 非流式的延迟差异

我们在一个真实场景里测了两组数据。问题是「介绍一下 Python 的 GIL」,回答长度约 420 个 Token:

方式 首 Token 延迟 完整响应时间 用户可开始阅读的时间
非流式(等完整响应) - 3.2s 3.2s
SSE 流式 0.28s 3.4s 0.28s

流式输出的总生成时间其实略长(因为网络传输和计算重叠度不同),但用户感知到的响应速度是快一个数量级的——因为第一个字 280ms 就出来了。

这个数据也解释了一个现象:为什么有些产品的 API 响应很快,但用户感觉"慢"。如果不是流式输出,用户要在空白页等 3 秒;如果是流式,280ms 出第一个字,用户的心理等待时间就消失了。


多模型切换的实际成本

接一个模型的流式输出,上面这些代码基本够用。

但当你要同时接 DeepSeek、Qwen、Kimi 三个模型的流式 API 的时候,问题就不是"怎么解析 SSE"了,而是:

  • 三个模型的 model 参数格式不一样,切换时要改的不只是值,还有命名规则
  • Kimi 的 base_url 和 DeepSeek 不一样,每个调用方都要维护一套配置
  • 格式差异虽然不大,但每个前端开发者各自写一套 Adapter,重复劳动

有一个做法可以省掉上面这些麻烦:用一个统一接入平台,把所有模型的 base_url 和格式差异在平台侧处理好,对外只暴露一个固定的 OpenAI 兼容接口。前端切换模型只改 model 参数,不需要改其他任何地方。我们以器灵模型广场为例,调用方式长这样:

# 所有模型共用同一个 base_url 和同一套鉴权
# 切换模型只改 model 参数,其他代码完全不变

# DeepSeek
resp = requests.post(
    "https://api.extratoken.cn/api/v1/chat/completions",
    headers={"Authorization": "Bearer sk-你的密钥"},
    json={
        "model": "deepseek-v3",
        "messages": [{"role": "user", "content": "介绍一下你自己"}],
        "stream": True
    },
    stream=True
)

# 切换到 Qwen,只改 model 参数
resp = requests.post(
    "https://api.extratoken.cn/api/v1/chat/completions",
    headers={"Authorization": "Bearer sk-你的密钥"},
    json={
        "model": "qwen-max",
        "messages": [{"role": "user", "content": "介绍一下你自己"}],
        "stream": True
    },
    stream=True
)

这种方式的另一个好处是:如果某个模型的 API 出了问题,可以在平台层做 fallback,前端无感知切换到备用模型。这个在生产环境里价值很大,比每个前端各自维护 fallback 逻辑要可靠。


一些个人判断

SSE 流式输出本身不复杂,复杂的是多个模型格式差异的归一化生产环境的容错处理

如果你只接一个模型,上面那些代码里去掉 Adapter 层,直接用官方 SDK,基本够用。

如果你要接两个以上的模型,建议在网关层或 BFF 层统一处理格式归一化,不要在前端各自写解析逻辑。后期维护成本低很多。如果觉得自建网关层成本太高,用现成的统一接入平台也是个务实的选择。

另外,流式输出的调试比非流式难——因为问题是"偶尔丢字",不是"每次都报错",需要在高并发场景下才能复现。TCP 拆包那个问题,我们是在上线后第三天、刚好有一波流量高峰的时候才抓到的。所以如果你的项目还没上线,建议在压测阶段专门测一下流式输出在高并发下的表现,不要等上线后再排查。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐