引言:为什么 Demo 里的三行调用撑不住上线

第 02 篇我们把「接入层」放进了七层技术栈地图:它的职责不是让模型更聪明,而是让模型调用稳定、可控、可追踪

很多团队的第一个 AI 功能,代码往往长这样:

response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": user_input}],
)
return response.choices[0].message.content

Demo 阶段这完全够用。但一旦真实用户开始用,问题会来得很快:

  • API 超时没有处理,前端一直转圈;
  • 供应商返回 429 / 503,用户直接看到错误;
  • 同一项目里,流式非流式各写一套逻辑,维护成本翻倍;
  • 后端需要 JSON 接流程,模型偶尔输出多余文字,解析失败;
  • 财务问「这个月 Token 花了多少」,团队答不上来;
  • 用户投诉「答错了」,日志里找不到当时的 request_id
  • 想从 OpenAI 切到 DeepSeek,或加一个备用模型,要改遍所有业务文件。

这些都不是模型能力问题,而是接入层缺失的问题。

本篇是系列第一篇以代码为主的实战文:从最小 API 调用出发,设计并实现一个可复用的 LLMClient,覆盖超时、重试、流式输出、结构化 JSON、Token 统计、日志和多模型路由。你可以把本文的 Client 当作后续 RAG、Agent、评估层的公共底座,而不是在每条业务链路里重复写 SDK 调用。

在这里插入图片描述

1. 最小 API 调用:先确认基线能跑通

动手封装之前,先确认「裸调用」在你环境里能跑通。我们以 OpenAI 兼容接口 为例——OpenAI 官方 SDK,以及通义千问、DeepSeek、Moonshot 等多数国产模型,都提供兼容 chat/completions 的 endpoint。

from openai import OpenAI

client = OpenAI(api_key="YOUR_API_KEY")

response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[
        {"role": "system", "content": "你是一个简洁的助手。"},
        {"role": "user", "content": "用一句话解释什么是 LLMClient。"},
    ],
    temperature=0.2,
)

print(response.choices[0].message.content)
print(response.usage)  # prompt_tokens, completion_tokens

几个要点:

  • messages 是对话数组,常见角色有 system(全局指令)、user(用户输入)、assistant(历史回复)。
  • temperature 越低,输出越稳定;生产环境常见 0~0.3。
  • usage 字段记录 Token 用量,是成本统计的基础。

国产模型通常只需改 base_urlapi_key

client = OpenAI(
    api_key="YOUR_DEEPSEEK_KEY",
    base_url="https://api.deepseek.com",
)

Azure OpenAI 也是同一套 SDK,差异主要在 base_url、API 版本和 deployment 名称(有时 model 参数填 deployment 名而非模型名)。接入层封装时,把这些差异留在 Provider 里即可。

这一步的目的只是验证连通性。明确说:把上述代码复制进每个业务函数,不是生产写法。下一节开始把它收进 LLMClient


2. LLMClient 接口设计:业务只关心 messages 和选项

接入层的第一原则是:业务代码不直接依赖某一家 SDK

业务层只需要回答两个问题:

  1. 我要发什么 messages
  2. 我要什么形式的返回(普通文本 / 流式 / JSON)?

其余——超时、重试、日志、Token、模型路由——全部收敛到 LLMClient

2.1 设计原则

原则 说明
单一入口 统一 chat(...),避免 create / stream / parse_json 散落各处
可替换供应商 OpenAI、DeepSeek、Azure 通过 Provider 适配,业务无感
可观测 每次调用自动产生 request_id、latency、token 用量
可配置 超时、重试、默认 model 集中管理,不散落在业务里

2.2 核心类型与接口

from dataclasses import dataclass
from typing import Iterator

@dataclass
class Usage:
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int

@dataclass
class ChatResult:
    text: str
    model: str
    usage: Usage
    request_id: str
    latency_ms: int
    fallback_used: bool = False

@dataclass
class StreamChunk:
    delta: str
    finish_reason: str | None = None
    usage: Usage | None = None  # 通常在最后一个 chunk

class LLMClient:
    def chat(
        self,
        messages: list[dict],
        *,
        model: str | None = None,
        temperature: float = 0.2,
        stream: bool = False,
        response_format: dict | None = None,
        metadata: dict | None = None,
    ) -> ChatResult | Iterator[StreamChunk]:
        ...
  • metadata:业务场景标签,如 {"scene": "ticket_classify"},写入日志,便于按场景统计成本与错误率。
  • response_format:结构化输出时使用,如 {"type": "json_object"}

先定义接口,再逐步实现。第一版不必拆太多文件;能跑通、能测试、能替换 Provider,比过早抽象更重要。

在这里插入图片描述


3. Provider 适配:把 SDK 差异挡在接入层里

LLMClient 不直接调用 openai.OpenAI(),而是通过 Provider 适配不同供应商。

from abc import ABC, abstractmethod
from typing import Iterator
from openai import OpenAI

class BaseLLMProvider(ABC):
    @abstractmethod
    def complete(
        self,
        *,
        model: str,
        messages: list[dict],
        temperature: float,
        response_format: dict | None = None,
    ):
        ...

    @abstractmethod
    def stream_complete(
        self,
        *,
        model: str,
        messages: list[dict],
        temperature: float,
    ) -> Iterator:
        ...


class OpenAICompatProvider(BaseLLMProvider):
    def __init__(
        self,
        api_key: str,
        base_url: str | None = None,
        timeout: float = 60.0,
    ):
        self._client = OpenAI(
            api_key=api_key,
            base_url=base_url,
            timeout=timeout,
        )

    def complete(self, *, model, messages, temperature, response_format=None):
        kwargs = dict(
            model=model,
            messages=messages,
            temperature=temperature,
        )
        if response_format:
            kwargs["response_format"] = response_format
        return self._client.chat.completions.create(**kwargs)

    def stream_complete(self, *, model, messages, temperature):
        stream = self._client.chat.completions.create(
            model=model,
            messages=messages,
            temperature=temperature,
            stream=True,
        )
        for event in stream:
            choice = event.choices[0]
            yield {
                "delta": choice.delta.content or "",
                "finish_reason": choice.finish_reason,
                "usage": getattr(event, "usage", None),
            }

业务层永远只依赖 LLMClient。换供应商时,改 Provider 配置或实现,不动业务代码。若你使用 Claude、Gemini 等非 OpenAI 兼容 API,同样实现 BaseLLMProvider 即可,对上层接口保持一致。


4. 超时与重试:把临时故障变成可恢复

网络抖动、供应商限流、短暂 503,在生产环境是常态,不是例外。

4.1 超时

建议区分三类超时:

类型 含义 典型设置
连接超时 建立 TCP/TLS 的上限 5~10 秒
读取超时 等待完整响应的上限 30~120 秒(视任务)
流式首 token 用户等到第一个字的最长时间 10~30 秒

OpenAI Python SDK 支持 timeout=60.0 或在 OpenAI(..., timeout=...) 初始化时设置。流式场景还要在应用层检测「长时间无 chunk」,避免连接挂死。

4.2 重试策略

仅对可重试错误重试:

错误 是否重试 说明
429 Too Many Requests 需指数退避,尊重 Retry-After
503 Service Unavailable 短暂故障
网络超时 / 连接重置 临时网络问题
400 Bad Request 参数错误,重试无意义
401 Unauthorized Key 错误

手写重试可以这样:

import time

class RetryableError(Exception):
    pass

def retry_with_backoff(fn, max_retries=3, base_delay=1.0):
    for attempt in range(max_retries + 1):
        try:
            return fn()
        except RetryableError:
            if attempt == max_retries:
                raise
            time.sleep(base_delay * (2 ** attempt))

生产环境更推荐用 tenacity,声明式配置更清晰:

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=8),
    retry=retry_if_exception_type(RetryableError),
    reraise=True,
)
def call_with_retry(fn):
    return fn()

流式重试注意:一旦已向用户推送部分内容,通常不应在同一请求内自动重试(用户会看到重复或断裂)。更稳妥的做法是:流式失败时返回明确错误,由前端决定是否让用户重新发起请求。

在这里插入图片描述


5. 流式输出:统一 SSE 与前端体验

长回答如果等全部生成完再返回,用户等待感明显。流式输出让首字更快出现,是在线对话产品的标配。

LLMClient.chat(..., stream=True) 返回 Iterator[StreamChunk],而不是 ChatResult

for chunk in client.chat(messages, stream=True):
    if chunk.delta:
        print(chunk.delta, end="", flush=True)

后端接前端时,常见做法是用 SSE(Server-Sent Events) 推送。FastAPI 示例:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat/stream")
def chat_stream(user_input: str):
    messages = [{"role": "user", "content": user_input}]

    def generate():
        for chunk in client.chat(messages, stream=True):
            if chunk.delta:
                yield f"data: {chunk.delta}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

流式场景下,Token 统计和完整日志通常在流结束后写入:最后一个 chunk 若带 usage 则直接取用;否则部分 Provider 需要在流结束后单独统计或估算。至少应记录:总耗时、是否完整结束、finish_reason


6. 结构化输出:约束 JSON,而不是祈祷模型听话

很多业务场景需要机器可读的输出:意图分类、字段抽取、工单参数生成。如果只靠 Prompt 说「请返回 JSON」,模型偶尔会在 JSON 外包裹 markdown 或多余说明,导致 json.loads 失败。

推荐三层组合:

  1. API 层约束response_format={"type": "json_object"}(视模型支持;部分模型支持 JSON Schema mode);
  2. Prompt 层约束:给出字段说明、类型和示例;
  3. 解析层兜底:清洗 + json.loads + schema 校验 + 失败重试或降级。
import json
import re

def parse_json_response(text: str) -> dict:
    text = text.strip()
    # 去掉 ```json ... ```包裹
    fence = re.match(r"```(?:json)?\s*([\s\S]*?)```", text)
    if fence:
        text = fence.group(1).strip()
    return json.loads(text)

pydantic 做简单 schema 校验:

from pydantic import BaseModel, ValidationError

class TicketIntent(BaseModel):
    intent: str
    ticket_id: str | None = None
    confidence: float

def parse_ticket_intent(text: str) -> TicketIntent | None:
    try:
        data = parse_json_response(text)
        return TicketIntent.model_validate(data)
    except (json.JSONDecodeError, ValidationError):
        return None

解析失败时的策略:

  • 重试一次:降低 temperature,或在 system 里强调「只输出 JSON」;
  • 返回结构化错误{"error": "parse_failed", "raw": "..."},让上游流程可分支处理;
  • 记录日志:带上 request_id,纳入评估集回放。

在这里插入图片描述


7. Token 统计与成本意识

response.usage 通常包含:

  • prompt_tokens:输入 Token 数;
  • completion_tokens:输出 Token 数;
  • total_tokens:合计。

LLMClient 应把 usage 统一放进 ChatResult.usage,并写入日志。即使第一版不做完整计费系统,也要从第一版就记 Token——否则上线后无法回答「哪个场景最烧钱」。

简单成本估算(单价需按官方价目表维护):

MODEL_PRICE = {
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},  # 美元 / 1M tokens,示例
    "gpt-4o": {"input": 2.50, "output": 10.00},
}

def estimate_cost(model: str, usage: Usage) -> float:
    price = MODEL_PRICE.get(model, {"input": 0, "output": 0})
    return (
        usage.prompt_tokens * price["input"] / 1_000_000
        + usage.completion_tokens * price["output"] / 1_000_000
    )

后续观测层可以把日志汇总成按 scene、按 model 的成本看板。本篇先把数据采出来


8. 日志与 request_id:出问题要能查

每次调用生成唯一 request_id(UUID),贯穿日志、用户反馈和后续 RAG/Agent 链路。

import json
import logging
import uuid

logger = logging.getLogger("llm")


def log_chat_event(*, request_id, model, scene, latency_ms, status, usage, fallback_used=False):
    logger.info(json.dumps({
        "request_id": request_id,
        "model": model,
        "scene": scene,
        "latency_ms": latency_ms,
        "status": status,
        "prompt_tokens": usage.prompt_tokens if usage else 0,
        "completion_tokens": usage.completion_tokens if usage else 0,
        "fallback_used": fallback_used,
    }, ensure_ascii=False))

注意 脱敏

  • 不要记录完整用户隐私、身份证号、手机号;
  • 可记录 messages 条数、总字符数、或对 content 做 hash;
  • 调试环境如需完整 messages,应单独开关且限制访问权限。

当用户说「这个答案不对」时,至少能用 request_id 找到当时用的 model、latency、token。接入 RAG 后,同一 request_id 还可以关联检索结果和 Prompt 版本——这是第 02 篇观测层在代码里的第一块落地。


9. 多模型路由与降级

常见场景:

  • 按任务选模型:分类、抽取用小模型,长文生成用强模型;
  • 主备降级:主模型 503 或超时时切备用模型。

简单路由配置:

ROUTING = {
    "default": {
        "model": "gpt-4o-mini",
        "fallback": ["deepseek-chat"],
    },
    "strong": {
        "model": "gpt-4o",
        "fallback": ["gpt-4o-mini"],
    },
}

chat() 内部推荐流程:

解析 route(metadata.scene 或显式 model)
  → 调用 Provider.complete
  → 可重试错误:退避重试
  → 仍失败:依次尝试 fallback 列表
  → 成功:返回 ChatResult,记录实际 model 与 fallback_used
  → 全失败:抛出明确异常,日志 status=error

降级必须可观测。如果日志里没有 fallback_used,排查时会误以为一直在用主模型,成本与质量分析都会失真。


10. 完整代码串联:一个可落地的 LLMClient

下面给出一个可运行精简版(约 180 行),把前面各节能力串在一起。生产项目可按需拆文件。

建议目录:

import json
import logging
import time
import uuid
from dataclasses import dataclass
from typing import Iterator

from openai import OpenAI, APIStatusError, APITimeoutError

logger = logging.getLogger("llm")

@dataclass
class Usage:
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int

@dataclass
class ChatResult:
    text: str
    model: str
    usage: Usage
    request_id: str
    latency_ms: int
    fallback_used: bool = False

@dataclass
class StreamChunk:
    delta: str
    finish_reason: str | None = None
    usage: Usage | None = None

class RetryableError(Exception):
    pass

def _to_usage(raw) -> Usage:
    return Usage(
        prompt_tokens=raw.prompt_tokens,
        completion_tokens=raw.completion_tokens,
        total_tokens=raw.total_tokens,
    )

class OpenAICompatProvider:
    def __init__(self, api_key: str, base_url: str | None = None, timeout: float = 60.0):
        self._client = OpenAI(api_key=api_key, base_url=base_url, timeout=timeout)

    def complete(self, *, model, messages, temperature, response_format=None):
        kwargs = dict(model=model, messages=messages, temperature=temperature)
        if response_format:
            kwargs["response_format"] = response_format
        return self._client.chat.completions.create(**kwargs)

    def stream_complete(self, *, model, messages, temperature):
        stream = self._client.chat.completions.create(
            model=model, messages=messages, temperature=temperature, stream=True
        )
        for event in stream:
            choice = event.choices[0]
            yield {
                "delta": choice.delta.content or "",
                "finish_reason": choice.finish_reason,
                "usage": getattr(event, "usage", None),
            }

class LLMClient:
    def __init__(
        self,
        provider: OpenAICompatProvider,
        default_model: str = "gpt-4o-mini",
        fallback_models: list[str] | None = None,
        max_retries: int = 2,
    ):
        self._provider = provider
        self._default_model = default_model
        self._fallback_models = fallback_models or []
        self._max_retries = max_retries

    @classmethod
    def from_env(cls):
        import os
        return cls(
            provider=OpenAICompatProvider(
                api_key=os.environ["OPENAI_API_KEY"],
                base_url=os.getenv("OPENAI_BASE_URL"),
            ),
            default_model=os.getenv("LLM_DEFAULT_MODEL", "gpt-4o-mini"),
            fallback_models=[m for m in os.getenv("LLM_FALLBACK_MODELS", "").split(",") if m] or None,
        )

    def chat(
        self,
        messages: list[dict],
        *,
        model: str | None = None,
        temperature: float = 0.2,
        stream: bool = False,
        response_format: dict | None = None,
        metadata: dict | None = None,
    ) -> ChatResult | Iterator[StreamChunk]:
        if stream:
            return self._stream(messages, model=model, temperature=temperature, metadata=metadata)
        return self._complete(messages, model=model, temperature=temperature,
                              response_format=response_format, metadata=metadata)

    def _complete(self, messages, *, model, temperature, response_format, metadata):
        request_id = str(uuid.uuid4())
        scene = (metadata or {}).get("scene", "default")
        candidates = [model or self._default_model] + self._fallback_models
        last_error = None

        for idx, current_model in enumerate(candidates):
            for attempt in range(self._max_retries + 1):
                started = time.perf_counter()
                try:
                    resp = self._provider.complete(
                        model=current_model,
                        messages=messages,
                        temperature=temperature,
                        response_format=response_format,
                    )
                    latency_ms = int((time.perf_counter() - started) * 1000)
                    usage = _to_usage(resp.usage)
                    result = ChatResult(
                        text=resp.choices[0].message.content or "",
                        model=current_model,
                        usage=usage,
                        request_id=request_id,
                        latency_ms=latency_ms,
                        fallback_used=idx > 0,
                    )
                    logger.info(json.dumps({
                        "request_id": request_id,
                        "model": current_model,
                        "scene": scene,
                        "latency_ms": latency_ms,
                        "status": "ok",
                        "prompt_tokens": usage.prompt_tokens,
                        "completion_tokens": usage.completion_tokens,
                        "fallback_used": result.fallback_used,
                    }, ensure_ascii=False))
                    return result
                except (APITimeoutError, APIStatusError) as e:
                    last_error = e
                    if isinstance(e, APIStatusError) and e.status_code not in (429, 503):
                        break
                    if attempt >= self._max_retries:
                        break
                    time.sleep(2 ** attempt)
        raise last_error

    def _stream(self, messages, *, model, temperature, metadata):
        request_id = str(uuid.uuid4())
        current_model = model or self._default_model
        started = time.perf_counter()

        for event in self._provider.stream_complete(
            model=current_model, messages=messages, temperature=temperature
        ):
            usage = _to_usage(event["usage"]) if event.get("usage") else None
            yield StreamChunk(
                delta=event["delta"],
                finish_reason=event.get("finish_reason"),
                usage=usage,
            )

        latency_ms = int((time.perf_counter() - started) * 1000)
        logger.info(json.dumps({
            "request_id": request_id,
            "model": current_model,
            "scene": (metadata or {}).get("scene", "default"),
            "latency_ms": latency_ms,
            "status": "ok_stream",
        }, ensure_ascii=False))

业务调用:

client = LLMClient.from_env()

result = client.chat(
    messages=[{"role": "user", "content": "用三句话总结 RAG。"}],
    metadata={"scene": "summarize"},
)

print(result.text)
print(result.usage)
print(result.request_id)

没有 API Key 时,可实现 MockProvider 返回固定文本,先跑通接口、日志和 JSON 解析链路。


11. 常见误区

误区 1:「接入层写太厚,不如直接用 SDK」

薄封装也要统一日志、超时和 Token。接入层「厚」在横切关注点,不在把业务逻辑塞进去。一个 150~200 行的 Client,通常就能让后续 RAG、Agent 少写大量重复代码。

误区 2:「重试次数越多越好」

429 需要退避;无限重试会放大故障、拉长用户等待。流式场景更要谨慎,避免已向用户输出内容后再次重试。

误区 3:「结构化输出只靠 Prompt」

需要 API 约束 + 解析校验 + 失败兜底,三层一起才可靠。Prompt 再强,也不能替代解析层的防御性编程。

误区 4:「流式和同步两套 Client」

同一 chat(stream=...) 入口更易维护,日志、路由、重试逻辑只写一遍。

误区 5:「日志以后再补」

没有 request_id,RAG 和 Agent 上线后无法串联全链路。第一版就要有最小结构化日志。


12. 本篇练习建议

  1. 把项目里散落的 client.chat.completions.create 收拢到一个 LLMClient
  2. 为每次调用打印 request_id 和 Token 用量,观察一周各场景成本分布。
  3. 为一个需要 JSON 的接口加上 parse_json_response + pydantic 校验 + 失败兜底。
  4. 配置一个备用 model,用断网或错误 model 名模拟 fallback,确认日志里 fallback_used=true

13. 下篇预告

第 04 篇:Prompt 工程基础 — 模板、版本与效果追踪

在稳定接入层之上,讨论如何把 Prompt 从「字符串常量」变成可维护的配置:角色设定、输出格式、拒答边界,以及改动后如何对比效果。


关注作者,获取后续更新

本系列《AI 应用开发实战课》将在以下平台同步更新,欢迎关注追更:

  • 小红书:@水无月sama
  • 知乎:@水无月sama
  • CSDN:@再让我睡两分钟
  • 微信公众号:微信搜索「介就是AI」

14. 讨论

你现在的模型调用是散落在业务里,还是已经有一层统一封装?卡在超时、流式还是 JSON 解析?

欢迎在评论区说说你的现状,后续文章会优先覆盖高频问题。

话题标签(CSDN/小红书发布时使用):
#AI应用开发 #大模型 #LLM #Python #OpenAI

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐