FastAPI 多模型 AI 后端实践:路由、流式与成本控制

背景

我维护一个网络工具集合站,后端 FastAPI,前端 Vue3。AI 对话功能最初只接了一个模型,后来逐步扩展到了多个模型的动态路由。

这篇文章记录的是多模型架构的实现过程,包括模型路由、流式输出处理、fallback 降级和成本追踪。不绑定任何具体的 API 服务商,代码可以直接复用。

为什么需要多模型路由

不同模型的强项不同,价格差异也很大。把所有请求都扔给最贵的模型,浪费钱;全用便宜的,有些复杂任务质量不行。

我的做法是:在请求里加一个可选的 strategy 字段,后端根据策略选择模型。用户也可以直接指定模型。

# 模型配置
MODEL_CONFIG = {
    "deepseek-chat": {
        "price_per_million_input": 3.5,
        "price_per_million_output": 10.5,
        "max_tokens": 128000,
        "strength": ["chinese", "general", "cheap"]
    },
    "deepseek-reasoner": {
        "price_per_million_input": 3.5,
        "price_per_million_output": 10.5,
        "max_tokens": 128000,
        "strength": ["reasoning", "math", "code"]
    },
    "claude-sonnet-4": {
        "price_per_million_input": 15,
        "price_per_million_output": 60,
        "max_tokens": 200000,
        "strength": ["code", "analysis", "long_context"]
    },
    "gpt-5.5": {
        "price_per_million_input": 84,
        "price_per_million_output": 336,
        "max_tokens": 128000,
        "strength": ["general", "creative", "latest"]
    },
}

ROUTING_STRATEGY = {
    "cheap": ["deepseek-chat"],
    "balanced": ["deepseek-chat", "claude-sonnet-4", "deepseek-reasoner"],
    "best": ["claude-sonnet-4", "gpt-5.5"],
}

策略路由的实现:

def select_model(strategy: str, user_model: str = None) -> str:
    if user_model:
        return user_model
    return ROUTING_STRATEGY.get(strategy, ["deepseek-chat"])[0]

核心 API 端点

from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from openai import OpenAI
from pydantic import BaseModel
import os, json, time

app = FastAPI()

client = OpenAI(
    base_url=os.environ.get("API_BASE_URL", "https://api.openai.com/v1"),
    api_key=os.environ["API_KEY"]
)

class ChatRequest(BaseModel):
    messages: list[dict]
    strategy: str = "balanced"
    model: str = None
    temperature: float = 0.7
    max_tokens: int = 4096
    stream: bool = False

非流式端点

@app.post("/api/chat")
async def chat(req: ChatRequest):
    model = select_model(req.strategy, req.model)
    start_time = time.time()

    response = client.chat.completions.create(
        model=model,
        messages=req.messages,
        temperature=req.temperature,
        max_tokens=req.max_tokens
    )

    elapsed = time.time() - start_time
    content = response.choices[0].message.content
    tokens = response.usage.total_tokens

    # 记录用量(写入数据库或日志)
    log_usage(model, tokens, elapsed)

    return {
        "model": model,
        "content": content,
        "tokens": tokens,
        "elapsed_ms": round(elapsed * 1000)
    }

log_usage 函数用于记录每次调用的模型、token 量、耗时。后面做成本分析全靠这些数据。

流式端点

流式输出(SSE)的处理复杂一些,主要两个点:一是 FastAPI 的 StreamingResponse 怎么正确写,二是异常处理。

@app.post("/api/chat/stream")
async def chat_stream(req: ChatRequest):
    model = select_model(req.strategy, req.model)

    async def generate():
        try:
            stream = client.chat.completions.create(
                model=model,
                messages=req.messages,
                temperature=req.temperature,
                max_tokens=req.max_tokens,
                stream=True,
                stream_options={"include_usage": True}
            )

            total_tokens = 0
            for chunk in stream:
                # usage 信息只在最后一个 chunk 出现
                if chunk.usage:
                    total_tokens = chunk.usage.total_tokens

                if chunk.choices and chunk.choices[0].delta.content:
                    content = chunk.choices[0].delta.content
                    yield f"data: {json.dumps({'content': content})}\n\n"

            # 流结束后发送用量信息
            yield f"data: {json.dumps({'done': True, 'tokens': total_tokens, 'model': model})}\n\n"

        except Exception as e:
            yield f"data: {json.dumps({'error': str(e)})}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Nginx 下关闭缓冲
        }
    )

X-Accel-Buffering: no 这个头很重要。如果后端前面有 Nginx 反向代理,不加这个头的话 Nginx 会缓冲 SSE 响应,用户那边就会看到消息"憋住"然后一下子全出来,而不是逐字流式输出。

Fallback 降级

任意模型都可能挂——上游维护、渠道问题、速率限制等等。fallback 机制必须写:

FALLBACK_CHAIN = [
    "claude-sonnet-4",
    "gpt-5.5",
    "deepseek-reasoner",
    "deepseek-chat",
]

def call_with_fallback(model: str, messages: list, **kwargs):
    """按降级链依次尝试,直到成功"""
    models_to_try = [model] + [m for m in FALLBACK_CHAIN if m != model]

    for i, m in enumerate(models_to_try):
        try:
            response = client.chat.completions.create(
                model=m, messages=messages, **kwargs
            )
            if i > 0:
                print(f"[FALLBACK] {model}{m}")
            return response, m
        except Exception as e:
            last_error = e
            continue

    raise last_error

这里有个细节:fallback 的模型选择顺序。上面的代码是"主模型优先,然后按质量从高到低降级"。如果你的场景对成本更敏感,可以反过来——先试便宜的,失败再试贵的。

成本追踪

预付费 API 模式下,知道钱花在哪了比知道花了多少更重要。

我写了一个简单的成本计算函数,记录每次调用的花费:

import sqlite3
from datetime import datetime

def log_usage(model: str, total_tokens: int, elapsed_ms: float):
    config = MODEL_CONFIG.get(model, {})
    input_price = config.get("price_per_million_input", 0)
    output_price = config.get("price_per_million_output", 0)

    # 简化:按 total_tokens 均分估算
    input_tokens = total_tokens * 0.7
    output_tokens = total_tokens * 0.3

    cost = (input_tokens / 1_000_000) * input_price + \
           (output_tokens / 1_000_000) * output_price

    conn = sqlite3.connect("usage.db")
    conn.execute(
        "INSERT INTO api_usage (model, tokens, cost, elapsed_ms, created_at) "
        "VALUES (?, ?, ?, ?, ?)",
        (model, total_tokens, round(cost, 6), elapsed_ms, datetime.now())
    )
    conn.commit()
    conn.close()

更精确的做法是从 API 响应的 usage 里分别拿 prompt_tokenscompletion_tokens,但流式模式下 usage 只在最后一个 chunk 出现,处理起来稍微麻烦。

有了这些日志数据,可以做日报:

SELECT
    model,
    SUM(tokens) as total_tokens,
    ROUND(SUM(cost), 2) as total_cost,
    COUNT(*) as calls,
    ROUND(AVG(elapsed_ms)) as avg_ms
FROM api_usage
WHERE date(created_at) = date('now')
GROUP BY model
ORDER BY total_cost DESC;

实际使用的 API 渠道

上面的代码不绑定任何特定的 API 提供商。API_BASE_URL 可以指向任何兼容 OpenAI 格式的服务——官方 OpenAI、官方 Anthropic(通过 SDK 的 base_url 参数)、或者中转服务。

我自己用的是 JJAPI(jyjyapi.com),一个国内的中转站。选它的原因比较实际:

  • DeepSeek V4 是 3 折(¥1.05/百万 token),这个模型占了 70% 的调用量,成本大头全靠它压
  • Claude Sonnet 4 是 3 折,代码审查的主力模型
  • Claude Opus 4-6 是 2.5 折,偶尔用于复杂逻辑的最低价
  • 后台有按模型维度的实时用量图表,跟我上面写的 api_usage 表互补验证
  • 微信支付直接充,到账即时

但即使你用的是别的中转站(或者自建网关),上面的路由、fallback、成本追踪代码完全通用。

踩过的坑

  1. 流式响应和 Nginx 缓冲:Nginx 默认会缓冲 proxy 响应。如果你后端返回流式数据,前端却看到消息"攒"在一起才出来,检查 proxy_buffering offX-Accel-Buffering: no

  2. CDN 缓存 API 响应:如果你用 CDN(腾讯云、Cloudflare 等),GET 请求的 API 响应可能被 CDN 缓存。用户看到的用量数据一直是旧的。解决办法:Nginx 加 no-cache 头,前端 GET 请求加 _t 参数。

  3. SSE 连接泄漏:流式连接如果客户端断开(关浏览器、网络切换),后端的 generator 可能继续运行直到超时。在 generate 函数里加 try-finally 清理逻辑。

  4. Python 脚本改 Vue 文件要小心 emoji:不要用 Python 的 string.replace 去操作含 emoji 的 Vue 文件,Python 3 的 surrogate 处理可能导致文件损坏。直接用工具写完整文件。

总结

多模型 AI 后端的核心就四个点:路由(按策略选模型)、流式(SSE 正确处理)、降级(fallback 链)、追踪(用量和成本记录)。代码不难,细节多。

有什么问题评论区交流。

#FastAPI #Python #DeepSeek #API #后端开发 #流式输出 #SSE #多模型路由 #OpenAI #Claude #成本追踪 #fallback

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐