Claude API Agent at Scale 成本核算与预算管理实战(含 Python 代码)
·
问题背景
把 Claude API 接入生产级 Agent 工作流之后,很多团队发现账单比测试阶段估算高出数倍。根本原因不是 Token 单价,而是 Agent 运行机制引入的多个成本乘数——重试、上下文膨胀、路由不当和观测盲区。
本文提供一套可直接落地的 Python 实现,覆盖:
- 模型分层路由
- Prompt Caching 配置
- 滑动窗口上下文管理
- 指数退避 + 预算熔断重试
- 异步并发限流
- Token 用量日志与成本估算
前置条件:
- Python 3.9+
anthropicSDK(pip install anthropic)- 有效的 API Key(在控制台申请)
- Base URL:
https://gw.claudeapi.com
环境配置
# requirements.txt
anthropic>=0.40.0
asyncio
dataclasses
import anthropic
import asyncio
import logging
import time
from dataclasses import dataclass, field
from typing import List, Optional, Dict
# 基础配置
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://gw.claudeapi.com"
client = anthropic.Anthropic(
api_key=API_KEY,
base_url=BASE_URL
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
一、模型分层路由
定价参考(ClaudeAPI 平台):
| 模型 ID | 输入 $/M Token | 输出 $/M Token | 适用场景 |
|---|---|---|---|
| claude-haiku-4-5-20251001 | $0.800 | $4.000 | 分类、摘要、意图识别 |
| claude-sonnet-4-6 | $2.400 | $12.000 | 代码生成、推理、结构化输出 |
| claude-opus-4-7 | $4.000 | $20.000 | 复杂分析、长文档深度理解 |
# 模型路由配置
MODEL_ROUTING: Dict[str, str] = {
"classify": "claude-haiku-4-5-20251001",
"summarize": "claude-haiku-4-5-20251001",
"extract": "claude-haiku-4-5-20251001",
"code_gen": "claude-sonnet-4-6",
"planning": "claude-sonnet-4-6",
"structured": "claude-sonnet-4-6",
"deep_analysis":"claude-opus-4-7",
"long_doc": "claude-opus-4-7",
}
# 模型成本表(美元/M Token)
MODEL_PRICE: Dict[str, tuple] = {
"claude-haiku-4-5-20251001": (0.8, 4.0),
"claude-sonnet-4-6": (2.4, 12.0),
"claude-opus-4-7": (4.0, 20.0),
}
def route_model(task_type: str, fallback: str = "claude-sonnet-4-6") -> str:
"""根据任务类型返回推荐模型 ID"""
model = MODEL_ROUTING.get(task_type, fallback)
logging.info(f"[Router] task_type={task_type} → model={model}")
return model
二、Prompt Caching 配置
对固定 System Prompt、tools schema 等不变内容标记 cache_control,首次写入后后续调用命中缓存,按更低费率计算(数据来源:Anthropic 官方文档 docs.anthropic.com,截至 2026-06-30)。
这里要注意两点:
- 缓存命中依赖稳定前缀。不要把时间戳、随机 trace id、用户本轮输入放进可缓存前缀。
- 对 Agent 来说,
tools、system、稳定背景文档通常是最适合缓存的内容;实时检索结果和用户输入不适合缓存。
SYSTEM_PROMPT_LONG = """
你是一个专业的数据分析 Agent。
以下是完整的业务背景、工具说明和操作规范:
[...此处省略,实际内容超过 1024 Token...]
""" * 5 # 模拟长 System Prompt
def create_with_caching(messages: List[dict], task_type: str = "code_gen"):
"""带 Prompt Caching 的消息创建"""
model = route_model(task_type)
response = client.messages.create(
model=model,
max_tokens=4096,
system=[
{
"type": "text",
"text": SYSTEM_PROMPT_LONG,
"cache_control": {"type": "ephemeral"} # 标记可缓存
}
],
messages=messages
)
# 记录缓存命中情况
usage = response.usage
cache_hit = getattr(usage, "cache_read_input_tokens", 0)
cache_write = getattr(usage, "cache_creation_input_tokens", 0)
logging.info(
f"[Cache] model={model} "
f"input={usage.input_tokens} "
f"cache_hit={cache_hit} "
f"cache_write={cache_write} "
f"output={usage.output_tokens}"
)
return response
三、滑动窗口上下文管理
def trim_messages(
messages: List[dict],
max_turns: int = 10,
keep_first: bool = True
) -> List[dict]:
"""
滑动窗口裁剪:保留最近 max_turns 轮对话
keep_first=True 时保留第一条消息(通常是任务初始指令)
"""
max_messages = max_turns * 2 # 每轮 = user + assistant
if len(messages) <= max_messages:
return messages
if keep_first and len(messages) > 1:
# 保留第 1 条 + 最近 (max_messages - 1) 条
trimmed = [messages[0]] + messages[-(max_messages - 1):]
else:
trimmed = messages[-max_messages:]
dropped = len(messages) - len(trimmed)
logging.info(f"[Trim] 裁剪了 {dropped} 条历史消息,保留 {len(trimmed)} 条")
return trimmed
def summarize_and_compress(
messages: List[dict],
compress_threshold: int = 20
) -> List[dict]:
"""
摘要压缩:当消息数超过阈值时,用 Haiku 压缩历史
"""
if len(messages) <= compress_threshold:
return messages
history_to_compress = messages[:-4] # 保留最近 2 轮不压缩
recent = messages[-4:]
# 用 Haiku 做历史摘要(成本最低)
compress_prompt = "请将以下对话历史压缩为简洁摘要,保留关键信息和决策:\n\n"
for m in history_to_compress:
compress_prompt += f"[{m['role']}]: {m['content']}\n"
summary_response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
messages=[{"role": "user", "content": compress_prompt}]
)
summary = summary_response.content[0].text
compressed = [
{"role": "user", "content": f"[历史摘要]: {summary}"},
{"role": "assistant", "content": "已了解历史背景,继续处理。"}
] + recent
logging.info(f"[Compress] {len(messages)} 条压缩为 {len(compressed)} 条")
return compressed
四、指数退避重试 + 预算熔断
# 可重试的 HTTP 状态码
RETRYABLE_STATUS = {429, 500, 502, 503, 529}
MAX_RETRIES = 4
DEFAULT_TASK_TOKEN_BUDGET = 50_000 # 单任务 Token 上限
@dataclass
class AgentCallLog:
task_id: str
model: str
input_tokens: int
output_tokens: int
cache_read_tokens: int = 0
cache_write_tokens: int = 0
retry_count: int = 0
error: str = ""
def cost_usd(self) -> float:
"""估算本次调用基础成本(美元),缓存读写以 ClaudeAPI 控制台账单为准"""
inp_price, out_price = MODEL_PRICE.get(self.model, (2.4, 12.0))
return (
self.input_tokens * inp_price +
self.output_tokens * out_price
) / 1_000_000
def __str__(self):
return (
f"task={self.task_id} model={self.model} "
f"in={self.input_tokens} out={self.output_tokens} "
f"cache_hit={self.cache_read_tokens} "
f"retry={self.retry_count} "
f"cost=${self.cost_usd():.6f}"
)
def call_with_retry_and_budget(
messages: List[dict],
model: str = "claude-sonnet-4-6",
task_id: str = "unknown",
task_tokens_used: int = 0,
token_budget: int = DEFAULT_TASK_TOKEN_BUDGET
) -> tuple:
"""
带指数退避重试和预算熔断的同步调用
返回 (response, task_tokens_used, AgentCallLog)
"""
last_error = ""
for attempt in range(MAX_RETRIES):
try:
response = client.messages.create(
model=model,
max_tokens=4096,
messages=messages
)
usage = response.usage
this_call_tokens = usage.input_tokens + usage.output_tokens
task_tokens_used += this_call_tokens
# 预算熔断检查
if task_tokens_used > token_budget:
raise RuntimeError(
f"任务 {task_id} 已消耗 {task_tokens_used} tokens,"
f"超出预算 {token_budget},强制终止"
)
log = AgentCallLog(
task_id=task_id,
model=model,
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
cache_read_tokens=getattr(usage, "cache_read_input_tokens", 0),
cache_write_tokens=getattr(usage, "cache_creation_input_tokens", 0),
retry_count=attempt,
)
logging.info(f"[Call] {log}")
return response, task_tokens_used, log
except anthropic.APIStatusError as e:
last_error = str(e)
if e.status_code not in RETRYABLE_STATUS:
# 不可重试错误(400/401 等)直接抛出
logging.error(f"[Call] 不可重试错误 status={e.status_code}: {e}")
raise
wait_sec = 2 ** attempt
logging.warning(
f"[Retry {attempt+1}/{MAX_RETRIES}] "
f"status={e.status_code},{wait_sec}s 后重试..."
)
time.sleep(wait_sec)
raise RuntimeError(f"超过最大重试次数 {MAX_RETRIES},最后错误: {last_error}")
五、异步并发限流
在 Agent at Scale 场景里,并发控制建议放在应用层,而不是只依赖接口返回 429 后再重试。一个更稳的结构是:
用户请求
↓
任务队列(按 P0/P1/P2 分优先级)
↓
并发控制器(Semaphore / token bucket)
↓
模型路由器(Haiku / Sonnet / Opus)
↓
Claude API 调用
↓
用量日志 + 成本报表
这样做的好处是,限流、路由、预算熔断、日志记录都在同一层完成,后续排查问题更简单。
async def call_async(
messages: List[dict],
model: str,
task_id: str,
semaphore: asyncio.Semaphore
) -> AgentCallLog:
"""带并发控制的异步调用(使用同步 client 的 asyncio wrapper)"""
async with semaphore:
loop = asyncio.get_event_loop()
# 在线程池中运行同步调用,避免阻塞事件循环
response, _, log = await loop.run_in_executor(
None,
lambda: call_with_retry_and_budget(messages, model, task_id)
)
return log
async def run_agent_pool(
task_list: List[dict],
max_concurrent: int = 5
) -> List[AgentCallLog]:
"""
并发执行多个 Agent 任务
task_list: [{"task_id": str, "task_type": str, "messages": List[dict]}, ...]
"""
semaphore = asyncio.Semaphore(max_concurrent)
coroutines = [
call_async(
messages=task["messages"],
model=route_model(task.get("task_type", "code_gen")),
task_id=task["task_id"],
semaphore=semaphore
)
for task in task_list
]
logs = await asyncio.gather(*coroutines, return_exceptions=True)
# 过滤异常,记录失败任务
results = []
for i, log in enumerate(logs):
if isinstance(log, Exception):
logging.error(f"[Pool] task {task_list[i]['task_id']} 失败: {log}")
else:
results.append(log)
return results
六、用量汇总与成本报表
from collections import defaultdict
def generate_cost_report(logs: List[AgentCallLog]) -> Dict:
"""
按模型汇总 Token 用量和成本
"""
report = defaultdict(lambda: {
"calls": 0,
"input_tokens": 0,
"output_tokens": 0,
"cache_hit_tokens": 0,
"total_cost_usd": 0.0,
"retry_total": 0,
})
for log in logs:
m = report[log.model]
m["calls"] += 1
m["input_tokens"] += log.input_tokens
m["output_tokens"] += log.output_tokens
m["cache_hit_tokens"] += log.cache_read_tokens
m["total_cost_usd"] += log.cost_usd()
m["retry_total"] += log.retry_count
# 打印报表
print("\n===== Agent 成本报表 =====")
total_cost = 0.0
for model, stats in report.items():
print(f"\n模型: {model}")
print(f" 调用次数: {stats['calls']}")
print(f" 输入 Tokens: {stats['input_tokens']:,}")
print(f" 输出 Tokens: {stats['output_tokens']:,}")
print(f" 缓存命中: {stats['cache_hit_tokens']:,}")
print(f" 总重试次数: {stats['retry_total']}")
print(f" 估算成本: ${stats['total_cost_usd']:.4f}")
total_cost += stats["total_cost_usd"]
print(f"\n总估算成本: ${total_cost:.4f}")
print("========================\n")
return dict(report)
如果要接入真实生产系统,建议把日志写入 SQLite / PostgreSQL / ClickHouse,而不是只打印到终端。最小表结构可以这样设计:
CREATE TABLE agent_call_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL,
request_id TEXT,
task_id TEXT,
team_id TEXT,
user_id TEXT,
model TEXT NOT NULL,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cache_read_tokens INTEGER DEFAULT 0,
cache_write_tokens INTEGER DEFAULT 0,
retry_count INTEGER DEFAULT 0,
latency_ms INTEGER DEFAULT 0,
error_code TEXT,
cost_usd_estimated REAL DEFAULT 0
);
每日可以跑三类 SQL:
-- 1. 按模型看成本结构
SELECT model, COUNT(*) AS calls, SUM(cost_usd_estimated) AS cost
FROM agent_call_logs
WHERE created_at >= date('now', '-1 day')
GROUP BY model
ORDER BY cost DESC;
-- 2. 按任务看最贵 workflow
SELECT task_id, COUNT(*) AS calls, SUM(input_tokens + output_tokens) AS tokens
FROM agent_call_logs
GROUP BY task_id
ORDER BY tokens DESC
LIMIT 20;
-- 3. 看重试和错误是否造成成本浪费
SELECT error_code, SUM(retry_count) AS retries, COUNT(*) AS calls
FROM agent_call_logs
GROUP BY error_code
ORDER BY retries DESC;
七、完整使用示例
async def main():
# 模拟 10 个不同类型的 Agent 任务
tasks = [
{
"task_id": f"task_{i:03d}",
"task_type": ["classify", "code_gen", "summarize", "planning", "deep_analysis"][i % 5],
"messages": [
{"role": "user", "content": f"这是第 {i} 个任务的输入内容,请处理。"}
]
}
for i in range(10)
]
# 并发执行,最多 3 个同时进行
logs = await run_agent_pool(tasks, max_concurrent=3)
# 生成成本报表
generate_cost_report(logs)
if __name__ == "__main__":
asyncio.run(main())
常见排障
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
401 Unauthorized |
API Key 无效或未配置 | 检查 api_key 和 base_url 是否正确 |
429 Rate Limit |
并发过高触发限速 | 降低 max_concurrent,增加退避时间 |
529 Overloaded |
服务端临时过载 | 已在重试逻辑中处理,等待后自动重试 |
| 成本远超预期 | 上下文未裁剪 / 路由不当 | 检查 trim_messages 是否生效;核查模型路由配置 |
| Prompt Caching 未命中 | 前缀内容每次有变化 | cache_control 标记的内容必须完全一致才能命中 |
| Agent 循环不停止 | 缺少熔断条件 | 检查 token_budget 是否设定,确认 Agent 有终止条件 |
配置检查清单
上线前逐项核验:
-
base_url设为https://gw.claudeapi.com - 模型路由按任务类型正确分配 Haiku / Sonnet / Opus
- 固定 System Prompt 已加
cache_control: ephemeral - 多轮对话有
trim_messages或摘要压缩 - 重试逻辑区分了可重试与不可重试错误
- 并发控制用
Semaphore限制最大并发数 - 每次调用有 Token 用量日志记录
- 单任务设有
token_budget熔断上限
参考资料
- Anthropic 官方文档 Prompt Caching:https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching
- Anthropic 官方 Python SDK:https://github.com/anthropic-sdk/anthropic-python
- Claude API 错误码说明:https://docs.anthropic.com/en/api/errors
本文的持续更新版本可查看:Claude API Agent at Scale 成本管理指南
更多推荐


所有评论(0)