内容简介:

本文完整公开了一套开源的自动化测试解决方案,专为评估大语言模型API性能而设计。通过MMLU(Massive Multitask Language Understanding)专业测评数据集,开发者可以:

  1. 核心验证能力
  • 精准检测API是否为完整能力版本(“满血版”)
  • 量化分析响应延迟与吞吐性能
  • 深度追踪token消耗与推理效率
  • 稳定性压力测试与异常容错验证
  1. 工具亮点特性
    ✅ 多维度指标实时监控(准确率/耗时/Token经济性)
    ✅ 智能失败重试与错误隔离机制
    ✅ 自动生成可视化测试报告
    ✅ 灵活支持主流API接口配置
    ✅ 开源可扩展的模块化架构

  2. 应用场景示例

  • 企业选型时验证不同API服务商的实际性能
  • 开发者调试提示工程(prompt engineering)效果
  • 追踪模型版本迭代中的能力波动
  • 学术研究中的可复现性测试

本方案已集成对DeepSeek系列模型的专项测试模块,同时兼容OpenAI、Claude等主流API架构。所有代码与测试数据集已开源。

MMLU数据集详解与获取指南

▍MMLU数据集核心价值

MMLU(Massive Multitask Language Understanding)由UC Berkeley团队开发,是当前最权威的大模型综合能力评估基准之一。该数据集具有以下核心特征:

  1. 学科全景覆盖
    涵盖数学、物理、法律、伦理等57个学科领域,全面检验模型的跨领域知识整合能力

  2. 专业级难度设计
    所有题目均来自真实考试题库(如GRE、LSAT等)和专业学术资源,平均准确率低于60%的模型不具备实用价值

  3. 科学评价维度
    通过四选一选择题形式,精确量化模型在零样本(zero-shot)和少样本(few-shot)场景下的表现

方式一:HuggingFace官方接口

from datasets import load_dataset

# 指定自定义存储路径(解决国内下载慢问题)
custom_cache_dir = "/path/to/your/custom_cache" 

# 全量数据加载(需要30GB+内存)
mmlu_full = load_dataset("cais/mmlu", "all", cache_dir=custom_cache_dir)

# 推荐:按学科加载(示例加载STEM类科目)
stem_subjects = ["physics", "chemistry", "math"]
mmlu_stem = load_dataset("cais/mmlu", stem_subjects, cache_dir=custom_cache_dir)

方式二:原始数据直连下载
数据源:Hendrycks Research Data

文件结构说明

data/
├── test/                # 测试集(无标签)
│   ├── astronomy_test.csv
│   └── ...其他学科
├── val/                 # 验证集
│   ├── astronomy_val.csv
│   └── ...
└── dev/                 # 训练集
    ├── astronomy_dev.csv
    └── ...

测试代码

import os
import csv
import time
import json
import argparse
from tqdm import tqdm
from openai import OpenAI
from datetime import datetime


def initialize_environment(config):
    os.makedirs(config.get("output_dir"), exist_ok=True)


def sanitize_filename(filename):
    """清理文件名中的特殊字符"""
    return "".join(c if c.isalnum() or c in (' ', '_') else '_' for c in filename)


def load_dataset(file_path, config):
    """加载并验证数据集"""
    questions = []
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            reader = csv.reader(f)
            for row_num, row in enumerate(reader, 1):
                if len(row) < 6:
                    continue

                # 数据验证
                answer = row[5].strip().upper()
                if answer not in {"A", "B", "C", "D"}:
                    continue

                # 构建问题对象
                question = {
                    "question": row[0].strip(),
                    "choices": [row[i].strip() for i in range(1, 5)],
                    "answer": answer
                }
                questions.append(question)

                # 限制最大数量
                if len(questions) >= config.get("max_questions"):
                    break
    except Exception as e:
        print(f"加载文件 {file_path} 失败: {str(e)}")
    return questions


def format_prompt(question):
    """生成标准化的问题提示"""
    choices = "\n".join(
        f"{chr(65 + i)}. {choice}"
        for i, choice in enumerate(question['choices'])
    )
    return (
        "请回答以下选择题,只需给出选项字母(A/B/C/D):\n"
        f"题目:{question['question']}\n"
        f"选项:\n{choices}\n"
        "答案:"
    )


def query_model_retries(prompt, config):
    """带重试机制的API调用"""
    max_retries = 3
    for attempt in range(max_retries):
        try:
            client = OpenAI(api_key=config.get("api_key"), base_url=config.get("base_url"))
            start_time = time.time()
            response = client.chat.completions.create(
                model=config.get("model_name"),
                messages=[{"role": "user", "content": prompt}],
                temperature=config.get("temperature"),
                max_tokens=1024
            )
            elapsed_time = time.time() - start_time
            prompt_tokens = response.usage.prompt_tokens
            completion_tokens = response.usage.completion_tokens
            total_tokens = response.usage.total_tokens
            try:
                reasoning_tokens = response.usage.completion_tokens_details.reasoning_tokens
            except:
                reasoning_tokens = 0
            speed = completion_tokens / elapsed_time
            tokens_data = [prompt_tokens, completion_tokens, total_tokens, reasoning_tokens, speed]
            return response.choices[0].message.content.strip(), elapsed_time, tokens_data
        except Exception as e:
            if attempt < max_retries - 1:
                wait = 2 ** attempt
                print(f"请求失败,{wait}秒后重试... ({str(e)})")
                time.sleep(wait)
            else:
                print(f"API调用失败: {str(e)}")
                return "ERROR", 0.0, [0, 0, 0, 0, 0]


def query_model(prompt, config):
    try:
        client = OpenAI(api_key=config.get("api_key"), base_url=config.get("base_url"))
        start_time = time.time()
        response = client.chat.completions.create(
            model=config.get("model_name"),
            messages=[{"role": "user", "content": prompt}],
            temperature=config.get("emperature"),
            max_tokens=1024
        )
        elapsed_time = time.time() - start_time
        prompt_tokens = response.usage.prompt_tokens
        completion_tokens = response.usage.completion_tokens
        total_tokens = response.usage.total_tokens
        try:
            reasoning_tokens = response.usage.completion_tokens_details.reasoning_tokens
        except:
            reasoning_tokens = 0
        speed = completion_tokens / elapsed_time
        tokens_data = [prompt_tokens, completion_tokens, total_tokens, reasoning_tokens, speed]
        return response.choices[0].message.content.strip(), elapsed_time, tokens_data
    except Exception as e:
        print(f"API调用失败: {str(e)}")
        return "ERROR", 0.0, [0, 0, 0, 0, 0]


def parse_answer(response):
    """增强的回答解析逻辑"""
    clean_response = response.strip().upper()

    # 处理带括号的情况
    if '(' in clean_response and ')' in clean_response:
        last_bracket = clean_response.rfind(')')
        clean_response = clean_response[last_bracket + 1:].strip()

    # 查找第一个有效选项
    for char in clean_response:
        if char in {"A", "B", "C", "D"}:
            return char
    return "INVALID"


def process_question(q, config):
    """处理单个问题"""
    prompt = format_prompt(q)
    response, elapsed_time, tokens_data = query_model(prompt, config)
    model_answer = parse_answer(response) if response != "ERROR" else "ERROR"
    prompt_tokens, completion_tokens, total_tokens, reasoning_tokens, speed = tokens_data
    time.sleep(config.get("request_interval"))

    return {
        "timestamp": datetime.now().isoformat(),
        "question": q["question"],
        "choices": json.dumps(q["choices"], ensure_ascii=False),
        "correct_answer": q["answer"],
        "model_answer": model_answer,
        "response_time": elapsed_time,
        "prompt_tokens": prompt_tokens,
        "completion_tokens": completion_tokens,
        "total_tokens": total_tokens,
        "reasoning_tokens": reasoning_tokens,
        "speed": speed,
    }


def process_single_file(file_path, config):
    """处理单个测试文件"""
    base_name = os.path.splitext(os.path.basename(file_path))[0]
    safe_name = sanitize_filename(base_name)
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    log_filename = f"log_{safe_name}_{timestamp}.csv"
    log_path = os.path.join(config.get("output_dir"), log_filename)

    questions = load_dataset(file_path, config)
    if not questions:
        print(f"文件 {base_name} 无有效数据,跳过处理")
        return {"error": "No valid data"}
    # 进度条设置
    progress = tqdm(
        total=len(questions),
        desc=f"处理 {safe_name[:15]:<15}",
        bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]")
    # 准备并发任务

    with open(log_path, "w", newline="", encoding="utf-8") as log_file:
        writer = csv.DictWriter(log_file, fieldnames=[
            "timestamp", "question", "choices",
            "correct_answer", "model_answer", "response_time", "prompt_tokens", "completion_tokens", "total_tokens",
            "reasoning_tokens", "speed"
        ])
        writer.writeheader()

        for q in questions:
            result = process_question(q, config)
            writer.writerow(result)
            log_file.flush()
            progress.update(1)

    return log_path


def analyze_log(log_path):
    """增强的日志分析(带数据校验)"""
    stats = {
        "total": 0,
        "correct": 0,
        "error": 0,
        "fail": 0,
        "total_time": 0.0,
        "response_times": [],
        "response_time": 0,

        "prompt_tokens": 0,
        "completion_tokens": 0,
        "total_tokens": 0,
        "reasoning_tokens": 0,
        "speed": 0.0,

    }

    try:
        with open(log_path, "r", encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for row in reader:
                # 数据校验
                if not all(key in row for key in ["correct_answer", "model_answer", "response_time"]):
                    print(f"日志文件 {log_path} 格式错误,跳过分析")
                    return {}

                stats["total"] += 1
                stats["total_time"] += float(row["response_time"])
                stats["response_times"].append(float(row["response_time"]))
                if row["model_answer"] == "ERROR":
                    stats["fail"] += 1
                elif row["model_answer"] == row["correct_answer"]:
                    stats["correct"] += 1
                else:
                    stats["error"] += 1
                stats["prompt_tokens"] += int(row["prompt_tokens"])
                stats["completion_tokens"] += int(row["completion_tokens"])
                stats["total_tokens"] += int(row["total_tokens"])
                stats["reasoning_tokens"] += int(row["reasoning_tokens"])
                stats["speed"] += float(row["speed"])

            # 计算百分位数
            if stats["response_times"]:
                sorted_times = sorted(stats["response_times"])
                stats["p95"] = sorted_times[int(len(sorted_times) * 0.95)]
                stats["p99"] = sorted_times[int(len(sorted_times) * 0.99)]
            else:
                stats["p95"] = stats["p99"] = 0.0
            # stats["speed"] = stats["speed"] / stats["total"]
            stats["speed"] = stats["completion_tokens"] / stats["total_time"]

    except Exception as e:
        print(f"分析日志 {log_path} 失败: {str(e)}")
        return {}

    return stats


def generate_report_content(subject, timestamp, stats):
    """生成报告内容模板"""
    if stats["total"] == 0:
        return f"# {subject} 测试报告\n\n## 错误\n无有效测试数据"

    return f"""# {subject} 测试报告

## 基本信息
- 测试文件: {subject}
- 测试时间: {timestamp}
- 总题数: {stats["total"]}

## 测试结果
| 指标         | 数值       | 占比     |
|--------------|------------|----------|
| 正确回答     | {stats["correct"]:6} | {stats["correct"] / stats["total"] * 100:5.1f}% |
| 错误回答     | {stats["error"]:6} | {stats["error"] / stats["total"] * 100:5.1f}% |
| 回答失败     | {stats["fail"]:6} | {stats["fail"] / stats["total"] * 100:5.1f}% |

## 响应时间分析
- 平均用时: {stats["total_time"] / stats["total"]:.2f}s
- 95%请求耗时: {stats["p95"]:.2f}s
- 99%请求耗时: {stats["p99"]:.2f}s

## Token使用总览
- 总Prompt Tokens: {stats['prompt_tokens']}
- 总Completion Tokens: {stats['completion_tokens']}
- 总Reasoning Tokens: {stats['reasoning_tokens']}
- 总Tokens消耗: {stats['total_tokens']}
- 整体平均速度: {stats['speed']:.2f} tokens/秒
"""


def extract_category_datetime(path):
    # 提取文件名(去掉路径)
    filename = path.split('\\')[-1]

    # 去除前缀和后缀
    clean = filename.replace("log_", "").replace(".csv", "")

    # 分割类别和日期时间部分
    category, datetime_str = clean.rsplit('_', 1)

    # 解析日期时间
    date_part = datetime_str[:8]
    hour_min = datetime_str[8:12]  # 提取小时和分钟

    # 格式化日期时间
    formatted_time = f"{date_part[:4]}-{date_part[4:6]}-{date_part[6:8]} {hour_min[:2]}:{hour_min[2:4]}"
    return category, formatted_time, datetime_str


def generate_sub_reports(log_files, config):
    """批量生成分报告(带错误处理)"""
    reports = []
    for log_path in tqdm(log_files, desc="生成分报告"):
        subject, timestamp, datetime_str = extract_category_datetime(log_path)

        stats = analyze_log(log_path)
        if not stats:
            continue

        report_path = os.path.join(
            config.get("output_dir"),
            f"report_{subject}_{datetime_str}.md"
        )

        try:
            content = generate_report_content(
                subject,
                timestamp,
                stats
            )

            with open(report_path, "w", encoding="utf-8") as f:
                f.write(content)

            reports.append({
                "subject": subject,
                "stats": stats,
                "report_path": report_path
            })
        except Exception as e:
            print(f"生成报告失败: {subject} - {str(e)}")

    return reports


def format_seconds(seconds):
    # 计算各时间单位
    hours = int(seconds // 3600)
    remaining = seconds % 3600
    minutes = int(remaining // 60)
    secs = remaining % 60
    # 强制显示所有单位
    return f"{hours}小时{minutes}分钟{secs:.2f}秒"


def generate_summary_report(reports, config):
    """生成增强的总报告"""
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    report_path = os.path.join(config.get("output_dir"), f"summary_report_{timestamp}.md")

    total_stats = {
        "total": 0,
        "correct": 0,
        "error": 0,
        "fail": 0,
        "total_time": 0.0,
        "response_times": [],

        "prompt_tokens": 0,
        "completion_tokens": 0,
        "total_tokens": 0,
        "reasoning_tokens": 0,
        "avg_speed": 0.0,

    }
    speed_samples = []
    # 汇总数据
    for report in reports:
        if "stats" not in report:
            continue
        s = report["stats"]
        total_stats["total"] += s["total"]
        total_stats["correct"] += s["correct"]
        total_stats["error"] += s["error"]
        total_stats["fail"] += s["fail"]
        total_stats["total_time"] += s["total_time"]
        total_stats["response_times"].extend(s["response_times"])

        total_stats["prompt_tokens"] += s["prompt_tokens"]
        total_stats["completion_tokens"] += s["completion_tokens"]
        total_stats["total_tokens"] += s["total_tokens"]
        speed_samples.append(s["speed"])
    total_question = total_stats["total"]
    total_time = total_stats["total_time"]
    # if speed_samples:
    #     total_stats["avg_speed"] = sum(speed_samples) / total_stats["total"]
    total_stats["avg_speed"] = total_stats["completion_tokens"] / total_stats["total_time"]
    # 生成报告内容
    content = [
        "# 综合测试报告",
        "\n## 基本信息",
        f"- 总体统计 ({len(reports)}个测试集)",
        f"- 总体统计 ({total_question}个题目)",
        f"- 总体统计响应时间: {format_seconds(total_time)}"
        "\n## Token使用总览",
        f"- 总Prompt Tokens: {total_stats['prompt_tokens']}",
        f"- 总Completion Tokens: {total_stats['completion_tokens']}",
        f"- 总Reasoning Tokens: {total_stats['reasoning_tokens']}",
        f"- 总Tokens消耗: {total_stats['total_tokens']}",
        f"- 整体平均速度: {total_stats['avg_speed']:.2f} tokens/秒",
    ]

    if total_stats["total"] > 0:
        # 响应时间分析
        sorted_times = sorted(total_stats["response_times"])
        p95 = sorted_times[int(len(sorted_times) * 0.95)] if sorted_times else 0
        p99 = sorted_times[int(len(sorted_times) * 0.99)] if sorted_times else 0

        content.extend([
            f"\n### 正确率分布",
            f"- 总正确率: {total_stats['correct'] / total_stats['total'] * 100:.1f}%",
            f"- 总错误率: {total_stats['error'] / total_stats['total'] * 100:.1f}%",
            f"- 总失败率: {total_stats['fail'] / total_stats['total'] * 100:.1f}%",

            f"\n### 性能指标",
            f"- 平均响应时间: {total_stats['total_time'] / total_stats['total']:.2f}s",
            f"- 95%请求耗时: {p95:.2f}s",
            f"- 99%请求耗时: {p99:.2f}s",

            "\n## 分科详情(按正确率排序)"
        ])

        # 排序科目
        sorted_reports = sorted(
            [r for r in reports if "stats" in r],
            key=lambda x: x["stats"]["correct"] / x["stats"]["total"],
            reverse=True
        )

        for report in sorted_reports:
            s = report["stats"]
            content.append(
                f"### {report['subject']}\n"
                f"- 正确率: {s['correct'] / s['total'] * 100:.1f}% "
                f"(正确{s['correct']}/错误{s['error']}/失败{s['fail']})\n"
                f"- 平均用时: {s['total_time'] / s['total']:.2f}s"
            )
    else:
        content.append("\n## 警告\n无有效测试数据")

    # 写入文件
    with open(report_path, "w", encoding="utf-8") as f:
        f.write('\n'.join(content))

    return report_path


def Reasoning_Test(config):
    initialize_environment(config)
    test_files = [os.path.join(config.get("data_path"), f)
                  for f in os.listdir(config.get("data_path")) if f.endswith(".csv")]
    # 第一阶段:处理所有文件
    with tqdm(total=len(test_files), desc="处理数据文件") as pbar:
        for fp in test_files:
            log_path = process_single_file(fp, config)
            print(f"\n日志文件已保存: {log_path}")
            pbar.update(1)


def Results_Summary(config):
    log_files = [os.path.join(config.get("output_dir"), f)
                 for f in os.listdir(config.get("output_dir")) if f.endswith(".csv")]
    # 第二阶段:生成报告
    if log_files:
        sub_reports = generate_sub_reports(log_files, config)
        if sub_reports:
            summary_path = generate_summary_report(sub_reports, config)
            print(f"\n所有测试完成!总报告路径: {summary_path}")
        else:
            print("\n无法生成报告:无有效日志数据")
    else:
        print("\n未生成任何日志文件")


def parse_opt(known=False):
    parser = argparse.ArgumentParser()
    parser.add_argument('--model', type=str, default=1, help='initial weights path')
    opt = parser.parse_known_args()[0] if known else parser.parse_args()
    return opt


if __name__ == '__main__':
    config = {'model_name': 'deepseek-r1:1.5b',
              'api_key': 'ollama',
              'base_url': 'http://localhost:11434/v1',
              'max_questions': 500, 'temperature': 0.3, 'request_interval': 1,
              'data_path': 'J:\\huggingface_data\\mmlu\\data\\try', 'output_dir': 'J:\\huggingface_data\\LOG\\ollama'}
    print("程序开始时间:", datetime.now())
    print("开始处理", config.get("output_dir"))
    print("开始处理", config.get("data_path"))
    Reasoning_Test(config)
    Results_Summary(config)

该代码实现了一个基于OpenAI API的多学科选择题自动化测试框架,主要包含数据处理、模型交互、结果分析三大核心模块。以下为关键功能解析:

一、核心功能架构

  1. 测试执行模块(Reasoning_Test)

    • 初始化环境:创建输出目录(initialize_environment
    • 批量处理CSV数据集:支持多文件并发处理(process_single_file
    • 问题加载:验证数据有效性,过滤非法选项(load_dataset
    • API调用封装:实现带指数退避的重试机制(query_model_retries
  2. 数据分析模块(Results_Summary)

    • 日志解析:计算准确率、响应时间分布、Token消耗等指标(analyze_log
    • 报告生成:输出分科目测试报告(generate_sub_reports)和综合汇总报告(generate_summary_report
  3. 辅助功能组件

    • 文件名净化处理(sanitize_filename
    • 提示词标准化生成(format_prompt
    • 答案解析强化逻辑(parse_answer

二、关键技术实现

  1. 数据处理流
  • CSV文件解析时执行严格校验:过滤选项不足4项、答案非A/B/C/D的问题
  • 支持最大问题数限制(max_questions配置项)
  • 采用UTF-8编码处理多语言数据
  1. 模型交互优化
  • 请求间隔控制(request_interval参数)
  • Token消耗统计:区分基础token与推理专用token(reasoning_tokens
  1. 性能监控体系
  • 响应时间百分位统计(P95/P99)
  • 吞吐量计算:tokens/秒统计
  • 多维资源监控:包含Prompt/Completion/Total Tokens

三、扩展性设计

  1. 配置驱动架构
  • 通过字典参数(config)集中管理:
config = {
    'model_name': 'deepseek-r1:1.5b',  # 模型标识
    'api_key': 'ollama',  # API认证密钥
    'base_url': 'http://localhost:11434/v1',  # API端点
    'max_questions': 5,  # 最大测试题量
    'temperature': 0.3,  # 模型温度参数
    'request_interval': 1,  # 请求间隔(秒)
    'data_path': 'J:\\...',  # 测试数据集路径
    'output_dir': 'J:\\...'  # 日志输出目录
}
  1. 模块化设计
  • 各功能组件高度解耦,便于替换模型后端(如切换不同API供应商)
  • 日志系统采用CSV格式存储,支持后续自定义分析工具接入

四、统计分析维度

指标类别 具体指标
准确性指标 正确率、错误率、API失败率
性能指标 平均响应时间、P95/P99响应时间
资源消耗 Prompt/Completion/Total Tokens
吞吐量 Tokens/秒
问题分布 分科目正确率排序

该框架通过系统化的测试流程设计,实现了从数据加载、模型交互到结果分析的全链路自动化,适用于大语言模型在标准化测试集上的性能评估与对比分析。代码中采用的异常处理机制和资源监控设计,可有效保障长时间批量测试的稳定性。

该实现方案为大规模模型评测提供了标准化框架,其模块化设计便于扩展至其他测评数据集,日志分析模块的设计尤其适合长期性能监控与对比实验。

DeepSeek官方api测试结果

选取high_school_computer_science_test.csv测试集进行测试,以下是计算结果,以供参考。

基本信息
  • 总体统计 (1个测试集)
  • 总体统计 (100个题目)
  • 总体统计响应时间: 0小时48分钟45.48秒
Token使用总览
  • 总Prompt Tokens: 13143
  • 总Completion Tokens: 56644
  • 总Reasoning Tokens: 56344
  • 总Tokens消耗: 69787
  • 整体平均速度: 19.36 tokens/秒
正确率分布
  • 总正确率: 97.0%
  • 总错误率: 3.0%
  • 总失败率: 0.0%
性能指标
  • 平均响应时间: 29.25s
  • 95%请求耗时: 56.03s
  • 99%请求耗时: 264.47s
分科详情(按正确率排序)
high_school_computer_science_test
  • 正确率: 97.0% (正确97/错误3/失败0)
  • 平均用时: 29.25s
Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐