基于 Claude 的 AI 自动化测试方案(超级详细版)
本文介绍了一个基于Claude AI的自动化测试平台设计方案,主要内容包括: 平台架构 采用分层设计,包含需求分析、AI测试大脑、测试执行引擎和结果分析等模块 支持API、UI、性能和安全测试等多种测试类型 技术栈包括Python、Claude AI、Pytest、Playwright等 核心功能 智能测试用例生成:从需求文档、代码变更自动生成测试用例 数据驱动测试:测试数据与用例分离,支持动态数
·
目录
目录
1. 总体架构
1.1 整体架构图
┌──────────────────────────────────────────────────────────────┐
│ AI 自动化测试平台 │
├──────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ 需求分析 │ │ 代码分析 │ │ 接口文档分析 │ │
│ │ (NLP 解析) │ │ (AST 分析) │ │ (Swagger/YAPI) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────────┬──────────┘ │
│ │ │ │ │
│ └───────────────┼──────────────────────┘ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Claude 测试大脑 │ │
│ │ │ │
│ │ • 测试策略制定 │ │
│ │ • 用例自动生成 │ │
│ │ • 智能场景编排 │ │
│ │ • 异常模式识别 │ │
│ │ • 回归范围预测 │ │
│ └────────┬────────────┘ │
│ │ │
│ ┌─────────────┼─────────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌──────────┐ ┌──────────┐ │
│ │ API 测试 │ │ UI 测试 │ │ 性能测试 │ │
│ │ (RestAssured│ │(Selenium/│ │ (JMeter/ │ │
│ │ Pytest) │ │ Playwright│ │ k6) │ │
│ └──────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └────────────┼────────────┘ │
│ ▼ │
│ ┌────────────────────┐ │
│ │ 测试执行引擎 │ │
│ │ • 并行执行 │ │
│ │ • 智能重试 │ │
│ │ • 结果收集 │ │
│ └────────┬───────────┘ │
│ │ │
│ ┌──────────┼──────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 结果分析 │ │ 智能报告 │ │ 知识库 │ │
│ │ (根因分析) │ │ (自然语言) │ │ (经验沉淀)│ │
│ └────────────┘ └──────────┘ └──────────┘ │
│ │
└──────────────────────────────────────────────────────────────┘
1.2 核心设计原则
| 原则 | 描述 |
|---|---|
| AI 优先 | 测试用例生成、场景设计、根因分析全部由 AI 驱动 |
| 数据驱动 | 测试数据与用例分离,支持动态数据生成 |
| 可追溯 | 每个测试用例可追溯到需求、代码、缺陷 |
| 渐进增强 | 从 API 测试开始,逐步扩展到 UI、性能、安全 |
| 人机协同 | AI 生成 + 人工审核 + 持续优化 |
1.3 技术栈选型
# 核心框架
language:
primary: Python 3.11+
secondary: TypeScript/Node.js
# AI 引擎
ai_engine:
primary: Claude 4 (Anthropic API)
fallback: Claude 3.5 Sonnet
embedding: Claude Embedding
# 测试框架
test_framework:
unit: Pytest
api: RestAssured + Pytest
ui: Playwright + Selenium
performance: k6 + Locust
security: OWASP ZAP + Burp
# 基础设施
infrastructure:
ci_cd: GitHub Actions + Jenkins
container: Docker + Kubernetes
database: PostgreSQL + Redis
storage: MinIO (对象存储)
# 监控告警
monitoring:
metrics: Prometheus + Grafana
logging: ELK Stack
tracing: Jaeger
alert: 钉钉/飞书 webhook
2. 测试用例设计
2.1 智能用例生成流程
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ 需求文档 │───▶│ AST 分析 │───▶│ 接口文档 │───▶│ AI 用例 │
│ 代码变更 │ │ 接口定义 │ │ 业务规则 │ │ 生成器 │
│ 历史用例 │ │ 数据字典 │ │ 异常场景 │ │ │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
│
┌────────────┐ ▼
│ 人工审核 │◀────────┐ ┌──────────┐
│ 优化调整 │─────────▶│ 用例库 │
└────────────┘ │ └──────────┘
│ │
┌────────────┐ │ ▼
│ 执行反馈 │─────────┐ ┌──────────┐
│ 覆盖率分析 │ ──▶│ 持续优化 │
└────────────┘ └──────────┘
2.2 测试策略模板
2.2.1 API 测试策略
api_testing_strategy:
# 层级覆盖
layers:
- name: "契约测试"
scope: "接口定义验证"
tools: ["Schemathesis", "Pact"]
frequency: "每次提交"
- name: "单元接口测试"
scope: "单个接口功能验证"
tools: ["Pytest", "RestAssured"]
frequency: "每次构建"
- name: "集成接口测试"
scope: "接口间交互验证"
tools: ["Pytest", "WireMock"]
frequency: "每日构建"
- name: "端到端流程测试"
scope: "完整业务流程"
tools: ["Pytest", "API Chaining"]
frequency: "每次发布"
# 测试维度
dimensions:
- name: "功能测试"
coverage:
- 正常流程
- 异常流程
- 边界值
- 等价类
- 错误处理
- name: "安全测试"
coverage:
- 身份认证
- 权限控制
- 数据加密
- 输入验证
- SQL注入
- XSS
- name: "性能测试"
coverage:
- 响应时间
- 吞吐量
- 并发能力
- 资源消耗
- 稳定性
- name: "兼容性测试"
coverage:
- HTTP/1.1
- HTTP/2.0
- JSON/XML
- 不同客户端
2.2.2 AI 驱动的测试场景设计
ai_test_scenarios:
# 场景分类
categories:
- name: "正向场景"
description: "正常业务流程"
ai_prompt: |
请根据以下需求描述,生成完整的正向测试场景:
- 包含所有必要的步骤
- 每个步骤明确输入输出
- 包含断言验证点
- 考虑数据准备和清理
- name: "逆向场景"
description: "异常和错误处理"
ai_prompt: |
请识别以下接口可能出现的异常情况:
- 网络异常(超时、断开)
- 数据异常(格式错误、类型错误、空值)
- 权限异常(未授权、越权)
- 业务异常(前置条件不满足)
- 资源异常(配额不足、限流)
- name: "边界场景"
description: "边界值和极限条件"
ai_prompt: |
请为以下参数设计边界测试用例:
- 字符串长度(0、1、最大值、最大值+1)
- 数值范围(最小值、最大值、负数、0)
- 日期时间(过去、现在、未来、闰年)
- 集合大小(空、1、最大、溢出)
- name: "组合场景"
description: "多参数交互"
ai_prompt: |
请分析以下参数之间的组合关系:
- 参数依赖关系
- 互斥条件
- 可选与必选
- 默认值处理
2.3 智能用例生成示例
2.3.1 需求分析 → 测试用例
"""
AI 测试用例生成器 - 核心实现
"""
import json
import yaml
from typing import Dict, List, Optional
from dataclasses import dataclass, field
@dataclass
class TestCase:
"""测试用例"""
id: str
name: str
description: str
priority: int # 1-5, 1为最高
type: str # positive, negative, boundary, combo
preconditions: List[str]
steps: List[Dict[str, str]] # 测试步骤
expected_results: List[str]
data: Dict[str, any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
source: str = "" # 来源:AI生成/人工编写
ai_confidence: float = 0.0 # AI置信度
class AITestCaseGenerator:
"""AI 测试用例生成器"""
def __init__(self, claude_client, test_config: Dict):
self.client = claude_client
self.config = test_config
self.case_templates = self._load_templates()
self.history_cases = self._load_history_cases()
self.business_rules = self._load_business_rules()
def generate_from_requirements(self, requirements: str) -> List[TestCase]:
"""
从需求文档生成测试用例
"""
prompt = f"""
请根据以下需求文档,生成完整的API测试用例集。
需求文档:
{requirements}
业务规则:
{json.dumps(self.business_rules, ensure_ascii=False, indent=2)}
历史用例(用于参考和避免重复):
{json.dumps(self.history_cases[:10], ensure_ascii=False, indent=2)}
请按照以下格式生成测试用例:
1. 每个用例包含:ID、名称、描述、优先级、类型、前置条件、步骤、预期结果
2. 覆盖以下维度:
- 正向场景(正常流程)
- 逆向场景(异常处理)
- 边界场景(极限条件)
- 组合场景(多参数交互)
3. 每个场景至少生成3-5个用例
4. 优先级分配:核心流程P1,重要流程P2,一般流程P3
输出格式:JSON数组
"""
response = self.client.messages.create(
model="claude-4",
max_tokens=8000,
temperature=0.3,
messages=[{"role": "user", "content": prompt}]
)
# 解析响应
cases_data = json.loads(response.content[0].text)
return [TestCase(**case) for case in cases_data]
def generate_from_code_change(self, diff_content: str) -> List[TestCase]:
"""
从代码变更生成回归测试用例
"""
prompt = f"""
分析以下代码变更,生成回归测试用例:
变更内容:
```diff
{diff_content}
```
分析要点:
1. 识别变更的影响范围
2. 分析可能影响的接口
3. 识别潜在的风险点
4. 生成针对性的回归测试用例
输出格式:JSON数组
"""
response = self.client.messages.create(
model="claude-4",
max_tokens=4000,
temperature=0.2,
messages=[{"role": "user", "content": prompt}]
)
return [TestCase(**case) for case in json.loads(response.content[0].text)]
def generate_boundary_tests(self, api_schema: Dict) -> List[TestCase]:
"""
从API Schema生成边界测试
"""
prompt = f"""
分析以下API Schema,生成边界测试用例:
API Schema:
```json
{json.dumps(api_schema, indent=2)}
```
边界测试要点:
1. 字符串长度边界(0, 1, max, max+1)
2. 数值范围边界(min-1, min, max, max+1)
3. 枚举值边界(有效值、无效值、空值)
4. 日期时间边界(过去、现在、未来、闰年、月末)
5. 集合大小边界(空、1、max、max+1)
6. 特殊字符边界(空格、换行、特殊符号)
7. 类型转换边界(字符串转数字、JSON解析)
输出格式:JSON数组
"""
response = self.client.messages.create(
model="claude-4",
max_tokens=4000,
temperature=0.1,
messages=[{"role": "user", "content": prompt}]
)
return [TestCase(**case) for case in json.loads(response.content[0].text)]
def generate_security_tests(self, api_endpoints: List[str]) -> List[TestCase]:
"""
生成安全测试用例
"""
prompt = f"""
为以下API端点生成安全测试用例:
API端点:
{json.dumps(api_endpoints, indent=2)}
安全测试维度:
1. 认证测试
- Token有效性
- Token过期
- 权限验证
- 越权访问
2. 输入验证
- SQL注入
- XSS攻击
- 命令注入
- 路径遍历
- 文件上传漏洞
3. 数据传输
- HTTPS验证
- 数据加密
- 敏感信息保护
4. 业务安全
- 频率限制
- 并发控制
- 数据一致性
输出格式:JSON数组
"""
response = self.client.messages.create(
model="claude-4",
max_tokens=4000,
temperature=0.2,
messages=[{"role": "user", "content": prompt}]
)
return [TestCase(**case) for case in json.loads(response.content[0].text)]
def generate_performance_tests(self, api_schema: Dict) -> List[TestCase]:
"""
生成性能测试用例
"""
prompt = f"""
为以下API生成性能测试方案:
API Schema:
```json
{json.dumps(api_schema, indent=2)}
```
性能测试场景:
1. 基准测试
- 单用户基准响应时间
- 资源消耗基线
2. 负载测试
- 逐步增加用户数
- 找出性能拐点
- 确定最大吞吐量
3. 压力测试
- 超出设计容量
- 观察系统崩溃点
- 恢复能力
4. 稳定性测试
- 长时间运行
- 内存泄漏检测
- 资源回收
5. 并发测试
- 多用户同时操作
- 资源竞争检测
- 死锁检测
输出格式:JSON数组
"""
response = self.client.messages.create(
model="claude-4",
max_tokens=4000,
temperature=0.2,
messages=[{"role": "user", "content": prompt}]
)
return [TestCase(**case) for case in json.loads(response.content[0].text)]
def optimize_test_cases(self, cases: List[TestCase]) -> List[TestCase]:
"""
优化测试用例集
"""
prompt = f"""
优化以下测试用例集,减少冗余,提高覆盖效率:
当前用例:
{json.dumps([c.to_dict() for c in cases], ensure_ascii=False, indent=2)}
优化策略:
1. 合并重复场景
2. 移除冗余用例
3. 调整优先级
4. 补充遗漏场景
5. 优化测试数据
输出格式:JSON数组
"""
response = self.client.messages.create(
model="claude-4",
max_tokens=4000,
temperature=0.1,
messages=[{"role": "user", "content": prompt}]
)
return [TestCase(**case) for case in json.loads(response.content[0].text)]
2.3.2 测试数据工厂
"""
AI 测试数据生成器
"""
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import random
import faker
@dataclass
class TestDataTemplate:
"""测试数据模板"""
field_name: str
field_type: str # string, number, email, phone, date, enum
constraints: Dict[str, Any] = field(default_factory=dict)
examples: List[Any] = field(default_factory=list)
generator: Optional[str] = None # AI生成策略
class AITestDataFactory:
"""AI 测试数据工厂"""
def __init__(self, claude_client, templates: List[TestDataTemplate]):
self.client = claude_client
self.templates = templates
self.fake = faker.Faker('zh_CN')
def generate_test_data(self, template: TestDataTemplate) -> Any:
"""
根据模板生成测试数据
"""
prompt = f"""
根据以下字段定义,生成测试数据:
字段定义:
- 字段名: {template.field_name}
- 类型: {template.field_type}
- 约束条件: {template.constraints}
- 示例数据: {template.examples}
请生成以下类型的测试数据:
1. 正常数据(符合所有约束)
2. 边界数据(接近约束边界)
3. 异常数据(违反约束)
4. 特殊数据(空值、null、超长等)
输出格式:JSON对象
"""
response = self.client.messages.create(
model="claude-4",
max_tokens=2000,
temperature=0.5,
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content[0].text)
def generate_user_data(self, count: int = 100) -> List[Dict]:
"""生成用户测试数据"""
return [
{
"username": self.fake.user_name(),
"email": self.fake.email(),
"phone": self.fake.phone_number(),
"name": self.fake.name(),
"age": random.randint(18, 65),
"gender": random.choice(["男", "女"]),
"address": self.fake.address(),
"created_at": self.fake.date_time_this_year()
}
for _ in range(count)
]
def generate_order_data(self, count: int = 100) -> List[Dict]:
"""生成订单测试数据"""
return [
{
"order_no": f"ORD{random.randint(100000, 999999)}",
"user_id": random.randint(1, 1000),
"amount": round(random.uniform(10, 10000), 2),
"currency": "CNY",
"status": random.choice(["pending", "paid", "shipped", "delivered", "cancelled"]),
"items": [
{
"product_id": random.randint(1, 1000),
"quantity": random.randint(1, 10),
"price": round(random.uniform(10, 1000), 2)
}
for _ in range(random.randint(1, 5))
],
"created_at": self.fake.date_time_this_year()
}
for _ in range(count)
]
3. 自动化测试框架搭建
3.1 框架目录结构
test_automation/
├── config/
│ ├── conftest.py # pytest 全局配置
│ ├── pytest.ini # pytest 配置
│ ├── settings.py # 测试配置
│ └── environments/
│ ├── dev.yaml # 开发环境
│ ├── staging.yaml # 测试环境
│ └── prod.yaml # 生产环境(只读)
│
├── ai/
│ ├── case_generator.py # AI用例生成器
│ ├── data_factory.py # AI数据工厂
│ ├── scenario_planner.py # AI场景规划器
│ ├── result_analyzer.py # AI结果分析器
│ └── knowledge_base.py # 知识库管理
│
├── tests/
│ ├── api/ # API测试
│ │ ├── test_users.py
│ │ ├── test_orders.py
│ │ ├── test_products.py
│ │ └── test_payments.py
│ │
│ ├── ui/ # UI测试
│ │ ├── test_login.py
│ │ ├── test_dashboard.py
│ │ ├── test_checkout.py
│ │ └── test_reports.py
│ │
│ ├── integration/ # 集成测试
│ │ ├── test_user_flow.py
│ │ ├── test_order_flow.py
│ │ └── test_payment_flow.py
│ │
│ ├── performance/ # 性能测试
│ │ ├── test_api_perf.py
│ │ ├── test_db_perf.py
│ │ └── test_cache_perf.py
│ │
│ └── security/ # 安全测试
│ ├── test_auth.py
│ ├── test_input_validation.py
│ └── test_data_protection.py
│
├── pages/ # Page Objects
│ ├── base_page.py
│ ├── login_page.py
│ ├── dashboard_page.py
│ └── order_page.py
│
├── utils/
│ ├── api_client.py # API客户端封装
│ ├── db_utils.py # 数据库工具
│ ├── file_utils.py # 文件工具
│ ├── mail_utils.py # 邮件工具
│ └── report_utils.py # 报告工具
│
├── fixtures/
│ ├── data/ # 测试数据
│ │ ├── users.json
│ │ ├── orders.json
│ │ └── products.json
│ │
│ └── sql/ # SQL脚本
│ ├── setup.sql
│ ├── teardown.sql
│ └── seed_data.sql
│
├── reports/ # 测试报告
│ ├── html/
│ ├── api/
│ └── performance/
│
├── scripts/
│ ├── run_tests.sh # 运行脚本
│ ├── generate_cases.py # 生成用例
│ ├── analyze_results.py # 分析结果
│ └── deploy_test_env.py # 部署测试环境
│
├── docs/
│ ├── api_spec.md # API规范
│ ├── test_plan.md # 测试计划
│ └── troubleshooting.md # 故障排查
│
├── requirements.txt
├── Dockerfile
└── README.md
3.2 核心框架代码
3.2.1 pytest 配置
# pytest.ini
[pytest]
# 测试目录
testpaths = tests
# 标记
markers =
api: API测试
ui: UI测试
integration: 集成测试
performance: 性能测试
security: 安全测试
regression: 回归测试
smoke: 冒烟测试
ai_generated: AI生成的用例
high_priority: 高优先级
medium_priority: 中优先级
low_priority: 低优先级
# 命令行选项
addopts =
-v
--tb=short
--strict-markers
--html=reports/html/report.html
--self-contained-html
--junitxml=reports/junit.xml
--alluredir=allure-results
# 日志
log_cli = true
log_cli_level = INFO
log_format = %(asctime)s [%(levelname)s] %(name)s: %(message)s
log_date_format = %Y-%m-%d %H:%M:%S
# 并行执行
dist = loadfile
numprocesses = auto
# 超时
timeout = 30
timeout_method = thread
3.2.2 全局配置
# config/settings.py
import os
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import yaml
@dataclass
class APIClientConfig:
"""API 客户端配置"""
base_url: str
timeout: int = 30
max_retries: int = 3
retry_delay: float = 1.0
headers: Dict[str, str] = field(default_factory=dict)
@dataclass
class DatabaseConfig:
"""数据库配置"""
host: str
port: int
username: str
password: str
database: str
pool_size: int = 5
max_overflow: int = 10
@dataclass
class AIConfig:
"""AI 引擎配置"""
api_key: str
model: str = "claude-4"
temperature: float = 0.2
max_tokens: int = 8000
rate_limit: int = 60 # 每分钟请求数
cache_enabled: bool = True
cache_ttl: int = 3600 # 缓存过期时间
@dataclass
class TestConfig:
"""测试配置"""
# 环境配置
environment: str = "staging"
# API 配置
api_client: APIClientConfig = field(default_factory=APIClientConfig)
# 数据库配置
database: DatabaseConfig = field(default_factory=DatabaseConfig)
# AI 配置
ai: AIConfig = field(default_factory=AIConfig)
# 报告配置
report_dir: str = "reports"
report_format: List[str] = field(default_factory=lambda: ["html", "json", "xml"])
# 并行配置
parallel_workers: int = 4
# 失败处理
fail_fast: bool = False
max_failures: int = 10
retry_failed: bool = True
retry_count: int = 2
def get_environment_config(self) -> Dict:
"""获取环境配置"""
env_file = f"config/environments/{self.environment}.yaml"
with open(env_file, 'r') as f:
return yaml.safe_load(f)
def get_ai_config(self) -> AIConfig:
"""获取 AI 配置"""
env_config = self.get_environment_config()
return AIConfig(
api_key=env_config.get('ai', {}).get('api_key'),
model=env_config.get('ai', {}).get('model', self.ai.model),
temperature=env_config.get('ai', {}).get('temperature', self.ai.temperature),
max_tokens=env_config.get('ai', {}).get('max_tokens', self.ai.max_tokens),
rate_limit=env_config.get('ai', {}).get('rate_limit', self.ai.rate_limit),
cache_enabled=env_config.get('ai', {}).get('cache_enabled', self.ai.cache_enabled),
cache_ttl=env_config.get('ai', {}).get('cache_ttl', self.ai.cache_ttl),
)
# 全局配置实例
config = TestConfig()
3.2.3 API 客户端封装
# utils/api_client.py
import requests
import json
import time
from typing import Dict, Any, Optional
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class APIClient:
"""
AI 驱动的 API 测试客户端
特点:
- 智能重试
- 请求/响应记录
- 断言辅助
- AI 异常分析
"""
def __init__(self, config: APIClientConfig):
self.config = config
self.session = requests.Session()
self._setup_session()
def _setup_session(self):
"""配置会话"""
# 设置默认头
self.session.headers.update({
"User-Agent": "AI-Test-Client/1.0",
"Accept": "application/json",
"Content-Type": "application/json",
**self.config.headers
})
# 配置重试策略
retry_strategy = Retry(
total=self.config.max_retries,
backoff_factor=self.config.retry_delay,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "PUT", "DELETE", "PATCH"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def get(self, path: str, params: Optional[Dict] = None, **kwargs) -> requests.Response:
"""GET 请求"""
url = f"{self.config.base_url}{path}"
return self.session.get(url, params=params, timeout=self.config.timeout, **kwargs)
def post(self, path: str, data: Optional[Dict] = None, **kwargs) -> requests.Response:
"""POST 请求"""
url = f"{self.config.base_url}{path}"
return self.session.post(url, json=data, timeout=self.config.timeout, **kwargs)
def put(self, path: str, data: Optional[Dict] = None, **kwargs) -> requests.Response:
"""PUT 请求"""
url = f"{self.config.base_url}{path}"
return self.session.put(url, json=data, timeout=self.config.timeout, **kwargs)
def delete(self, path: str, **kwargs) -> requests.Response:
"""DELETE 请求"""
url = f"{self.config.base_url}{path}"
return self.session.delete(url, timeout=self.config.timeout, **kwargs)
def patch(self, path: str, data: Optional[Dict] = None, **kwargs) -> requests.Response:
"""PATCH 请求"""
url = f"{self.config.base_url}{path}"
return self.session.patch(url, json=data, timeout=self.config.timeout, **kwargs)
def assert_status_code(self, response: requests.Response, expected: int):
"""断言状态码"""
assert response.status_code == expected, \
f"Expected status {expected}, got {response.status_code}"
def assert_response_time(self, response: requests.Response, max_time: float):
"""断言响应时间"""
assert response.elapsed.total_seconds() <= max_time, \
f"Response time {response.elapsed.total_seconds()}s > {max_time}s"
def assert_json_schema(self, response: requests.Response, schema: Dict):
"""断言JSON Schema"""
import jsonschema
try:
jsonschema.validate(instance=response.json(), schema=schema)
except jsonschema.ValidationError as e:
raise AssertionError(f"JSON Schema validation failed: {e}")
def assert_response_fields(self, response: requests.Response, required_fields: List[str]):
"""断言响应字段"""
data = response.json()
for field in required_fields:
assert field in data, f"Missing field: {field}"
def assert_error_code(self, response: requests.Response, expected_code: str):
"""断言错误码"""
data = response.json()
assert data.get("error_code") == expected_code, \
f"Expected error code {expected_code}, got {data.get('error_code')}"
def smart_assert(self, response: requests.Response, assertions: Dict[str, Any]):
"""
智能断言
支持多种断言类型:
- status_code: 状态码
- response_time: 响应时间
- json_field: JSON字段
- json_schema: JSON Schema
- error_code: 错误码
- contains: 包含检查
- regex: 正则匹配
"""
for assert_type, assert_value in assertions.items():
if assert_type == "status_code":
self.assert_status_code(response, assert_value)
elif assert_type == "response_time":
self.assert_response_time(response, assert_value)
elif assert_type == "json_field":
self.assert_response_fields(response, assert_value)
elif assert_type == "json_schema":
self.assert_json_schema(response, assert_value)
elif assert_type == "error_code":
self.assert_error_code(response, assert_value)
elif assert_type == "contains":
assert assert_value in response.text, \
f"Response doesn't contain: {assert_value}"
elif assert_type == "regex":
import re
assert re.search(assert_value, response.text), \
f"Response doesn't match regex: {assert_value}"
3.2.4 测试基类
# tests/api/conftest.py
import pytest
import requests
from utils.api_client import APIClient
from config.settings import config
@pytest.fixture(scope="session")
def api_client():
"""API 客户端 fixture"""
env_config = config.get_environment_config()
return APIClient(
base_url=env_config["api"]["base_url"],
timeout=env_config["api"]["timeout"],
max_retries=env_config["api"]["max_retries"],
retry_delay=env_config["api"]["retry_delay"],
headers=env_config["api"]["headers"]
)
@pytest.fixture(scope="function")
def clean_user_data(api_client):
"""清理用户数据"""
yield
# 清理测试创建的用户
response = api_client.get("/users?test_mode=true")
for user in response.json().get("data", []):
api_client.delete(f"/users/{user['id']}")
@pytest.fixture(scope="function")
def clean_order_data(api_client):
"""清理订单数据"""
yield
# 清理测试创建的订单
response = api_client.get("/orders?test_mode=true")
for order in response.json().get("data", []):
api_client.delete(f"/orders/{order['id']}")
@pytest.fixture
def auth_token(api_client):
"""获取认证 token"""
response = api_client.post("/auth/login", data={
"username": config.test_username,
"password": config.test_password
})
assert response.status_code == 200
return response.json()["token"]
@pytest.fixture
def admin_token(api_client):
"""获取管理员 token"""
response = api_client.post("/auth/login", data={
"username": config.admin_username,
"password": config.admin_password
})
assert response.status_code == 200
return response.json()["token"]
@pytest.fixture
def test_user_data():
"""测试用户数据"""
return {
"username": f"test_user_{time.time()}",
"password": "Test@123456",
"email": f"test_{time.time()}@example.com",
"phone": f"138{random.randint(10000000, 99999999)}",
"name": "测试用户"
}
@pytest.fixture
def test_order_data():
"""测试订单数据"""
return {
"user_id": 1,
"amount": 100.00,
"currency": "CNY",
"items": [
{
"product_id": 1,
"quantity": 1,
"price": 100.00
}
]
}
3.3 API 测试示例
3.3.1 用户模块测试
# tests/api/test_users.py
import pytest
import time
from utils.api_client import APIClient
class TestUserAPI:
"""用户 API 测试"""
@pytest.mark.api
@pytest.mark.smoke
@pytest.mark.ai_generated
def test_create_user_success(self, api_client: APIClient, test_user_data):
"""创建用户 - 成功场景"""
# Act
response = api_client.post("/users", data=test_user_data)
# Assert
api_client.assert_status_code(response, 201)
api_client.assert_response_fields(response, [
"id", "username", "email", "phone", "name", "created_at"
])
assert response.json()["username"] == test_user_data["username"]
assert response.json()["email"] == test_user_data["email"]
@pytest.mark.api
@pytest.mark.ai_generated
def test_create_user_duplicate_username(self, api_client: APIClient, test_user_data):
"""创建用户 - 用户名重复"""
# Arrange
api_client.post("/users", data=test_user_data)
# Act
response = api_client.post("/users", data=test_user_data)
# Assert
api_client.assert_status_code(response, 400)
api_client.assert_error_code(response, "USER_DUPLICATE_USERNAME")
@pytest.mark.api
@pytest.mark.ai_generated
def test_create_user_invalid_email(self, api_client: APIClient, test_user_data):
"""创建用户 - 邮箱格式错误"""
test_user_data["email"] = "invalid-email"
# Act
response = api_client.post("/users", data=test_user_data)
# Assert
api_client.assert_status_code(response, 400)
api_client.assert_error_code(response, "USER_INVALID_EMAIL")
@pytest.mark.api
@pytest.mark.ai_generated
def test_create_user_empty_password(self, api_client: APIClient, test_user_data):
"""创建用户 - 密码为空"""
test_user_data["password"] = ""
# Act
response = api_client.post("/users", data=test_user_data)
# Assert
api_client.assert_status_code(response, 400)
api_client.assert_error_code(response, "USER_PASSWORD_REQUIRED")
@pytest.mark.api
@pytest.mark.ai_generated
def test_get_user_by_id(self, api_client: APIClient, auth_token, test_user_data):
"""获取用户详情"""
# Arrange
create_resp = api_client.post("/users", data=test_user_data)
user_id = create_resp.json()["id"]
# Act
response = api_client.get(f"/users/{user_id}", headers={
"Authorization": f"Bearer {auth_token}"
})
# Assert
api_client.assert_status_code(response, 200)
assert response.json()["username"] == test_user_data["username"]
@pytest.mark.api
@pytest.mark.ai_generated
def test_update_user(self, api_client: APIClient, auth_token, test_user_data):
"""更新用户信息"""
# Arrange
create_resp = api_client.post("/users", data=test_user_data)
user_id = create_resp.json()["id"]
# Act
update_data = {"name": "更新后的姓名", "phone": "13912345678"}
response = api_client.put(f"/users/{user_id}", data=update_data, headers={
"Authorization": f"Bearer {auth_token}"
})
# Assert
api_client.assert_status_code(response, 200)
assert response.json()["name"] == "更新后的姓名"
assert response.json()["phone"] == "13912345678"
@pytest.mark.api
@pytest.mark.ai_generated
def test_delete_user(self, api_client: APIClient, auth_token, test_user_data):
"""删除用户"""
# Arrange
create_resp = api_client.post("/users", data=test_user_data)
user_id = create_resp.json()["id"]
# Act
response = api_client.delete(f"/users/{user_id}", headers={
"Authorization": f"Bearer {auth_token}"
})
# Assert
api_client.assert_status_code(response, 204)
# 验证用户已删除
get_response = api_client.get(f"/users/{user_id}")
api_client.assert_status_code(get_response, 404)
@pytest.mark.api
@pytest.mark.ai_generated
def test_list_users_pagination(self, api_client: APIClient, auth_token):
"""用户列表 - 分页测试"""
# Arrange - 创建多个用户
for i in range(25):
api_client.post("/users", data={
**test_user_data,
"username": f"test_user_{i}",
"email": f"test_{i}@example.com"
})
# Act
response = api_client.get("/users?limit=10&page=1", headers={
"Authorization": f"Bearer {auth_token}"
})
# Assert
api_client.assert_status_code(response, 200)
data = response.json()
assert len(data["data"]) <= 10
assert "total" in data
assert "page" in data
assert "limit" in data
@pytest.mark.api
@pytest.mark.ai_generated
def test_user_password_complexity(self, api_client: APIClient, test_user_data):
"""密码复杂度验证"""
test_cases = [
("123456", "USER_PASSWORD_TOO_WEAK"),
("short", "USER_PASSWORD_TOO_SHORT"),
("NoNumbers!", "USER_PASSWORD_NEEDS_NUMBER"),
("nouppercase123", "USER_PASSWORD_NEEDS_UPPER"),
("Valid@123", None), # 有效密码
]
for password, expected_error in test_cases:
test_user_data["password"] = password
response = api_client.post("/users", data=test_user_data)
if expected_error:
api_client.assert_status_code(response, 400)
api_client.assert_error_code(response, expected_error)
else:
api_client.assert_status_code(response, 201)
3.3.2 订单模块测试
# tests/api/test_orders.py
import pytest
from utils.api_client import APIClient
class TestOrderAPI:
"""订单 API 测试"""
@pytest.mark.api
@pytest.mark.smoke
@pytest.mark.ai_generated
def test_create_order_success(self, api_client: APIClient, auth_token, test_order_data):
"""创建订单 - 成功场景"""
response = api_client.post("/orders", data=test_order_data, headers={
"Authorization": f"Bearer {auth_token}"
})
api_client.assert_status_code(response, 201)
api_client.assert_response_fields(response, [
"id", "order_no", "user_id", "amount", "currency",
"status", "items", "created_at"
])
assert response.json()["status"] == "pending"
@pytest.mark.api
@pytest.mark.ai_generated
def test_create_order_insufficient_balance(self, api_client: APIClient, auth_token):
"""创建订单 - 余额不足"""
order_data = {
"user_id": 1,
"amount": 999999.99,
"currency": "CNY",
"items": [{"product_id": 1, "quantity": 1, "price": 999999.99}]
}
response = api_client.post("/orders", data=order_data, headers={
"Authorization": f"Bearer {auth_token}"
})
api_client.assert_status_code(response, 400)
api_client.assert_error_code(response, "ORDER_INSUFFICIENT_BALANCE")
@pytest.mark.api
@pytest.mark.ai_generated
def test_order_payment_success(self, api_client: APIClient, auth_token, test_order_data):
"""订单支付 - 成功场景"""
# Arrange
order_resp = api_client.post("/orders", data=test_order_data, headers={
"Authorization": f"Bearer {auth_token}"
})
order_id = order_resp.json()["id"]
# Act
response = api_client.post(f"/orders/{order_id}/pay", data={
"payment_method": "alipay",
"amount": test_order_data["amount"]
}, headers={"Authorization": f"Bearer {auth_token}"})
# Assert
api_client.assert_status_code(response, 200)
assert response.json()["status"] == "paid"
@pytest.mark.api
@pytest.mark.ai_generated
def test_order_cancel_after_payment(self, api_client: APIClient, auth_token, test_order_data):
"""取消已支付的订单"""
# Arrange - 创建并支付订单
order_resp = api_client.post("/orders", data=test_order_data, headers={
"Authorization": f"Bearer {auth_token}"
})
order_id = order_resp.json()["id"]
api_client.post(f"/orders/{order_id}/pay", data={
"payment_method": "alipay",
"amount": test_order_data["amount"]
}, headers={"Authorization": f"Bearer {auth_token}"})
# Act
response = api_client.post(f"/orders/{order_id}/cancel", headers={
"Authorization": f"Bearer {auth_token}"
})
# Assert
api_client.assert_status_code(response, 200)
assert response.json()["status"] == "cancelled"
assert response.json()["refund_status"] == "refunded"
3.4 UI 测试示例
3.4.1 Page Object 模式
# pages/base_page.py
import time
from playwright.sync_api import Page, expect
from typing import Optional
class BasePage:
"""基础页面类"""
def __init__(self, page: Page):
self.page = page
self.page.set_default_timeout(30000)
def wait_for_selector(self, selector: str, timeout: int = 30000):
"""等待元素出现"""
self.page.wait_for_selector(selector, timeout=timeout)
def wait_for_load_state(self, state: str = "networkidle"):
"""等待页面加载"""
self.page.wait_for_load_state(state)
def take_screenshot(self, filename: str):
"""截图"""
self.page.screenshot(path=f"screenshots/{filename}.png")
def get_console_logs(self) -> list:
"""获取控制台日志"""
logs = []
self.page.on("console", lambda msg: logs.append(msg.text))
return logs
def get_network_requests(self) -> list:
"""获取网络请求"""
requests = []
self.page.on("request", lambda req: requests.append(req))
return requests
# pages/login_page.py
from playwright.sync_api import Page, expect
from pages.base_page import BasePage
class LoginPage(BasePage):
"""登录页面"""
# 元素定位器
USERNAME_INPUT = "#username"
PASSWORD_INPUT = "#password"
LOGIN_BUTTON = "#login-button"
REMEMBER_ME = "#remember-me"
FORGOT_PASSWORD = "#forgot-password"
ERROR_MESSAGE = ".error-message"
SUCCESS_MESSAGE = ".success-message"
def __init__(self, page: Page):
super().__init__(page)
self.base_url = "https://staging.example.com/login"
def navigate(self):
"""导航到登录页"""
self.page.goto(self.base_url)
self.wait_for_load_state()
def enter_username(self, username: str):
"""输入用户名"""
self.page.fill(self.USERNAME_INPUT, username)
def enter_password(self, password: str):
"""输入密码"""
self.page.fill(self.PASSWORD_INPUT, password)
def check_remember_me(self):
"""勾选记住我"""
self.page.check(self.REMEMBER_ME)
def click_login(self):
"""点击登录按钮"""
self.page.click(self.LOGIN_BUTTON)
def login(self, username: str, password: str, remember: bool = False):
"""登录"""
self.navigate()
self.enter_username(username)
self.enter_password(password)
if remember:
self.check_remember_me()
self.click_login()
self.wait_for_load_state()
def get_error_message(self) -> str:
"""获取错误消息"""
return self.page.text_content(self.ERROR_MESSAGE)
def get_success_message(self) -> str:
"""获取成功消息"""
return self.page.text_content(self.SUCCESS_MESSAGE)
def is_login_successful(self) -> bool:
"""检查是否登录成功"""
return self.page.url != self.base_url
def is_error_shown(self) -> bool:
"""检查是否显示错误"""
return self.page.is_visible(self.ERROR_MESSAGE)
3.4.2 UI 测试用例
# tests/ui/test_login.py
import pytest
from playwright.sync_api import Page
from pages.login_page import LoginPage
class TestLoginPage:
"""登录页面测试"""
@pytest.fixture
def login_page(self, page: Page) -> LoginPage:
"""登录页面 fixture"""
return LoginPage(page)
@pytest.mark.ui
@pytest.mark.smoke
@pytest.mark.ai_generated
def test_login_success(self, login_page: LoginPage):
"""登录成功"""
login_page.login("test_user", "Test@123456")
assert login_page.is_login_successful()
@pytest.mark.ui
@pytest.mark.ai_generated
def test_login_invalid_credentials(self, login_page: LoginPage):
"""登录失败 - 无效凭证"""
login_page.login("invalid_user", "wrong_password")
assert login_page.is_error_shown()
assert "无效的用户名或密码" in login_page.get_error_message()
@pytest.mark.ui
@pytest.mark.ai_generated
def test_login_empty_username(self, login_page: LoginPage):
"""登录 - 用户名为空"""
login_page.login("", "Test@123456")
assert login_page.is_error_shown()
assert "请输入用户名" in login_page.get_error_message()
@pytest.mark.ui
@pytest.mark.ai_generated
def test_login_empty_password(self, login_page: LoginPage):
"""登录 - 密码为空"""
login_page.login("test_user", "")
assert login_page.is_error_shown()
assert "请输入密码" in login_page.get_error_message()
@pytest.mark.ui
@pytest.mark.ai_generated
def test_login_remember_me(self, login_page: LoginPage):
"""登录 - 记住我"""
login_page.login("test_user", "Test@123456", remember=True)
assert login_page.is_login_successful()
# 检查 cookie
cookies = login_page.page.context.cookies()
assert any("remember_me" in c["name"] for c in cookies)
@pytest.mark.ui
@pytest.mark.ai_generated
def test_login_page_load_performance(self, login_page: LoginPage):
"""登录页面加载性能"""
login_page.navigate()
# 检查页面加载时间
load_time = login_page.page.evaluate("() => performance.timing.loadEventEnd - performance.timing.navigationStart")
assert load_time < 3000, f"页面加载时间过长: {load_time}ms"
4. 智能测试场景
4.1 AI 场景规划器
# ai/scenario_planner.py
from typing import List, Dict
import json
from config.settings import config
class AIScenarioPlanner:
"""AI 场景规划器"""
def __init__(self, claude_client):
self.client = claude_client
self.config = config.get_ai_config()
def plan_regression_test(self, changed_files: List[str]) -> Dict:
"""
根据代码变更规划回归测试
"""
prompt = f"""
分析以下代码变更,规划回归测试方案:
变更文件:
{json.dumps(changed_files, indent=2)}
请提供:
1. 影响范围分析
2. 需要执行的测试用例
3. 测试执行顺序
4. 风险评估
5. 预计执行时间
输出格式:JSON
"""
response = self.client.messages.create(
model=self.config.model,
max_tokens=self.config.max_tokens,
temperature=0.2,
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content[0].text)
def plan_performance_test(self, api_endpoints: List[str]) -> Dict:
"""
规划性能测试方案
"""
prompt = f"""
为以下API端点规划性能测试方案:
API端点:
{json.dumps(api_endpoints, indent=2)}
请提供:
1. 测试场景设计
2. 性能指标定义
3. 测试数据准备
4. 测试环境要求
5. 执行策略
6. 结果分析方法
输出格式:JSON
"""
response = self.client.messages.create(
model=self.config.model,
max_tokens=self.config.max_tokens,
temperature=0.2,
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content[0].text)
def plan_security_test(self, application_info: Dict) -> Dict:
"""
规划安全测试方案
"""
prompt = f"""
为以下应用规划安全测试方案:
应用信息:
{json.dumps(application_info, indent=2)}
请提供:
1. 安全测试范围
2. 测试用例设计
3. 测试工具选择
4. 测试执行策略
5. 风险评估标准
6. 修复建议模板
输出格式:JSON
"""
response = self.client.messages.create(
model=self.config.model,
max_tokens=self.config.max_tokens,
temperature=0.2,
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content[0].text)
4.2 智能测试执行引擎
# ai/test_executor.py
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Optional
from dataclasses import dataclass, field
import time
import json
@dataclass
class TestResult:
"""测试结果"""
test_id: str
test_name: str
status: str # PASSED, FAILED, ERROR, SKIPPED
duration: float
message: str = ""
trace: str = ""
metadata: Dict = field(default_factory=dict)
def to_dict(self) -> Dict:
return {
"test_id": self.test_id,
"test_name": self.test_name,
"status": self.status,
"duration": self.duration,
"message": self.message,
"trace": self.trace,
"metadata": self.metadata
}
class SmartTestExecutor:
"""
智能测试执行引擎
特性:
- 并行执行
- 智能重试
- 动态优先级调整
- 失败隔离
- 结果缓存
"""
def __init__(self, max_workers: int = 4, retry_count: int = 2):
self.max_workers = max_workers
self.retry_count = retry_count
self.results: List[TestResult] = []
self.failed_tests: List[TestResult] = []
self.skipped_tests: List[TestResult] = []
def execute_test_suite(self, test_cases: List[TestCase]) -> List[TestResult]:
"""
执行测试套件
"""
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_case = {}
for case in test_cases:
future = executor.submit(self._execute_single_test, case)
future_to_case[future] = case
for future in as_completed(future_to_case):
case = future_to_case[future]
try:
result = future.result()
self.results.append(result)
if result.status == "FAILED":
self.failed_tests.append(result)
# 智能重试
retry_result = self._retry_test(case, result)
if retry_result.status == "PASSED":
self.results.remove(result)
self.results.append(retry_result)
self.failed_tests.remove(result)
elif result.status == "SKIPPED":
self.skipped_tests.append(result)
except Exception as e:
error_result = TestResult(
test_id=case.id,
test_name=case.name,
status="ERROR",
duration=time.time() - case.start_time,
message=str(e),
trace=traceback.format_exc()
)
self.results.append(error_result)
return self.results
def _execute_single_test(self, case: TestCase) -> TestResult:
"""执行单个测试用例"""
start_time = time.time()
try:
# 准备测试环境
self._setup_test_environment(case)
# 执行测试步骤
for step in case.steps:
self._execute_step(step)
# 验证预期结果
for expected in case.expected_results:
self._verify_expected_result(expected)
duration = time.time() - start_time
return TestResult(
test_id=case.id,
test_name=case.name,
status="PASSED",
duration=duration,
message="Test passed"
)
except AssertionError as e:
duration = time.time() - start_time
return TestResult(
test_id=case.id,
test_name=case.name,
status="FAILED",
duration=duration,
message=str(e),
trace=traceback.format_exc()
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_id=case.id,
test_name=case.name,
status="ERROR",
duration=duration,
message=str(e),
trace=traceback.format_exc()
)
finally:
# 清理测试环境
self._teardown_test_environment(case)
def _retry_test(self, case: TestCase, failed_result: TestResult) -> TestResult:
"""
智能重试失败的测试
"""
for attempt in range(self.retry_count):
time.sleep(1) # 重试间隔
retry_result = self._execute_single_test(case)
if retry_result.status == "PASSED":
return retry_result
return failed_result
def _setup_test_environment(self, case: TestCase):
"""准备测试环境"""
# 数据库清理
self._cleanup_database(case)
# 数据准备
self._prepare_test_data(case)
# 服务状态检查
self._check_service_status(case)
def _teardown_test_environment(self, case: TestCase):
"""清理测试环境"""
# 清理测试数据
self._cleanup_test_data(case)
# 重置服务状态
self._reset_service_status(case)
def _cleanup_database(self, case: TestCase):
"""清理数据库"""
# 实现数据库清理逻辑
pass
def _prepare_test_data(self, case: TestCase):
"""准备测试数据"""
# 实现测试数据准备逻辑
pass
def _check_service_status(self, case: TestCase):
"""检查服务状态"""
# 实现服务状态检查逻辑
pass
def _execute_step(self, step: Dict):
"""执行测试步骤"""
# 实现步骤执行逻辑
pass
def _verify_expected_result(self, expected: str):
"""验证预期结果"""
# 实现结果验证逻辑
pass
def _cleanup_test_data(self, case: TestCase):
"""清理测试数据"""
# 实现测试数据清理逻辑
pass
def _reset_service_status(self, case: TestCase):
"""重置服务状态"""
# 实现服务状态重置逻辑
pass
5. CI/CD 集成
5.1 GitHub Actions 工作流
# .github/workflows/test-automation.yml
name: AI-Powered Test Automation
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
schedule:
# 每日凌晨2点执行完整测试
- cron: '0 2 * * *'
jobs:
# 代码质量检查
code-quality:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install flake8 pylint mypy
- name: Lint code
run: |
flake8 tests/ --count --select=E9,F63,F7,F82 --show-source --statistics
pylint tests/ --disable=C0114,C0115,C0116
- name: Type checking
run: |
mypy tests/
# API 测试
api-tests:
needs: code-quality
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_DB: test_db
POSTGRES_USER: test_user
POSTGRES_PASSWORD: test_pass
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis:
image: redis:7
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-asyncio pytest-cov
- name: Run API tests
env:
TEST_ENV: staging
DATABASE_URL: postgresql://test_user:test_pass@localhost:5432/test_db
REDIS_URL: redis://localhost:6379
API_BASE_URL: https://staging-api.example.com
run: |
pytest tests/api/ -v --cov=app --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
# UI 测试
ui-tests:
needs: api-tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install playwright
playwright install chromium
- name: Run UI tests
env:
TEST_ENV: staging
BASE_URL: https://staging.example.com
run: |
pytest tests/ui/ -v --screenshot=on-failure
- name: Upload screenshots
if: failure()
uses: actions/upload-artifact@v4
with:
name: ui-test-screenshots
path: screenshots/
# 性能测试
performance-tests:
needs: api-tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install k6
- name: Run performance tests
env:
TEST_ENV: staging
API_BASE_URL: https://staging-api.example.com
run: |
pytest tests/performance/ -v
- name: Upload performance results
uses: actions/upload-artifact@v4
with:
name: performance-results
path: reports/performance/
# 安全测试
security-tests:
needs: api-tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install security-testing-tools
- name: Run security tests
env:
TEST_ENV: staging
API_BASE_URL: https://staging-api.example.com
run: |
pytest tests/security/ -v
- name: Upload security report
uses: actions/upload-artifact@v4
with:
name: security-report
path: reports/security/
# 生成测试报告
generate-report:
needs: [api-tests, ui-tests, performance-tests, security-tests]
runs-on: ubuntu-latest
if: always()
steps:
- uses: actions/checkout@v4
- name: Download test results
uses: actions/download-artifact@v4
- name: Generate HTML report
run: |
pip install allure-pytest
allure generate allure-results -o allure-report
- name: Deploy report to GitHub Pages
uses: peaceiris/actions-gh-pages@v3
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
publish_dir: ./allure-report
5.2 Jenkins Pipeline
// Jenkinsfile
pipeline {
agent any
environment {
TEST_ENV = 'staging'
DOCKER_REGISTRY = 'registry.example.com'
PYTHON_VERSION = '3.11'
}
stages {
stage('代码检出') {
steps {
checkout scm
}
}
stage('代码质量检查') {
steps {
sh '''
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install flake8 pylint mypy
# Lint
flake8 tests/ --count --select=E9,F63,F7,F82 --show-source --statistics
# Type checking
mypy tests/
'''
}
}
stage('构建测试环境') {
steps {
sh '''
docker-compose up -d postgres redis
docker-compose exec postgres psql -U test_user -d test_db -f fixtures/sql/setup.sql
'''
}
}
stage('API 测试') {
parallel {
stage('冒烟测试') {
steps {
sh '''
pytest tests/api/ -m smoke -v --tb=short
'''
}
}
stage('全量测试') {
steps {
sh '''
pytest tests/api/ -v --tb=short --cov=app --cov-report=xml
'''
}
}
}
}
stage('UI 测试') {
steps {
sh '''
pip install playwright
playwright install chromium
pytest tests/ui/ -v --screenshot=on-failure
'''
}
}
stage('性能测试') {
steps {
sh '''
pytest tests/performance/ -v
'''
}
}
stage('安全测试') {
steps {
sh '''
pytest tests/security/ -v
'''
}
}
stage('生成测试报告') {
steps {
sh '''
allure generate allure-results -o allure-report
cp -r allure-report docs/reports/
'''
}
}
stage('通知') {
steps {
script {
if (currentBuild.currentResult == 'SUCCESS') {
slackSend(
channel: '#test-automation',
color: 'good',
message: "✅ 测试通过: ${currentBuild.fullDisplayName}\n" +
"API测试: ${apiTestCount} 通过\n" +
"UI测试: ${uiTestCount} 通过\n" +
"性能测试: 通过\n" +
"安全测试: 通过"
)
} else {
slackSend(
channel: '#test-automation',
color: 'danger',
message: "❌ 测试失败: ${currentBuild.fullDisplayName}\n" +
"失败用例: ${failedTestCount}\n" +
"详细信息: ${currentBuild.url}"
)
}
}
}
}
}
post {
always {
junit 'reports/junit.xml'
allure(
includeProperties: true,
jdk: '',
results: [[path: 'allure-results']]
)
cleanWs()
}
failure {
archiveArtifacts artifacts: 'screenshots/*.png', allowEmptyArchive: true
}
}
}
6. 测试报告与质量看板
6.1 智能报告生成器
# ai/result_analyzer.py
from typing import List, Dict
import json
from datetime import datetime
class AITestResultAnalyzer:
"""AI 测试结果分析器"""
def __init__(self, claude_client):
self.client = claude_client
def analyze_test_results(self, results: List[TestResult]) -> Dict:
"""
分析测试结果,提供根因分析
"""
prompt = f"""
分析以下测试结果,提供详细的根因分析:
测试概要:
- 总用例数: {len(results)}
- 通过: {sum(1 for r in results if r.status == 'PASSED')}
- 失败: {sum(1 for r in results if r.status == 'FAILED')}
- 错误: {sum(1 for r in results if r.status == 'ERROR')}
- 跳过: {sum(1 for r in results if r.status == 'SKIPPED')}
失败用例详情:
{json.dumps([r.to_dict() for r in results if r.status in ['FAILED', 'ERROR']], indent=2)}
请提供:
1. 失败模式分析
2. 根因分析
3. 修复建议
4. 风险评估
5. 优先级排序
6. 回归测试建议
输出格式:JSON
"""
response = self.client.messages.create(
model="claude-4",
max_tokens=8000,
temperature=0.2,
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content[0].text)
def generate_natural_language_report(self, results: List[TestResult]) -> str:
"""
生成自然语言测试报告
"""
prompt = f"""
根据以下测试结果,生成一份自然语言的测试报告:
测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
总用例数: {len(results)}
通过: {sum(1 for r in results if r.status == 'PASSED')}
失败: {sum(1 for r in results if r.status == 'FAILED')}
错误: {sum(1 for r in results if r.status == 'ERROR')}
跳过: {sum(1 for r in results if r.status == 'SKIPPED')}
失败用例详情:
{json.dumps([r.to_dict() for r in results if r.status in ['FAILED', 'ERROR']], indent=2)}
请生成:
1. 测试概要(包含关键指标)
2. 详细分析(失败原因、趋势)
3. 改进建议
4. 风险评估
5. 下一步行动计划
要求:
- 使用清晰、专业的语言
- 重点突出关键问题
- 提供可操作的建议
- 包含数据支持
"""
response = self.client.messages.create(
model="claude-4",
max_tokens=4000,
temperature=0.3,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
def identify_test_flakiness(self, historical_results: List[List[TestResult]]) -> Dict:
"""
识别不稳定的测试用例
"""
prompt = f"""
分析以下历史测试结果,识别不稳定的测试用例:
历史结果:
{json.dumps(historical_results, indent=2)}
请提供:
1. 不稳定用例列表
2. 失败模式分析
3. 根本原因分析
4. 修复建议
5. 优先级排序
输出格式:JSON
"""
response = self.client.messages.create(
model="claude-4",
max_tokens=4000,
temperature=0.2,
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content[0].text)
6.2 质量看板设计
# utils/report_utils.py
import json
import os
from datetime import datetime, timedelta
from typing import List, Dict
class QualityDashboard:
"""质量看板生成器"""
def __init__(self, results_dir: str = "reports"):
self.results_dir = results_dir
self.metrics = self._load_metrics()
def _load_metrics(self) -> Dict:
"""加载历史指标"""
metrics_file = os.path.join(self.results_dir, "metrics.json")
if os.path.exists(metrics_file):
with open(metrics_file, 'r') as f:
return json.load(f)
return {
"daily_metrics": [],
"weekly_metrics": [],
"monthly_metrics": [],
"test_suites": [],
"defect_trends": []
}
def calculate_coverage_metrics(self, results: List[TestResult]) -> Dict:
"""
计算覆盖率指标
"""
total_tests = len(results)
passed = sum(1 for r in results if r.status == 'PASSED')
failed = sum(1 for r in results if r.status == 'FAILED')
errors = sum(1 for r in results if r.status == 'ERROR')
skipped = sum(1 for r in results if r.status == 'SKIPPED')
return {
"total_tests": total_tests,
"passed_tests": passed,
"failed_tests": failed,
"error_tests": errors,
"skipped_tests": skipped,
"pass_rate": passed / total_tests * 100 if total_tests > 0 else 0,
"failure_rate": failed / total_tests * 100 if total_tests > 0 else 0,
"error_rate": errors / total_tests * 100 if total_tests > 0 else 0,
"skip_rate": skipped / total_tests * 100 if total_tests > 0 else 0
}
def calculate_performance_metrics(self, results: List[TestResult]) -> Dict:
"""
计算性能指标
"""
durations = [r.duration for r in results if r.duration > 0]
if not durations:
return {}
return {
"avg_duration": sum(durations) / len(durations),
"max_duration": max(durations),
"min_duration": min(durations),
"p95_duration": sorted(durations)[int(len(durations) * 0.95)],
"p99_duration": sorted(durations)[int(len(durations) * 0.99)]
}
def generate_html_report(self, results: List[TestResult], output_file: str = "reports/html/report.html"):
"""
生成HTML报告
"""
coverage_metrics = self.calculate_coverage_metrics(results)
performance_metrics = self.calculate_performance_metrics(results)
ai_analysis = self.analyze_test_results(results)
report_template = """
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AI 自动化测试报告</title>
<style>
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; margin: 0; padding: 20px; background: #f5f5f5; }
.container { max-width: 1200px; margin: 0 auto; }
.header { background: white; padding: 20px; border-radius: 8px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
.metrics-grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 20px; margin-bottom: 20px; }
.metric-card { background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
.metric-value { font-size: 24px; font-weight: bold; color: #2c3e50; }
.metric-label { color: #7f8c8d; font-size: 14px; }
.status-passed { color: #27ae60; }
.status-failed { color: #e74c3c; }
.status-error { color: #f39c12; }
.status-skipped { color: #95a5a6; }
.chart-container { background: white; padding: 20px; border-radius: 8px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
.ai-analysis { background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
.test-case { border: 1px solid #ddd; margin: 10px 0; padding: 15px; border-radius: 5px; }
.test-case.passed { border-left: 4px solid #27ae60; }
.test-case.failed { border-left: 4px solid #e74c3c; }
.test-case.error { border-left: 4px solid #f39c12; }
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🤖 AI 自动化测试报告</h1>
<p>生成时间: {timestamp}</p>
<p>测试环境: {environment}</p>
</div>
<div class="metrics-grid">
<div class="metric-card">
<div class="metric-value">{total_tests}</div>
<div class="metric-label">总用例数</div>
</div>
<div class="metric-card">
<div class="metric-value status-passed">{passed_tests}</div>
<div class="metric-label">通过</div>
</div>
<div class="metric-card">
<div class="metric-value status-failed">{failed_tests}</div>
<div class="metric-label">失败</div>
</div>
<div class="metric-card">
<div class="metric-value status-error">{error_tests}</div>
<div class="metric-label">错误</div>
</div>
<div class="metric-card">
<div class="metric-value status-skipped">{skipped_tests}</div>
<div class="metric-label">跳过</div>
</div>
<div class="metric-card">
<div class="metric-value">{pass_rate:.1f}%</div>
<div class="metric-label">通过率</div>
</div>
</div>
<div class="chart-container">
<h2>📊 性能指标</h2>
<pre>{performance_metrics_json}</pre>
</div>
<div class="ai-analysis">
<h2>🧠 AI 分析</h2>
<div id="ai-analysis-content">
{ai_analysis_content}
</div>
</div>
<div class="test-cases">
<h2>📋 测试用例详情</h2>
{test_cases_html}
</div>
</div>
</body>
</html>
"""
# 生成HTML内容
html_content = report_template.format(
timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
environment="staging",
total_tests=coverage_metrics['total_tests'],
passed_tests=coverage_metrics['passed_tests'],
failed_tests=coverage_metrics['failed_tests'],
error_tests=coverage_metrics['error_tests'],
skipped_tests=coverage_metrics['skipped_tests'],
pass_rate=coverage_metrics['pass_rate'],
performance_metrics_json=json.dumps(performance_metrics, indent=2),
ai_analysis_content=self.generate_natural_language_report(results),
test_cases_html=self._generate_test_cases_html(results)
)
# 写入文件
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
f.write(html_content)
def _generate_test_cases_html(self, results: List[TestResult]) -> str:
"""生成测试用例HTML"""
html = ""
for result in results:
status_class = result.status.lower()
html += f"""
<div class="test-case {status_class}">
<h3>{result.test_name}</h3>
<p><strong>ID:</strong> {result.test_id}</p>
<p><strong>状态:</strong> <span class="status-{status_class}">{result.status}</span></p>
<p><strong>耗时:</strong> {result.duration:.2f}s</p>
<p><strong>消息:</strong> {result.message}</p>
{'<pre>' + result.trace + '</pre>' if result.trace else ''}
</div>
"""
return html
7. 性能优化与最佳实践
7.1 测试性能优化策略
# config/performance_config.py
from dataclasses import dataclass, field
from typing import List, Dict
@dataclass
class PerformanceOptimizationConfig:
"""性能优化配置"""
# 并行执行配置
parallel_execution: Dict = field(default_factory=lambda: {
"enabled": True,
"max_workers": 8,
"batch_size": 50,
"timeout": 300
})
# 缓存配置
caching: Dict = field(default_factory=lambda: {
"enabled": True,
"cache_dir": ".test_cache",
"ttl": 3600, # 1小时
"max_size": "1GB"
})
# 数据库优化
database: Dict = field(default_factory=lambda: {
"connection_pool_size": 10,
"max_overflow": 20,
"pool_timeout": 30,
"pool_recycle": 1800
})
# 网络优化
network: Dict = field(default_factory=lambda: {
"connection_timeout": 10,
"read_timeout": 30,
"keep_alive": True,
"max_connections": 100
})
# 测试数据优化
test_data: Dict = field(default_factory=lambda: {
"use_mock_data": True,
"data_generation_batch_size": 100,
"cleanup_after_test": True
})
# 报告生成优化
reporting: Dict = field(default_factory=lambda: {
"parallel_report_generation": True,
"incremental_report": True,
"cache_report_data": True
})
# 性能优化最佳实践
BEST_PRACTICES = {
"test_isolation": {
"description": "测试用例隔离",
"rules": [
"每个测试用例使用独立的数据集",
"测试后清理测试数据",
"避免测试用例之间的依赖",
"使用 fixture 管理测试环境"
]
},
"parallel_execution": {
"description": "并行执行优化",
"rules": [
"无状态测试优先并行",
"数据库测试串行执行",
"UI测试限制并行数",
"使用线程池管理并发"
]
},
"data_management": {
"description": "测试数据管理",
"rules": [
"使用数据工厂生成测试数据",
"缓存常用测试数据",
"定期清理过期数据",
"使用事务回滚保证数据一致性"
]
},
"error_handling": {
"description": "错误处理优化",
"rules": [
"快速失败策略",
"智能重试机制",
"错误分类处理",
"详细的错误日志"
]
},
"resource_management": {
"description": "资源管理",
"rules": [
"连接池管理",
"内存泄漏检测",
"文件句柄及时关闭",
"定期资源回收"
]
}
}
7.2 测试稳定性保障
# utils/stability_utils.py
import time
import random
from typing import Callable, Any
from functools import wraps
def flaky_test(max_retries: int = 3, delay: float = 1.0):
"""
不稳定测试装饰器
自动重试失败的测试
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except AssertionError as e:
last_exception = e
if attempt < max_retries - 1:
# 指数退避
sleep_time = delay * (2 ** attempt) + random.uniform(0, 1)
time.sleep(sleep_time)
else:
raise last_exception
except Exception as e:
raise e
return wrapper
return decorator
def retry_on_exception(max_retries: int = 3, exceptions: tuple = (Exception,)):
"""
异常重试装饰器
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_retries - 1:
sleep_time = 1 * (2 ** attempt) + random.uniform(0, 1)
time.sleep(sleep_time)
else:
raise last_exception
return wrapper
return decorator
def timeout(seconds: int):
"""
超时装饰器
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
import signal
def handler(signum, frame):
raise TimeoutError(f"Function {func.__name__} timed out after {seconds} seconds")
signal.signal(signal.SIGALRM, handler)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wrapper
return decorator
8. 故障排查与自愈机制
8.1 智能故障诊断
# ai/diagnostic_engine.py
from typing import Dict, List
import json
import traceback
class AIDiagnosticEngine:
"""AI 故障诊断引擎"""
def __init__(self, claude_client):
self.client = claude_client
self.diagnosis_history = []
def diagnose_test_failure(self, failed_result: TestResult, test_code: str) -> Dict:
"""
诊断测试失败原因
"""
prompt = f"""
诊断以下测试失败原因:
测试用例:
{failed_result.test_name}
测试代码:
```python
{test_code}
```
失败信息:
{failed_result.message}
堆栈跟踪:
```
{failed_result.trace}
```
请提供:
1. 失败根因分析
2. 可能的问题场景
3. 修复建议
4. 预防措施
5. 相关测试用例
输出格式:JSON
"""
response = self.client.messages.create(
model="claude-4",
max_tokens=4000,
temperature=0.2,
messages=[{"role": "user", "content": prompt}]
)
diagnosis = json.loads(response.content[0].text)
self.diagnosis_history.append(diagnosis)
return diagnosis
def suggest_fix(self, diagnosis: Dict) -> str:
"""
根据诊断结果生成修复代码
"""
prompt = f"""
根据以下诊断结果,生成修复代码:
诊断结果:
{json.dumps(diagnosis, indent=2)}
请提供:
1. 修复代码
2. 修复说明
3. 测试验证方案
4. 回归测试建议
输出格式:JSON
"""
response = self.client.messages.create(
model="claude-4",
max_tokens=4000,
temperature=0.2,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
def predict_test_flakiness(self, test_results_history: List[List[TestResult]]) -> Dict:
"""
预测测试不稳定性
"""
prompt = f"""
分析以下测试历史结果,预测不稳定性:
历史结果:
{json.dumps(test_results_history, indent=2)}
请提供:
1. 不稳定测试用例列表
2. 不稳定原因分析
3. 稳定性改进建议
4. 优先级排序
输出格式:JSON
"""
response = self.client.messages.create(
model="claude-4",
max_tokens=4000,
temperature=0.2,
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content[0].text)
8.2 自愈机制
# ai/self_healing.py
import os
import json
import time
from typing import List, Dict
import subprocess
class AITestSelfHealing:
"""AI 测试自愈引擎"""
def __init__(self, diagnostic_engine: AIDiagnosticEngine):
self.diagnostic_engine = diagnostic_engine
self.healing_history = []
def heal_flaky_test(self, test_file: str, test_name: str) -> bool:
"""
修复不稳定的测试
"""
# 读取测试文件
with open(test_file, 'r') as f:
test_code = f.read()
# 诊断问题
diagnosis = self.diagnostic_engine.diagnose_test_failure(
failed_result=None,
test_code=test_code
)
# 生成修复方案
fix_suggestion = self.diagnostic_engine.suggest_fix(diagnosis)
# 应用修复
try:
self._apply_fix(test_file, fix_suggestion)
return True
except Exception as e:
print(f"Failed to apply fix: {e}")
return False
def heal_environment_issue(self, error_message: str) -> bool:
"""
修复环境问题
"""
prompt = f"""
诊断并修复以下环境错误:
错误信息:
{error_message}
请提供:
1. 问题根因
2. 修复命令
3. 验证方法
4. 预防措施
输出格式:JSON
"""
response = self.diagnostic_engine.client.messages.create(
model="claude-4",
max_tokens=4000,
temperature=0.2,
messages=[{"role": "user", "content": prompt}]
)
fix_plan = json.loads(response.content[0].text)
# 执行修复命令
try:
commands = fix_plan.get("fix_commands", [])
for cmd in commands:
subprocess.run(cmd, shell=True, check=True)
return True
except Exception as e:
print(f"Failed to fix environment: {e}")
return False
def _apply_fix(self, test_file: str, fix_suggestion: str):
"""应用修复"""
# 解析修复建议
fix_code = self._parse_fix_code(fix_suggestion)
# 备份原文件
backup_file = f"{test_file}.backup"
shutil.copy2(test_file, backup_file)
# 应用修复
with open(test_file, 'w') as f:
f.write(fix_code)
# 验证修复
self._verify_fix(test_file)
def _parse_fix_code(self, fix_suggestion: str) -> str:
"""解析修复代码"""
# 从AI响应中提取代码
import re
code_blocks = re.findall(r'```python\n(.*?)\n```', fix_suggestion, re.DOTALL)
if code_blocks:
return code_blocks[0]
return fix_suggestion
def _verify_fix(self, test_file: str):
"""验证修复"""
# 运行测试验证
result = subprocess.run(
["pytest", test_file, "-v"],
capture_output=True,
text=True
)
if result.returncode != 0:
# 修复失败,恢复备份
backup_file = f"{test_file}.backup"
shutil.copy2(backup_file, test_file)
os.remove(backup_file)
raise Exception("Fix verification failed")
9. 安全测试
9.1 自动化安全扫描
# tests/security/test_auth.py
import pytest
import requests
from utils.api_client import APIClient
class TestAuthentication:
"""认证安全测试"""
@pytest.mark.security
@pytest.mark.ai_generated
def test_token_expiration(self, api_client: APIClient, auth_token):
"""Token 过期测试"""
# 模拟Token过期
expired_token = self._generate_expired_token(auth_token)
response = api_client.get("/users/profile", headers={
"Authorization": f"Bearer {expired_token}"
})
assert response.status_code == 401
assert "token expired" in response.json().get("message", "").lower()
@pytest.mark.security
@pytest.mark.ai_generated
def test_token_tampering(self, api_client: APIClient, auth_token):
"""Token 篡改测试"""
tampered_token = auth_token[:-4] + "0000" # 篡改签名
response = api_client.get("/users/profile", headers={
"Authorization": f"Bearer {tampered_token}"
})
assert response.status_code == 401
@pytest.mark.security
@pytest.mark.ai_generated
def test_privilege_escalation(self, api_client: APIClient, user_token, admin_endpoint):
"""权限提升测试"""
response = api_client.get(admin_endpoint, headers={
"Authorization": f"Bearer {user_token}"
})
assert response.status_code == 403
@pytest.mark.security
@pytest.mark.ai_generated
def test_sql_injection(self, api_client: APIClient):
"""SQL注入测试"""
malicious_inputs = [
"'; DROP TABLE users; --",
"' OR '1'='1",
"admin'--",
"1; EXEC xp_cmdshell('dir')",
]
for payload in malicious_inputs:
response = api_client.get(f"/users/search?q={payload}")
# 应该返回错误而不是执行SQL
assert response.status_code in [400, 403, 500]
assert "DROP TABLE" not in response.text
assert "xp_cmdshell" not in response.text
@pytest.mark.security
@pytest.mark.ai_generated
def test_xss_attacks(self, api_client: APIClient):
"""XSS攻击测试"""
xss_payloads = [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"javascript:alert('XSS')",
"<svg onload=alert('XSS')>",
]
for payload in xss_payloads:
response = api_client.post("/users/profile", data={
"bio": payload
}, headers={"Authorization": f"Bearer {auth_token}"})
# 应该转义或过滤XSS payload
assert "<script>" not in response.text
assert "onerror" not in response.text
assert "javascript:" not in response.text
10. 团队协作与知识管理
10.1 测试知识库
# ai/knowledge_base.py
import json
import os
from datetime import datetime
from typing import List, Dict
import faiss # 向量数据库
class TestKnowledgeBase:
"""测试知识库"""
def __init__(self, knowledge_dir: str = "knowledge_base"):
self.knowledge_dir = knowledge_dir
self.vector_db = None
self.knowledge_entries = []
self._load_knowledge()
def _load_knowledge(self):
"""加载知识库"""
knowledge_file = os.path.join(self.knowledge_dir, "knowledge.json")
if os.path.exists(knowledge_file):
with open(knowledge_file, 'r') as f:
self.knowledge_entries = json.load(f)
def add_knowledge(self, category: str, content: str, metadata: Dict = None):
"""
添加知识条目
"""
entry = {
"id": f"{category}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"category": category,
"content": content,
"metadata": metadata or {},
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat()
}
self.knowledge_entries.append(entry)
self._save_knowledge()
def search_knowledge(self, query: str, top_k: int = 5) -> List[Dict]:
"""
搜索相关知识
"""
# 使用向量搜索
if self.vector_db:
# 向量搜索实现
pass
# 简单关键词搜索
results = []
for entry in self.knowledge_entries:
if query.lower() in entry["content"].lower():
results.append(entry)
return results[:top_k]
def get_test_patterns(self, category: str) -> List[Dict]:
"""
获取测试模式
"""
return [
entry for entry in self.knowledge_entries
if entry["category"] == category
]
def _save_knowledge(self):
"""保存知识库"""
os.makedirs(self.knowledge_dir, exist_ok=True)
knowledge_file = os.path.join(self.knowledge_dir, "knowledge.json")
with open(knowledge_file, 'w') as f:
json.dump(self.knowledge_entries, f, indent=2, ensure_ascii=False)
10.2 团队协作规范
# .team/rules.yml
# 团队协作规范
communication:
# 测试用例评审
case_review:
required_reviewers: 2
approval_threshold: 1
review_timeframe: "24h"
# 缺陷报告
defect_reporting:
severity_levels:
- critical: "系统崩溃/数据丢失"
- major: "核心功能不可用"
- minor: "一般功能异常"
- trivial: "界面/体验问题"
required_fields:
- title
- description
- steps_to_reproduce
- expected_result
- actual_result
- environment
- severity
- priority
- attachments
# 测试报告
test_reporting:
frequency: "每次发布"
format: "HTML + JSON"
distribution:
- dev_team
- qa_team
- product_team
- management
quality_gates:
# 质量门禁
gates:
- name: "单元测试覆盖率"
threshold: 80%
action: "block"
- name: "API测试通过率"
threshold: 95%
action: "block"
- name: "UI测试通过率"
threshold: 90%
action: "block"
- name: "性能测试达标率"
threshold: 100%
action: "block"
- name: "安全扫描通过率"
threshold: 100%
action: "block"
knowledge_sharing:
# 知识共享
sessions:
- name: "测试技术分享"
frequency: "每周"
duration: "1h"
- name: "缺陷分析会"
frequency: "每次发布后"
duration: "30min"
- name: "测试策略讨论"
frequency: "每月"
duration: "2h"
# 文档维护
documentation:
- name: "测试用例文档"
owner: "QA Team"
frequency: "每次用例变更"
- name: "测试环境文档"
owner: "DevOps"
frequency: "每次环境变更"
- name: "测试报告模板"
owner: "QA Lead"
frequency: "每季度"
11. 实际案例
11.1 电商系统测试案例
# tests/integration/test_e2e_flows.py
import pytest
import time
from utils.api_client import APIClient
from pages.login_page import LoginPage
from pages.dashboard_page import DashboardPage
from pages.order_page import OrderPage
class TestECommerceFlows:
"""电商系统端到端测试"""
@pytest.mark.integration
@pytest.mark.ai_generated
def test_complete_purchase_flow(self, api_client: APIClient, page):
"""完整购物流程测试"""
# 1. 用户登录
login_page = LoginPage(page)
login_page.login("test_user", "Test@123456")
assert login_page.is_login_successful()
# 2. 浏览商品
dashboard = DashboardPage(page)
products = dashboard.get_product_list()
assert len(products) > 0
# 3. 加入购物车
product = products[0]
dashboard.add_to_cart(product['id'], quantity=2)
assert dashboard.get_cart_count() == 2
# 4. 结算
order_page = OrderPage(page)
order_page.go_to_checkout()
order_page.fill_shipping_info({
"name": "测试用户",
"address": "北京市朝阳区",
"phone": "13812345678"
})
order_page.select_payment_method("alipay")
order_page.confirm_order()
# 5. 支付
order_page.make_payment()
assert order_page.is_payment_successful()
# 6. 验证订单
order_info = order_page.get_order_info()
assert order_info['status'] == 'paid'
assert order_info['amount'] == product['price'] * 2
@pytest.mark.integration
@pytest.mark.ai_generated
def test_return_refund_flow(self, api_client: APIClient, auth_token):
"""退货退款流程测试"""
# 1. 创建订单
order_resp = api_client.post("/orders", data={
"user_id": 1,
"amount": 100.00,
"currency": "CNY",
"items": [{"product_id": 1, "quantity": 1, "price": 100.00}]
}, headers={"Authorization": f"Bearer {auth_token}"})
order_id = order_resp.json()["id"]
# 2. 支付订单
api_client.post(f"/orders/{order_id}/pay", data={
"payment_method": "alipay",
"amount": 100.00
}, headers={"Authorization": f"Bearer {auth_token}"})
# 3. 申请退货
return_resp = api_client.post(f"/orders/{order_id}/return", data={
"reason": "商品质量问题",
"description": "商品有破损",
"images": ["image1.jpg", "image2.jpg"]
}, headers={"Authorization": f"Bearer {auth_token}"})
return_id = return_resp.json()["id"]
assert return_resp.json()["status"] == "pending"
# 4. 商家审核
api_client.post(f"/returns/{return_id}/approve", data={
"reason": "同意退货"
}, headers={"Authorization": f"Bearer {admin_token}"})
# 5. 退款
api_client.post(f"/returns/{return_id}/refund", data={
"amount": 100.00,
"reason": "商品退款"
}, headers={"Authorization": f"Bearer {admin_token}"})
# 6. 验证退款
refund_resp = api_client.get(f"/returns/{return_id}", headers={
"Authorization": f"Bearer {auth_token}"
})
assert refund_resp.json()["status"] == "refunded"
assert refund_resp.json()["refund_amount"] == 100.00
11.2 性能测试案例
# tests/performance/test_api_performance.py
import pytest
import time
import asyncio
from k6 import http, check, group
class TestAPIPerformance:
"""API性能测试"""
@pytest.mark.performance
@pytest.mark.ai_generated
def test_api_response_time(self, api_client: APIClient):
"""API响应时间测试"""
# 基准测试
start_time = time.time()
response = api_client.get("/api/v1/users")
end_time = time.time()
response_time = end_time - start_time
assert response_time < 1.0, f"响应时间过长: {response_time}s"
# 压力测试
concurrent_requests = 100
tasks = [
asyncio.create_task(self._make_request(api_client))
for _ in range(concurrent_requests)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 分析结果
success_count = sum(1 for r in results if isinstance(r, dict))
failure_count = len(results) - success_count
assert success_count / concurrent_requests >= 0.95, \
f"成功率过低: {success_count/concurrent_requests*100}%"
async def _make_request(self, api_client: APIClient):
"""并发请求"""
try:
response = await api_client.get("/api/v1/users")
return {"status": response.status_code, "time": response.elapsed.total_seconds()}
except Exception as e:
return {"error": str(e)}
@pytest.mark.performance
@pytest.mark.ai_generated
def test_database_performance(self, db_client):
"""数据库性能测试"""
# 查询性能
start_time = time.time()
result = db_client.execute_query("SELECT * FROM users WHERE status = 'active'")
end_time = time.time()
query_time = end_time - start_time
assert query_time < 0.5, f"查询时间过长: {query_time}s"
# 批量插入性能
test_data = [
{"username": f"user_{i}", "email": f"user_{i}@test.com"}
for i in range(1000)
]
start_time = time.time()
db_client.batch_insert("users", test_data)
end_time = time.time()
insert_time = end_time - start_time
assert insert_time < 5.0, f"批量插入时间过长: {insert_time}s"
12. 成本优化
12.1 AI API 调用优化
# config/ai_optimization.py
from dataclasses import dataclass
from typing import Dict, Optional
import json
import os
import time
@dataclass
class AICallOptimization:
"""AI API 调用优化配置"""
# 缓存策略
cache_config: Dict = field(default_factory=lambda: {
"enabled": True,
"cache_dir": ".ai_cache",
"ttl": 3600, # 1小时
"max_size": "1GB"
})
# 批处理策略
batch_config: Dict = field(default_factory=lambda: {
"enabled": True,
"batch_size": 10,
"timeout": 30,
"max_retries": 3
})
# 模型选择策略
model_config: Dict = field(default_factory=lambda: {
"default_model": "claude-4",
"light_model": "claude-3.5-sonnet",
"heavy_model": "claude-opus",
"fallback_model": "claude-3.5-sonnet"
})
# 成本控制
cost_config: Dict = field(default_factory=lambda: {
"daily_budget": 100, # 每天预算
"max_calls_per_day": 1000,
"cost_alert_threshold": 0.8 # 80%预算时告警
})
def get_optimized_model(self, task_type: str) -> str:
"""
根据任务类型选择最优模型
"""
model_mapping = {
"test_generation": "claude-4", # 测试生成 - 需要创造力
"code_analysis": "claude-4", # 代码分析 - 需要理解力
"result_analysis": "claude-4", # 结果分析 - 需要推理能力
"data_generation": "claude-3.5-sonnet", # 数据生成 - 可以用轻量模型
"simple_validation": "claude-3.5-sonnet", # 简单验证 - 轻量模型
"complex_reasoning": "claude-opus", # 复杂推理 - 重型模型
}
return model_mapping.get(task_type, self.model_config["default_model"])
def estimate_cost(self, task_type: str, token_count: int) -> float:
"""
估算AI API调用成本
"""
pricing = {
"claude-4": {"input": 0.01, "output": 0.03}, # 每1K tokens
"claude-3.5-sonnet": {"input": 0.003, "output": 0.01},
"claude-opus": {"input": 0.015, "output": 0.075},
}
model = self.get_optimized_model(task_type)
prices = pricing.get(model, pricing["claude-4"])
input_tokens = int(token_count * 0.7)
output_tokens = int(token_count * 0.3)
cost = (input_tokens / 1000 * prices["input"]) + (output_tokens / 1000 * prices["output"])
return cost
def should_use_cache(self, prompt: str) -> bool:
"""
判断是否应该使用缓存
"""
# 简单查询使用缓存
if len(prompt) < 500:
return True
# 复杂查询不使用缓存
if len(prompt) > 5000:
return False
# 包含动态内容不使用缓存
dynamic_indicators = ["current", "today", "now", "latest"]
if any(indicator in prompt.lower() for indicator in dynamic_indicators):
return False
return True
12.2 测试执行成本优化
# utils/cost_optimizer.py
import json
import os
from datetime import datetime, timedelta
from typing import List, Dict
import statistics
class TestCostOptimizer:
"""测试执行成本优化器"""
def __init__(self, cost_history_file: str = "cost_history.json"):
self.cost_history_file = cost_history_file
self.cost_history = self._load_cost_history()
def analyze_test_execution_cost(self, test_results: List[TestResult]) -> Dict:
"""
分析测试执行成本
"""
total_time = sum(r.duration for r in test_results)
total_tests = len(test_results)
avg_time = total_time / total_tests if total_tests > 0 else 0
# 识别高成本测试
expensive_tests = [
r for r in test_results if r.duration > avg_time * 2
]
# 识别低价值测试
low_value_tests = [
r for r in test_results
if r.status == "PASSED" and r.duration < avg_time * 0.1
]
return {
"total_time": total_time,
"total_tests": total_tests,
"avg_time_per_test": avg_time,
"expensive_tests": expensive_tests,
"low_value_tests": low_value_tests,
"optimization_suggestions": self._generate_optimization_suggestions(
expensive_tests, low_value_tests
)
}
def optimize_test_execution_order(self, test_results: List[TestResult]) -> List[str]:
"""
优化测试执行顺序
"""
# 按执行时间排序
sorted_tests = sorted(test_results, key=lambda r: r.duration)
# 快速测试优先
fast_tests = [r.test_id for r in sorted_tests[:len(sorted_tests)//2]]
slow_tests = [r.test_id for r in sorted_tests[len(sorted_tests)//2:]]
# 混合执行策略
optimized_order = []
for i in range(len(fast_tests)):
optimized_order.append(fast_tests[i])
if i < len(slow_tests):
optimized_order.append(slow_tests[i])
return optimized_order
def _generate_optimization_suggestions(self, expensive_tests, low_value_tests) -> List[str]:
"""
生成优化建议
"""
suggestions = []
if expensive_tests:
suggestions.append(
f"发现 {len(expensive_tests)} 个高成本测试,建议优化执行逻辑"
)
if low_value_tests:
suggestions.append(
f"发现 {len(low_value_tests)} 个低价值测试,建议合并或移除"
)
# 并行执行建议
if len(expensive_tests) > 1:
suggestions.append(
"建议对高成本测试使用并行执行"
)
# 缓存建议
if len(low_value_tests) > 5:
suggestions.append(
"建议对低价值测试使用结果缓存"
)
return suggestions
def _load_cost_history(self) -> List[Dict]:
"""加载成本历史"""
if os.path.exists(self.cost_history_file):
with open(self.cost_history_file, 'r') as f:
return json.load(f)
return []
def save_cost_history(self, cost_data: Dict):
"""保存成本历史"""
self.cost_history.append(cost_data)
with open(self.cost_history_file, 'w') as f:
json.dump(self.cost_history, f, indent=2)
总结
这份基于 Claude 的 AI 自动化测试方案涵盖了:
✅ 核心能力
- 智能测试用例生成 - 从需求、代码变更、API Schema 自动生成测试
- 多维度测试覆盖 - API、UI、性能、安全、集成测试全覆盖
- 智能执行引擎 - 并行执行、智能重试、动态优先级
- AI 结果分析 - 根因分析、自然语言报告、稳定性预测
- 自愈机制 - 自动修复不稳定测试、环境自愈
✅ 工程实践
- CI/CD 集成 - GitHub Actions、Jenkins 完整流水线
- 质量门禁 - 覆盖率、通过率、性能、安全多重门禁
- 成本优化 - AI API 调用优化、测试执行优化
- 团队协作 - 知识库、评审流程、文档规范
✅ 最佳实践
- 渐进式实施 - 从 API 测试开始,逐步扩展到 UI、性能、安全
- 人机协同 - AI 生成 + 人工审核 + 持续优化
- 数据驱动 - 测试数据与用例分离,支持动态生成
- 可追溯性 - 每个测试用例可追溯到需求、代码、缺陷
🚀 实施建议
- Phase 1 - 搭建基础框架,实现 API 自动化测试
- Phase 2 - 集成 AI 用例生成,提升测试覆盖率
- Phase 3 - 添加 UI、性能、安全测试,完善质量门禁
- Phase 4 - 优化执行效率,实现自愈机制
- Phase 5 - 建立知识库,形成持续改进闭环
- Phase 6 - 扩展到全公司,形成标准化体系
- Phase 7 - 引入机器学习,实现预测性测试
- Phase 8 - 建立 AI 测试中台,赋能所有团队
这个方案已经非常详细了,涵盖了从架构设计到具体实现的各个方面。你可以根据实际项目需求进行调整和扩展。
更多推荐



所有评论(0)