模型 API 接入与统一 LLMClient 设计
引言:为什么 Demo 里的三行调用撑不住上线
第 02 篇我们把「接入层」放进了七层技术栈地图:它的职责不是让模型更聪明,而是让模型调用稳定、可控、可追踪。
很多团队的第一个 AI 功能,代码往往长这样:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": user_input}],
)
return response.choices[0].message.content
Demo 阶段这完全够用。但一旦真实用户开始用,问题会来得很快:
- API 超时没有处理,前端一直转圈;
- 供应商返回 429 / 503,用户直接看到错误;
- 同一项目里,流式和非流式各写一套逻辑,维护成本翻倍;
- 后端需要 JSON 接流程,模型偶尔输出多余文字,解析失败;
- 财务问「这个月 Token 花了多少」,团队答不上来;
- 用户投诉「答错了」,日志里找不到当时的 request_id;
- 想从 OpenAI 切到 DeepSeek,或加一个备用模型,要改遍所有业务文件。
这些都不是模型能力问题,而是接入层缺失的问题。
本篇是系列第一篇以代码为主的实战文:从最小 API 调用出发,设计并实现一个可复用的 LLMClient,覆盖超时、重试、流式输出、结构化 JSON、Token 统计、日志和多模型路由。你可以把本文的 Client 当作后续 RAG、Agent、评估层的公共底座,而不是在每条业务链路里重复写 SDK 调用。
1. 最小 API 调用:先确认基线能跑通
动手封装之前,先确认「裸调用」在你环境里能跑通。我们以 OpenAI 兼容接口 为例——OpenAI 官方 SDK,以及通义千问、DeepSeek、Moonshot 等多数国产模型,都提供兼容 chat/completions 的 endpoint。
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY")
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "你是一个简洁的助手。"},
{"role": "user", "content": "用一句话解释什么是 LLMClient。"},
],
temperature=0.2,
)
print(response.choices[0].message.content)
print(response.usage) # prompt_tokens, completion_tokens
几个要点:
- messages 是对话数组,常见角色有
system(全局指令)、user(用户输入)、assistant(历史回复)。 - temperature 越低,输出越稳定;生产环境常见 0~0.3。
- usage 字段记录 Token 用量,是成本统计的基础。
国产模型通常只需改 base_url 和 api_key:
client = OpenAI(
api_key="YOUR_DEEPSEEK_KEY",
base_url="https://api.deepseek.com",
)
Azure OpenAI 也是同一套 SDK,差异主要在 base_url、API 版本和 deployment 名称(有时 model 参数填 deployment 名而非模型名)。接入层封装时,把这些差异留在 Provider 里即可。
这一步的目的只是验证连通性。明确说:把上述代码复制进每个业务函数,不是生产写法。下一节开始把它收进 LLMClient。
2. LLMClient 接口设计:业务只关心 messages 和选项
接入层的第一原则是:业务代码不直接依赖某一家 SDK。
业务层只需要回答两个问题:
- 我要发什么 messages?
- 我要什么形式的返回(普通文本 / 流式 / JSON)?
其余——超时、重试、日志、Token、模型路由——全部收敛到 LLMClient。
2.1 设计原则
| 原则 | 说明 |
|---|---|
| 单一入口 | 统一 chat(...),避免 create / stream / parse_json 散落各处 |
| 可替换供应商 | OpenAI、DeepSeek、Azure 通过 Provider 适配,业务无感 |
| 可观测 | 每次调用自动产生 request_id、latency、token 用量 |
| 可配置 | 超时、重试、默认 model 集中管理,不散落在业务里 |
2.2 核心类型与接口
from dataclasses import dataclass
from typing import Iterator
@dataclass
class Usage:
prompt_tokens: int
completion_tokens: int
total_tokens: int
@dataclass
class ChatResult:
text: str
model: str
usage: Usage
request_id: str
latency_ms: int
fallback_used: bool = False
@dataclass
class StreamChunk:
delta: str
finish_reason: str | None = None
usage: Usage | None = None # 通常在最后一个 chunk
class LLMClient:
def chat(
self,
messages: list[dict],
*,
model: str | None = None,
temperature: float = 0.2,
stream: bool = False,
response_format: dict | None = None,
metadata: dict | None = None,
) -> ChatResult | Iterator[StreamChunk]:
...
- metadata:业务场景标签,如
{"scene": "ticket_classify"},写入日志,便于按场景统计成本与错误率。 - response_format:结构化输出时使用,如
{"type": "json_object"}。
先定义接口,再逐步实现。第一版不必拆太多文件;能跑通、能测试、能替换 Provider,比过早抽象更重要。

3. Provider 适配:把 SDK 差异挡在接入层里
LLMClient 不直接调用 openai.OpenAI(),而是通过 Provider 适配不同供应商。
from abc import ABC, abstractmethod
from typing import Iterator
from openai import OpenAI
class BaseLLMProvider(ABC):
@abstractmethod
def complete(
self,
*,
model: str,
messages: list[dict],
temperature: float,
response_format: dict | None = None,
):
...
@abstractmethod
def stream_complete(
self,
*,
model: str,
messages: list[dict],
temperature: float,
) -> Iterator:
...
class OpenAICompatProvider(BaseLLMProvider):
def __init__(
self,
api_key: str,
base_url: str | None = None,
timeout: float = 60.0,
):
self._client = OpenAI(
api_key=api_key,
base_url=base_url,
timeout=timeout,
)
def complete(self, *, model, messages, temperature, response_format=None):
kwargs = dict(
model=model,
messages=messages,
temperature=temperature,
)
if response_format:
kwargs["response_format"] = response_format
return self._client.chat.completions.create(**kwargs)
def stream_complete(self, *, model, messages, temperature):
stream = self._client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
stream=True,
)
for event in stream:
choice = event.choices[0]
yield {
"delta": choice.delta.content or "",
"finish_reason": choice.finish_reason,
"usage": getattr(event, "usage", None),
}
业务层永远只依赖 LLMClient。换供应商时,改 Provider 配置或实现,不动业务代码。若你使用 Claude、Gemini 等非 OpenAI 兼容 API,同样实现 BaseLLMProvider 即可,对上层接口保持一致。
4. 超时与重试:把临时故障变成可恢复
网络抖动、供应商限流、短暂 503,在生产环境是常态,不是例外。
4.1 超时
建议区分三类超时:
| 类型 | 含义 | 典型设置 |
|---|---|---|
| 连接超时 | 建立 TCP/TLS 的上限 | 5~10 秒 |
| 读取超时 | 等待完整响应的上限 | 30~120 秒(视任务) |
| 流式首 token | 用户等到第一个字的最长时间 | 10~30 秒 |
OpenAI Python SDK 支持 timeout=60.0 或在 OpenAI(..., timeout=...) 初始化时设置。流式场景还要在应用层检测「长时间无 chunk」,避免连接挂死。
4.2 重试策略
仅对可重试错误重试:
| 错误 | 是否重试 | 说明 |
|---|---|---|
| 429 Too Many Requests | 是 | 需指数退避,尊重 Retry-After |
| 503 Service Unavailable | 是 | 短暂故障 |
| 网络超时 / 连接重置 | 是 | 临时网络问题 |
| 400 Bad Request | 否 | 参数错误,重试无意义 |
| 401 Unauthorized | 否 | Key 错误 |
手写重试可以这样:
import time
class RetryableError(Exception):
pass
def retry_with_backoff(fn, max_retries=3, base_delay=1.0):
for attempt in range(max_retries + 1):
try:
return fn()
except RetryableError:
if attempt == max_retries:
raise
time.sleep(base_delay * (2 ** attempt))
生产环境更推荐用 tenacity,声明式配置更清晰:
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=8),
retry=retry_if_exception_type(RetryableError),
reraise=True,
)
def call_with_retry(fn):
return fn()
流式重试注意:一旦已向用户推送部分内容,通常不应在同一请求内自动重试(用户会看到重复或断裂)。更稳妥的做法是:流式失败时返回明确错误,由前端决定是否让用户重新发起请求。

5. 流式输出:统一 SSE 与前端体验
长回答如果等全部生成完再返回,用户等待感明显。流式输出让首字更快出现,是在线对话产品的标配。
LLMClient.chat(..., stream=True) 返回 Iterator[StreamChunk],而不是 ChatResult:
for chunk in client.chat(messages, stream=True):
if chunk.delta:
print(chunk.delta, end="", flush=True)
后端接前端时,常见做法是用 SSE(Server-Sent Events) 推送。FastAPI 示例:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat/stream")
def chat_stream(user_input: str):
messages = [{"role": "user", "content": user_input}]
def generate():
for chunk in client.chat(messages, stream=True):
if chunk.delta:
yield f"data: {chunk.delta}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
流式场景下,Token 统计和完整日志通常在流结束后写入:最后一个 chunk 若带 usage 则直接取用;否则部分 Provider 需要在流结束后单独统计或估算。至少应记录:总耗时、是否完整结束、finish_reason。
6. 结构化输出:约束 JSON,而不是祈祷模型听话
很多业务场景需要机器可读的输出:意图分类、字段抽取、工单参数生成。如果只靠 Prompt 说「请返回 JSON」,模型偶尔会在 JSON 外包裹 markdown 或多余说明,导致 json.loads 失败。
推荐三层组合:
- API 层约束:
response_format={"type": "json_object"}(视模型支持;部分模型支持 JSON Schema mode); - Prompt 层约束:给出字段说明、类型和示例;
- 解析层兜底:清洗 +
json.loads+ schema 校验 + 失败重试或降级。
import json
import re
def parse_json_response(text: str) -> dict:
text = text.strip()
# 去掉 ```json ... ```包裹
fence = re.match(r"```(?:json)?\s*([\s\S]*?)```", text)
if fence:
text = fence.group(1).strip()
return json.loads(text)
用 pydantic 做简单 schema 校验:
from pydantic import BaseModel, ValidationError
class TicketIntent(BaseModel):
intent: str
ticket_id: str | None = None
confidence: float
def parse_ticket_intent(text: str) -> TicketIntent | None:
try:
data = parse_json_response(text)
return TicketIntent.model_validate(data)
except (json.JSONDecodeError, ValidationError):
return None
解析失败时的策略:
- 重试一次:降低 temperature,或在 system 里强调「只输出 JSON」;
- 返回结构化错误:
{"error": "parse_failed", "raw": "..."},让上游流程可分支处理; - 记录日志:带上
request_id,纳入评估集回放。

7. Token 统计与成本意识
response.usage 通常包含:
prompt_tokens:输入 Token 数;completion_tokens:输出 Token 数;total_tokens:合计。
LLMClient 应把 usage 统一放进 ChatResult.usage,并写入日志。即使第一版不做完整计费系统,也要从第一版就记 Token——否则上线后无法回答「哪个场景最烧钱」。
简单成本估算(单价需按官方价目表维护):
MODEL_PRICE = {
"gpt-4o-mini": {"input": 0.15, "output": 0.60}, # 美元 / 1M tokens,示例
"gpt-4o": {"input": 2.50, "output": 10.00},
}
def estimate_cost(model: str, usage: Usage) -> float:
price = MODEL_PRICE.get(model, {"input": 0, "output": 0})
return (
usage.prompt_tokens * price["input"] / 1_000_000
+ usage.completion_tokens * price["output"] / 1_000_000
)
后续观测层可以把日志汇总成按 scene、按 model 的成本看板。本篇先把数据采出来。
8. 日志与 request_id:出问题要能查
每次调用生成唯一 request_id(UUID),贯穿日志、用户反馈和后续 RAG/Agent 链路。
import json
import logging
import uuid
logger = logging.getLogger("llm")
def log_chat_event(*, request_id, model, scene, latency_ms, status, usage, fallback_used=False):
logger.info(json.dumps({
"request_id": request_id,
"model": model,
"scene": scene,
"latency_ms": latency_ms,
"status": status,
"prompt_tokens": usage.prompt_tokens if usage else 0,
"completion_tokens": usage.completion_tokens if usage else 0,
"fallback_used": fallback_used,
}, ensure_ascii=False))
注意 脱敏:
- 不要记录完整用户隐私、身份证号、手机号;
- 可记录 messages 条数、总字符数、或对 content 做 hash;
- 调试环境如需完整 messages,应单独开关且限制访问权限。
当用户说「这个答案不对」时,至少能用 request_id 找到当时用的 model、latency、token。接入 RAG 后,同一 request_id 还可以关联检索结果和 Prompt 版本——这是第 02 篇观测层在代码里的第一块落地。
9. 多模型路由与降级
常见场景:
- 按任务选模型:分类、抽取用小模型,长文生成用强模型;
- 主备降级:主模型 503 或超时时切备用模型。
简单路由配置:
ROUTING = {
"default": {
"model": "gpt-4o-mini",
"fallback": ["deepseek-chat"],
},
"strong": {
"model": "gpt-4o",
"fallback": ["gpt-4o-mini"],
},
}
chat() 内部推荐流程:
解析 route(metadata.scene 或显式 model)
→ 调用 Provider.complete
→ 可重试错误:退避重试
→ 仍失败:依次尝试 fallback 列表
→ 成功:返回 ChatResult,记录实际 model 与 fallback_used
→ 全失败:抛出明确异常,日志 status=error
降级必须可观测。如果日志里没有 fallback_used,排查时会误以为一直在用主模型,成本与质量分析都会失真。
10. 完整代码串联:一个可落地的 LLMClient
下面给出一个可运行精简版(约 180 行),把前面各节能力串在一起。生产项目可按需拆文件。
建议目录:
import json
import logging
import time
import uuid
from dataclasses import dataclass
from typing import Iterator
from openai import OpenAI, APIStatusError, APITimeoutError
logger = logging.getLogger("llm")
@dataclass
class Usage:
prompt_tokens: int
completion_tokens: int
total_tokens: int
@dataclass
class ChatResult:
text: str
model: str
usage: Usage
request_id: str
latency_ms: int
fallback_used: bool = False
@dataclass
class StreamChunk:
delta: str
finish_reason: str | None = None
usage: Usage | None = None
class RetryableError(Exception):
pass
def _to_usage(raw) -> Usage:
return Usage(
prompt_tokens=raw.prompt_tokens,
completion_tokens=raw.completion_tokens,
total_tokens=raw.total_tokens,
)
class OpenAICompatProvider:
def __init__(self, api_key: str, base_url: str | None = None, timeout: float = 60.0):
self._client = OpenAI(api_key=api_key, base_url=base_url, timeout=timeout)
def complete(self, *, model, messages, temperature, response_format=None):
kwargs = dict(model=model, messages=messages, temperature=temperature)
if response_format:
kwargs["response_format"] = response_format
return self._client.chat.completions.create(**kwargs)
def stream_complete(self, *, model, messages, temperature):
stream = self._client.chat.completions.create(
model=model, messages=messages, temperature=temperature, stream=True
)
for event in stream:
choice = event.choices[0]
yield {
"delta": choice.delta.content or "",
"finish_reason": choice.finish_reason,
"usage": getattr(event, "usage", None),
}
class LLMClient:
def __init__(
self,
provider: OpenAICompatProvider,
default_model: str = "gpt-4o-mini",
fallback_models: list[str] | None = None,
max_retries: int = 2,
):
self._provider = provider
self._default_model = default_model
self._fallback_models = fallback_models or []
self._max_retries = max_retries
@classmethod
def from_env(cls):
import os
return cls(
provider=OpenAICompatProvider(
api_key=os.environ["OPENAI_API_KEY"],
base_url=os.getenv("OPENAI_BASE_URL"),
),
default_model=os.getenv("LLM_DEFAULT_MODEL", "gpt-4o-mini"),
fallback_models=[m for m in os.getenv("LLM_FALLBACK_MODELS", "").split(",") if m] or None,
)
def chat(
self,
messages: list[dict],
*,
model: str | None = None,
temperature: float = 0.2,
stream: bool = False,
response_format: dict | None = None,
metadata: dict | None = None,
) -> ChatResult | Iterator[StreamChunk]:
if stream:
return self._stream(messages, model=model, temperature=temperature, metadata=metadata)
return self._complete(messages, model=model, temperature=temperature,
response_format=response_format, metadata=metadata)
def _complete(self, messages, *, model, temperature, response_format, metadata):
request_id = str(uuid.uuid4())
scene = (metadata or {}).get("scene", "default")
candidates = [model or self._default_model] + self._fallback_models
last_error = None
for idx, current_model in enumerate(candidates):
for attempt in range(self._max_retries + 1):
started = time.perf_counter()
try:
resp = self._provider.complete(
model=current_model,
messages=messages,
temperature=temperature,
response_format=response_format,
)
latency_ms = int((time.perf_counter() - started) * 1000)
usage = _to_usage(resp.usage)
result = ChatResult(
text=resp.choices[0].message.content or "",
model=current_model,
usage=usage,
request_id=request_id,
latency_ms=latency_ms,
fallback_used=idx > 0,
)
logger.info(json.dumps({
"request_id": request_id,
"model": current_model,
"scene": scene,
"latency_ms": latency_ms,
"status": "ok",
"prompt_tokens": usage.prompt_tokens,
"completion_tokens": usage.completion_tokens,
"fallback_used": result.fallback_used,
}, ensure_ascii=False))
return result
except (APITimeoutError, APIStatusError) as e:
last_error = e
if isinstance(e, APIStatusError) and e.status_code not in (429, 503):
break
if attempt >= self._max_retries:
break
time.sleep(2 ** attempt)
raise last_error
def _stream(self, messages, *, model, temperature, metadata):
request_id = str(uuid.uuid4())
current_model = model or self._default_model
started = time.perf_counter()
for event in self._provider.stream_complete(
model=current_model, messages=messages, temperature=temperature
):
usage = _to_usage(event["usage"]) if event.get("usage") else None
yield StreamChunk(
delta=event["delta"],
finish_reason=event.get("finish_reason"),
usage=usage,
)
latency_ms = int((time.perf_counter() - started) * 1000)
logger.info(json.dumps({
"request_id": request_id,
"model": current_model,
"scene": (metadata or {}).get("scene", "default"),
"latency_ms": latency_ms,
"status": "ok_stream",
}, ensure_ascii=False))
业务调用:
client = LLMClient.from_env()
result = client.chat(
messages=[{"role": "user", "content": "用三句话总结 RAG。"}],
metadata={"scene": "summarize"},
)
print(result.text)
print(result.usage)
print(result.request_id)
没有 API Key 时,可实现 MockProvider 返回固定文本,先跑通接口、日志和 JSON 解析链路。
11. 常见误区
误区 1:「接入层写太厚,不如直接用 SDK」
薄封装也要统一日志、超时和 Token。接入层「厚」在横切关注点,不在把业务逻辑塞进去。一个 150~200 行的 Client,通常就能让后续 RAG、Agent 少写大量重复代码。
误区 2:「重试次数越多越好」
429 需要退避;无限重试会放大故障、拉长用户等待。流式场景更要谨慎,避免已向用户输出内容后再次重试。
误区 3:「结构化输出只靠 Prompt」
需要 API 约束 + 解析校验 + 失败兜底,三层一起才可靠。Prompt 再强,也不能替代解析层的防御性编程。
误区 4:「流式和同步两套 Client」
同一 chat(stream=...) 入口更易维护,日志、路由、重试逻辑只写一遍。
误区 5:「日志以后再补」
没有 request_id,RAG 和 Agent 上线后无法串联全链路。第一版就要有最小结构化日志。
12. 本篇练习建议
- 把项目里散落的
client.chat.completions.create收拢到一个LLMClient。 - 为每次调用打印
request_id和 Token 用量,观察一周各场景成本分布。 - 为一个需要 JSON 的接口加上
parse_json_response+ pydantic 校验 + 失败兜底。 - 配置一个备用 model,用断网或错误 model 名模拟 fallback,确认日志里
fallback_used=true。
13. 下篇预告
第 04 篇:Prompt 工程基础 — 模板、版本与效果追踪
在稳定接入层之上,讨论如何把 Prompt 从「字符串常量」变成可维护的配置:角色设定、输出格式、拒答边界,以及改动后如何对比效果。
关注作者,获取后续更新
本系列《AI 应用开发实战课》将在以下平台同步更新,欢迎关注追更:
- 小红书:@水无月sama
- 知乎:@水无月sama
- CSDN:@再让我睡两分钟
- 微信公众号:微信搜索「介就是AI」
14. 讨论
你现在的模型调用是散落在业务里,还是已经有一层统一封装?卡在超时、流式还是 JSON 解析?
欢迎在评论区说说你的现状,后续文章会优先覆盖高频问题。
话题标签(CSDN/小红书发布时使用):#AI应用开发 #大模型 #LLM #Python #OpenAI
更多推荐




所有评论(0)