基于Claude AI模板库快速构建智能应用:架构解析与实战指南
在人工智能应用开发领域,API调用与集成是构建智能系统的核心技术。其原理在于通过标准化的接口协议,使开发者能够便捷地调用云端大语言模型的强大能力,而无需从零训练模型。这项技术的核心价值在于显著降低了AI应用开发的门槛,提升了开发效率,使得开发者可以专注于业务逻辑而非底层算法实现。典型的应用场景包括智能对话系统、文档分析与摘要、代码生成助手以及各类需要自然语言理解与生成的自动化工具。本文聚焦于一个名
1. 项目概述与核心价值
最近在折腾Claude AI应用开发,发现一个挺有意思的开源项目,叫“walidboulanouar/ay-claude-templates”。这名字乍一看有点长,但拆开来看就明白了——walidboulanouar是作者,ay-claude-templates是项目名,直译过来就是“Claude AI模板”。我花了一周时间深度体验了这个项目,发现它确实解决了不少开发者的痛点。
简单来说,这是一个专门为Claude AI API设计的模板集合。如果你用过OpenAI的API,应该知道官方提供了不少示例代码和模板,但Claude这边相对就少一些。这个项目正好填补了这个空白,提供了从基础对话到复杂应用的各种现成模板,让你能快速上手Claude API开发。我实测下来,用这些模板搭建一个基础的Claude应用,从零开始到跑通第一个对话,大概只需要15分钟。
这个项目特别适合三类人:一是刚开始接触Claude API的开发者,不想从零开始写样板代码;二是需要快速验证某个功能是否可行的产品经理或创业者;三是已经有OpenAI开发经验,想快速迁移到Claude生态的工程师。我自己属于第三类,之前主要用GPT-4的API,现在想试试Claude-3系列模型,这个模板库帮我省了不少时间。
2. 项目架构与技术栈解析
2.1 整体架构设计思路
这个模板项目的架构设计得很清晰,采用了模块化的思想。整个项目按照功能场景划分成了多个独立的模板,每个模板都是一个完整的、可独立运行的应用。这种设计的好处是,你可以根据需求只使用其中某个模板,而不需要引入整个项目的依赖。
我仔细研究了项目结构,发现它主要包含以下几个核心模块:
- 基础对话模板 :最简单的单轮对话实现,适合新手入门
- 多轮对话管理 :包含对话历史管理、上下文维护的完整方案
- 流式响应处理 :实时显示AI生成内容,提升用户体验
- 文件处理模板 :支持上传PDF、Word、Excel等文件,让Claude读取文件内容
- 工具调用模板 :演示如何让Claude调用外部API或函数
- Web界面模板 :提供基于Web的交互界面,方便演示和测试
每个模板都遵循相同的目录结构:配置文件、主程序文件、依赖说明、示例数据。这种一致性让学习成本大大降低,你学会了一个模板,其他模板的用法也就基本掌握了。
2.2 技术栈选择与考量
项目主要基于Python生态,这是目前AI应用开发的主流选择。核心依赖包括:
- anthropic :官方的Claude Python SDK,这是与Claude API交互的基础
- fastapi :用于构建Web API,轻量级且性能优秀
- uvicorn :ASGI服务器,用于运行FastAPI应用
- python-dotenv :管理环境变量,保护API密钥等敏感信息
- httpx :现代化的HTTP客户端,用于异步请求
选择这些技术栈有几个考虑:首先是成熟度,这些都是经过大量项目验证的稳定库;其次是社区支持,遇到问题容易找到解决方案;最后是学习曲线,这些库的API设计都比较友好,新手也能快速上手。
我特别欣赏作者对异步编程的处理。Claude API的响应时间相对较长,特别是在处理复杂任务时,同步请求会导致界面卡顿。项目中的流式响应模板就很好地解决了这个问题,通过异步请求和服务器发送事件(SSE)技术,实现了实时的内容流式传输。
2.3 配置管理与安全性设计
安全性是AI应用开发中经常被忽视但极其重要的一环。这个项目在安全设计上做得不错,主要体现在几个方面:
首先是API密钥的管理。项目使用 .env 文件存储敏感信息,并通过 python-dotenv 加载。 .env 文件被排除在版本控制之外,避免了密钥泄露的风险。我建议在实际部署时,可以进一步使用密钥管理服务,但作为开发阶段的方案,这已经足够安全。
其次是请求限流和错误处理。模板中包含了完善的异常处理逻辑,比如网络超时、API配额不足、无效输入等情况都有相应的处理。这对于生产环境应用至关重要,能避免因为单点故障导致整个服务不可用。
最后是输入验证。虽然Claude API本身有一定的内容过滤机制,但客户端也应该进行基本的输入验证。项目中的Web模板就包含了输入长度检查、内容过滤等基础验证,防止恶意输入或意外的大数据量请求。
3. 核心模板深度解析
3.1 基础对话模板的实现细节
基础对话模板是这个项目的入门级示例,但里面包含的细节比想象中要多。我以这个模板为例,详细拆解一下Claude API调用的完整流程。
首先需要初始化客户端,这里有个细节需要注意:Claude API支持多个区域端点,包括北美、欧洲和亚太。选择离你用户最近的区域能显著降低延迟。模板中默认使用北美端点,但你可以根据实际情况修改:
import anthropic
from dotenv import load_dotenv
import os
load_dotenv()
client = anthropic.Anthropic(
api_key=os.getenv("ANTHROPIC_API_KEY"),
# 如果需要使用其他区域,可以这样配置
# base_url="https://api.eu.anthropic.com" # 欧洲区域
)
接下来是构造请求。Claude API的请求结构与OpenAI有所不同,需要特别注意 messages 参数的格式。每个消息都需要指定 role (system、user、assistant)和 content 。system消息用于设定AI的行为模式,这个位置很关键:
response = client.messages.create(
model="claude-3-opus-20240229", # 指定模型版本
max_tokens=1000, # 最大生成token数
temperature=0.7, # 创造性参数,0-1之间
system="你是一个有帮助的助手,回答要简洁专业。", # 系统提示
messages=[
{"role": "user", "content": "请解释一下机器学习的基本概念"}
]
)
这里有几个参数需要特别注意: temperature 控制输出的随机性,值越高回答越有创造性,但可能偏离事实;值越低回答越确定,但可能显得呆板。对于事实性问答,建议设置在0.3-0.5;对于创意写作,可以提高到0.8-0.9。
max_tokens 需要根据实际需求设置。Claude-3系列模型有上下文窗口限制(通常是200K tokens),你需要预留足够的tokens给AI生成回答。一个经验公式是:输入tokens + max_tokens ≤ 上下文窗口 - 安全余量(建议留10%作为缓冲)。
3.2 流式响应模板的技术实现
流式响应是提升用户体验的关键技术。当AI需要生成较长内容时,如果等全部生成完再显示,用户会感到明显的等待延迟。流式响应能让内容逐字逐句地显示出来,就像真人打字一样。
这个项目的流式响应模板实现得很优雅。核心是利用了Claude API的stream参数和Python的异步编程:
import asyncio
from anthropic import AsyncAnthropic
async def stream_response():
client = AsyncAnthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
async with client.messages.stream(
model="claude-3-sonnet-20240229",
max_tokens=1000,
messages=[{"role": "user", "content": "写一篇关于人工智能的短文"}]
) as stream:
async for event in stream:
if event.type == "content_block_delta":
# 实时输出生成的文本
print(event.delta.text, end="", flush=True)
在实际使用中,我发现了几个优化点。首先是缓冲处理:如果每个字符都立即输出,会导致界面频繁刷新,影响性能。更好的做法是积累一定量的文本(比如每50个字符)再批量输出。其次是错误恢复:网络不稳定时流可能会中断,需要实现重连机制。模板中虽然没包含这些高级功能,但提供了基础的框架,你可以在此基础上扩展。
另一个重要细节是token计数。在流式响应中,你需要实时统计已生成的tokens,避免超出配额。Claude API的响应中包含了usage信息,但只在流结束时才提供完整的统计。我建议在客户端也实现一个简单的token计数器,基于平均每个中文字符约等于1.5个tokens的经验值进行估算。
3.3 文件处理模板的实用技巧
文件处理是Claude的一个强项,特别是Claude-3系列模型支持多种文件格式的直接读取。这个模板展示了如何处理PDF、Word、Excel等常见格式。
我以PDF处理为例,分享一些实战经验。首先,不是所有PDF都能被完美解析,特别是扫描版PDF或特殊排版的文档。模板中使用的是 PyPDF2 库,这是一个基础但稳定的选择:
import PyPDF2
def extract_text_from_pdf(pdf_path):
text = ""
with open(pdf_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
for page_num in range(len(reader.pages)):
page = reader.pages[page_num]
text += page.extract_text() + "\n\n"
return text
但在实际使用中,我发现有几个问题需要注意。一是编码问题,有些PDF中的特殊字符可能无法正确提取;二是布局问题,提取的文本可能失去原有的段落结构;三是图片中的文字无法提取。
对于更复杂的PDF处理,我推荐结合使用其他工具。比如先用 pdfplumber 尝试提取,它对于表格数据的处理比PyPDF2更好。如果还是不行,可以考虑使用OCR技术,但这就需要额外的依赖和处理时间。
文件上传到Claude API时,需要注意文件大小限制。Claude API通常有10MB的单文件限制,对于更大的文件需要先进行分割。模板中提供了一个简单的分割函数,但实际应用中可能需要更智能的分割策略,比如按章节分割、按页数分割等。
3.4 工具调用模板的高级应用
工具调用(Function Calling)是让AI与现实世界交互的关键技术。这个模板展示了如何定义工具、如何解析AI的工具调用请求、如何执行工具并返回结果。
定义工具时,需要提供详细的描述和参数说明。Claude会根据这些描述决定是否以及如何调用工具:
tools = [
{
"name": "get_weather",
"description": "获取指定城市的天气信息",
"input_schema": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称,如'北京'、'上海'"
}
},
"required": ["city"]
}
}
]
在实际实现中,有几个细节需要特别注意。一是工具描述的准确性:描述越准确,AI调用工具的准确性越高。二是错误处理:工具执行可能失败,需要提供有意义的错误信息给AI。三是工具组合:复杂任务可能需要连续调用多个工具,需要维护好对话状态。
我基于这个模板扩展了一个实际应用:智能日程管理系统。定义了创建事件、查询日程、修改事件、删除事件等多个工具,Claude能够理解自然语言指令并调用相应的工具。比如用户说"明天下午3点安排一个团队会议,主题是项目评审",Claude会解析出时间、类型、主题等信息,然后调用创建事件工具。
4. 实战:基于模板构建完整应用
4.1 项目初始化与环境配置
现在我们来实际构建一个基于这些模板的完整应用。假设我们要开发一个智能文档分析助手,能够上传文档、提取关键信息、回答相关问题。
首先从模板项目开始,我建议采用"模板组合"的方式。不要只用一个模板,而是根据需求选择多个模板进行组合。对于文档分析助手,我们需要:基础对话模板(核心交互)、文件处理模板(文档上传)、流式响应模板(提升体验)。
创建项目结构:
smart-doc-assistant/
├── app/
│ ├── main.py # 主应用文件
│ ├── handlers/ # 请求处理器
│ ├── utils/ # 工具函数
│ └── templates/ # Web模板(如果需要界面)
├── requirements.txt # 依赖列表
├── .env # 环境变量(不要提交到Git)
└── README.md # 项目说明
安装依赖时要注意版本兼容性。Claude的Python SDK更新比较频繁,建议在requirements.txt中指定版本范围:
anthropic>=0.25.0,<0.30.0
fastapi>=0.104.0
uvicorn[standard]>=0.24.0
python-dotenv>=1.0.0
PyPDF2>=3.0.0
python-multipart>=0.0.6
环境配置方面,除了基本的API密钥,我还建议配置一些应用级参数:
# .env文件示例
ANTHROPIC_API_KEY=your_key_here
MODEL_NAME=claude-3-sonnet-20240229 # 默认模型
MAX_TOKENS=4000 # 每次请求最大tokens
TEMPERATURE=0.3 # 默认温度值
REQUEST_TIMEOUT=30 # 请求超时时间(秒)
4.2 核心功能模块实现
文档分析助手的核心是文档处理模块。我设计了一个 DocumentProcessor 类,封装了从文件上传到内容提取的全流程:
class DocumentProcessor:
def __init__(self, client):
self.client = client
self.supported_formats = ['.pdf', '.txt', '.docx', '.md']
async def process_document(self, file_path: str, file_type: str) -> Dict:
"""处理上传的文档"""
# 1. 验证文件类型
if not self._validate_file_type(file_type):
raise ValueError(f"不支持的文件格式: {file_type}")
# 2. 提取文本内容
content = await self._extract_content(file_path, file_type)
# 3. 如果内容太长,进行智能分割
if self._estimate_tokens(content) > 100000: # 约66K汉字
chunks = self._smart_chunking(content)
else:
chunks = [content]
# 4. 为每个分块生成摘要
summaries = []
for chunk in chunks:
summary = await self._generate_summary(chunk)
summaries.append(summary)
return {
"original_content": content,
"chunks": chunks,
"summaries": summaries,
"total_tokens": self._estimate_tokens(content)
}
def _smart_chunking(self, content: str) -> List[str]:
"""智能分块:按段落、标题等自然边界分割"""
# 这里实现具体的分块逻辑
# 比如按Markdown标题分割、按段落分割等
pass
问答模块是另一个核心。我们需要处理两种类型的问答:基于文档内容的问答和通用知识问答。这里用到了RAG(检索增强生成)的基本思想:
class QASystem:
def __init__(self, document_store):
self.document_store = document_store
self.conversation_history = []
async def answer_question(self, question: str, context_doc_id: str = None) -> str:
# 如果有指定文档,先检索相关段落
if context_doc_id:
relevant_text = await self._retrieve_relevant_text(question, context_doc_id)
prompt = self._build_contextual_prompt(question, relevant_text)
else:
prompt = self._build_general_prompt(question)
# 调用Claude API
response = await self.client.messages.create(
model=self.model,
max_tokens=self.max_tokens,
messages=[
{"role": "system", "content": "你是一个专业的文档分析助手。"},
*self.conversation_history[-5:], # 保留最近5轮对话作为上下文
{"role": "user", "content": prompt}
]
)
# 更新对话历史
self.conversation_history.append({"role": "user", "content": question})
self.conversation_history.append({"role": "assistant", "content": response.content[0].text})
return response.content[0].text
4.3 Web界面集成与用户体验优化
虽然模板项目提供了基础的Web界面,但在实际产品中,我们需要更完善的用户体验。我基于FastAPI和Jinja2模板引擎构建了一个更友好的界面。
首先设计API端点:
from fastapi import FastAPI, UploadFile, File, Form, WebSocket
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
app = FastAPI(title="智能文档分析助手")
@app.get("/", response_class=HTMLResponse)
async def home():
"""主页面"""
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/api/upload")
async def upload_document(file: UploadFile = File(...)):
"""上传文档接口"""
# 保存文件
file_path = f"./uploads/{file.filename}"
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
# 处理文档
processor = DocumentProcessor(client)
result = await processor.process_document(file_path, file.filename)
return {
"success": True,
"doc_id": generate_doc_id(),
"summary": result["summaries"][0] if result["summaries"] else "",
"total_pages": len(result["chunks"])
}
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
"""WebSocket聊天接口,支持流式响应"""
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
question = data.get("question")
doc_id = data.get("doc_id")
# 创建流式响应
async with client.messages.stream(
model=MODEL_NAME,
max_tokens=MAX_TOKENS,
messages=build_messages(question, doc_id)
) as stream:
async for event in stream:
if event.type == "content_block_delta":
text = event.delta.text
await websocket.send_json({
"type": "text_delta",
"content": text
})
except WebSocketDisconnect:
print("客户端断开连接")
前端界面需要特别关注流式响应的展示。我使用JavaScript的WebSocket API实现实时显示:
class ChatInterface {
constructor() {
this.ws = new WebSocket(`ws://${window.location.host}/ws/chat`);
this.setupWebSocket();
}
setupWebSocket() {
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'text_delta') {
// 逐字显示AI回复
this.appendToChat(data.content);
}
};
}
async sendMessage(question, docId) {
// 显示用户消息
this.appendToChat(question, 'user');
// 发送到WebSocket
this.ws.send(JSON.stringify({
question: question,
doc_id: docId
}));
// 创建AI回复区域
this.createAIReplyArea();
}
appendToChat(content, sender = 'ai') {
const chatArea = document.getElementById('chat-area');
if (sender === 'user') {
chatArea.innerHTML += `<div class="user-message">${content}</div>`;
} else {
// AI回复使用最后一个AI消息元素追加内容
const lastAiMessage = chatArea.querySelector('.ai-message:last-child');
if (lastAiMessage) {
lastAiMessage.textContent += content;
}
}
chatArea.scrollTop = chatArea.scrollHeight;
}
}
4.4 性能优化与错误处理
在实际部署中,性能优化是必须考虑的问题。Claude API的调用有延迟,特别是处理长文档时。我总结了几个优化策略:
缓存策略 :对于处理过的文档,将提取的文本和生成的摘要缓存起来。可以使用Redis或内存缓存:
from functools import lru_cache
import hashlib
class CachedDocumentProcessor(DocumentProcessor):
@lru_cache(maxsize=100)
def get_document_hash(self, file_path: str) -> str:
"""计算文档内容的哈希值,用于缓存键"""
with open(file_path, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
async def process_document(self, file_path: str, file_type: str) -> Dict:
doc_hash = self.get_document_hash(file_path)
# 检查缓存
cached = cache.get(doc_hash)
if cached:
return cached
# 处理文档(原始逻辑)
result = await super().process_document(file_path, file_type)
# 存入缓存,设置过期时间
cache.set(doc_hash, result, timeout=3600) # 1小时过期
return result
异步批处理 :当有多个文档需要处理时,使用异步并发可以提高效率:
import asyncio
from typing import List
async def batch_process_documents(file_paths: List[str]) -> List[Dict]:
"""批量处理文档"""
tasks = []
for file_path in file_paths:
task = process_single_document(file_path)
tasks.append(task)
# 限制并发数,避免超过API限制
semaphore = asyncio.Semaphore(5) # 最大5个并发
async def process_with_semaphore(task):
async with semaphore:
return await task
results = await asyncio.gather(
*[process_with_semaphore(task) for task in tasks],
return_exceptions=True
)
# 处理结果,分离成功和失败
successful = []
failed = []
for i, result in enumerate(results):
if isinstance(result, Exception):
failed.append({
"file": file_paths[i],
"error": str(result)
})
else:
successful.append(result)
return {
"successful": successful,
"failed": failed
}
错误处理与重试 :网络请求可能失败,需要实现重试机制:
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
class RobustClaudeClient:
def __init__(self, api_key: str, max_retries: int = 3):
self.client = anthropic.Anthropic(api_key=api_key)
self.max_retries = max_retries
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def send_message_with_retry(self, messages, **kwargs):
"""带重试的消息发送"""
try:
response = await self.client.messages.create(
messages=messages,
**kwargs
)
return response
except anthropic.APIConnectionError as e:
print(f"网络连接错误: {e}")
raise
except anthropic.RateLimitError as e:
print(f"速率限制: {e}")
# 等待更长时间再重试
await asyncio.sleep(10)
raise
except anthropic.APIStatusError as e:
print(f"API错误: {e.status_code} - {e.response}")
# 4xx错误不重试(除了429)
if e.status_code == 429: # 速率限制
await asyncio.sleep(5)
raise
elif 400 <= e.status_code < 500:
raise # 客户端错误,不重试
else:
raise # 服务器错误,会触发重试
5. 部署与运维实践
5.1 本地开发环境配置
在开始部署之前,我们先完善本地开发环境。我推荐使用Docker进行环境隔离,这样可以确保开发、测试、生产环境的一致性。
创建Dockerfile:
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建非root用户
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
对应的docker-compose.yml:
version: '3.8'
services:
web:
build: .
ports:
- "8000:8000"
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- MODEL_NAME=claude-3-sonnet-20240229
- REDIS_URL=redis://redis:6379/0
volumes:
- ./uploads:/app/uploads
- ./logs:/app/logs
depends_on:
- redis
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
volumes:
redis_data:
本地开发时,我习惯使用热重载模式,这样修改代码后不需要手动重启服务:
# 使用uvicorn的热重载功能
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
# 或者使用watchfiles(更快的文件监控)
pip install watchfiles
watchfiles "uvicorn app.main:app --host 0.0.0.0 --port 8000" .
5.2 生产环境部署策略
生产环境部署需要考虑更多因素:性能、安全、监控、扩展性等。我分享一个基于云服务器的部署方案。
服务器配置建议 :
- CPU:至少2核,推荐4核(用于处理并发请求)
- 内存:至少4GB,推荐8GB(缓存和并发处理需要内存)
- 存储:至少20GB SSD(用于存储上传的文件和日志)
- 带宽:至少100Mbps(如果用户需要上传大文件)
Nginx配置 :作为反向代理和负载均衡器
# /etc/nginx/sites-available/smart-doc-assistant
upstream app_servers {
server 127.0.0.1:8000;
server 127.0.0.1:8001;
server 127.0.0.1:8002;
# 可以添加更多worker进程
}
server {
listen 80;
server_name your-domain.com;
# 重定向到HTTPS
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name your-domain.com;
ssl_certificate /etc/letsencrypt/live/your-domain.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/your-domain.com/privkey.pem;
# SSL优化配置
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512;
ssl_prefer_server_ciphers off;
# 文件上传大小限制
client_max_body_size 100M;
location / {
proxy_pass http://app_servers;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket支持
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
# 超时设置
proxy_read_timeout 300s;
proxy_connect_timeout 75s;
}
# 静态文件服务
location /static/ {
alias /app/static/;
expires 1y;
add_header Cache-Control "public, immutable";
}
# 上传文件访问
location /uploads/ {
alias /app/uploads/;
# 限制访问权限
internal;
}
}
Supervisor配置 :管理应用进程
; /etc/supervisor/conf.d/smart-doc-assistant.conf
[program:smart-doc-assistant]
command=/usr/local/bin/uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
directory=/app
user=appuser
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stderr_logfile=/var/log/smart-doc-assistant.err.log
stdout_logfile=/var/log/smart-doc-assistant.out.log
; 启动多个worker进程
[program:smart-doc-assistant-8001]
command=/usr/local/bin/uvicorn app.main:app --host 0.0.0.0 --port 8001 --workers 4
directory=/app
user=appuser
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stderr_logfile=/var/log/smart-doc-assistant-8001.err.log
stdout_logfile=/var/log/smart-doc-assistant-8001.out.log
5.3 监控与日志管理
生产环境必须有完善的监控和日志系统。我推荐使用以下组合:
结构化日志 :使用JSON格式的日志,方便后续处理和分析
import json
import logging
from pythonjsonlogger import jsonlogger
def setup_logging():
logger = logging.getLogger()
# 控制台输出(开发环境)
console_handler = logging.StreamHandler()
console_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
console_handler.setFormatter(logging.Formatter(console_format))
# 文件输出(生产环境)
file_handler = logging.FileHandler('/app/logs/app.log')
json_formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s'
)
file_handler.setFormatter(json_formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
logger.setLevel(logging.INFO)
return logger
# 在应用中使用
logger = setup_logging()
@app.post("/api/upload")
async def upload_document(file: UploadFile = File(...)):
logger.info("文件上传开始", extra={
"filename": file.filename,
"content_type": file.content_type,
"file_size": file.size
})
try:
# 处理逻辑
logger.info("文件处理成功", extra={
"filename": file.filename,
"processing_time": processing_time
})
except Exception as e:
logger.error("文件处理失败", extra={
"filename": file.filename,
"error": str(e),
"error_type": type(e).__name__
})
raise
性能监控 :使用Prometheus和Grafana
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response
# 定义指标
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
REQUEST_LATENCY = Histogram(
'http_request_duration_seconds',
'HTTP request latency',
['method', 'endpoint']
)
# 中间件记录指标
@app.middleware("http")
async def monitor_requests(request: Request, call_next):
start_time = time.time()
try:
response = await call_next(request)
status_code = response.status_code
except Exception:
status_code = 500
raise
finally:
latency = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=status_code
).inc()
REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.url.path
).observe(latency)
return response
# Prometheus指标端点
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")
健康检查端点 :
@app.get("/health")
async def health_check():
"""健康检查端点"""
checks = {
"api_connection": await check_claude_api(),
"database": await check_database(),
"cache": await check_cache(),
"storage": await check_storage()
}
status = "healthy" if all(checks.values()) else "unhealthy"
return {
"status": status,
"timestamp": datetime.now().isoformat(),
"checks": checks
}
async def check_claude_api() -> bool:
"""检查Claude API连接"""
try:
# 发送一个简单的测试请求
response = await client.messages.create(
model=MODEL_NAME,
max_tokens=10,
messages=[{"role": "user", "content": "test"}]
)
return response is not None
except Exception:
return False
5.4 安全加固措施
生产环境必须考虑安全防护,我总结了几项关键措施:
API密钥管理 :
- 使用密钥管理服务(如AWS Secrets Manager、Azure Key Vault)
- 定期轮换密钥
- 不同环境使用不同密钥
import boto3
from botocore.exceptions import ClientError
def get_secret(secret_name: str) -> str:
"""从AWS Secrets Manager获取密钥"""
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=os.getenv('AWS_REGION')
)
try:
response = client.get_secret_value(SecretId=secret_name)
return response['SecretString']
except ClientError as e:
logger.error(f"获取密钥失败: {e}")
raise
# 使用示例
ANTHROPIC_API_KEY = get_secret("anthropic/api-key")
输入验证与过滤 :
from pydantic import BaseModel, validator
import re
class ChatRequest(BaseModel):
message: str
doc_id: str = None
@validator('message')
def validate_message(cls, v):
# 检查长度
if len(v) > 5000:
raise ValueError('消息过长')
# 检查是否有恶意内容
malicious_patterns = [
r'<script.*?>.*?</script>', # 脚本标签
r'on\w+=".*?"', # 事件处理器
r'javascript:', # JavaScript协议
]
for pattern in malicious_patterns:
if re.search(pattern, v, re.IGNORECASE):
raise ValueError('检测到潜在恶意内容')
return v.strip()
速率限制 :
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/api/chat")
@limiter.limit("10/minute") # 每分钟10次
async def chat_endpoint(request: Request, chat_request: ChatRequest):
# 处理逻辑
pass
6. 常见问题与解决方案
6.1 API调用相关问题
在实际使用中,API调用是最容易出问题的环节。我整理了常见的问题和解决方案:
问题1:API响应慢或超时
这是最常见的问题,特别是处理长文档时。Claude API对复杂问题的思考时间较长,可能导致超时。
解决方案 :
- 调整超时时间:根据任务复杂度设置合理的超时
import httpx
client = anthropic.Anthropic(
api_key=api_key,
timeout=httpx.Timeout(60.0, connect=10.0) # 总超时60秒,连接超时10秒
)
- 使用异步调用:避免阻塞主线程
async def process_with_timeout(messages, timeout=30):
try:
async with asyncio.timeout(timeout):
response = await client.messages.create(
model=MODEL_NAME,
max_tokens=1000,
messages=messages
)
return response
except asyncio.TimeoutError:
# 返回部分结果或错误信息
return {"error": "请求超时", "partial_response": None}
- 实现进度提示:对于长时间任务,提供进度反馈
@app.websocket("/ws/long-task")
async def long_task(websocket: WebSocket):
await websocket.accept()
# 发送进度更新
await websocket.send_json({"type": "progress", "value": 10, "message": "开始处理..."})
# 处理过程...
await websocket.send_json({"type": "progress", "value": 50, "message": "处理中..."})
# 完成
await websocket.send_json({"type": "progress", "value": 100, "message": "处理完成"})
问题2:Token超限错误
Claude模型有上下文窗口限制,输入+输出的tokens总数不能超过限制。
解决方案 :
- 实时计算tokens:使用tiktoken库估算
import tiktoken
def count_tokens(text: str, model: str = "claude-3-sonnet") -> int:
"""估算文本的token数量"""
# Claude使用类似GPT的tokenizer
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
- 智能截断:保留最重要的部分
def truncate_text(text: str, max_tokens: int, model: str) -> str:
"""智能截断文本,保留重要内容"""
tokens = count_tokens(text, model)
if tokens <= max_tokens:
return text
# 如果是对话历史,保留最近的对话
if "User:" in text and "Assistant:" in text:
# 按对话轮次分割
turns = text.split("\n\n")
while turns and count_tokens("\n\n".join(turns), model) > max_tokens:
turns.pop(0) # 移除最早的对话
return "\n\n".join(turns)
# 如果是普通文本,尝试按段落截断
paragraphs = text.split("\n")
while paragraphs and count_tokens("\n".join(paragraphs), model) > max_tokens:
paragraphs.pop(0)
return "\n".join(paragraphs)
- 分块处理:大文档分成多个请求
async def process_large_document(content: str, chunk_size: int = 50000):
"""处理大文档,分块发送"""
chunks = []
current_chunk = ""
current_tokens = 0
paragraphs = content.split("\n")
for para in paragraphs:
para_tokens = count_tokens(para)
if current_tokens + para_tokens > chunk_size:
# 当前块已满,处理并清空
if current_chunk:
chunks.append(current_chunk)
current_chunk = para
current_tokens = para_tokens
else:
# 添加到当前块
current_chunk += "\n" + para if current_chunk else para
current_tokens += para_tokens
# 处理最后一块
if current_chunk:
chunks.append(current_chunk)
# 分别处理每个块
results = []
for i, chunk in enumerate(chunks):
result = await process_chunk(chunk, chunk_index=i, total_chunks=len(chunks))
results.append(result)
return combine_results(results)
6.2 文件处理常见问题
问题:PDF解析质量差
不同PDF的解析难度不同,特别是扫描件或复杂排版的文档。
解决方案组合拳 :
- 多引擎尝试:先用PyPDF2,失败后用pdfplumber,再失败用OCR
def extract_pdf_text(pdf_path: str) -> str:
"""尝试多种方法提取PDF文本"""
methods = [
_extract_with_pypdf2,
_extract_with_pdfplumber,
_extract_with_ocr
]
for method in methods:
try:
text = method(pdf_path)
if text and len(text.strip()) > 100: # 有意义的文本
return text
except Exception as e:
print(f"{method.__name__} 失败: {e}")
continue
return "" # 所有方法都失败
- 后处理优化:清理提取的文本
def clean_extracted_text(text: str) -> str:
"""清理提取的文本"""
# 移除多余的空格和换行
lines = text.split('\n')
cleaned_lines = []
for line in lines:
line = line.strip()
if line: # 跳过空行
# 合并被错误分割的单词
if cleaned_lines and not cleaned_lines[-1].endswith(('.', '!', '?', '。', '!', '?')):
cleaned_lines[-1] += ' ' + line
else:
cleaned_lines.append(line)
return '\n'.join(cleaned_lines)
- 质量评估:自动评估提取质量
def assess_text_quality(text: str) -> float:
"""评估文本提取质量(0-1分)"""
if not text:
return 0.0
# 检查指标
metrics = {
"length_score": min(len(text) / 1000, 1.0), # 长度得分
"readability_score": calculate_readability(text), # 可读性得分
"structure_score": assess_structure(text), # 结构完整性得分
}
# 加权平均
weights = {"length_score": 0.3, "readability_score": 0.4, "structure_score": 0.3}
total_score = sum(metrics[key] * weights[key] for key in metrics)
return total_score
6.3 性能优化问题
问题:高并发下的性能瓶颈
当多个用户同时使用时,可能会出现响应变慢的问题。
解决方案 :
- 连接池管理:复用HTTP连接
import httpx
from httpx import AsyncClient, Limits
class ConnectionPool:
def __init__(self):
self.limits = Limits(
max_connections=100, # 最大连接数
max_keepalive_connections=20, # 保持活跃的连接数
keepalive_expiry=5.0 # 保持时间(秒)
)
self.client = None
async def get_client(self):
if not self.client:
self.client = AsyncClient(
limits=self.limits,
timeout=httpx.Timeout(30.0)
)
return self.client
async def close(self):
if self.client:
await self.client.aclose()
# 使用连接池
pool = ConnectionPool()
@app.on_event("startup")
async def startup():
await pool.get_client()
@app.on_event("shutdown")
async def shutdown():
await pool.close()
- 请求批处理:合并小请求
async def batch_process_requests(requests: List[Dict]) -> List[Dict]:
"""批量处理请求,减少API调用次数"""
# 按类型分组
grouped = {}
for req in requests:
req_type = req.get("type", "default")
if req_type not in grouped:
grouped[req_type] = []
grouped[req_type].append(req)
results = []
for req_type, req_list in grouped.items():
if len(req_list) == 1:
# 单个请求直接处理
result = await process_single(req_list[0])
results.append(result)
else:
# 批量处理
batch_result = await process_batch(req_list)
results.extend(batch_result)
return results
- 缓存策略:减少重复计算
from functools import lru_cache
import hashlib
import pickle
class SmartCache:
def __init__(self, max_size=1000, ttl=3600):
self.cache = {}
self.max_size = max_size
self.ttl = ttl # 生存时间(秒)
def _make_key(self, *args, **kwargs):
"""生成缓存键"""
key_data = pickle.dumps((args, sorted(kwargs.items())))
return hashlib.md5(key_data).hexdigest()
def get(self, key):
"""获取缓存"""
if key in self.cache:
entry = self.cache[key]
if time.time() - entry["timestamp"] < self.ttl:
return entry["value"]
else:
# 过期删除
del self.cache[key]
return None
def set(self, key, value):
"""设置缓存"""
if len(self.cache) >= self.max_size:
# 移除最旧的条目
oldest_key = min(self.cache.keys(),
key=lambda k: self.cache[k]["timestamp"])
del self.cache[oldest_key]
self.cache[key] = {
"value": value,
"timestamp": time.time()
}
def clear_expired(self):
"""清理过期缓存"""
current_time = time.time()
expired_keys = [
key for key, entry in self.cache.items()
if current_time - entry["timestamp"] >= self.ttl
]
for key in expired_keys:
del self.cache[key]
# 使用示例
cache = SmartCache(max_size=500, ttl=1800) # 30分钟过期
@lru_cache(maxsize=100)
async def get_cached_response(prompt: str) -> str:
"""带缓存的响应获取"""
cache_key = hashlib.md5(prompt.encode()).hexdigest()
# 先查缓存
cached = cache.get(cache_key)
if cached:
return cached
# 缓存未命中,调用API
response = await call_claude_api(prompt)
# 存入缓存
cache.set(cache_key, response)
return response
6.4 成本控制问题
问题:API使用成本过高
Claude API按token收费,不当使用可能导致成本失控。
成本控制策略 :
- Token使用监控
class CostMonitor:
def __init__(self, budget_daily=10.0): # 每日预算10美元
self.budget_daily = budget_daily
self.usage_today = 0.0
self.reset_time = self._get_next_reset_time()
def _get_next_reset_time(self):
"""获取下次重置时间(每天UTC 0点)"""
now = datetime.utcnow()
tomorrow = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
return tomorrow
def check_budget(self, estimated_cost: float) -> bool:
"""检查是否超出预算"""
# 如果过了重置时间,重置用量
if datetime.utcnow() >= self.reset_time:
self.usage_today = 0.0
self.reset_time = self._get_next_reset_time()
return self.usage_today + estimated_cost <= self.budget_daily
def record_usage(self, input_tokens: int, output_tokens: int, model: str):
"""记录token使用量"""
# Claude-3 Opus: $15/1M输入tokens, $75/1M输出tokens
# Claude-3 Sonnet: $3/1M输入tokens, $15/1M输出tokens
# Claude-3 Haiku: $0.25/1M输入tokens, $1.25/1M输出tokens
model_prices = {
"claude-3-opus": {"input": 15.0, "output": 75.0},
"claude-3-sonnet": {"input": 3.0, "output": 15.0},
"claude-3-haiku": {"input": 0.25, "output": 1.25}
}
if model not in model_prices:
model = "claude-3-sonnet" # 默认
prices = model_prices[model]
cost = (input_tokens / 1_000_000 * prices["input"] +
output_tokens / 1_000_000 * prices["output"])
self.usage_today += cost
return cost
# 使用示例
monitor = CostMonitor(budget_daily=5.0) # 每日预算5美元
async def process_with_budget(prompt: str, max_tokens: int = 1000):
"""带预算控制的处理"""
# 估算成本(假设输入tokens为prompt长度的一半)
estimated_input_tokens = len(prompt) // 2
estimated_cost = monitor.record_usage(estimated_input_tokens, max_tokens, "claude-3-sonnet")
if not monitor.check_budget(estimated_cost):
return {"error": "今日预算已用完"}
# 处理请求
response = await client.messages.create(...)
# 记录实际使用量
actual_cost = monitor.record_usage(
response.usage.input_tokens,
response.usage.output_tokens,
"claude-3-sonnet"
)
return response
- 模型选择优化:根据任务复杂度选择合适模型
def select_model_by_complexity(task_complexity: str, require_high_quality: bool = False) -> str:
"""根据任务复杂度选择模型"""
if require_high_quality:
return "claude-3-opus" # 最高质量
model_map = {
"simple": "claude-3-haiku", # 简单任务用最便宜的
"medium": "claude-3-sonnet", # 中等任务用平衡型
"complex": "claude-3-opus" # 复杂任务用最强模型
}
return model_map.get(task_complexity, "claude-3-sonnet")
def estimate_task_complexity(text: str, task_type: str) -> str:
"""估算任务复杂度"""
# 基于文本长度和任务类型
text_length = len(text)
if task_type == "summary":
if text_length < 1000:
return "simple"
elif text_length < 5000:
return "medium"
else:
return "complex"
elif task_type == "qa":
# 问答任务通常较简单
return "simple" if text_length < 2000 else "medium"
elif task_type == "analysis":
# 分析任务较复杂
return "complex"
return "medium" # 默认
- 响应长度控制:避免生成过长内容
def optimize_max_tokens(prompt: str, task_type: str, budget_ratio: float = 0.3) -> int:
"""根据提示词和任务类型优化max_tokens"""
# 估算输入tokens
input_tokens = len(prompt) // 2 # 简单估算
# 不同任务的输出比例
output_ratios = {
"summary": 0.2, # 摘要:输出是输入的20%
"qa": 0.5, # 问答:输出是输入的50%
"analysis": 0.8, # 分析:输出是输入的80%
"creative": 1.5 # 创作:输出可能超过输入
}
ratio = output_ratios.get(task_type, 0.5)
# 计算建议的max_tokens
suggested = int(input_tokens * ratio)
# 考虑预算比例(避免用完所有tokens)
budget_limited = int(suggested * budget_ratio)
# 设置上下限
return max(100, min(budget_limited, 4000)) # 最少100,最多4000
7. 扩展与定制开发
7.1 自定义模板开发
虽然walidboulanouar/ay-claude-templates提供了很多现成模板,但实际项目中经常需要自定义模板。我分享几个自定义模板的开发经验。
模板结构设计 : 一个好的模板应该包含以下部分:
- 清晰的目录结构
- 完整的依赖说明
- 详细的配置示例
- 使用示例和测试代码
- 部署说明
# 自定义模板示例:法律文档分析模板
"""
legal-doc-analyzer/
├── README.md # 模板说明
├── requirements.txt # 依赖
├── config.py # 配置
├── main.py # 主程序
├── legal_analyzer.py # 法律分析核心逻辑
├── templates/ # Web模板
│ └── index.html
├── static/ # 静态文件
│ └── style.css
└── examples/ # 使用示例
├── contract_analysis.py
└── clause_extraction.py
"""
核心功能实现 :
class LegalDocumentAnalyzer:
"""法律文档分析器"""
def __init__(self, client):
self.client = client
self.legal_knowledge_base = self._load_legal_knowledge()
def _load_legal_knowledge(self):
"""加载法律知识库"""
# 可以加载常见的法律条款、法规等
return {
"contract_types": ["买卖合同", "租赁合同", "劳动合同", "借款合同"],
"common_clauses": ["违约责任", "争议解决", "保密条款", "不可抗力"],
"risk_keywords": ["单方解除", "无限责任", "霸王条款", "格式条款"]
}
async def analyze_contract(self, contract_text: str) -> Dict:
"""分析合同文档"""
# 构建专业提示词
system_prompt = """你是一名资深法律专家,擅长合同审查和分析。
请仔细分析提供的合同文本,识别潜在风险、不明确条款和需要修改的内容。
请用专业但易懂的语言给出建议。"""
user_prompt = f"""请分析以下合同文本:
{contract_text}
请从以下角度进行分析:
1. 合同类型识别
2. 关键条款提取
3. 潜在风险点
4. 修改建议
5. 谈判要点
请用结构化格式回复。"""
response = await self.client.messages.create(
model="claude-3-opus", # 法律分析需要高精度
max_tokens=3000,
system=system_prompt,
messages=[{"role": "user", "content": user_prompt}]
)
return self._parse_analysis_result(response.content[0].text)
def _parse_analysis_result(self, result_text: str) -> Dict:
"""解析分析结果"""
# 这里实现具体的解析逻辑
# 可以将文本解析为结构化的JSON
pass
模板配置优化 :
# config.py
import os
from typing import Optional
from pydantic import BaseSettings
class Settings(BaseSettings):
"""应用配置"""
# API配置
anthropic_api_key: str
model_name: str = "claude-3-sonnet-20240229"
# 应用配置
app_name: str = "法律文档分析助手"
app_version: str = "1.0.0"
# 分析配置
max_document_size_mb: int = 10
supported_file_types: list = [".pdf", ".docx", ".txt"]
# 缓存配置
cache_enabled: bool = True
cache_ttl_seconds: int = 3600
# 日志配置
log_level: str = "INFO"
log_file: str = "./logs/legal_analyzer.log"
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
# 使用示例
settings = Settings()
# 根据环境变量覆盖配置
if os.getenv("ENVIRONMENT") == "production":
settings.model_name = "claude-3-opus" # 生产环境用更准确的模型
settings.cache_ttl_seconds = 7200 # 缓存时间更长
7.2 集成第三方服务
实际项目中,经常需要集成其他服务。以下是一些常见的集成方案:
数据库集成 :存储分析结果
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
Base = declarative_base()
class AnalysisResult(Base):
"""分析结果表"""
__tablename__ = "analysis_results"
id = Column(Integer, primary_key=True)
document_hash = Column(String(64), unique=True, index=True)
original_filename = Column(String(255))
file_type = Column(String(10))
analysis_result = Column(Text) # JSON格式的完整结果
summary = Column(Text) # 摘要
risk_level = Column(String(20)) # 风险等级
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class AnalysisDB:
"""分析结果数据库管理"""
def __init__(self, connection_string: str):
self.engine = create_engine(connection_string)
self.SessionLocal = sessionmaker(bind=self.engine)
Base.metadata.create_all(bind=self.engine)
def save_result(self, document_hash: str, filename: str,
file_type: str, result: Dict) -> int:
"""保存分析结果"""
session = self.SessionLocal()
try:
# 检查是否已存在
existing = session.query(AnalysisResult).filter_by(
document_hash=document_hash
).first()
if existing:
# 更新现有记录
existing.analysis_result = json.dumps(result)
existing.summary = result.get("summary", "")
existing.risk_level = result.get("risk_level", "unknown")
existing.updated_at = datetime.utcnow()
else:
# 创建新记录
new_result = AnalysisResult(
document_hash=document_hash,
original_filename=filename,
file_type=file_type,
analysis_result=json.dumps(result),
summary=result.get("summary", ""),
risk_level=result.get("risk_level", "unknown")
)
session.add(new_result)
session.commit()
return existing.id if existing else new_result.id
except Exception as e:
session.rollback()
raise
finally:
session.close()
def get_result(self, document_hash: str) -> Optional[Dict]:
"""获取分析结果"""
session = self.SessionLocal()
try:
result = session.query(AnalysisResult).filter_by(
document_hash=document_hash
).first()
if result:
return {
"id": result.id,
"filename": result.original_filename,
"result": json.loads(result.analysis_result),
"summary": result.summary,
"risk_level": result.risk_level,
"created_at": result.created_at.isoformat(),
"updated_at": result.updated_at.isoformat()
}
return None
finally:
session.close()
邮件通知集成 :发送分析报告
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from jinja2 import Template
class EmailNotifier:
"""邮件通知器"""
def __init__(self, smtp_server: str, smtp_port: int,
username: str, password: str):
self.smtp_server = smtp_server
self.smtp_port = smtp_port
self.username = username
self.password = password
# 邮件模板
self.template = Template("""
<!DOCTYPE html>
<html>
<head>
<style>
body { font-family: Arial, sans-serif; }
.container { max-width: 600px; margin: 0 auto; }
.header { background: #4CAF50; color: white; padding: 20px; }
.content { padding: 20px; }
.risk-high { color: #f44336; }
.risk-medium { color: #ff9800; }
.risk-low { color: #4CAF50; }
</style>
</head>
<body>
<div class="container">
<div class="header">
<h2>法律文档分析报告</h2>
</div>
<div class="content">
<p>文档名称:{{ filename }}</p>
<p>分析时间:{{ analysis_time }}</p>
<p>风险等级:<span class="risk-{{ risk_level }}">{{ risk_level }}</span></p>
<h3>关键发现</h3>
<ul>
{% for finding in key_findings %}
<li>{{ finding }}</li>
{% endfor %}
</ul>
<h3>建议</h3>
<p>{{ recommendations }}</p>
<p>详细报告请登录系统查看。</p>
</div>
</div>
</body>
</html>
""")
def send_analysis_report(self, to_email: str, analysis_result: Dict):
"""发送分析报告邮件"""
# 准备邮件内容
html_content = self.template.render(
filename=analysis_result.get("filename", "未知文档"),
analysis_time=analysis_result.get("analysis_time", ""),
risk_level=analysis_result.get("risk_level", "unknown"),
key_findings=analysis_result.get("key_findings", []),
recommendations=analysis_result.get("recommendations", "")
)
# 创建邮件
msg = MIMEMultipart("alternative")
msg["Subject"] = f"法律文档分析报告 - {analysis_result.get('filename')}"
msg["From"] = self.username
msg["To"] = to_email
# 添加HTML版本
html_part = MIMEText(html_content, "html")
msg.attach(html_part)
# 发送邮件
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
云存储集成 :存储上传的文件
import boto3
from botocore.exceptions import ClientError
import os
class S3Storage:
"""AWS S3存储"""
def __init__(self, bucket_name: str, region: str = None):
self.bucket_name = bucket_name
self.s3_client = boto3.client(
's3',
region_name=region or os.getenv('AWS_REGION')
)
async def upload_file(self, file_path: str, object_name: str = None) -> str:
"""上传文件到S3"""
if object_name is None:
object_name = os.path.basename(file_path)
try:
self.s3_client.upload_file(file_path, self.bucket_name, object_name)
# 生成访问URL(根据需要设置过期时间)
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': object_name},
ExpiresIn=3600 # 1小时有效
)
return url
except ClientError as e:
print(f"上传失败: {e}")
raise
async def download_file(self, object_name: str, file_path: str):
"""从S3下载文件"""
try:
self.s3_client.download_file(self.bucket_name, object_name, file_path)
except ClientError as e:
print(f"下载失败: {e}")
raise
async def delete_file(self, object_name: str):
"""删除S3文件"""
try:
self.s3_client.delete_object(Bucket=self.bucket_name, Key=object_name)
except ClientError as e:
print(f"删除失败: {e}")
raise
7.3 性能监控与优化
对于生产环境的应用,性能监控至关重要。以下是一个完整的监控方案:
应用性能监控 :
import time
from dataclasses import dataclass
from typing import Dict, List
import psutil
import asyncio
@dataclass
class PerformanceMetrics:
"""性能指标"""
request_count: int = 0
error_count: int = 0
avg_response_time: float = 0.0
p95_response_time: float = 0.0
memory_usage_mb: float = 0.0
cpu_percent: float = 0.0
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.metrics = PerformanceMetrics()
self.response_times: List[float] = []
self.start_time = time.time()
def record_request(self, response_time: float, success: bool = True):
"""记录请求"""
self.metrics.request_count += 1
if not success:
self.metrics.error_count += 1
self.response_times.append(response_time)
# 计算统计指标
if self.response_times:
self.metrics.avg_response_time = sum(self.response_times) / len(self.response_times)
sorted_times = sorted(self.response_times)
index = int(len(sorted_times) * 0.95)
self.metrics.p95_response_time = sorted_times[index] if index < len(sorted_times) else sorted_times[-1]
def update_system_metrics(self):
"""更新系统指标"""
process = psutil.Process()
self.metrics.memory_usage_mb = process.memory_info().rss / 1024 / 1024
self.metrics.cpu_percent = process.cpu_percent(interval=0.1)
def get_report(self) -> Dict:
"""获取监控报告"""
self.update_system_metrics()
uptime = time.time() - self.start_time
error_rate = (self.metrics.error_count / self.metrics.request_count * 100) if self.metrics.request_count > 0 else 0
return {
"uptime_seconds": uptime,
"request_count": self.metrics.request_count,
"error_count": self.metrics.error_count,
"error_rate_percent": error_rate,
"avg_response_time_seconds": self.metrics.avg_response_time,
"p95_response_time_seconds": self.metrics.p95_response_time,
"memory_usage_mb": self.metrics.memory_usage_mb,
"cpu_percent": self.metrics.cpu_percent,
"requests_per_second": self.metrics.request_count / uptime if uptime > 0 else 0
}
# 在FastAPI应用中使用
monitor = PerformanceMonitor()
@app.middleware("http")
async def monitor_requests(request: Request, call_next):
start_time = time.time()
try:
response = await call_next(request)
response_time = time.time() - start_time
monitor.record_request(response_time, success=True)
return response
except Exception:
response_time = time.time() - start_time
monitor.record_request(response_time, success=False)
raise
@app.get("/metrics/performance")
async def get_performance_metrics():
"""获取性能指标"""
return monitor.get_report()
API使用监控 :
class APIUsageMonitor:
"""API使用监控"""
def __init__(self):
self.usage_by_model: Dict[str, Dict] = {}
self.daily_usage: Dict[str, float] = {}
self.cost_by_day: Dict[str, float] = {}
def record_usage(self, model: str, input_tokens: int, output_tokens: int):
"""记录API使用情况"""
today = datetime.now().strftime("%Y-%m-%d")
# 初始化数据结构
if model not in self.usage_by_model:
self.usage_by_model[model] = {
"total_input_tokens": 0,
"total_output_tokens": 0,
"request_count": 0
}
if today not in self.daily_更多推荐



所有评论(0)