最近在做一个需要调用多个大模型的项目,踩了不少坑。把方案整理出来分享给同样在搞多模型接入的同学。

本文包含完整的Python代码,可以直接跑。涉及DeepSeek、通义千问、智谱GLM三个模型的统一接入和智能路由。

一、为什么要做多模型统一接入?
项目需求是这样的:一个知识库问答系统,不同问题要路由到不同模型。简单问题走便宜的快模型,复杂推理走深度模型,代码相关走擅长编程的模型。

如果每个模型单独对接,你会面对三个痛点:

每家请求格式不一样,DeepSeek用OpenAI兼容格式,通义千问有自己的SDK,GLM又是一套
每家的计费规则和Token计算方式不同,月底对账头疼
某家模型挂了没有自动降级,用户体验直接崩
解决方案就是在业务层和模型之间加一层抽象——统一请求格式、统一错误处理、统一成本核算。

二、架构设计
在这里插入图片描述

核心设计决策:
在这里插入图片描述

三、完整代码实现
3.1 项目结构
在这里插入图片描述

3.2 配置文件

# config.yaml
models:
  deepseek:
    endpoint: "https://api.deepseek.com/v1/chat/completions"
    model_name: "deepseek-chat"
    price_input: 0.001      # 元/千Token
    price_output: 0.002
    max_retries: 3
    timeout: 30

  qwen:
    endpoint: "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions"
    model_name: "qwen-plus"
    price_input: 0.0008
    price_output: 0.002

  glm:
    endpoint: "https://open.bigmodel.cn/api/paas/v4/chat/completions"
    model_name: "glm-4"
    price_input: 0.001
    price_output: 0.001

routing:
  default_model: "deepseek"
  fallback_chain: ["deepseek", "qwen", "glm"]
  cost_limit_per_call: 0.05

credentials:
  deepseek: "${DEEPSEEK_API_KEY}"
  qwen: "${DASHSCOPE_API_KEY}"
  glm: "${GLM_API_KEY}"

这里说一句,接入凭证千万别硬编码到代码里。用环境变量读取,生产环境接密钥管理服务。

3.3 统一数据结构

# gateway/base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, Dict, Any, List
from enum import Enum
import httpx
import os
import yaml


class Role(str, Enum):
    SYSTEM = "system"
    USER = "user"
    ASSISTANT = "assistant"


@dataclass
class Message:
    role: Role
    content: str


@dataclass
class ChatRequest:
    """统一请求体——不管底层调哪个模型,业务层只用这个"""
    messages: List[Message]
    model: Optional[str] = None      # 指定模型,None则走默认路由
    temperature: float = 0.7
    max_tokens: int = 4096


@dataclass
class TokenUsage:
    input_tokens: int
    output_tokens: int
    total_tokens: int


@dataclass
class ChatResponse:
    """统一响应体"""
    content: str
    model: str
    usage: TokenUsage
    cost: float          # 本次调用费用,单位元
    latency_ms: float    # 响应耗时


class BaseAdapter(ABC):
    """模型适配器基类——所有模型适配器继承这个"""

    def __init__(self, config: Dict[str, Any]):
        self.endpoint = config["endpoint"]
        self.model_name = config["model_name"]
        self.input_price = config["price_input"]
        self.output_price = config["price_output"]
        self.max_retries = config.get("max_retries", 3)
        self.timeout = config.get("timeout", 30)
        # 接入凭证从环境变量读取
        self.api_key = os.environ.get(config.get("env_key", ""))

    @abstractmethod
    def build_payload(self, request: ChatRequest) -> Dict[str, Any]:
        """将统一请求转为该模型的原生格式"""
        pass

    @abstractmethod
    def parse_response(self, raw: Dict[str, Any]) -> ChatResponse:
        """将原生响应转为统一格式"""
        pass

    def calc_cost(self, input_tokens: int, output_tokens: int) -> float:
        """计算本次调用费用"""
        return round(
            input_tokens * self.input_price / 1000 +
            output_tokens * self.output_price / 1000, 6
        )

    async def call(self, request: ChatRequest) -> ChatResponse:
        """发起HTTP请求,带重试"""
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        payload = self.build_payload(request)

        async with httpx.AsyncClient(timeout=self.timeout) as client:
            for attempt in range(self.max_retries):
                try:
                    resp = await client.post(
                        self.endpoint, json=payload, headers=headers
                    )
                    resp.raise_for_status()
                    return self.parse_response(resp.json())
                except Exception as e:
                    if attempt == self.max_retries - 1:
                        raise
                    continue

3.4 具体适配器实现
DeepSeek、通义千问、GLM都兼容OpenAI格式,但请求体有细微差异:

# gateway/adapters/deepseek.py
class DeepSeekAdapter(BaseAdapter):

    def build_payload(self, request: ChatRequest) -> Dict[str, Any]:
        return {
            "model": self.model_name,
            "messages": [
                {"role": m.role.value, "content": m.content}
                for m in request.messages
            ],
            "temperature": request.temperature,
            "max_tokens": request.max_tokens,
        }

    def parse_response(self, raw: Dict[str, Any]) -> ChatResponse:
        choice = raw["choices"][0]
        usage = raw["usage"]
        input_t = usage["prompt_tokens"]
        output_t = usage["completion_tokens"]

        return ChatResponse(
            content=choice["message"]["content"],
            model="deepseek-chat",
            usage=TokenUsage(input_t, output_t, usage["total_tokens"]),
            cost=self.calc_cost(input_t, output_t),
            latency_ms=0.0,
        )


# gateway/adapters/qwen.py
class QwenAdapter(BaseAdapter):
    """通义千问——兼容OpenAI格式,但model字段名称不同"""

    def build_payload(self, request: ChatRequest) -> Dict[str, Any]:
        return {
            "model": self.model_name,  # qwen-plus
            "messages": [
                {"role": m.role.value, "content": m.content}
                for m in request.messages
            ],
            "temperature": request.temperature,
            "max_tokens": request.max_tokens,
        }

    def parse_response(self, raw: Dict[str, Any]) -> ChatResponse:
        choice = raw["choices"][0]
        usage = raw["usage"]
        input_t = usage.get("input_tokens", usage.get("prompt_tokens", 0))
        output_t = usage.get("output_tokens", usage.get("completion_tokens", 0))

        return ChatResponse(
            content=choice["message"]["content"],
            model="qwen-plus",
            usage=TokenUsage(input_t, output_t, input_t + output_t),
            cost=self.calc_cost(input_t, output_t),
            latency_ms=0.0,
        )


# gateway/adapters/glm.py
class GLMAdapter(BaseAdapter):
    """智谱GLM——部分字段名不同"""

    def build_payload(self, request: ChatRequest) -> Dict[str, Any]:
        return {
            "model": self.model_name,  # glm-4
            "messages": [
                {"role": m.role.value, "content": m.content}
                for m in request.messages
            ],
            "temperature": request.temperature,
            "max_tokens": request.max_tokens,
        }

    def parse_response(self, raw: Dict[str, Any]) -> ChatResponse:
        choice = raw["choices"][0]
        usage = raw["usage"]
        input_t = usage.get("prompt_tokens", 0)
        output_t = usage.get("completion_tokens", 0)

        return ChatResponse(
            content=choice["message"]["content"],
            model="glm-4",
            usage=TokenUsage(input_t, output_t, usage.get("total_tokens", input_t + output_t)),
            cost=self.calc_cost(input_t, output_t),
            latency_ms=0.0,
        )

3.5 核心路由器
这是整个方案的心脏——根据任务类型选模型,失败自动降级:

# gateway/router.py
import time
from collections import defaultdict

class ModelRouter:
    """多模型路由器——统一入口,智能选模型,自动降级"""

    ADAPTER_MAP = {
        "deepseek": DeepSeekAdapter,
        "qwen": QwenAdapter,
        "glm": GLMAdapter,
    }

    def __init__(self, config_path: str = "config.yaml"):
        with open(config_path) as f:
            self.config = yaml.safe_load(f)

        self.routing_config = self.config["routing"]
        self.default_model = self.routing_config["default_model"]
        self.fallback_chain = self.routing_config["fallback_chain"]

        # 初始化各模型适配器
        self.adapters = {}
        for name, cls in self.ADAPTER_MAP.items():
            if name in self.config["models"]:
                model_config = self.config["models"][name]
                model_config["env_key"] = list(self.config["credentials"].values())[
                    list(self.config["credentials"].keys()).index(name)
                ].strip("${}")
                self.adapters[name] = cls(model_config)

        # 成本统计
        self.cost_log = defaultdict(float)
        self.call_count = defaultdict(int)

    async def chat(self, request: ChatRequest) -> ChatResponse:
        """统一调用入口:首选模型 → 失败按降级链路依次尝试"""
        preferred = request.model if request.model in self.adapters else self.default_model

        # 构建候选链:首选 + 降级链中排除首选的
        candidates = [preferred] + [
            m for m in self.fallback_chain
            if m != preferred and m in self.adapters
        ]

        last_error = None
        for model_name in candidates:
            try:
                start = time.time()
                response = await self.adapters[model_name].call(request)
                response.latency_ms = (time.time() - start) * 1000

                # 记录成本
                self.cost_log[model_name] += response.cost
                self.call_count[model_name] += 1

                return response
            except Exception as e:
                print(f"[路由器] {model_name} 调用失败: {e},尝试下一个...")
                last_error = e

        raise RuntimeError(f"所有模型调用失败,最后错误: {last_error}")

    def stats(self) -> Dict[str, Any]:
        """查看各模型调用统计和成本"""
        total_cost = sum(self.cost_log.values())
        return {
            "per_model": {
                name: {
                    "calls": self.call_count[name],
                    "cost": round(self.cost_log[name], 4),
                }
                for name in self.adapters
            },
            "total_cost": round(total_cost, 4),
        }

3.6 使用示例

# demo.py
import asyncio
from gateway.base import ChatRequest, Message, Role
from gateway.router import ModelRouter


async def main():
    router = ModelRouter("config.yaml")

    # 场景1:自动路由(走默认模型)
    req = ChatRequest(
        messages=[
            Message(Role.SYSTEM, "你是一个Python专家"),
            Message(Role.USER, "写一个异步爬虫,用aiohttp"),
        ]
    )
    resp = await router.chat(req)
    print(f"模型: {resp.model}")
    print(f"内容: {resp.content[:100]}...")
    print(f"Token: 输入{resp.usage.input_tokens} / 输出{resp.usage.output_tokens}")
    print(f"费用: ¥{resp.cost}")
    print(f"耗时: {resp.latency_ms:.0f}ms\n")

    # 场景2:指定模型
    req2 = ChatRequest(
        messages=[Message(Role.USER, "解释一下什么是RAG")],
        model="qwen"  # 指定走通义千问
    )
    resp2 = await router.chat(req2)
    print(f"模型: {resp2.model} | 费用: ¥{resp2.cost}")

    # 场景3:模拟降级(指定一个不存在的模型,会走fallback)
    req3 = ChatRequest(
        messages=[Message(Role.USER, "你好")],
        model="nonexistent"  # 不存在,自动走默认模型
    )
    resp3 = await router.chat(req3)
    print(f"降级后使用: {resp3.model}")

    # 查看统计
    print(f"\n调用统计: {router.stats()}")


asyncio.run(main())

运行结果:

在这里插入图片描述

四、实测成本对比
在这里插入图片描述

成本下降的三个来源:

智能路由省约15%:简单任务走低价模型,复杂任务走深度推理
语义缓存省约35%:知识库场景大量相似问题,缓存命中直接返回
成本可视化省约10%:有了明细才发现某功能每天无意义重复调用200次
五、踩坑记录
坑1:各家Token计数不一致。同一句话DeepSeek算100 Token,通义千问可能算120。解决方案是在网关层用tiktoken统一计算,不依赖各家返回的usage。

坑2:连接池管理。httpx默认连接池太小,高并发下会排队。建议max_connections=20、max_keepalive_connections=10、read_timeout=60秒。

坑3:降级链别超过3个。超过3个说明你的模型选型有问题,而且链路太长会拖慢响应。总超时设30秒,超过直接返回错误比无限等待好。

坑4:缓存不是万能的。时间敏感的问题、需要实时信息的场景、需要随机性的创意写作,这些场景加黑名单不走缓存。

六、总结
核心就三件事:

统一接口:一套代码调所有模型,新增模型只需写50行适配器
智能路由:按任务类型自动选模型,失败自动降级
成本可控:每次调用都有明细,月底对账不头疼
完整代码已开源,有问题评论区交流。如果觉得有用,点赞收藏支持一下。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐