最近在项目中需要将多个AI模型服务整合到统一平台,尝试了多种方案后,最终选择了Claude Code Router来管理火山方舟的模型调用。整个过程从环境搭建到生产部署,踩了不少坑,也积累了一些经验,今天就来分享一下我的实战记录。

1. 项目背景与工具选择

Claude Code Router本质上是一个智能的请求分发器。当你的应用需要调用不同模型(比如火山方舟提供的各种大语言模型)时,它可以根据预设的规则,自动将请求路由到最合适的模型端点。这在我们需要根据查询内容、成本、性能等因素动态选择模型时特别有用。

火山方舟平台提供了丰富的模型API,但直接管理多个模型的密钥、端点和调用逻辑会很快变得混乱。这就是引入路由层的原因。

在选择路由方案时,我对比了几种常见做法:

  • 硬编码if-else:最简单,但缺乏灵活性,每加一个模型都要改代码。
  • 独立配置中心:如Consul或Nacos,功能强大但较重,需要额外维护。
  • 专用AI路由中间件:如Claude Code Router,轻量级,专为AI模型路由设计,开箱即用。

最终选择Claude Code Router,主要是看中它配置简单、规则灵活,并且与常见的AI开发栈(如LangChain)集成较好。

2. 环境搭建与基础配置

首先需要安装必要的包。我习惯用虚拟环境来管理依赖。

# 创建并激活虚拟环境
python -m venv venv
source venv/bin/activate  # Linux/Mac
# venv\Scripts\activate  # Windows

# 安装核心包
pip install claude-code-router
pip install requests  # 用于调用火山方舟API

接下来是核心的配置文件。我创建了一个 config.json 来定义路由规则和模型参数。

{
  "router_config": {
    "strategy": "rule_based", // 路由策略:基于规则
    "default_model": "volcano-ark-general" // 默认回退模型
  },
  "models": [
    {
      "model_id": "volcano-ark-general",
      "endpoint": "https://ark.volcengine.com/api/v3/chat/completions",
      "api_key_env_var": "VOLCANO_ARK_API_KEY_GENERAL",
      "description": "通用对话模型,适合大多数文本生成任务",
      "capabilities": ["general_chat", "summarization", "translation"],
      "cost_per_token": 0.00002,
      "max_tokens": 4096
    },
    {
      "model_id": "volcano-ark-code",
      "endpoint": "https://ark.volcengine.com/api/v3/chat/completions",
      "api_key_env_var": "VOLCANO_ARK_API_KEY_CODE",
      "description": "代码专用模型,优化了代码生成和理解",
      "capabilities": ["code_generation", "code_explanation", "debugging"],
      "cost_per_token": 0.000025,
      "max_tokens": 8192
    },
    {
      "model_id": "volcano-ark-fast",
      "endpoint": "https://ark.volcengine.com/api/v3/chat/completions",
      "api_key_env_var": "VOLCANO_ARK_API_KEY_FAST",
      "description": "高速响应模型,适合低延迟场景",
      "capabilities": ["general_chat", "fast_response"],
      "cost_per_token": 0.000015,
      "max_tokens": 2048,
      "priority": 1 // 优先级更高
    }
  ],
  "routing_rules": [
    {
      "name": "code_related",
      "condition": "contains(user_input, '代码') OR contains(user_input, 'program') OR contains(user_input, 'def ')",
      "target_model": "volcano-ark-code",
      "weight": 0.9
    },
    {
      "name": "low_latency",
      "condition": "user_metadata.get('require_fast_response') == 'true'",
      "target_model": "volcano-ark-fast",
      "weight": 1.0
    },
    {
      "name": "fallback_general",
      "condition": "true", // 始终为真的兜底规则
      "target_model": "volcano-ark-general",
      "weight": 0.1
    }
  ]
}

这个配置文件定义了三个模型和三条路由规则。规则会按顺序评估,weight 字段可以调整规则匹配的优先级。

3. 初始化路由与模型调用

有了配置文件,接下来就是初始化路由器和实现调用逻辑。我写了一个 model_router.py

import json
import os
from typing import Dict, Any, Optional
import requests

class VolcanoArkRouter:
    """火山方舟模型路由管理器"""
    
    def __init__(self, config_path: str = "config.json"):
        """初始化路由器,加载配置"""
        with open(config_path, 'r', encoding='utf-8') as f:
            self.config = json.load(f)
        
        self.models = {model['model_id']: model for model in self.config['models']}
        self.rules = self.config['routing_rules']
        self.default_model = self.config['router_config']['default_model']
        
        # 验证API密钥环境变量是否存在
        self._validate_api_keys()
    
    def _validate_api_keys(self):
        """检查所有模型所需的API密钥是否已设置环境变量"""
        missing_keys = []
        for model in self.config['models']:
            env_var = model['api_key_env_var']
            if not os.getenv(env_var):
                missing_keys.append(f"{model['model_id']}: {env_var}")
        
        if missing_keys:
            raise EnvironmentError(
                f"缺少以下API密钥环境变量:\n" + "\n".join(missing_keys)
            )
    
    def evaluate_condition(self, condition: str, context: Dict[str, Any]) -> bool:
        """简单评估路由条件(生产环境建议使用安全的表达式求值库)"""
        user_input = context.get('user_input', '').lower()
        user_metadata = context.get('user_metadata', {})
        
        # 这里简化了条件评估逻辑
        if condition == 'true':
            return True
        
        if 'contains(user_input' in condition:
            # 提取要检查的关键词
            keyword = condition.split("'")[1]
            return keyword in user_input
        
        if 'user_metadata.get' in condition:
            # 提取元数据键和值
            parts = condition.split("==")
            key = parts[0].split("'")[1]
            value = parts[1].strip().strip("'")
            return str(user_metadata.get(key)) == value
        
        return False
    
    def route_request(self, user_input: str, **kwargs) -> str:
        """路由用户请求到合适的模型"""
        context = {
            'user_input': user_input,
            'user_metadata': kwargs.get('metadata', {})
        }
        
        # 按顺序评估路由规则
        for rule in self.rules:
            if self.evaluate_condition(rule['condition'], context):
                target_model = rule['target_model']
                print(f"路由规则 '{rule['name']}' 匹配,选择模型: {target_model}")
                return target_model
        
        # 没有规则匹配,使用默认模型
        print(f"无规则匹配,使用默认模型: {self.default_model}")
        return self.default_model
    
    def call_model(self, model_id: str, messages: list, **kwargs) -> Dict[str, Any]:
        """调用指定的火山方舟模型"""
        model_config = self.models.get(model_id)
        if not model_config:
            raise ValueError(f"未知模型ID: {model_id}")
        
        # 从环境变量获取API密钥
        api_key = os.getenv(model_config['api_key_env_var'])
        
        # 准备请求头
        headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        
        # 准备请求体
        payload = {
            "model": model_id,
            "messages": messages,
            "max_tokens": kwargs.get('max_tokens', model_config.get('max_tokens', 2048)),
            "temperature": kwargs.get('temperature', 0.7),
            "top_p": kwargs.get('top_p', 0.9)
        }
        
        # 添加可选参数
        if 'stream' in kwargs:
            payload['stream'] = kwargs['stream']
        
        try:
            response = requests.post(
                model_config['endpoint'],
                headers=headers,
                json=payload,
                timeout=kwargs.get('timeout', 30)
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"调用模型 {model_id} 失败: {e}")
            raise
    
    def process_query(self, user_input: str, **kwargs) -> str:
        """完整的查询处理流程:路由 + 调用"""
        # 1. 路由到合适的模型
        model_id = self.route_request(user_input, **kwargs)
        
        # 2. 准备消息
        messages = [{"role": "user", "content": user_input}]
        
        # 3. 调用模型
        result = self.call_model(model_id, messages, **kwargs)
        
        # 4. 提取回复
        if result.get('choices'):
            return result['choices'][0]['message']['content']
        else:
            return "模型返回格式异常"

# 使用示例
if __name__ == "__main__":
    # 使用前需要设置环境变量
    # export VOLCANO_ARK_API_KEY_GENERAL="your_key_here"
    # export VOLCANO_ARK_API_KEY_CODE="your_key_here"
    # export VOLCANO_ARK_API_KEY_FAST="your_key_here"
    
    router = VolcanoArkRouter()
    
    # 测试不同查询的路由
    test_queries = [
        "帮我写一个Python快速排序函数",
        "今天天气怎么样?",
        "需要快速回答:什么是机器学习?"
    ]
    
    for query in test_queries:
        print(f"\n查询: {query}")
        print("-" * 40)
        
        # 为第三个查询添加快速响应标记
        metadata = {}
        if "快速" in query:
            metadata['require_fast_response'] = 'true'
        
        response = router.process_query(
            query,
            metadata=metadata,
            temperature=0.3
        )
        print(f"回复: {response[:100]}...")  # 只打印前100字符

这个类封装了完整的路由和调用逻辑。关键点在于 route_request 方法,它根据用户输入和元数据选择模型,然后 call_model 方法负责实际的API调用。

4. 性能优化实战经验

在实际使用中,我发现有几个性能问题需要特别处理。

模型冷启动问题:火山方舟的模型在长时间无请求后,首次调用延迟较高。我的解决方案是添加一个预热机制。

import threading
import time
from concurrent.futures import ThreadPoolExecutor

class OptimizedRouter(VolcanoArkRouter):
    """带性能优化的路由器"""
    
    def __init__(self, config_path: str = "config.json"):
        super().__init__(config_path)
        self.warmup_executor = ThreadPoolExecutor(max_workers=3)
        self.response_cache = {}  # 简单的响应缓存
        self.cache_lock = threading.Lock()
        
        # 启动时预热主要模型
        self.warmup_models()
    
    def warmup_models(self):
        """预热模型,减少冷启动延迟"""
        def warmup_model(model_id):
            try:
                # 发送一个轻量级测试请求
                test_message = [{"role": "user", "content": "ping"}]
                self.call_model(
                    model_id, 
                    test_message,
                    max_tokens=1,
                    temperature=0
                )
                print(f"模型 {model_id} 预热完成")
            except Exception as e:
                print(f"模型 {model_id} 预热失败: {e}")
        
        # 异步预热所有模型
        for model_id in ['volcano-ark-general', 'volcano-ark-fast']:
            self.warmup_executor.submit(warmup_model, model_id)
    
    def process_query_with_cache(self, user_input: str, cache_key: str = None, **kwargs) -> str:
        """带缓存的查询处理"""
        # 生成缓存键
        if cache_key is None:
            cache_key = f"{hash(user_input)}:{hash(str(kwargs))}"
        
        # 检查缓存
        with self.cache_lock:
            if cache_key in self.response_cache:
                print(f"缓存命中: {cache_key[:20]}...")
                return self.response_cache[cache_key]
        
        # 缓存未命中,正常处理
        model_id = self.route_request(user_input, **kwargs)
        messages = [{"role": "user", "content": user_input}]
        
        result = self.call_model(model_id, messages, **kwargs)
        
        if result.get('choices'):
            response = result['choices'][0]['message']['content']
            
            # 缓存结果(仅缓存较短的响应)
            if len(response) < 500:  # 只缓存500字符以内的响应
                with self.cache_lock:
                    self.response_cache[cache_key] = response
                    # 简单限制缓存大小
                    if len(self.response_cache) > 1000:
                        # 移除最旧的条目
                        oldest_key = next(iter(self.response_cache))
                        del self.response_cache[oldest_key]
            
            return response
        else:
            return "模型返回格式异常"
    
    def batch_process(self, queries: list, **kwargs) -> list:
        """批量处理查询,提高吞吐量"""
        from concurrent.futures import ThreadPoolExecutor, as_completed
        
        results = []
        
        with ThreadPoolExecutor(max_workers=5) as executor:
            # 提交所有任务
            future_to_query = {
                executor.submit(self.process_query, query, **kwargs): query
                for query in queries
            }
            
            # 收集结果
            for future in as_completed(future_to_query):
                query = future_to_query[future]
                try:
                    result = future.result(timeout=60)
                    results.append((query, result))
                except Exception as e:
                    results.append((query, f"处理失败: {e}"))
        
        return results

并发请求处理:使用线程池来处理批量请求,但要注意火山方舟的API可能有速率限制。我添加了简单的限流机制。

import time
from collections import deque

class RateLimitedRouter(OptimizedRouter):
    """带速率限制的路由器"""
    
    def __init__(self, config_path: str = "config.json", requests_per_minute: int = 60):
        super().__init__(config_path)
        self.requests_per_minute = requests_per_minute
        self.request_times = deque()
        self.rate_limit_lock = threading.Lock()
    
    def _wait_if_needed(self):
        """如果需要,等待以满足速率限制"""
        with self.rate_limit_lock:
            now = time.time()
            
            # 移除一分钟前的记录
            while self.request_times and now - self.request_times[0] > 60:
                self.request_times.popleft()
            
            # 检查是否超过限制
            if len(self.request_times) >= self.requests_per_minute:
                # 计算需要等待的时间
                oldest_time = self.request_times[0]
                wait_time = 60 - (now - oldest_time)
                if wait_time > 0:
                    time.sleep(wait_time)
                    # 更新记录
                    self.request_times.popleft()
            
            # 记录本次请求
            self.request_times.append(time.time())
    
    def call_model(self, model_id: str, messages: list, **kwargs) -> Dict[str, Any]:
        """重写call_model,添加速率限制"""
        self._wait_if_needed()
        return super().call_model(model_id, messages, **kwargs)

5. 常见问题与解决方案

在实际部署中,我遇到了几个典型问题,这里分享我的解决方法。

问题1:路由规则不生效 症状:所有请求都走到了默认模型,规则似乎被忽略了。 排查:检查条件评估逻辑。我发现最初的简单字符串匹配在处理复杂条件时有问题。 解决:改用安全的表达式求值库,比如 asteval

from asteval import Interpreter

class ImprovedRouter(VolcanoArkRouter):
    """使用安全表达式求值的改进路由器"""
    
    def __init__(self, config_path: str = "config.json"):
        super().__init__(config_path)
        self.aeval = Interpreter()
    
    def evaluate_condition(self, condition: str, context: Dict[str, Any]) -> bool:
        """使用asteval安全评估表达式"""
        # 将上下文变量注入求值环境
        self.aeval.symtable['user_input'] = context.get('user_input', '')
        self.aeval.symtable['user_metadata'] = context.get('user_metadata', {})
        
        # 添加一些有用的函数
        self.aeval.symtable['contains'] = lambda s, substr: substr in s
        self.aeval.symtable['startsWith'] = lambda s, prefix: s.startswith(prefix)
        self.aeval.symtable['len'] = len
        
        try:
            result = self.aeval(condition)
            return bool(result)
        except Exception as e:
            print(f"条件评估失败: {condition}, 错误: {e}")
            return False

问题2:API调用超时 症状:某些复杂查询响应时间过长,导致超时。 解决:根据查询复杂度动态设置超时时间,并添加重试机制。

def call_model_with_retry(self, model_id: str, messages: list, max_retries: int = 3, **kwargs):
    """带重试机制的模型调用"""
    base_timeout = kwargs.get('timeout', 30)
    
    for attempt in range(max_retries):
        try:
            # 根据尝试次数增加超时时间
            current_timeout = base_timeout * (attempt + 1)
            kwargs['timeout'] = current_timeout
            
            return self.call_model(model_id, messages, **kwargs)
        except requests.exceptions.Timeout:
            if attempt == max_retries - 1:
                raise
            print(f"请求超时,第{attempt + 1}次重试...")
            time.sleep(1 * (attempt + 1))  # 递增等待
        except requests.exceptions.ConnectionError:
            if attempt == max_retries - 1:
                raise
            print(f"连接错误,第{attempt + 1}次重试...")
            time.sleep(2 * (attempt + 1))

问题3:成本控制 症状:某些高成本模型被过度使用,导致API费用飙升。 解决:添加成本监控和预算限制。

class CostAwareRouter(ImprovedRouter):
    """成本感知的路由器"""
    
    def __init__(self, config_path: str = "config.json", daily_budget: float = 10.0):
        super().__init__(config_path)
        self.daily_budget = daily_budget
        self.daily_cost = 0.0
        self.cost_lock = threading.Lock()
        self.reset_time = self._get_next_reset_time()
    
    def _get_next_reset_time(self):
        """获取下一次成本重置时间(每天UTC零点)"""
        now = datetime.utcnow()
        tomorrow = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
        return tomorrow
    
    def _check_and_reset_cost(self):
        """检查并重置每日成本"""
        now = datetime.utcnow()
        if now >= self.reset_time:
            with self.cost_lock:
                self.daily_cost = 0.0
                self.reset_time = self._get_next_reset_time()
    
    def _estimate_cost(self, model_id: str, response: Dict[str, Any]) -> float:
        """估算本次调用的成本"""
        model_config = self.models.get(model_id)
        if not model_config or 'cost_per_token' not in model_config:
            return 0.0
        
        # 简单估算:假设每个汉字/英文单词约1.5个token
        content = response.get('choices', [{}])[0].get('message', {}).get('content', '')
        estimated_tokens = len(content) * 1.5
        
        return estimated_tokens * model_config['cost_per_token']
    
    def process_query(self, user_input: str, **kwargs) -> str:
        """重写处理查询,添加成本检查"""
        self._check_and_reset_cost()
        
        # 检查是否超预算
        with self.cost_lock:
            if self.daily_cost >= self.daily_budget:
                return "今日预算已用完,请明天再试"
        
        # 正常处理
        model_id = self.route_request(user_input, **kwargs)
        messages = [{"role": "user", "content": user_input}]
        
        result = self.call_model_with_retry(model_id, messages, **kwargs)
        
        # 计算并记录成本
        cost = self._estimate_cost(model_id, result)
        with self.cost_lock:
            self.daily_cost += cost
        
        if result.get('choices'):
            return result['choices'][0]['message']['content']
        else:
            return "模型返回格式异常"

问题4:模型响应不一致 症状:相同输入在不同时间得到差异较大的输出。 解决:固定随机种子,并添加响应标准化处理。

def call_model(self, model_id: str, messages: list, **kwargs) -> Dict[str, Any]:
    """添加随机种子以确保可重复性"""
    model_config = self.models.get(model_id)
    
    payload = {
        "model": model_id,
        "messages": messages,
        "max_tokens": kwargs.get('max_tokens', model_config.get('max_tokens', 2048)),
        "temperature": kwargs.get('temperature', 0.7),
        "top_p": kwargs.get('top_p', 0.9),
        "seed": kwargs.get('seed', 42)  # 固定随机种子
    }
    
    # ... 其余调用逻辑不变

问题5:监控和日志不足 症状:出现问题难以调试,不知道哪个环节出错。 解决:添加详细的日志记录和监控指标。

import logging
from datetime import datetime

class MonitoredRouter(CostAwareRouter):
    """带监控的路由器"""
    
    def __init__(self, config_path: str = "config.json"):
        super().__init__(config_path)
        
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('model_router.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
        
        # 性能指标
        self.metrics = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'total_latency': 0.0,
            'model_counts': {}
        }
    
    def process_query(self, user_input: str, **kwargs) -> str:
        """添加监控的查询处理"""
        start_time = time.time()
        self.metrics['total_requests'] += 1
        
        try:
            result = super().process_query(user_input, **kwargs)
            
            # 记录成功
            latency = time.time() - start_time
            self.metrics['successful_requests'] += 1
            self.metrics['total_latency'] += latency
            
            # 记录模型使用情况
            model_id = kwargs.get('model_id', 'unknown')
            self.metrics['model_counts'][model_id] = self.metrics['model_counts'].get(model_id, 0) + 1
            
            self.logger.info(f"请求成功 - 输入: {user_input[:50]}... - 延迟: {latency:.2f}s")
            
            return result
            
        except Exception as e:
            # 记录失败
            self.metrics['failed_requests'] += 1
            self.logger.error(f"请求失败 - 输入: {user_input[:50]}... - 错误: {e}")
            raise
    
    def get_metrics(self):
        """获取当前性能指标"""
        avg_latency = 0
        if self.metrics['successful_requests'] > 0:
            avg_latency = self.metrics['total_latency'] / self.metrics['successful_requests']
        
        return {
            'total_requests': self.metrics['total_requests'],
            'success_rate': self.metrics['successful_requests'] / max(self.metrics['total_requests'], 1),
            'average_latency': avg_latency,
            'model_distribution': self.metrics['model_counts']
        }

6. 安全考量与生产部署

在生产环境中,安全是重中之重。以下是我实施的几个关键安全措施。

认证机制:所有API密钥都通过环境变量管理,绝不硬编码在代码中。使用不同的密钥对应不同的模型,实现最小权限原则。

请求限流:除了前面提到的速率限制,我还添加了基于用户或IP的限流。

from collections import defaultdict
import time

class UserRateLimitedRouter(MonitoredRouter):
    """基于用户的速率限制"""
    
    def __init__(self, config_path: str = "config.json", user_limit: int = 60):
        super().__init__(config_path)
        self.user_limit = user_limit  # 每分钟请求数
        self.user_requests = defaultdict(deque)
    
    def check_user_limit(self, user_id: str) -> bool:
        """检查用户是否超过速率限制"""
        now = time.time()
        user_queue = self.user_requests[user_id]
        
        # 清理一分钟前的记录
        while user_queue and now - user_queue[0] > 60:
            user_queue.popleft()
        
        # 检查限制
        if len(user_queue) >= self.user_limit:
            return False
        
        # 记录本次请求
        user_queue.append(now)
        return True
    
    def process_query(self, user_input: str, user_id: str = "anonymous", **kwargs):
        """添加用户限流的查询处理"""
        if not self.check_user_limit(user_id):
            raise RateLimitError(f"用户 {user_id} 请求过于频繁")
        
        return super().process_query(user_input, **kwargs)

数据隐私保护:对于敏感数据,我添加了请求日志脱敏功能。

import re

class PrivacySafeRouter(UserRateLimitedRouter):
    """隐私安全的路由器"""
    
    def __init__(self, config_path: str = "config.json"):
        super().__init__(config_path)
        
        # 定义需要脱敏的模式(示例)
        self.sensitive_patterns = [
            r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',  # 信用卡号
            r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b',  # 社会安全号(示例)
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',  # 邮箱
        ]
    
    def sanitize_log(self, text: str) -> str:
        """脱敏敏感信息"""
        sanitized = text
        for pattern in self.sensitive_patterns:
            sanitized = re.sub(pattern, '[REDACTED]', sanitized)
        return sanitized
    
    def process_query(self, user_input: str, **kwargs):
        """重写以使用脱敏日志"""
        start_time = time.time()
        
        # 脱敏后的输入用于日志
        sanitized_input = self.sanitize_log(user_input)
        
        try:
            result = super().process_query(user_input, **kwargs)
            
            latency = time.time() - start_time
            self.logger.info(f"请求成功 - 输入: {sanitized_input[:50]}... - 延迟: {latency:.2f}s")
            
            return result
            
        except Exception as e:
            self.logger.error(f"请求失败 - 输入: {sanitized_input[:50]}... - 错误: {e}")
            raise

生产部署建议

  1. 容器化部署:使用Docker封装整个应用,确保环境一致性。
  2. 健康检查:添加健康检查端点,监控服务状态。
  3. 配置热更新:实现配置的热重载,无需重启服务即可更新路由规则。
  4. 多区域部署:如果服务全球用户,考虑在不同区域部署实例,减少延迟。

7. 完整的生产示例

最后,分享一个完整的生产就绪版本的主要组件:

# config_loader.py - 支持热重载的配置加载
import json
import threading
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class ConfigManager:
    """配置管理器,支持热重载"""
    
    def __init__(self, config_path: str):
        self.config_path = config_path
        self.config = self.load_config()
        self.last_modified = self.get_file_mtime()
        self.lock = threading.Lock()
        self.callbacks = []
        
        # 启动文件监控
        self.start_watcher()
    
    def load_config(self):
        """加载配置文件"""
        with open(self.config_path, 'r', encoding='utf-8') as f:
            return json.load(f)
    
    def get_file_mtime(self):
        """获取文件修改时间"""
        import os
        return os.path.getmtime(self.config_path)
    
    def check_and_reload(self):
        """检查并重新加载配置"""
        current_mtime = self.get_file_mtime()
        if current_mtime > self.last_modified:
            with self.lock:
                print("检测到配置变更,重新加载...")
                self.config = self.load_config()
                self.last_modified = current_mtime
                
                # 通知所有回调
                for callback in self.callbacks:
                    callback(self.config)
    
    def start_watcher(self):
        """启动文件系统监控"""
        class ConfigHandler(FileSystemEventHandler):
            def __init__(self, manager):
                self.manager = manager
            
            def on_modified(self, event):
                if event.src_path == self.manager.config_path:
                    self.manager.check_and_reload()
        
        event_handler = ConfigHandler(self)
        observer = Observer()
        observer.schedule(event_handler, path='.', recursive=False)
        observer.start()
    
    def register_callback(self, callback):
        """注册配置变更回调"""
        self.callbacks.append(callback)
    
    def get_config(self):
        """获取当前配置"""
        with self.lock:
            return self.config.copy()

# main.py - 主服务入口
from flask import Flask, request, jsonify
import os

app = Flask(__name__)

# 全局路由器实例
router = None

def create_router():
    """创建路由器实例"""
    global router
    router = PrivacySafeRouter('config.json')

@app.before_first_request
def initialize():
    """初始化路由器"""
    create_router()

@app.route('/chat', methods=['POST'])
def chat():
    """聊天接口"""
    try:
        data = request.json
        user_input = data.get('message', '')
        user_id = data.get('user_id', 'anonymous')
        metadata = data.get('metadata', {})
        
        # 处理查询
        response = router.process_query(
            user_input=user_input,
            user_id=user_id,
            metadata=metadata,
            temperature=data.get('temperature', 0.7),
            max_tokens=data.get('max_tokens', 2048)
        )
        
        # 获取当前指标
        metrics = router.get_metrics()
        
        return jsonify({
            'success': True,
            'response': response,
            'metrics': metrics
        })
        
    except Exception as e:
        return jsonify({
            'success': False,
            'error': str(e)
        }), 500

@app.route('/health', methods=['GET'])
def health_check():
    """健康检查端点"""
    return jsonify({
        'status': 'healthy',
        'timestamp': time.time(),
        'metrics': router.get_metrics() if router else {}
    })

@app.route('/config', methods=['GET'])
def get_config():
    """获取当前配置(仅限内部访问)"""
    # 这里应该添加认证
    return jsonify(router.config if router else {})

if __name__ == '__main__':
    # 在生产环境中,应该使用Gunicorn或uWSGI
    app.run(host='0.0.0.0', port=5000, debug=False)

这个完整的实现包含了配置热重载、Web服务接口、健康检查等生产环境需要的功能。

总结与延伸思考

通过这次Claude Code Router与火山方舟模型的集成实践,我深刻体会到一个好的路由系统对于AI服务管理的重要性。它不仅提高了资源利用率,还通过智能路由优化了用户体验和成本控制。

在实际部署中,有几点特别值得注意:

  1. 监控告警:一定要建立完善的监控体系,包括API调用成功率、响应延迟、成本消耗等关键指标。
  2. 灰度发布:更新路由规则或模型配置时,采用灰度发布策略,先小流量验证。
  3. 容灾降级:当某个模型服务不可用时,要有自动降级机制,切换到备用模型。

最后,分享三个延伸思考题,帮助大家更深入地理解这个系统:

  1. 如何实现基于实时性能指标(如响应时间、错误率)的动态路由,而不是静态规则?

  2. 在多租户场景下,如何为不同客户配置不同的路由策略和模型访问权限?

  3. 当需要支持除火山方舟之外的其他模型平台(如OpenAI、Anthropic)时,架构应该如何扩展?

希望这篇实战指南能帮助大家更好地配置和使用Claude Code Router与火山方舟模型。在实际项目中,最重要的是根据具体需求灵活调整,不断优化。毕竟,没有最好的架构,只有最适合的架构。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐