通义千问3-VL-Reranker-8B实战教程:日志埋点与重排序效果AB测试框架

你是不是也遇到过这样的问题?搭建了一个多模态检索系统,图片、视频、文字都能搜,看起来功能很强大,但心里总是没底——用户搜“可爱的小狗”,系统返回的结果真的符合“可爱”这个标准吗?新上线的重排序模型,效果到底比老版本提升了多少?

今天,我就带你手把手搭建一套完整的AB测试框架,专门用来评估通义千问3-VL-Reranker-8B这个多模态重排序模型的实际效果。我们不仅要让模型跑起来,还要知道它跑得怎么样,哪里好,哪里需要改进。

1. 为什么需要AB测试框架?

在介绍具体操作之前,我们先搞清楚为什么要做这件事。

想象一下,你开了一家餐厅,推出了一道新菜。你怎么知道顾客喜不喜欢?最直接的方法就是让一部分顾客尝新菜,另一部分顾客吃老菜,然后看看哪道菜的盘子空得更快。AB测试就是这个道理。

对于重排序模型来说,AB测试能帮你回答几个关键问题:

  • 新模型真的比旧模型好吗? 不只是技术指标,而是实际用户体验
  • 好在哪里? 是图片排序更准,还是文字理解更深?
  • 差在哪里? 有没有某些类型的查询效果反而变差了?
  • 值不值得升级? 效果提升是否足以覆盖部署成本?

没有AB测试,你就像蒙着眼睛开车——知道车在动,但不知道方向对不对。

2. 环境准备与快速部署

2.1 硬件检查

首先确认你的机器配置是否达标。通义千问3-VL-Reranker-8B对资源有一定要求:

# 查看GPU信息
nvidia-smi

# 查看内存
free -h

# 查看磁盘空间
df -h

如果你的机器符合以下推荐配置,体验会更好:

  • 内存:32GB或以上(最低16GB)
  • 显存:16GB或以上(最低8GB,使用bf16精度)
  • 磁盘:30GB可用空间(最低20GB)

2.2 一键启动服务

如果你已经拉取了CSDN星图镜像,启动服务非常简单:

# 进入工作目录
cd /root/Qwen3-VL-Reranker-8B

# 启动Web UI服务
python3 app.py --host 0.0.0.0 --port 7860

# 或者生成可分享的链接(适合演示)
python3 app.py --share

启动成功后,在浏览器打开 http://localhost:7860 就能看到图形化界面。

重要提示:这个镜像采用了延迟加载策略。第一次访问时,你需要点击“加载模型”按钮,模型才会真正加载到内存中。这样做的好处是,如果你只是测试API或者查看界面,可以节省资源。

2.3 基础功能测试

服务启动后,我们先做个简单测试,确保一切正常:

  1. Web界面测试

    • 在Query框输入:“海边日落”
    • 在Documents框添加几个候选项:
      {"text": "金色阳光洒在海面上"}
      {"text": "城市夜景灯光璀璨"}
      {"image": "sunset.jpg"}  # 假设有这张图片
      {"text": "山间清晨的雾气"}
      
    • 点击“重排序”按钮,查看结果
  2. API接口测试: 打开新的终端,用Python脚本测试:

import requests
import json

# 测试API是否正常
url = "http://localhost:7860/api/rerank"
payload = {
    "query": {"text": "海边日落"},
    "documents": [
        {"text": "金色阳光洒在海面上"},
        {"text": "城市夜景灯光璀璨"},
        {"text": "山间清晨的雾气"}
    ]
}

response = requests.post(url, json=payload)
print("状态码:", response.status_code)
print("响应内容:", response.json())

如果看到返回了排序分数,说明服务运行正常。

3. 日志埋点系统设计

AB测试的核心是数据。没有数据,一切都是空谈。我们需要设计一套日志系统,记录每一次重排序的关键信息。

3.1 日志数据结构设计

一个好的日志应该包含这些信息:

# 日志记录的基本结构
log_entry = {
    "timestamp": "2024-01-15T10:30:25.123Z",  # 时间戳
    "request_id": "req_123456789",  # 请求唯一ID
    "user_id": "user_001",  # 用户ID(可选)
    "session_id": "session_abc",  # 会话ID
    
    # 查询信息
    "query": {
        "text": "海边日落",  # 查询文本
        "modality": "text"  # 模态类型:text/image/video
    },
    
    # 候选项信息
    "candidates": [
        {
            "id": "doc_001",
            "content": {"text": "金色阳光洒在海面上"},
            "modality": "text",
            "ground_truth_score": 0.9  # 人工标注的相关性分数(如果有)
        },
        # ... 更多候选项
    ],
    
    # 模型信息
    "model_info": {
        "model_name": "Qwen3-VL-Reranker-8B",
        "model_version": "v1.0",
        "experiment_group": "A"  # A组还是B组
    },
    
    # 排序结果
    "ranking_results": [
        {
            "candidate_id": "doc_001",
            "score": 0.87,
            "rank": 1
        },
        # ... 更多排序结果
    ],
    
    # 性能指标
    "performance": {
        "inference_time_ms": 125.3,  # 推理耗时(毫秒)
        "memory_usage_mb": 2450.5,  # 内存使用量
        "gpu_utilization": 65.2  # GPU利用率(%)
    },
    
    # 用户反馈(后续收集)
    "user_feedback": {
        "clicked_items": ["doc_001"],  # 用户点击了哪些结果
        "dwell_time_ms": 3500,  # 停留时间
        "satisfaction_score": 4  # 满意度评分(1-5分)
    }
}

3.2 日志收集实现

我们可以在重排序服务中嵌入日志收集代码:

import time
import json
import uuid
from datetime import datetime
from typing import Dict, List, Any
import logging

class RerankLogger:
    """重排序日志记录器"""
    
    def __init__(self, log_file: str = "rerank_logs.jsonl"):
        self.log_file = log_file
        self.logger = logging.getLogger(__name__)
        
    def create_log_entry(self, 
                        query: Dict[str, Any],
                        candidates: List[Dict[str, Any]],
                        model_info: Dict[str, Any],
                        results: List[Dict[str, Any]],
                        start_time: float) -> Dict[str, Any]:
        """创建日志条目"""
        
        inference_time = (time.time() - start_time) * 1000  # 转为毫秒
        
        log_entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "request_id": f"req_{uuid.uuid4().hex[:12]}",
            "query": query,
            "candidates": candidates,
            "model_info": model_info,
            "ranking_results": results,
            "performance": {
                "inference_time_ms": round(inference_time, 2),
                "timestamp": datetime.utcnow().isoformat() + "Z"
            }
        }
        
        return log_entry
    
    def save_log(self, log_entry: Dict[str, Any]):
        """保存日志到文件"""
        try:
            with open(self.log_file, 'a', encoding='utf-8') as f:
                f.write(json.dumps(log_entry, ensure_ascii=False) + '\n')
        except Exception as e:
            self.logger.error(f"保存日志失败: {e}")
    
    def log_rerank_request(self,
                          query: Dict[str, Any],
                          candidates: List[Dict[str, Any]],
                          model_name: str,
                          model_version: str,
                          experiment_group: str,
                          results: List[Dict[str, Any]],
                          start_time: float):
        """记录重排序请求"""
        
        model_info = {
            "model_name": model_name,
            "model_version": model_version,
            "experiment_group": experiment_group
        }
        
        log_entry = self.create_log_entry(
            query=query,
            candidates=candidates,
            model_info=model_info,
            results=results,
            start_time=start_time
        )
        
        self.save_log(log_entry)
        return log_entry["request_id"]

# 在重排序函数中使用
logger = RerankLogger()

def rerank_with_logging(query, candidates, model, experiment_group="A"):
    """带日志记录的重排序"""
    start_time = time.time()
    
    # 执行重排序
    results = model.rerank(query, candidates)
    
    # 记录日志
    request_id = logger.log_rerank_request(
        query=query,
        candidates=candidates,
        model_name="Qwen3-VL-Reranker-8B",
        model_version="v1.0",
        experiment_group=experiment_group,
        results=results,
        start_time=start_time
    )
    
    return results, request_id

3.3 日志存储方案

对于生产环境,建议使用更专业的存储方案:

# 方案1:本地文件+定期归档(适合小规模测试)
class FileLogStorage:
    def __init__(self, base_dir="./logs"):
        self.base_dir = base_dir
        os.makedirs(base_dir, exist_ok=True)
    
    def get_today_file(self):
        """按天分割日志文件"""
        today = datetime.now().strftime("%Y%m%d")
        return os.path.join(self.base_dir, f"rerank_{today}.jsonl")

# 方案2:数据库存储(适合大规模使用)
import sqlite3

class DatabaseLogStorage:
    def __init__(self, db_path="./logs.db"):
        self.conn = sqlite3.connect(db_path)
        self.create_tables()
    
    def create_tables(self):
        """创建日志表"""
        cursor = self.conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS rerank_logs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT,
                request_id TEXT,
                query_text TEXT,
                model_name TEXT,
                experiment_group TEXT,
                inference_time_ms REAL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        self.conn.commit()

# 方案3:发送到日志收集服务(如ELK、Loki)
import requests

class RemoteLogStorage:
    def __init__(self, endpoint="http://log-server:8080/logs"):
        self.endpoint = endpoint
    
    def send_log(self, log_entry):
        """发送日志到远程服务器"""
        try:
            response = requests.post(
                self.endpoint,
                json=log_entry,
                timeout=2
            )
            return response.status_code == 200
        except:
            # 失败时降级到本地存储
            return False

4. AB测试框架搭建

有了日志数据,我们就可以搭建AB测试框架了。AB测试的核心是将流量随机分配到不同的实验组。

4.1 流量分配策略

import hashlib
import random
from typing import Dict, Any

class ABTestManager:
    """AB测试管理器"""
    
    def __init__(self, experiments: Dict[str, Dict[str, Any]]):
        """
        初始化AB测试管理器
        
        Args:
            experiments: 实验配置,例如:
                {
                    "rerank_model_v1": {
                        "groups": {
                            "A": {"weight": 50, "model": "Qwen3-VL-Reranker-8B-v1"},
                            "B": {"weight": 50, "model": "Qwen3-VL-Reranker-8B-v2"}
                        }
                    }
                }
        """
        self.experiments = experiments
    
    def assign_group(self, 
                    experiment_name: str, 
                    user_id: str = None, 
                    request_id: str = None) -> str:
        """
        分配实验组
        
        Args:
            experiment_name: 实验名称
            user_id: 用户ID(用于保持用户一致性)
            request_id: 请求ID(用于随机分配)
        
        Returns:
            分配的组名,如 "A" 或 "B"
        """
        if experiment_name not in self.experiments:
            return "control"  # 默认对照组
        
        experiment = self.experiments[experiment_name]
        groups = experiment["groups"]
        
        # 如果有user_id,基于user_id哈希分配(保证用户一致性)
        if user_id:
            hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
            random.seed(hash_value)
        
        # 计算总权重
        total_weight = sum(group["weight"] for group in groups.values())
        
        # 随机分配
        rand_value = random.randint(1, total_weight)
        cumulative_weight = 0
        
        for group_name, group_config in groups.items():
            cumulative_weight += group_config["weight"]
            if rand_value <= cumulative_weight:
                return group_name
        
        # 默认返回第一个组
        return list(groups.keys())[0]
    
    def get_model_for_request(self, 
                             experiment_name: str,
                             user_id: str = None,
                             request_id: str = None) -> Dict[str, Any]:
        """
        获取当前请求应该使用的模型配置
        
        Returns:
            模型配置字典
        """
        group = self.assign_group(experiment_name, user_id, request_id)
        
        if experiment_name in self.experiments:
            experiment_config = self.experiments[experiment_name]
            if group in experiment_config["groups"]:
                return experiment_config["groups"][group]
        
        # 默认配置
        return {"model": "Qwen3-VL-Reranker-8B", "version": "default"}

# 使用示例
ab_test_manager = ABTestManager({
    "rerank_model_ab_test": {
        "groups": {
            "A": {
                "weight": 50,  # 50%流量
                "model": "Qwen3-VL-Reranker-8B",
                "version": "v1.0",
                "description": "原始版本"
            },
            "B": {
                "weight": 50,  # 50%流量
                "model": "Qwen3-VL-Reranker-8B",
                "version": "v1.1",
                "description": "优化版本"
            }
        }
    }
})

# 为请求分配实验组
user_id = "user_123"
request_id = "req_456"
group = ab_test_manager.assign_group("rerank_model_ab_test", user_id, request_id)
model_config = ab_test_manager.get_model_for_request("rerank_model_ab_test", user_id, request_id)

print(f"用户 {user_id} 被分配到组: {group}")
print(f"使用模型配置: {model_config}")

4.2 集成到重排序服务

现在我们把AB测试框架集成到实际的重排序服务中:

from scripts.qwen3_vl_reranker import Qwen3VLReranker
import torch

class ABTestRerankService:
    """支持AB测试的重排序服务"""
    
    def __init__(self, ab_test_manager: ABTestManager):
        self.ab_test_manager = ab_test_manager
        self.models = {}  # 缓存已加载的模型
        self.logger = RerankLogger()
    
    def get_model(self, model_name: str, model_version: str):
        """获取或加载模型"""
        model_key = f"{model_name}_{model_version}"
        
        if model_key not in self.models:
            print(f"加载模型: {model_key}")
            
            # 这里根据版本加载不同的模型
            if model_version == "v1.0":
                model_path = "/path/to/qwen3-vl-reranker-8b-v1"
            elif model_version == "v1.1":
                model_path = "/path/to/qwen3-vl-reranker-8b-v2"
            else:
                model_path = "/path/to/qwen3-vl-reranker-8b-default"
            
            # 加载模型
            model = Qwen3VLReranker(
                model_name_or_path=model_path,
                torch_dtype=torch.bfloat16,
                device_map="auto"
            )
            
            self.models[model_key] = model
        
        return self.models[model_key]
    
    def rerank(self, 
               query: Dict[str, Any],
               candidates: List[Dict[str, Any]],
               user_id: str = None,
               request_id: str = None) -> Dict[str, Any]:
        """执行重排序(支持AB测试)"""
        
        start_time = time.time()
        
        # 获取实验组和模型配置
        model_config = self.ab_test_manager.get_model_for_request(
            "rerank_model_ab_test",
            user_id,
            request_id
        )
        
        # 获取模型
        model = self.get_model(
            model_config["model"],
            model_config["version"]
        )
        
        # 执行重排序
        inputs = {
            "instruction": "Given a search query, retrieve relevant candidates.",
            "query": query,
            "documents": candidates,
            "fps": 1.0  # 对于视频的处理帧率
        }
        
        try:
            scores = model.process(inputs)
            
            # 构建排序结果
            results = []
            for i, (candidate, score) in enumerate(zip(candidates, scores)):
                results.append({
                    "candidate_id": candidate.get("id", f"doc_{i}"),
                    "content": candidate,
                    "score": float(score),
                    "rank": i + 1
                })
            
            # 按分数降序排序
            results.sort(key=lambda x: x["score"], reverse=True)
            
            # 更新排名
            for i, result in enumerate(results):
                result["rank"] = i + 1
            
            # 记录日志
            log_request_id = self.logger.log_rerank_request(
                query=query,
                candidates=candidates,
                model_name=model_config["model"],
                model_version=model_config["version"],
                experiment_group=model_config.get("group", "A"),
                results=results,
                start_time=start_time
            )
            
            return {
                "success": True,
                "request_id": log_request_id,
                "experiment_group": model_config.get("group", "A"),
                "model_version": model_config["version"],
                "results": results,
                "inference_time_ms": (time.time() - start_time) * 1000
            }
            
        except Exception as e:
            print(f"重排序失败: {e}")
            return {
                "success": False,
                "error": str(e),
                "request_id": request_id
            }

# 使用示例
service = ABTestRerankService(ab_test_manager)

# 测试查询
query = {"text": "A woman playing with her dog"}
candidates = [
    {"id": "doc1", "text": "A woman and dog on beach"},
    {"id": "doc2", "text": "A cat sleeping on sofa"},
    {"id": "doc3", "text": "A woman walking her dog in park"},
    {"id": "doc4", "image": "dog_beach.jpg"}  # 假设这是图片路径
]

result = service.rerank(
    query=query,
    candidates=candidates,
    user_id="test_user_001",
    request_id="test_request_001"
)

print(f"实验组: {result['experiment_group']}")
print(f"模型版本: {result['model_version']}")
print(f"排序结果: {result['results'][:2]}")  # 显示前2个结果

5. 效果评估与分析

收集到足够的日志数据后,我们就可以开始分析了。AB测试的核心是统计检验,确保观察到的差异不是随机波动。

5.1 关键指标定义

首先,我们需要定义评估指标。对于重排序任务,常用的指标包括:

from typing import List, Dict, Any
import numpy as np
from sklearn.metrics import ndcg_score

class EvaluationMetrics:
    """重排序评估指标计算"""
    
    @staticmethod
    def calculate_ndcg(ranking_scores: List[float], 
                      ideal_scores: List[float], 
                      k: int = 10) -> float:
        """
        计算NDCG@k
        
        Args:
            ranking_scores: 模型给出的排序分数
            ideal_scores: 理想排序分数(人工标注)
            k: 只考虑前k个结果
        
        Returns:
            NDCG@k 分数
        """
        if len(ranking_scores) < k:
            k = len(ranking_scores)
        
        # 取前k个
        ranking_scores_k = ranking_scores[:k]
        ideal_scores_k = sorted(ideal_scores, reverse=True)[:k]
        
        # 计算DCG
        dcg = 0
        for i, score in enumerate(ranking_scores_k):
            dcg += score / np.log2(i + 2)  # i+2因为索引从0开始
        
        # 计算IDCG
        idcg = 0
        for i, score in enumerate(ideal_scores_k):
            idcg += score / np.log2(i + 2)
        
        return dcg / idcg if idcg > 0 else 0
    
    @staticmethod
    def calculate_mrr(ranking_results: List[Dict[str, Any]], 
                     relevant_ids: List[str]) -> float:
        """
        计算MRR(平均倒数排名)
        
        Args:
            ranking_results: 排序结果,每个元素包含id和rank
            relevant_ids: 相关文档的ID列表
        
        Returns:
            MRR分数
        """
        reciprocal_ranks = []
        
        for relevant_id in relevant_ids:
            # 查找相关文档的排名
            for result in ranking_results:
                if result.get("candidate_id") == relevant_id:
                    rank = result.get("rank", len(ranking_results) + 1)
                    reciprocal_ranks.append(1.0 / rank)
                    break
            else:
                # 相关文档不在结果中
                reciprocal_ranks.append(0)
        
        return np.mean(reciprocal_ranks) if reciprocal_ranks else 0
    
    @staticmethod
    def calculate_precision_at_k(ranking_results: List[Dict[str, Any]], 
                                relevant_ids: List[str], 
                                k: int = 5) -> float:
        """
        计算Precision@k
        
        Args:
            ranking_results: 排序结果
            relevant_ids: 相关文档ID列表
            k: 只考虑前k个结果
        
        Returns:
            Precision@k
        """
        if len(ranking_results) < k:
            k = len(ranking_results)
        
        top_k_ids = [result.get("candidate_id") for result in ranking_results[:k]]
        relevant_in_top_k = [doc_id for doc_id in top_k_ids if doc_id in relevant_ids]
        
        return len(relevant_in_top_k) / k
    
    @staticmethod
    def calculate_inference_time_stats(inference_times: List[float]) -> Dict[str, float]:
        """
        计算推理时间统计
        
        Args:
            inference_times: 推理时间列表(毫秒)
        
        Returns:
            统计信息字典
        """
        if not inference_times:
            return {}
        
        times_array = np.array(inference_times)
        
        return {
            "mean": float(np.mean(times_array)),
            "median": float(np.median(times_array)),
            "p95": float(np.percentile(times_array, 95)),
            "p99": float(np.percentile(times_array, 99)),
            "min": float(np.min(times_array)),
            "max": float(np.max(times_array)),
            "std": float(np.std(times_array))
        }

5.2 AB测试结果分析

有了指标计算工具,我们就可以分析AB测试的结果了:

import pandas as pd
import json
from scipy import stats
from datetime import datetime, timedelta

class ABTestAnalyzer:
    """AB测试结果分析器"""
    
    def __init__(self, log_file: str):
        self.log_file = log_file
        self.metrics_calculator = EvaluationMetrics()
    
    def load_logs(self, start_date: str = None, end_date: str = None) -> pd.DataFrame:
        """加载日志数据"""
        logs = []
        
        with open(self.log_file, 'r', encoding='utf-8') as f:
            for line in f:
                try:
                    log_entry = json.loads(line.strip())
                    
                    # 时间过滤
                    if start_date or end_date:
                        log_time = datetime.fromisoformat(log_entry["timestamp"].replace("Z", "+00:00"))
                        
                        if start_date and log_time < datetime.fromisoformat(start_date):
                            continue
                        if end_date and log_time > datetime.fromisoformat(end_date):
                            continue
                    
                    logs.append(log_entry)
                except json.JSONDecodeError:
                    continue
        
        return pd.DataFrame(logs)
    
    def analyze_experiment(self, 
                          experiment_name: str,
                          start_date: str = None,
                          end_date: str = None) -> Dict[str, Any]:
        """分析实验效果"""
        
        # 加载数据
        df = self.load_logs(start_date, end_date)
        
        if df.empty:
            return {"error": "没有找到日志数据"}
        
        # 按实验组分组
        groups = df["model_info"].apply(lambda x: x.get("experiment_group", "unknown"))
        df["group"] = groups
        
        results = {}
        
        for group in df["group"].unique():
            group_data = df[df["group"] == group]
            
            # 计算各项指标
            metrics = self._calculate_group_metrics(group_data)
            results[group] = metrics
        
        # 统计检验
        significance_tests = self._perform_significance_tests(df)
        
        return {
            "experiment_name": experiment_name,
            "period": f"{start_date} to {end_date}" if start_date and end_date else "all",
            "total_requests": len(df),
            "group_results": results,
            "significance_tests": significance_tests,
            "recommendation": self._generate_recommendation(results, significance_tests)
        }
    
    def _calculate_group_metrics(self, group_data: pd.DataFrame) -> Dict[str, Any]:
        """计算单个实验组的指标"""
        
        metrics = {
            "request_count": len(group_data),
            "avg_inference_time_ms": 0,
            "avg_ndcg": 0,
            "avg_mrr": 0,
            "avg_precision_at_5": 0,
            "success_rate": 0
        }
        
        if len(group_data) == 0:
            return metrics
        
        # 计算平均推理时间
        inference_times = []
        ndcg_scores = []
        mrr_scores = []
        precision_scores = []
        success_count = 0
        
        for _, row in group_data.iterrows():
            # 推理时间
            perf = row.get("performance", {})
            inference_time = perf.get("inference_time_ms", 0)
            if inference_time > 0:
                inference_times.append(inference_time)
            
            # 成功率(假设有结果就是成功)
            results = row.get("ranking_results", [])
            if results and len(results) > 0:
                success_count += 1
            
            # 这里需要人工标注的相关性分数来计算NDCG等
            # 实际应用中,这些分数应该从标注数据中获取
            # 为了示例,我们使用随机数据
            
            # 模拟计算(实际应该使用真实标注)
            if results:
                # 模拟相关性分数(0-1之间)
                simulated_scores = np.random.rand(len(results))
                ideal_scores = sorted(simulated_scores, reverse=True)
                
                ranking_scores = [r.get("score", 0) for r in results]
                
                # 计算指标
                ndcg = self.metrics_calculator.calculate_ndcg(ranking_scores, ideal_scores)
                ndcg_scores.append(ndcg)
                
                # 模拟相关文档(前30%作为相关)
                relevant_count = max(1, int(len(results) * 0.3))
                relevant_ids = [results[i].get("candidate_id") for i in range(relevant_count)]
                
                mrr = self.metrics_calculator.calculate_mrr(results, relevant_ids)
                mrr_scores.append(mrr)
                
                precision = self.metrics_calculator.calculate_precision_at_k(results, relevant_ids, k=5)
                precision_scores.append(precision)
        
        # 汇总指标
        if inference_times:
            time_stats = self.metrics_calculator.calculate_inference_time_stats(inference_times)
            metrics["inference_time_stats"] = time_stats
            metrics["avg_inference_time_ms"] = time_stats["mean"]
        
        if ndcg_scores:
            metrics["avg_ndcg"] = np.mean(ndcg_scores)
            metrics["ndcg_std"] = np.std(ndcg_scores)
        
        if mrr_scores:
            metrics["avg_mrr"] = np.mean(mrr_scores)
            metrics["mrr_std"] = np.std(mrr_scores)
        
        if precision_scores:
            metrics["avg_precision_at_5"] = np.mean(precision_scores)
            metrics["precision_std"] = np.std(precision_scores)
        
        metrics["success_rate"] = success_count / len(group_data) if len(group_data) > 0 else 0
        
        return metrics
    
    def _perform_significance_tests(self, df: pd.DataFrame) -> Dict[str, Any]:
        """执行统计显著性检验"""
        
        # 这里简化处理,实际应该按指标分别检验
        # 我们以NDCG为例
        
        groups = df["group"].unique()
        if len(groups) < 2:
            return {"error": "需要至少两个实验组进行检验"}
        
        # 收集各组的NDCG分数(模拟)
        group_scores = {}
        for group in groups:
            group_data = df[df["group"] == group]
            # 模拟NDCG分数
            scores = np.random.normal(loc=0.7, scale=0.1, size=len(group_data))
            group_scores[group] = scores
        
        # T检验(比较A组和B组)
        if "A" in group_scores and "B" in group_scores:
            t_stat, p_value = stats.ttest_ind(group_scores["A"], group_scores["B"])
            
            return {
                "t_test": {
                    "groups": ["A", "B"],
                    "t_statistic": float(t_stat),
                    "p_value": float(p_value),
                    "significant": p_value < 0.05,  # 95%置信水平
                    "interpretation": "差异显著" if p_value < 0.05 else "差异不显著"
                }
            }
        
        return {"error": "无法执行显著性检验"}
    
    def _generate_recommendation(self, 
                                results: Dict[str, Any],
                                significance_tests: Dict[str, Any]) -> str:
        """生成实验结论和建议"""
        
        if "error" in results or "error" in significance_tests:
            return "数据不足,无法给出建议"
        
        # 比较A组和B组
        if "A" in results and "B" in results:
            group_a = results["A"]
            group_b = results["B"]
            
            # 检查显著性
            is_significant = significance_tests.get("t_test", {}).get("significant", False)
            
            recommendations = []
            
            # 比较NDCG
            if group_b["avg_ndcg"] > group_a["avg_ndcg"]:
                improvement = (group_b["avg_ndcg"] - group_a["avg_ndcg"]) / group_a["avg_ndcg"] * 100
                recommendations.append(f"B组NDCG提升{improvement:.1f}%")
            
            # 比较推理时间
            if group_b["avg_inference_time_ms"] < group_a["avg_inference_time_ms"]:
                speedup = (group_a["avg_inference_time_ms"] - group_b["avg_inference_time_ms"]) / group_a["avg_inference_time_ms"] * 100
                recommendations.append(f"B组推理速度提升{speedup:.1f}%")
            
            # 生成最终建议
            if recommendations:
                if is_significant:
                    return f"B组表现显著优于A组:{','.join(recommendations)}。建议采用B组模型。"
                else:
                    return f"B组表现优于A组但不显著:{','.join(recommendations)}。建议扩大样本量继续观察。"
            else:
                return "两组表现相近,无显著差异。"
        
        return "无法比较实验组"
    
    def generate_report(self, analysis_result: Dict[str, Any]) -> str:
        """生成分析报告"""
        
        report = []
        report.append(f"# AB测试分析报告")
        report.append(f"实验名称: {analysis_result['experiment_name']}")
        report.append(f"分析周期: {analysis_result['period']}")
        report.append(f"总请求数: {analysis_result['total_requests']}")
        report.append("")
        
        report.append("## 各组表现对比")
        for group_name, metrics in analysis_result["group_results"].items():
            report.append(f"### {group_name}组")
            report.append(f"- 请求数量: {metrics['request_count']}")
            report.append(f"- 平均推理时间: {metrics['avg_inference_time_ms']:.2f}ms")
            report.append(f"- 平均NDCG: {metrics['avg_ndcg']:.3f}")
            report.append(f"- 平均MRR: {metrics['avg_mrr']:.3f}")
            report.append(f"- 平均Precision@5: {metrics['avg_precision_at_5']:.3f}")
            report.append(f"- 成功率: {metrics['success_rate']:.1%}")
            report.append("")
        
        if "significance_tests" in analysis_result:
            report.append("## 统计显著性检验")
            for test_name, test_result in analysis_result["significance_tests"].items():
                if test_name != "error":
                    report.append(f"### {test_name}")
                    for key, value in test_result.items():
                        if key not in ["groups", "interpretation"]:
                            report.append(f"- {key}: {value}")
                    if "interpretation" in test_result:
                        report.append(f"- 结论: {test_result['interpretation']}")
                    report.append("")
        
        report.append("## 实验建议")
        report.append(analysis_result["recommendation"])
        
        return "\n".join(report)

# 使用示例
analyzer = ABTestAnalyzer("rerank_logs.jsonl")

# 分析最近7天的数据
end_date = datetime.now().isoformat()
start_date = (datetime.now() - timedelta(days=7)).isoformat()

analysis_result = analyzer.analyze_experiment(
    experiment_name="rerank_model_ab_test",
    start_date=start_date,
    end_date=end_date
)

# 生成报告
report = analyzer.generate_report(analysis_result)
print(report)

# 保存报告
with open("ab_test_report.md", "w", encoding="utf-8") as f:
    f.write(report)

5.3 可视化分析

数据可视化能帮助我们更直观地理解实验结果:

import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib import font_manager

# 设置中文字体(如果需要)
# plt.rcParams['font.sans-serif'] = ['SimHei']
# plt.rcParams['axes.unicode_minus'] = False

class ABTestVisualizer:
    """AB测试结果可视化"""
    
    @staticmethod
    def plot_metric_comparison(analysis_result: Dict[str, Any], 
                               save_path: str = None):
        """绘制指标对比图"""
        
        groups = list(analysis_result["group_results"].keys())
        if not groups:
            return
        
        # 准备数据
        metrics_data = []
        for group in groups:
            metrics = analysis_result["group_results"][group]
            metrics_data.append({
                "group": group,
                "NDCG": metrics.get("avg_ndcg", 0),
                "MRR": metrics.get("avg_mrr", 0),
                "Precision@5": metrics.get("avg_precision_at_5", 0),
                "推理时间(ms)": metrics.get("avg_inference_time_ms", 0)
            })
        
        df = pd.DataFrame(metrics_data)
        
        # 创建子图
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        fig.suptitle('AB测试指标对比', fontsize=16)
        
        # 绘制每个指标
        metrics_to_plot = ["NDCG", "MRR", "Precision@5", "推理时间(ms)"]
        colors = ['skyblue', 'lightcoral', 'lightgreen', 'gold']
        
        for idx, metric in enumerate(metrics_to_plot):
            ax = axes[idx // 2, idx % 2]
            
            bars = ax.bar(df["group"], df[metric], color=colors[idx], alpha=0.7)
            ax.set_title(metric)
            ax.set_ylabel(metric)
            
            # 在柱子上显示数值
            for bar in bars:
                height = bar.get_height()
                ax.text(bar.get_x() + bar.get_width()/2., height,
                       f'{height:.3f}', ha='center', va='bottom')
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
            print(f"图表已保存到: {save_path}")
        
        plt.show()
    
    @staticmethod
    def plot_time_series(log_file: str, 
                        metric: str = "inference_time_ms",
                        window: int = 100,
                        save_path: str = None):
        """绘制时间序列图"""
        
        # 加载日志数据
        logs = []
        with open(log_file, 'r', encoding='utf-8') as f:
            for line in f:
                try:
                    log = json.loads(line.strip())
                    logs.append(log)
                except:
                    continue
        
        if not logs:
            print("没有找到日志数据")
            return
        
        # 提取时间和指标
        timestamps = []
        values = []
        groups = []
        
        for log in logs:
            timestamp = log.get("timestamp", "")
            if timestamp:
                try:
                    # 转换时间戳
                    dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
                    timestamps.append(dt)
                    
                    # 提取指标值
                    perf = log.get("performance", {})
                    if metric == "inference_time_ms":
                        value = perf.get("inference_time_ms", 0)
                    else:
                        value = 0
                    
                    values.append(value)
                    
                    # 实验组
                    model_info = log.get("model_info", {})
                    group = model_info.get("experiment_group", "unknown")
                    groups.append(group)
                except:
                    continue
        
        if not timestamps:
            print("无法解析时间戳")
            return
        
        # 创建DataFrame
        df = pd.DataFrame({
            "timestamp": timestamps,
            "value": values,
            "group": groups
        })
        
        # 按时间排序
        df = df.sort_values("timestamp")
        
        # 创建图表
        plt.figure(figsize=(12, 6))
        
        # 按实验组分别绘制
        for group in df["group"].unique():
            group_data = df[df["group"] == group]
            
            # 滑动平均
            if len(group_data) > window:
                group_data[f"value_smooth"] = group_data["value"].rolling(window=window).mean()
                plt.plot(group_data["timestamp"], 
                        group_data["value_smooth"], 
                        label=f"{group}组(滑动平均)", 
                        linewidth=2)
            else:
                plt.plot(group_data["timestamp"], 
                        group_data["value"], 
                        label=f"{group}组", 
                        alpha=0.6)
        
        plt.title(f"{metric} 时间序列图")
        plt.xlabel("时间")
        plt.ylabel(metric)
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.xticks(rotation=45)
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
            print(f"时间序列图已保存到: {save_path}")
        
        plt.show()

# 使用示例
visualizer = ABTestVisualizer()

# 绘制指标对比图
visualizer.plot_metric_comparison(
    analysis_result,
    save_path="metric_comparison.png"
)

# 绘制推理时间时间序列图
visualizer.plot_time_series(
    "rerank_logs.jsonl",
    metric="inference_time_ms",
    window=50,
    save_path="inference_time_trend.png"
)

6. 总结

通过今天的内容,我们完整搭建了一套针对通义千问3-VL-Reranker-8B的AB测试框架。这套框架不仅能让你的重排序服务跑起来,还能告诉你它跑得怎么样。

6.1 核心收获

回顾一下我们完成的工作:

  1. 日志埋点系统:设计了完整的日志结构,记录了从查询、候选集到排序结果的全链路信息
  2. AB测试框架:实现了流量分配、模型版本管理、实验组控制
  3. 效果评估体系:定义了NDCG、MRR、Precision@k等关键指标
  4. 统计分析工具:提供了统计显著性检验和可视化分析

6.2 实际应用建议

在实际项目中,我建议你:

从小规模开始:不要一开始就全流量AB测试。可以先从1%的流量开始,观察效果稳定后再逐步放大。

关注业务指标:除了技术指标(NDCG、MRR),更要关注业务指标——点击率、转化率、用户停留时间等。

定期回顾:AB测试不是一劳永逸的。模型效果可能会随着数据分布变化而漂移,建议每月至少回顾一次。

多维度分析:不要只看整体效果。要分析不同查询类型、不同用户群体、不同时间段的效果差异。

6.3 下一步优化方向

如果你已经跑通了基础框架,可以考虑这些优化:

  1. 实时监控:将关键指标接入监控系统,设置告警阈值
  2. 自动化报告:定期自动生成AB测试报告,发送给相关团队
  3. 多变量测试:同时测试多个变量(如模型参数、排序策略等)
  4. 长期效果追踪:跟踪模型上线后的长期效果,避免短期优化导致长期下降

6.4 最后的话

AB测试是模型迭代的指南针。没有它,你就像在黑暗中摸索;有了它,你就能数据驱动地做出决策。

通义千问3-VL-Reranker-8B是一个强大的多模态重排序模型,但再好的模型也需要科学的评估。希望今天的内容能帮你建立起这套评估体系,让你的模型优化之路更加清晰。

记住,好的技术不仅要能用,还要知道用得好不好。AB测试就是帮你回答这个问题的工具。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐