# AI API 多模型 Fallback 实战:Claude 挂了自动切 GPT,一行代码搞定
你的情况推荐方案个人项目 / 快速验证手写简单版 Fallback,两个供应商够了中小型 SaaS自建路由器 + 熔断,可控性和灵活性平衡生产环境 / 团队项目API 网关,省时省力,内置监控不要让单一 AI API 供应商成为你服务的单点故障。实现成本很低,但它能在下一次凌晨三点的故障里救你一命。我自己在用的是Praka.ai—— 支持 GPT、Claude、Gemini、DeepSeek 等
本文从实际问题出发,从手写 fallback 逻辑,到最终用一行代码搞定,手把手带你实现 AI API 的多模型容灾方案。
场景还原:凌晨三点,线上报警
这是真实发生过的事:
某个 AI 写作工具,用户量不大但付费率不错。某天凌晨,OpenAI 突发故障,所有 API 调用返回 503。开发者睡得正香,等到早上起来看手机,发现几百条报错,用户在群里投诉。
损失?直观的是几十个用户退款请求,不直观的是口碑。
这次故障本来完全可以避免,只需要一套 fallback 机制——Claude 挂了切 GPT,GPT 挂了切 Gemini,对用户无感知,对代码零改动。
本文就来聊怎么做。
为什么单供应商是个定时炸弹?
先把问题说清楚。
1. 每家都会出故障
这不是黑某家,是客观事实:
- OpenAI 在过去 12 个月内有记录的服务中断超过 8 次
- Anthropic 的 Claude API 在高峰期偶发 529(过载)
- Google Gemini 在 EU 地区有过数据中心级别的不可用
没有 100% SLA 的云服务,AI API 也不例外。
2. Rate Limit 比故障更常见
更频繁的问题不是「挂了」,而是「被限流了」。
尤其是 Anthropic,Claude 的限速相当严格(详见上一篇《2026 主流 AI API 全面横评》):
- 新账号 RPM 极低
- Claude Code 有独立的会话级别计数
- 429 之后没有准确的 retry-after
单一供应商被限流 = 你的服务挂起,与故障无异。
3. 价格波动是隐患
AI API 价格调整越来越频繁。绑定单一供应商意味着:涨价了只能被动接受,或者改一遍代码换供应商。
方案一:手写 Fallback 逻辑(DIY 派)
我们先用最直白的方式实现,理解原理。
最简版:按顺序重试
import anthropic
from openai import OpenAI
import time
anthropic_client = anthropic.Anthropic(api_key="your-claude-key")
openai_client = OpenAI(api_key="your-openai-key")
def chat_with_fallback(user_message: str) -> str:
"""
优先用 Claude,失败后自动切 GPT
"""
# 第一优先级:Claude
try:
response = anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}]
)
return response.content[0].text
except anthropic.RateLimitError:
print("Claude 被限流,切换到 GPT...")
except anthropic.APIStatusError as e:
print(f"Claude API 异常 ({e.status_code}),切换到 GPT...")
except Exception as e:
print(f"Claude 未知错误:{e},切换到 GPT...")
# 第二优先级:GPT-4o
try:
response = openai_client.chat.completions.create(
model="gpt-4o",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}]
)
return response.choices[0].message.content
except Exception as e:
raise RuntimeError(f"所有 AI API 均不可用:{e}")
# 使用
result = chat_with_fallback("帮我写一个快速排序的 Python 实现")
print(result)
这个版本能跑,但有几个问题:
- 每个供应商的 SDK 和异常类型不同,切 Gemini 还得再写一套
- hardcode 了模型优先级,调整困难
- 没有熔断机制:Claude 已经连续失败 100 次了,每次还是先试 Claude 再失败,浪费时间
进阶版:带权重、熔断和多供应商
import time
import random
from dataclasses import dataclass, field
from typing import Callable, Optional
from openai import OpenAI # Gemini 和 DeepSeek 都兼容 OpenAI 格式
@dataclass
class Provider:
name: str
client: OpenAI
model: str
weight: int = 1 # 权重,越高越优先
failure_count: int = 0 # 连续失败次数
circuit_open: bool = False # 熔断状态
circuit_open_until: float = 0.0 # 熔断结束时间
class AIRouter:
"""
多供应商 AI 路由器,支持权重调度、自动 Fallback、熔断保护
"""
CIRCUIT_FAILURE_THRESHOLD = 3 # 连续失败 N 次触发熔断
CIRCUIT_RECOVERY_SECONDS = 60 # 熔断恢复时间(秒)
def __init__(self, providers: list[Provider]):
self.providers = sorted(providers, key=lambda p: p.weight, reverse=True)
def _is_available(self, provider: Provider) -> bool:
"""检查供应商是否可用(考虑熔断状态)"""
if not provider.circuit_open:
return True
# 检查熔断是否过期
if time.time() > provider.circuit_open_until:
print(f"[Router] {provider.name} 熔断已恢复,重新尝试")
provider.circuit_open = False
provider.failure_count = 0
return True
return False
def _on_success(self, provider: Provider):
provider.failure_count = 0
provider.circuit_open = False
def _on_failure(self, provider: Provider):
provider.failure_count += 1
if provider.failure_count >= self.CIRCUIT_FAILURE_THRESHOLD:
provider.circuit_open = True
provider.circuit_open_until = time.time() + self.CIRCUIT_RECOVERY_SECONDS
print(f"[Router] {provider.name} 触发熔断,{self.CIRCUIT_RECOVERY_SECONDS}s 后恢复")
def chat(self, message: str, **kwargs) -> str:
"""
发送消息,自动在可用供应商间路由
"""
errors = []
for provider in self.providers:
if not self._is_available(provider):
print(f"[Router] {provider.name} 熔断中,跳过")
continue
try:
print(f"[Router] 尝试 {provider.name} ({provider.model})")
response = provider.client.chat.completions.create(
model=provider.model,
messages=[{"role": "user", "content": message}],
max_tokens=kwargs.get("max_tokens", 1024),
)
self._on_success(provider)
return response.choices[0].message.content
except Exception as e:
self._on_failure(provider)
errors.append(f"{provider.name}: {e}")
print(f"[Router] {provider.name} 失败:{e}")
continue
raise RuntimeError(f"所有供应商均不可用。错误详情:{'; '.join(errors)}")
# ---- 配置供应商 ----
router = AIRouter(providers=[
Provider(
name="Claude",
client=OpenAI(
api_key="your-anthropic-key",
base_url="https://api.anthropic.com/v1/"
# 注:需要用 anthropic-sdk 或兼容层,这里用 praka 等网关最方便
),
model="claude-3-5-sonnet-20241022",
weight=3, # 优先级最高
),
Provider(
name="GPT-4o",
client=OpenAI(api_key="your-openai-key"),
model="gpt-4o",
weight=2,
),
Provider(
name="Gemini Flash",
client=OpenAI(
api_key="your-gemini-key",
base_url="https://generativelanguage.googleapis.com/v1beta/openai/"
),
model="gemini-2.0-flash",
weight=1, # 兜底
),
])
# ---- 使用 ----
result = router.chat("帮我写一个快速排序的 Python 实现")
print(result)
这版好多了:
- ✅ 支持多供应商(权重排序)
- ✅ 支持熔断(连续失败自动暂停,定时恢复)
- ✅ 统一 OpenAI 接口(Claude 需要兼容层)
但还有一个问题:Anthropic 的 SDK 格式和 OpenAI 不完全一致,直接用 OpenAI client 调 Anthropic 会报错。这就引出了第三个方案。
方案二:用统一 API 网关(一行代码搞定)
上面那套代码写起来还是比较繁琐的,而且每次要接新供应商都要改路由逻辑。
有一类工具叫 AI API 网关,专门解决这个问题:
- 统一 OpenAI 兼容接口
- 内置多模型路由和 Fallback
- 自动处理各厂商的格式差异
- 一个 API Key 接入所有模型
接入方式只需要改两行:
from openai import OpenAI
# 改之前(只有 OpenAI)
client = OpenAI(api_key="sk-openai-xxx")
# 改之后(接入网关,自动支持 100+ 模型 + Fallback)
client = OpenAI(
api_key="your-gateway-api-key", # 换成网关的 key
base_url="https://api.praka.ai/v1" # 换成网关地址
)
# 以下代码完全不变 ↓
response = client.chat.completions.create(
model="claude-3-5-sonnet-20241022",
messages=[{"role": "user", "content": "帮我写一个快速排序的 Python 实现"}],
max_tokens=1024,
)
print(response.choices[0].message.content)
这就是标题里说的「一行代码搞定」——严格来说是两行(api_key + base_url),但你现有的业务代码零改动。
网关在背后做了什么?
你的代码
│
▼
AI API 网关
├─ 解析请求中的 model 参数
├─ 路由到对应的供应商(Claude → Anthropic)
├─ 供应商返回错误(429/503/故障)
│ ├─ 自动 Fallback → GPT-4o
│ └─ 仍然失败 → Fallback → Gemini
└─ 统一格式返回 OpenAI 兼容响应
│
▼
你的代码收到结果(感知不到切换过程)
Fallback 策略配置示例
网关通常支持在请求时声明 fallback 链:
response = client.chat.completions.create(
model="claude-3-5-sonnet-20241022",
messages=[{"role": "user", "content": "你好"}],
# 通过 extra_body 传递网关专属参数
extra_body={
"fallback_models": [
"gpt-4o", # 第一备选
"gemini-2.0-flash" # 第二备选(兜底)
],
"fallback_on": ["rate_limit", "server_error", "timeout"]
}
)
也可以在网关控制台全局配置,不需要每次请求都带参数。
方案对比
| 维度 | 手写 Fallback | 自建路由器 | API 网关 |
|---|---|---|---|
| 接入成本 | 高(每个供应商都要适配) | 中(写一次,复用) | 极低(改两行) |
| 维护成本 | 高(供应商 SDK 升级要跟进) | 中 | 极低(网关维护) |
| 扩展性 | 差(加新供应商改代码) | 中 | 好(控制台添加) |
| 熔断/限流处理 | 要自己写 | 自己写 | 网关内置 |
| 可观测性 | 要自己埋点 | 要自己埋点 | 网关提供 Dashboard |
| 适合场景 | 供应商≤2,极度定制化需求 | 中型项目,自托管偏好 | 快速接入,生产环境 |
进阶:不同任务用不同 Fallback 策略
Fallback 并不是「一刀切换到备用」这么简单。更精细的做法是按任务类型设置不同策略:
def get_fallback_chain(task_type: str) -> list[str]:
"""根据任务类型返回不同的 fallback 优先级"""
chains = {
# 代码任务:Claude 最强,GPT-4o 次之
"code": ["claude-3-7-sonnet", "gpt-4.1", "deepseek-v3"],
# 成本敏感的简单任务:优先便宜的
"simple": ["gemini-2.0-flash", "gpt-4o-mini", "claude-3-haiku"],
# 中文内容:DeepSeek 排前面
"chinese": ["deepseek-v3", "claude-3-5-sonnet", "gpt-4o"],
# 实时性要求高:优先低延迟
"realtime": ["gemini-2.0-flash", "gpt-4o", "claude-3-haiku"],
}
return chains.get(task_type, ["gpt-4o", "claude-3-5-sonnet"])
# 使用示例
def smart_chat(message: str, task_type: str = "general") -> str:
fallback_chain = get_fallback_chain(task_type)
response = client.chat.completions.create(
model=fallback_chain[0],
messages=[{"role": "user", "content": message}],
extra_body={"fallback_models": fallback_chain[1:]}
)
return response.choices[0].message.content
# 代码任务:Claude → GPT-4.1 → DeepSeek
result = smart_chat("写一个 Redis 分布式锁实现", task_type="code")
# 中文摘要:DeepSeek → Claude → GPT
result = smart_chat("总结这篇新闻...", task_type="chinese")
监控:知道 Fallback 发生了多少次
Fallback 是容灾机制,不是常态。如果 Fallback 频率很高,说明主力供应商有问题,需要排查。
要监控几个关键指标:
import time
from collections import defaultdict
class MetricsCollector:
def __init__(self):
self.success_count = defaultdict(int)
self.fallback_count = defaultdict(int)
self.error_count = defaultdict(int)
self.latency_sum = defaultdict(float)
def record(self, provider: str, latency_ms: float,
is_fallback: bool = False, is_error: bool = False):
if is_error:
self.error_count[provider] += 1
elif is_fallback:
self.fallback_count[provider] += 1
else:
self.success_count[provider] += 1
self.latency_sum[provider] += latency_ms
def report(self):
print("\n=== AI API 健康报告 ===")
all_providers = set(
list(self.success_count.keys()) +
list(self.fallback_count.keys()) +
list(self.error_count.keys())
)
for p in all_providers:
total = self.success_count[p] + self.fallback_count[p] + self.error_count[p]
if total == 0:
continue
fallback_rate = self.fallback_count[p] / total * 100
error_rate = self.error_count[p] / total * 100
avg_latency = self.latency_sum[p] / total
print(f"{p:20s} | 总请求: {total:4d} | "
f"Fallback率: {fallback_rate:.1f}% | "
f"错误率: {error_rate:.1f}% | "
f"均延迟: {avg_latency:.0f}ms")
metrics = MetricsCollector()
# 输出示例:
# Claude | 总请求: 1000 | Fallback率: 3.2% | 错误率: 0.1% | 均延迟: 620ms
# GPT-4o | 总请求: 32 | Fallback率: 0.0% | 错误率: 0.0% | 均延迟: 310ms
Fallback 率超过 5% → 开始关注
Fallback 率超过 15% → 主力供应商需要排查或替换
常见问题 & 踩坑记录
Q:Fallback 后响应格式会变吗?
A:只要都走 OpenAI 兼容接口,格式完全一致。用 SDK 的话 response.choices[0].message.content 永远是对的。
Q:Fallback 会不会让用户感知到延迟增加?
A:会有轻微增加(约 200-500ms,取决于第一个供应商的超时设置)。建议把主供应商的超时设短一点(如 10s),失败后快速切换,总延迟比死等 30s 要好。
Q:流式输出(streaming)支持 Fallback 吗?
A:支持,但实现复杂一些。主供应商开始 stream 后如果中途断流,需要判断已收到的内容,继续用备用供应商补全剩余内容。网关通常内置这个逻辑。
Q:Fallback 产生的费用怎么算?
A:主供应商失败(如果是 429 没有实际 token 消耗)则不计费;如果是正常调用后返回错误,该算多少就算多少。Fallback 本身会在备用供应商计费。
总结
| 你的情况 | 推荐方案 |
|---|---|
| 个人项目 / 快速验证 | 手写简单版 Fallback,两个供应商够了 |
| 中小型 SaaS | 自建路由器 + 熔断,可控性和灵活性平衡 |
| 生产环境 / 团队项目 | API 网关,省时省力,内置监控 |
核心结论只有一句话:不要让单一 AI API 供应商成为你服务的单点故障。
实现成本很低,但它能在下一次凌晨三点的故障里救你一命。
我自己在用的是 Praka.ai —— 支持 GPT、Claude、Gemini、DeepSeek 等 100 多个模型,接入方式兼容 OpenAI SDK,改一行 base_url 就能用。对于个人开发者和小团队来说,是一个低成本降低供应商风险的实用方案。
觉得有用的话,点个赞收藏下次找得到 🙏
有问题欢迎评论区讨论,也欢迎分享你的踩坑经历 👇
更多推荐



所有评论(0)