问题背景

把 Claude API 接入生产级 Agent 工作流之后,很多团队发现账单比测试阶段估算高出数倍。根本原因不是 Token 单价,而是 Agent 运行机制引入的多个成本乘数——重试、上下文膨胀、路由不当和观测盲区。

本文提供一套可直接落地的 Python 实现,覆盖:

  • 模型分层路由
  • Prompt Caching 配置
  • 滑动窗口上下文管理
  • 指数退避 + 预算熔断重试
  • 异步并发限流
  • Token 用量日志与成本估算

前置条件

  • Python 3.9+
  • anthropic SDK(pip install anthropic
  • 有效的 API Key(在控制台申请)
  • Base URL:https://gw.claudeapi.com

环境配置

# requirements.txt
anthropic>=0.40.0
asyncio
dataclasses
import anthropic
import asyncio
import logging
import time
from dataclasses import dataclass, field
from typing import List, Optional, Dict

# 基础配置
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://gw.claudeapi.com"

client = anthropic.Anthropic(
    api_key=API_KEY,
    base_url=BASE_URL
)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

一、模型分层路由

定价参考(ClaudeAPI 平台):

模型 ID 输入 $/M Token 输出 $/M Token 适用场景
claude-haiku-4-5-20251001 $0.800 $4.000 分类、摘要、意图识别
claude-sonnet-4-6 $2.400 $12.000 代码生成、推理、结构化输出
claude-opus-4-7 $4.000 $20.000 复杂分析、长文档深度理解
# 模型路由配置
MODEL_ROUTING: Dict[str, str] = {
    "classify":     "claude-haiku-4-5-20251001",
    "summarize":    "claude-haiku-4-5-20251001",
    "extract":      "claude-haiku-4-5-20251001",
    "code_gen":     "claude-sonnet-4-6",
    "planning":     "claude-sonnet-4-6",
    "structured":   "claude-sonnet-4-6",
    "deep_analysis":"claude-opus-4-7",
    "long_doc":     "claude-opus-4-7",
}

# 模型成本表(美元/M Token)
MODEL_PRICE: Dict[str, tuple] = {
    "claude-haiku-4-5-20251001": (0.8, 4.0),
    "claude-sonnet-4-6":         (2.4, 12.0),
    "claude-opus-4-7":           (4.0, 20.0),
}

def route_model(task_type: str, fallback: str = "claude-sonnet-4-6") -> str:
    """根据任务类型返回推荐模型 ID"""
    model = MODEL_ROUTING.get(task_type, fallback)
    logging.info(f"[Router] task_type={task_type} → model={model}")
    return model

二、Prompt Caching 配置

对固定 System Prompt、tools schema 等不变内容标记 cache_control,首次写入后后续调用命中缓存,按更低费率计算(数据来源:Anthropic 官方文档 docs.anthropic.com,截至 2026-06-30)。

这里要注意两点:

  1. 缓存命中依赖稳定前缀。不要把时间戳、随机 trace id、用户本轮输入放进可缓存前缀。
  2. 对 Agent 来说,toolssystem、稳定背景文档通常是最适合缓存的内容;实时检索结果和用户输入不适合缓存。
SYSTEM_PROMPT_LONG = """
你是一个专业的数据分析 Agent。
以下是完整的业务背景、工具说明和操作规范:
[...此处省略,实际内容超过 1024 Token...]
""" * 5  # 模拟长 System Prompt

def create_with_caching(messages: List[dict], task_type: str = "code_gen"):
    """带 Prompt Caching 的消息创建"""
    model = route_model(task_type)
    
    response = client.messages.create(
        model=model,
        max_tokens=4096,
        system=[
            {
                "type": "text",
                "text": SYSTEM_PROMPT_LONG,
                "cache_control": {"type": "ephemeral"}  # 标记可缓存
            }
        ],
        messages=messages
    )
    
    # 记录缓存命中情况
    usage = response.usage
    cache_hit = getattr(usage, "cache_read_input_tokens", 0)
    cache_write = getattr(usage, "cache_creation_input_tokens", 0)
    logging.info(
        f"[Cache] model={model} "
        f"input={usage.input_tokens} "
        f"cache_hit={cache_hit} "
        f"cache_write={cache_write} "
        f"output={usage.output_tokens}"
    )
    return response

三、滑动窗口上下文管理

def trim_messages(
    messages: List[dict],
    max_turns: int = 10,
    keep_first: bool = True
) -> List[dict]:
    """
    滑动窗口裁剪:保留最近 max_turns 轮对话
    keep_first=True 时保留第一条消息(通常是任务初始指令)
    """
    max_messages = max_turns * 2  # 每轮 = user + assistant
    
    if len(messages) <= max_messages:
        return messages
    
    if keep_first and len(messages) > 1:
        # 保留第 1 条 + 最近 (max_messages - 1) 条
        trimmed = [messages[0]] + messages[-(max_messages - 1):]
    else:
        trimmed = messages[-max_messages:]
    
    dropped = len(messages) - len(trimmed)
    logging.info(f"[Trim] 裁剪了 {dropped} 条历史消息,保留 {len(trimmed)} 条")
    return trimmed


def summarize_and_compress(
    messages: List[dict],
    compress_threshold: int = 20
) -> List[dict]:
    """
    摘要压缩:当消息数超过阈值时,用 Haiku 压缩历史
    """
    if len(messages) <= compress_threshold:
        return messages
    
    history_to_compress = messages[:-4]  # 保留最近 2 轮不压缩
    recent = messages[-4:]
    
    # 用 Haiku 做历史摘要(成本最低)
    compress_prompt = "请将以下对话历史压缩为简洁摘要,保留关键信息和决策:\n\n"
    for m in history_to_compress:
        compress_prompt += f"[{m['role']}]: {m['content']}\n"
    
    summary_response = client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=512,
        messages=[{"role": "user", "content": compress_prompt}]
    )
    summary = summary_response.content[0].text
    
    compressed = [
        {"role": "user", "content": f"[历史摘要]: {summary}"},
        {"role": "assistant", "content": "已了解历史背景,继续处理。"}
    ] + recent
    
    logging.info(f"[Compress] {len(messages)} 条压缩为 {len(compressed)} 条")
    return compressed

四、指数退避重试 + 预算熔断

# 可重试的 HTTP 状态码
RETRYABLE_STATUS = {429, 500, 502, 503, 529}
MAX_RETRIES = 4
DEFAULT_TASK_TOKEN_BUDGET = 50_000  # 单任务 Token 上限

@dataclass
class AgentCallLog:
    task_id: str
    model: str
    input_tokens: int
    output_tokens: int
    cache_read_tokens: int = 0
    cache_write_tokens: int = 0
    retry_count: int = 0
    error: str = ""
    
    def cost_usd(self) -> float:
        """估算本次调用基础成本(美元),缓存读写以 ClaudeAPI 控制台账单为准"""
        inp_price, out_price = MODEL_PRICE.get(self.model, (2.4, 12.0))
        return (
            self.input_tokens * inp_price +
            self.output_tokens * out_price
        ) / 1_000_000
    
    def __str__(self):
        return (
            f"task={self.task_id} model={self.model} "
            f"in={self.input_tokens} out={self.output_tokens} "
            f"cache_hit={self.cache_read_tokens} "
            f"retry={self.retry_count} "
            f"cost=${self.cost_usd():.6f}"
        )


def call_with_retry_and_budget(
    messages: List[dict],
    model: str = "claude-sonnet-4-6",
    task_id: str = "unknown",
    task_tokens_used: int = 0,
    token_budget: int = DEFAULT_TASK_TOKEN_BUDGET
) -> tuple:
    """
    带指数退避重试和预算熔断的同步调用
    返回 (response, task_tokens_used, AgentCallLog)
    """
    last_error = ""
    
    for attempt in range(MAX_RETRIES):
        try:
            response = client.messages.create(
                model=model,
                max_tokens=4096,
                messages=messages
            )
            usage = response.usage
            this_call_tokens = usage.input_tokens + usage.output_tokens
            task_tokens_used += this_call_tokens
            
            # 预算熔断检查
            if task_tokens_used > token_budget:
                raise RuntimeError(
                    f"任务 {task_id} 已消耗 {task_tokens_used} tokens,"
                    f"超出预算 {token_budget},强制终止"
                )
            
            log = AgentCallLog(
                task_id=task_id,
                model=model,
                input_tokens=usage.input_tokens,
                output_tokens=usage.output_tokens,
                cache_read_tokens=getattr(usage, "cache_read_input_tokens", 0),
                cache_write_tokens=getattr(usage, "cache_creation_input_tokens", 0),
                retry_count=attempt,
            )
            logging.info(f"[Call] {log}")
            return response, task_tokens_used, log
            
        except anthropic.APIStatusError as e:
            last_error = str(e)
            if e.status_code not in RETRYABLE_STATUS:
                # 不可重试错误(400/401 等)直接抛出
                logging.error(f"[Call] 不可重试错误 status={e.status_code}: {e}")
                raise
            
            wait_sec = 2 ** attempt
            logging.warning(
                f"[Retry {attempt+1}/{MAX_RETRIES}] "
                f"status={e.status_code}{wait_sec}s 后重试..."
            )
            time.sleep(wait_sec)
    
    raise RuntimeError(f"超过最大重试次数 {MAX_RETRIES},最后错误: {last_error}")

五、异步并发限流

在 Agent at Scale 场景里,并发控制建议放在应用层,而不是只依赖接口返回 429 后再重试。一个更稳的结构是:

用户请求
  ↓
任务队列(按 P0/P1/P2 分优先级)
  ↓
并发控制器(Semaphore / token bucket)
  ↓
模型路由器(Haiku / Sonnet / Opus)
  ↓
Claude API 调用
  ↓
用量日志 + 成本报表

这样做的好处是,限流、路由、预算熔断、日志记录都在同一层完成,后续排查问题更简单。

async def call_async(
    messages: List[dict],
    model: str,
    task_id: str,
    semaphore: asyncio.Semaphore
) -> AgentCallLog:
    """带并发控制的异步调用(使用同步 client 的 asyncio wrapper)"""
    async with semaphore:
        loop = asyncio.get_event_loop()
        # 在线程池中运行同步调用,避免阻塞事件循环
        response, _, log = await loop.run_in_executor(
            None,
            lambda: call_with_retry_and_budget(messages, model, task_id)
        )
        return log


async def run_agent_pool(
    task_list: List[dict],
    max_concurrent: int = 5
) -> List[AgentCallLog]:
    """
    并发执行多个 Agent 任务
    task_list: [{"task_id": str, "task_type": str, "messages": List[dict]}, ...]
    """
    semaphore = asyncio.Semaphore(max_concurrent)
    
    coroutines = [
        call_async(
            messages=task["messages"],
            model=route_model(task.get("task_type", "code_gen")),
            task_id=task["task_id"],
            semaphore=semaphore
        )
        for task in task_list
    ]
    
    logs = await asyncio.gather(*coroutines, return_exceptions=True)
    
    # 过滤异常,记录失败任务
    results = []
    for i, log in enumerate(logs):
        if isinstance(log, Exception):
            logging.error(f"[Pool] task {task_list[i]['task_id']} 失败: {log}")
        else:
            results.append(log)
    
    return results

六、用量汇总与成本报表

from collections import defaultdict

def generate_cost_report(logs: List[AgentCallLog]) -> Dict:
    """
    按模型汇总 Token 用量和成本
    """
    report = defaultdict(lambda: {
        "calls": 0,
        "input_tokens": 0,
        "output_tokens": 0,
        "cache_hit_tokens": 0,
        "total_cost_usd": 0.0,
        "retry_total": 0,
    })
    
    for log in logs:
        m = report[log.model]
        m["calls"] += 1
        m["input_tokens"] += log.input_tokens
        m["output_tokens"] += log.output_tokens
        m["cache_hit_tokens"] += log.cache_read_tokens
        m["total_cost_usd"] += log.cost_usd()
        m["retry_total"] += log.retry_count
    
    # 打印报表
    print("\n===== Agent 成本报表 =====")
    total_cost = 0.0
    for model, stats in report.items():
        print(f"\n模型: {model}")
        print(f"  调用次数:     {stats['calls']}")
        print(f"  输入 Tokens:  {stats['input_tokens']:,}")
        print(f"  输出 Tokens:  {stats['output_tokens']:,}")
        print(f"  缓存命中:     {stats['cache_hit_tokens']:,}")
        print(f"  总重试次数:   {stats['retry_total']}")
        print(f"  估算成本:     ${stats['total_cost_usd']:.4f}")
        total_cost += stats["total_cost_usd"]
    
    print(f"\n总估算成本: ${total_cost:.4f}")
    print("========================\n")
    return dict(report)

如果要接入真实生产系统,建议把日志写入 SQLite / PostgreSQL / ClickHouse,而不是只打印到终端。最小表结构可以这样设计:

CREATE TABLE agent_call_logs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    created_at TEXT NOT NULL,
    request_id TEXT,
    task_id TEXT,
    team_id TEXT,
    user_id TEXT,
    model TEXT NOT NULL,
    input_tokens INTEGER DEFAULT 0,
    output_tokens INTEGER DEFAULT 0,
    cache_read_tokens INTEGER DEFAULT 0,
    cache_write_tokens INTEGER DEFAULT 0,
    retry_count INTEGER DEFAULT 0,
    latency_ms INTEGER DEFAULT 0,
    error_code TEXT,
    cost_usd_estimated REAL DEFAULT 0
);

每日可以跑三类 SQL:

-- 1. 按模型看成本结构
SELECT model, COUNT(*) AS calls, SUM(cost_usd_estimated) AS cost
FROM agent_call_logs
WHERE created_at >= date('now', '-1 day')
GROUP BY model
ORDER BY cost DESC;

-- 2. 按任务看最贵 workflow
SELECT task_id, COUNT(*) AS calls, SUM(input_tokens + output_tokens) AS tokens
FROM agent_call_logs
GROUP BY task_id
ORDER BY tokens DESC
LIMIT 20;

-- 3. 看重试和错误是否造成成本浪费
SELECT error_code, SUM(retry_count) AS retries, COUNT(*) AS calls
FROM agent_call_logs
GROUP BY error_code
ORDER BY retries DESC;

七、完整使用示例

async def main():
    # 模拟 10 个不同类型的 Agent 任务
    tasks = [
        {
            "task_id": f"task_{i:03d}",
            "task_type": ["classify", "code_gen", "summarize", "planning", "deep_analysis"][i % 5],
            "messages": [
                {"role": "user", "content": f"这是第 {i} 个任务的输入内容,请处理。"}
            ]
        }
        for i in range(10)
    ]
    
    # 并发执行,最多 3 个同时进行
    logs = await run_agent_pool(tasks, max_concurrent=3)
    
    # 生成成本报表
    generate_cost_report(logs)


if __name__ == "__main__":
    asyncio.run(main())

常见排障

问题 可能原因 解决方案
401 Unauthorized API Key 无效或未配置 检查 api_keybase_url 是否正确
429 Rate Limit 并发过高触发限速 降低 max_concurrent,增加退避时间
529 Overloaded 服务端临时过载 已在重试逻辑中处理,等待后自动重试
成本远超预期 上下文未裁剪 / 路由不当 检查 trim_messages 是否生效;核查模型路由配置
Prompt Caching 未命中 前缀内容每次有变化 cache_control 标记的内容必须完全一致才能命中
Agent 循环不停止 缺少熔断条件 检查 token_budget 是否设定,确认 Agent 有终止条件

配置检查清单

上线前逐项核验:

  • base_url 设为 https://gw.claudeapi.com
  • 模型路由按任务类型正确分配 Haiku / Sonnet / Opus
  • 固定 System Prompt 已加 cache_control: ephemeral
  • 多轮对话有 trim_messages 或摘要压缩
  • 重试逻辑区分了可重试与不可重试错误
  • 并发控制用 Semaphore 限制最大并发数
  • 每次调用有 Token 用量日志记录
  • 单任务设有 token_budget 熔断上限

参考资料

  • Anthropic 官方文档 Prompt Caching:https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching
  • Anthropic 官方 Python SDK:https://github.com/anthropic-sdk/anthropic-python
  • Claude API 错误码说明:https://docs.anthropic.com/en/api/errors

本文的持续更新版本可查看:Claude API Agent at Scale 成本管理指南

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐