大模型能力评测实战指南:基于MMLU数据集的DeepSeek API全维度测试方法
本文完整公开了一套开源的自动化测试解决方案,专为评估大语言模型API性能而设计。通过MMLU(Massive Multitask Language Understanding)专业测评数据集,开发者可以:工具亮点特性✅ 多维度指标实时监控(准确率/耗时/Token经济性)✅ 智能失败重试与错误隔离机制✅ 自动生成可视化测试报告✅ 灵活支持主流API接口配置✅ 开源可扩展的模块化架构应用场景示例本方
内容简介:
本文完整公开了一套开源的自动化测试解决方案,专为评估大语言模型API性能而设计。通过MMLU(Massive Multitask Language Understanding)专业测评数据集,开发者可以:
- 核心验证能力
- 精准检测API是否为完整能力版本(“满血版”)
- 量化分析响应延迟与吞吐性能
- 深度追踪token消耗与推理效率
- 稳定性压力测试与异常容错验证
-
工具亮点特性
✅ 多维度指标实时监控(准确率/耗时/Token经济性)
✅ 智能失败重试与错误隔离机制
✅ 自动生成可视化测试报告
✅ 灵活支持主流API接口配置
✅ 开源可扩展的模块化架构 -
应用场景示例
- 企业选型时验证不同API服务商的实际性能
- 开发者调试提示工程(prompt engineering)效果
- 追踪模型版本迭代中的能力波动
- 学术研究中的可复现性测试
本方案已集成对DeepSeek系列模型的专项测试模块,同时兼容OpenAI、Claude等主流API架构。所有代码与测试数据集已开源。
MMLU数据集详解与获取指南
▍MMLU数据集核心价值
MMLU(Massive Multitask Language Understanding)由UC Berkeley团队开发,是当前最权威的大模型综合能力评估基准之一。该数据集具有以下核心特征:
-
学科全景覆盖
涵盖数学、物理、法律、伦理等57个学科领域,全面检验模型的跨领域知识整合能力 -
专业级难度设计
所有题目均来自真实考试题库(如GRE、LSAT等)和专业学术资源,平均准确率低于60%的模型不具备实用价值 -
科学评价维度
通过四选一选择题形式,精确量化模型在零样本(zero-shot)和少样本(few-shot)场景下的表现
方式一:HuggingFace官方接口
from datasets import load_dataset
# 指定自定义存储路径(解决国内下载慢问题)
custom_cache_dir = "/path/to/your/custom_cache"
# 全量数据加载(需要30GB+内存)
mmlu_full = load_dataset("cais/mmlu", "all", cache_dir=custom_cache_dir)
# 推荐:按学科加载(示例加载STEM类科目)
stem_subjects = ["physics", "chemistry", "math"]
mmlu_stem = load_dataset("cais/mmlu", stem_subjects, cache_dir=custom_cache_dir)
方式二:原始数据直连下载
数据源:Hendrycks Research Data
文件结构说明:
data/
├── test/ # 测试集(无标签)
│ ├── astronomy_test.csv
│ └── ...其他学科
├── val/ # 验证集
│ ├── astronomy_val.csv
│ └── ...
└── dev/ # 训练集
├── astronomy_dev.csv
└── ...
测试代码
import os
import csv
import time
import json
import argparse
from tqdm import tqdm
from openai import OpenAI
from datetime import datetime
def initialize_environment(config):
os.makedirs(config.get("output_dir"), exist_ok=True)
def sanitize_filename(filename):
"""清理文件名中的特殊字符"""
return "".join(c if c.isalnum() or c in (' ', '_') else '_' for c in filename)
def load_dataset(file_path, config):
"""加载并验证数据集"""
questions = []
try:
with open(file_path, "r", encoding="utf-8") as f:
reader = csv.reader(f)
for row_num, row in enumerate(reader, 1):
if len(row) < 6:
continue
# 数据验证
answer = row[5].strip().upper()
if answer not in {"A", "B", "C", "D"}:
continue
# 构建问题对象
question = {
"question": row[0].strip(),
"choices": [row[i].strip() for i in range(1, 5)],
"answer": answer
}
questions.append(question)
# 限制最大数量
if len(questions) >= config.get("max_questions"):
break
except Exception as e:
print(f"加载文件 {file_path} 失败: {str(e)}")
return questions
def format_prompt(question):
"""生成标准化的问题提示"""
choices = "\n".join(
f"{chr(65 + i)}. {choice}"
for i, choice in enumerate(question['choices'])
)
return (
"请回答以下选择题,只需给出选项字母(A/B/C/D):\n"
f"题目:{question['question']}\n"
f"选项:\n{choices}\n"
"答案:"
)
def query_model_retries(prompt, config):
"""带重试机制的API调用"""
max_retries = 3
for attempt in range(max_retries):
try:
client = OpenAI(api_key=config.get("api_key"), base_url=config.get("base_url"))
start_time = time.time()
response = client.chat.completions.create(
model=config.get("model_name"),
messages=[{"role": "user", "content": prompt}],
temperature=config.get("temperature"),
max_tokens=1024
)
elapsed_time = time.time() - start_time
prompt_tokens = response.usage.prompt_tokens
completion_tokens = response.usage.completion_tokens
total_tokens = response.usage.total_tokens
try:
reasoning_tokens = response.usage.completion_tokens_details.reasoning_tokens
except:
reasoning_tokens = 0
speed = completion_tokens / elapsed_time
tokens_data = [prompt_tokens, completion_tokens, total_tokens, reasoning_tokens, speed]
return response.choices[0].message.content.strip(), elapsed_time, tokens_data
except Exception as e:
if attempt < max_retries - 1:
wait = 2 ** attempt
print(f"请求失败,{wait}秒后重试... ({str(e)})")
time.sleep(wait)
else:
print(f"API调用失败: {str(e)}")
return "ERROR", 0.0, [0, 0, 0, 0, 0]
def query_model(prompt, config):
try:
client = OpenAI(api_key=config.get("api_key"), base_url=config.get("base_url"))
start_time = time.time()
response = client.chat.completions.create(
model=config.get("model_name"),
messages=[{"role": "user", "content": prompt}],
temperature=config.get("emperature"),
max_tokens=1024
)
elapsed_time = time.time() - start_time
prompt_tokens = response.usage.prompt_tokens
completion_tokens = response.usage.completion_tokens
total_tokens = response.usage.total_tokens
try:
reasoning_tokens = response.usage.completion_tokens_details.reasoning_tokens
except:
reasoning_tokens = 0
speed = completion_tokens / elapsed_time
tokens_data = [prompt_tokens, completion_tokens, total_tokens, reasoning_tokens, speed]
return response.choices[0].message.content.strip(), elapsed_time, tokens_data
except Exception as e:
print(f"API调用失败: {str(e)}")
return "ERROR", 0.0, [0, 0, 0, 0, 0]
def parse_answer(response):
"""增强的回答解析逻辑"""
clean_response = response.strip().upper()
# 处理带括号的情况
if '(' in clean_response and ')' in clean_response:
last_bracket = clean_response.rfind(')')
clean_response = clean_response[last_bracket + 1:].strip()
# 查找第一个有效选项
for char in clean_response:
if char in {"A", "B", "C", "D"}:
return char
return "INVALID"
def process_question(q, config):
"""处理单个问题"""
prompt = format_prompt(q)
response, elapsed_time, tokens_data = query_model(prompt, config)
model_answer = parse_answer(response) if response != "ERROR" else "ERROR"
prompt_tokens, completion_tokens, total_tokens, reasoning_tokens, speed = tokens_data
time.sleep(config.get("request_interval"))
return {
"timestamp": datetime.now().isoformat(),
"question": q["question"],
"choices": json.dumps(q["choices"], ensure_ascii=False),
"correct_answer": q["answer"],
"model_answer": model_answer,
"response_time": elapsed_time,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
"reasoning_tokens": reasoning_tokens,
"speed": speed,
}
def process_single_file(file_path, config):
"""处理单个测试文件"""
base_name = os.path.splitext(os.path.basename(file_path))[0]
safe_name = sanitize_filename(base_name)
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
log_filename = f"log_{safe_name}_{timestamp}.csv"
log_path = os.path.join(config.get("output_dir"), log_filename)
questions = load_dataset(file_path, config)
if not questions:
print(f"文件 {base_name} 无有效数据,跳过处理")
return {"error": "No valid data"}
# 进度条设置
progress = tqdm(
total=len(questions),
desc=f"处理 {safe_name[:15]:<15}",
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]")
# 准备并发任务
with open(log_path, "w", newline="", encoding="utf-8") as log_file:
writer = csv.DictWriter(log_file, fieldnames=[
"timestamp", "question", "choices",
"correct_answer", "model_answer", "response_time", "prompt_tokens", "completion_tokens", "total_tokens",
"reasoning_tokens", "speed"
])
writer.writeheader()
for q in questions:
result = process_question(q, config)
writer.writerow(result)
log_file.flush()
progress.update(1)
return log_path
def analyze_log(log_path):
"""增强的日志分析(带数据校验)"""
stats = {
"total": 0,
"correct": 0,
"error": 0,
"fail": 0,
"total_time": 0.0,
"response_times": [],
"response_time": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
"reasoning_tokens": 0,
"speed": 0.0,
}
try:
with open(log_path, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
# 数据校验
if not all(key in row for key in ["correct_answer", "model_answer", "response_time"]):
print(f"日志文件 {log_path} 格式错误,跳过分析")
return {}
stats["total"] += 1
stats["total_time"] += float(row["response_time"])
stats["response_times"].append(float(row["response_time"]))
if row["model_answer"] == "ERROR":
stats["fail"] += 1
elif row["model_answer"] == row["correct_answer"]:
stats["correct"] += 1
else:
stats["error"] += 1
stats["prompt_tokens"] += int(row["prompt_tokens"])
stats["completion_tokens"] += int(row["completion_tokens"])
stats["total_tokens"] += int(row["total_tokens"])
stats["reasoning_tokens"] += int(row["reasoning_tokens"])
stats["speed"] += float(row["speed"])
# 计算百分位数
if stats["response_times"]:
sorted_times = sorted(stats["response_times"])
stats["p95"] = sorted_times[int(len(sorted_times) * 0.95)]
stats["p99"] = sorted_times[int(len(sorted_times) * 0.99)]
else:
stats["p95"] = stats["p99"] = 0.0
# stats["speed"] = stats["speed"] / stats["total"]
stats["speed"] = stats["completion_tokens"] / stats["total_time"]
except Exception as e:
print(f"分析日志 {log_path} 失败: {str(e)}")
return {}
return stats
def generate_report_content(subject, timestamp, stats):
"""生成报告内容模板"""
if stats["total"] == 0:
return f"# {subject} 测试报告\n\n## 错误\n无有效测试数据"
return f"""# {subject} 测试报告
## 基本信息
- 测试文件: {subject}
- 测试时间: {timestamp}
- 总题数: {stats["total"]}
## 测试结果
| 指标 | 数值 | 占比 |
|--------------|------------|----------|
| 正确回答 | {stats["correct"]:6} | {stats["correct"] / stats["total"] * 100:5.1f}% |
| 错误回答 | {stats["error"]:6} | {stats["error"] / stats["total"] * 100:5.1f}% |
| 回答失败 | {stats["fail"]:6} | {stats["fail"] / stats["total"] * 100:5.1f}% |
## 响应时间分析
- 平均用时: {stats["total_time"] / stats["total"]:.2f}s
- 95%请求耗时: {stats["p95"]:.2f}s
- 99%请求耗时: {stats["p99"]:.2f}s
## Token使用总览
- 总Prompt Tokens: {stats['prompt_tokens']}
- 总Completion Tokens: {stats['completion_tokens']}
- 总Reasoning Tokens: {stats['reasoning_tokens']}
- 总Tokens消耗: {stats['total_tokens']}
- 整体平均速度: {stats['speed']:.2f} tokens/秒
"""
def extract_category_datetime(path):
# 提取文件名(去掉路径)
filename = path.split('\\')[-1]
# 去除前缀和后缀
clean = filename.replace("log_", "").replace(".csv", "")
# 分割类别和日期时间部分
category, datetime_str = clean.rsplit('_', 1)
# 解析日期时间
date_part = datetime_str[:8]
hour_min = datetime_str[8:12] # 提取小时和分钟
# 格式化日期时间
formatted_time = f"{date_part[:4]}-{date_part[4:6]}-{date_part[6:8]} {hour_min[:2]}:{hour_min[2:4]}"
return category, formatted_time, datetime_str
def generate_sub_reports(log_files, config):
"""批量生成分报告(带错误处理)"""
reports = []
for log_path in tqdm(log_files, desc="生成分报告"):
subject, timestamp, datetime_str = extract_category_datetime(log_path)
stats = analyze_log(log_path)
if not stats:
continue
report_path = os.path.join(
config.get("output_dir"),
f"report_{subject}_{datetime_str}.md"
)
try:
content = generate_report_content(
subject,
timestamp,
stats
)
with open(report_path, "w", encoding="utf-8") as f:
f.write(content)
reports.append({
"subject": subject,
"stats": stats,
"report_path": report_path
})
except Exception as e:
print(f"生成报告失败: {subject} - {str(e)}")
return reports
def format_seconds(seconds):
# 计算各时间单位
hours = int(seconds // 3600)
remaining = seconds % 3600
minutes = int(remaining // 60)
secs = remaining % 60
# 强制显示所有单位
return f"{hours}小时{minutes}分钟{secs:.2f}秒"
def generate_summary_report(reports, config):
"""生成增强的总报告"""
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
report_path = os.path.join(config.get("output_dir"), f"summary_report_{timestamp}.md")
total_stats = {
"total": 0,
"correct": 0,
"error": 0,
"fail": 0,
"total_time": 0.0,
"response_times": [],
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
"reasoning_tokens": 0,
"avg_speed": 0.0,
}
speed_samples = []
# 汇总数据
for report in reports:
if "stats" not in report:
continue
s = report["stats"]
total_stats["total"] += s["total"]
total_stats["correct"] += s["correct"]
total_stats["error"] += s["error"]
total_stats["fail"] += s["fail"]
total_stats["total_time"] += s["total_time"]
total_stats["response_times"].extend(s["response_times"])
total_stats["prompt_tokens"] += s["prompt_tokens"]
total_stats["completion_tokens"] += s["completion_tokens"]
total_stats["total_tokens"] += s["total_tokens"]
speed_samples.append(s["speed"])
total_question = total_stats["total"]
total_time = total_stats["total_time"]
# if speed_samples:
# total_stats["avg_speed"] = sum(speed_samples) / total_stats["total"]
total_stats["avg_speed"] = total_stats["completion_tokens"] / total_stats["total_time"]
# 生成报告内容
content = [
"# 综合测试报告",
"\n## 基本信息",
f"- 总体统计 ({len(reports)}个测试集)",
f"- 总体统计 ({total_question}个题目)",
f"- 总体统计响应时间: {format_seconds(total_time)}"
"\n## Token使用总览",
f"- 总Prompt Tokens: {total_stats['prompt_tokens']}",
f"- 总Completion Tokens: {total_stats['completion_tokens']}",
f"- 总Reasoning Tokens: {total_stats['reasoning_tokens']}",
f"- 总Tokens消耗: {total_stats['total_tokens']}",
f"- 整体平均速度: {total_stats['avg_speed']:.2f} tokens/秒",
]
if total_stats["total"] > 0:
# 响应时间分析
sorted_times = sorted(total_stats["response_times"])
p95 = sorted_times[int(len(sorted_times) * 0.95)] if sorted_times else 0
p99 = sorted_times[int(len(sorted_times) * 0.99)] if sorted_times else 0
content.extend([
f"\n### 正确率分布",
f"- 总正确率: {total_stats['correct'] / total_stats['total'] * 100:.1f}%",
f"- 总错误率: {total_stats['error'] / total_stats['total'] * 100:.1f}%",
f"- 总失败率: {total_stats['fail'] / total_stats['total'] * 100:.1f}%",
f"\n### 性能指标",
f"- 平均响应时间: {total_stats['total_time'] / total_stats['total']:.2f}s",
f"- 95%请求耗时: {p95:.2f}s",
f"- 99%请求耗时: {p99:.2f}s",
"\n## 分科详情(按正确率排序)"
])
# 排序科目
sorted_reports = sorted(
[r for r in reports if "stats" in r],
key=lambda x: x["stats"]["correct"] / x["stats"]["total"],
reverse=True
)
for report in sorted_reports:
s = report["stats"]
content.append(
f"### {report['subject']}\n"
f"- 正确率: {s['correct'] / s['total'] * 100:.1f}% "
f"(正确{s['correct']}/错误{s['error']}/失败{s['fail']})\n"
f"- 平均用时: {s['total_time'] / s['total']:.2f}s"
)
else:
content.append("\n## 警告\n无有效测试数据")
# 写入文件
with open(report_path, "w", encoding="utf-8") as f:
f.write('\n'.join(content))
return report_path
def Reasoning_Test(config):
initialize_environment(config)
test_files = [os.path.join(config.get("data_path"), f)
for f in os.listdir(config.get("data_path")) if f.endswith(".csv")]
# 第一阶段:处理所有文件
with tqdm(total=len(test_files), desc="处理数据文件") as pbar:
for fp in test_files:
log_path = process_single_file(fp, config)
print(f"\n日志文件已保存: {log_path}")
pbar.update(1)
def Results_Summary(config):
log_files = [os.path.join(config.get("output_dir"), f)
for f in os.listdir(config.get("output_dir")) if f.endswith(".csv")]
# 第二阶段:生成报告
if log_files:
sub_reports = generate_sub_reports(log_files, config)
if sub_reports:
summary_path = generate_summary_report(sub_reports, config)
print(f"\n所有测试完成!总报告路径: {summary_path}")
else:
print("\n无法生成报告:无有效日志数据")
else:
print("\n未生成任何日志文件")
def parse_opt(known=False):
parser = argparse.ArgumentParser()
parser.add_argument('--model', type=str, default=1, help='initial weights path')
opt = parser.parse_known_args()[0] if known else parser.parse_args()
return opt
if __name__ == '__main__':
config = {'model_name': 'deepseek-r1:1.5b',
'api_key': 'ollama',
'base_url': 'http://localhost:11434/v1',
'max_questions': 500, 'temperature': 0.3, 'request_interval': 1,
'data_path': 'J:\\huggingface_data\\mmlu\\data\\try', 'output_dir': 'J:\\huggingface_data\\LOG\\ollama'}
print("程序开始时间:", datetime.now())
print("开始处理", config.get("output_dir"))
print("开始处理", config.get("data_path"))
Reasoning_Test(config)
Results_Summary(config)
该代码实现了一个基于OpenAI API的多学科选择题自动化测试框架,主要包含数据处理、模型交互、结果分析三大核心模块。以下为关键功能解析:
一、核心功能架构
-
测试执行模块(Reasoning_Test)
- 初始化环境:创建输出目录(
initialize_environment
) - 批量处理CSV数据集:支持多文件并发处理(
process_single_file
) - 问题加载:验证数据有效性,过滤非法选项(
load_dataset
) - API调用封装:实现带指数退避的重试机制(
query_model_retries
)
- 初始化环境:创建输出目录(
-
数据分析模块(Results_Summary)
- 日志解析:计算准确率、响应时间分布、Token消耗等指标(
analyze_log
) - 报告生成:输出分科目测试报告(
generate_sub_reports
)和综合汇总报告(generate_summary_report
)
- 日志解析:计算准确率、响应时间分布、Token消耗等指标(
-
辅助功能组件
- 文件名净化处理(
sanitize_filename
) - 提示词标准化生成(
format_prompt
) - 答案解析强化逻辑(
parse_answer
)
- 文件名净化处理(
二、关键技术实现
- 数据处理流
- CSV文件解析时执行严格校验:过滤选项不足4项、答案非A/B/C/D的问题
- 支持最大问题数限制(
max_questions
配置项) - 采用UTF-8编码处理多语言数据
- 模型交互优化
- 请求间隔控制(
request_interval
参数) - Token消耗统计:区分基础token与推理专用token(
reasoning_tokens
)
- 性能监控体系
- 响应时间百分位统计(P95/P99)
- 吞吐量计算:tokens/秒统计
- 多维资源监控:包含Prompt/Completion/Total Tokens
三、扩展性设计
- 配置驱动架构
- 通过字典参数(config)集中管理:
config = {
'model_name': 'deepseek-r1:1.5b', # 模型标识
'api_key': 'ollama', # API认证密钥
'base_url': 'http://localhost:11434/v1', # API端点
'max_questions': 5, # 最大测试题量
'temperature': 0.3, # 模型温度参数
'request_interval': 1, # 请求间隔(秒)
'data_path': 'J:\\...', # 测试数据集路径
'output_dir': 'J:\\...' # 日志输出目录
}
- 模块化设计
- 各功能组件高度解耦,便于替换模型后端(如切换不同API供应商)
- 日志系统采用CSV格式存储,支持后续自定义分析工具接入
四、统计分析维度
指标类别 | 具体指标 |
---|---|
准确性指标 | 正确率、错误率、API失败率 |
性能指标 | 平均响应时间、P95/P99响应时间 |
资源消耗 | Prompt/Completion/Total Tokens |
吞吐量 | Tokens/秒 |
问题分布 | 分科目正确率排序 |
该框架通过系统化的测试流程设计,实现了从数据加载、模型交互到结果分析的全链路自动化,适用于大语言模型在标准化测试集上的性能评估与对比分析。代码中采用的异常处理机制和资源监控设计,可有效保障长时间批量测试的稳定性。
该实现方案为大规模模型评测提供了标准化框架,其模块化设计便于扩展至其他测评数据集,日志分析模块的设计尤其适合长期性能监控与对比实验。
DeepSeek官方api测试结果
选取high_school_computer_science_test.csv测试集进行测试,以下是计算结果,以供参考。
基本信息
- 总体统计 (1个测试集)
- 总体统计 (100个题目)
- 总体统计响应时间: 0小时48分钟45.48秒
Token使用总览
- 总Prompt Tokens: 13143
- 总Completion Tokens: 56644
- 总Reasoning Tokens: 56344
- 总Tokens消耗: 69787
- 整体平均速度: 19.36 tokens/秒
正确率分布
- 总正确率: 97.0%
- 总错误率: 3.0%
- 总失败率: 0.0%
性能指标
- 平均响应时间: 29.25s
- 95%请求耗时: 56.03s
- 99%请求耗时: 264.47s
分科详情(按正确率排序)
high_school_computer_science_test
- 正确率: 97.0% (正确97/错误3/失败0)
- 平均用时: 29.25s
更多推荐
所有评论(0)