从0到1构建Multi-Agent数据分析平台:LangGraph实战

摘要/引言

在当今数据驱动的世界中,数据分析已经成为企业决策的核心。然而,传统的数据分析流程往往需要专业的数据科学家团队,耗时耗力。想象一下,如果有一个平台,能够自动理解你的数据分析需求,调用合适的工具,执行复杂的分析任务,并以直观的方式呈现结果,那将会带来多大的变革?

这正是我们今天要探讨的主题——构建一个基于多智能体(Multi-Agent)系统的数据分析平台。借助LangGraph,我们将创建一个由多个专门化AI代理组成的协作系统,它们能够分工合作,完成从数据理解到洞察提取的全过程。

在这篇文章中,你将学到:

  • Multi-Agent系统的核心概念和优势
  • LangGraph的架构和工作原理
  • 如何设计和实现专门化的数据分析代理
  • 如何让多个代理有效地协作完成复杂任务
  • 如何构建一个端到端的数据分析平台

我们将从基础概念开始,逐步深入到系统设计和代码实现,最后通过一个完整的案例来展示这个平台的威力。无论你是AI开发者、数据科学家还是对自动化数据分析感兴趣的技术爱好者,这篇文章都将为你提供有价值的见解和实践指导。

一、Multi-Agent系统基础

1.1 核心概念

Multi-Agent系统(多智能体系统)是人工智能领域的一个重要分支,它由多个自主的智能体(Agent)组成,这些智能体在一个共享的环境中交互,共同完成单个智能体难以完成的任务。

在这个系统中,每个智能体都具有以下特性:

  • 自主性(Autonomy):智能体能够在没有人类或其他智能体直接干预的情况下运行,并对自己的行为和内部状态有一定的控制权。
  • 社交能力(Social Ability):智能体能够通过某种通信语言与其他智能体(以及可能的人类)进行交互。
  • 反应性(Reactivity):智能体能够感知环境(可能是物理世界、虚拟世界、其他智能体或互联网等),并对环境的变化做出及时反应。
  • 主动性(Pro-activeness):智能体不仅仅对环境做出反应,它们能够通过主动采取行动来实现目标。

在数据分析的场景中,我们可以设计专门的智能体来负责不同的任务,如数据理解、数据清洗、可视化生成、统计分析等,然后让它们协同工作,完成整个数据分析流程。

1.2 问题背景

传统的数据分析流程通常涉及多个步骤和不同专业技能的人员:

  1. 问题定义:业务人员提出数据分析需求
  2. 数据收集:数据工程师收集和整理相关数据
  3. 数据清洗:数据科学家处理缺失值、异常值等
  4. 探索性分析:生成统计摘要和可视化图表
  5. 高级分析:应用机器学习或统计算法
  6. 结果解释:将分析结果转化为业务洞察
  7. 报告生成:制作可视化报告

这个流程存在几个明显的问题:

  • 技能要求高:需要多个领域的专业知识
  • 流程繁琐:多个步骤之间的切换效率低下
  • 沟通成本高:不同角色之间的沟通容易产生误解
  • 响应速度慢:从需求到结果往往需要数天甚至数周

Multi-Agent系统为解决这些问题提供了一个新的思路:通过将不同的数据分析任务分配给专门化的智能体,并让它们协同工作,我们可以实现数据分析流程的自动化和高效化。

1.3 传统单Agent vs Multi-Agent在数据分析中的对比

特性 单Agent系统 Multi-Agent系统
任务复杂度 适合相对简单的任务 能够处理复杂的多步骤任务
专业化程度 泛而不专 每个Agent可以专注特定领域
错误恢复 单点故障风险高 可以通过冗余和协作提高容错性
可扩展性 扩展难度大 可以灵活添加新的Agent
处理并行任务 通常顺序执行 可以并行处理多个子任务
学习和适应 学习范围有限 可以通过Agent间的知识共享提高整体能力
实现复杂度 相对简单 设计和协调复杂度高

二、LangGraph概述

2.1 什么是LangGraph

LangGraph是LangChain生态系统中的一个库,专门用于构建状态化的、多角色的语言模型应用程序。它允许开发者定义由多个节点(通常代表不同的代理或处理步骤)组成的图,这些节点通过边连接,表示信息流和控制流。

与传统的链式(Chain)架构不同,LangGraph支持:

  • 循环(Cycles):允许回到之前的步骤,这对于需要反思和迭代的任务非常重要
  • 状态管理(State Management):内置的状态管理机制,便于在不同节点之间传递信息
  • 条件边(Conditional Edges):根据当前状态决定下一步走向哪个节点
  • 人机交互(Human-in-the-loop):支持在流程中插入人类决策点

这些特性使得LangGraph非常适合构建Multi-Agent系统,特别是那些需要复杂决策流程和多步骤协作的应用。

2.2 LangGraph核心概念

节点(Nodes)

节点是LangGraph中的基本执行单元。每个节点代表一个功能单元,可以是:

  • 一个LLM调用
  • 一个工具使用
  • 一段自定义代码
  • 甚至是一个等待用户输入的状态

在Multi-Agent系统中,每个节点通常代表一个专门的代理或代理的特定行为。

边(Edges)

边定义了节点之间的连接关系,决定了执行流程。LangGraph支持两种类型的边:

  1. 普通边(Normal Edges):无条件地从一个节点流向另一个节点
  2. 条件边(Conditional Edges):根据当前状态决定下一步的去向
状态(State)

状态是LangGraph的核心概念之一,它代表了图在执行过程中累积的所有信息。状态可以在节点之间传递,每个节点可以读取和修改状态。

状态通常是一个TypedDict或Pydantic模型,定义了图中所有节点可以访问和修改的数据结构。

2.3 LangGraph架构设计原理

LangGraph的设计灵感来源于状态机和数据流图,但它针对LLM应用进行了优化。其核心架构原理包括:

  1. 消息传递范式:节点之间通过消息传递进行通信,这些消息被累积在状态中
  2. 循环支持:与传统的DAG(有向无环图)不同,LangGraph允许循环,这对于代理之间的对话和迭代改进非常重要
  3. 模块化设计:每个节点都是独立的,可以单独测试和改进
  4. 可观察性:提供了完整的执行历史记录,便于调试和优化

三、Multi-Agent数据分析平台设计

3.1 系统整体架构

我们的Multi-Agent数据分析平台将由以下几个核心组件组成:

用户界面

协调代理

数据理解代理

数据清洗代理

分析代理

可视化代理

报告生成代理

数据存储

工具库

这个架构中,协调代理(Orchestrator)充当中心控制器,负责理解用户需求,规划任务流程,并协调其他专业代理的工作。每个专业代理负责数据分析流程中的一个特定环节,它们通过协调代理进行通信,并共享对数据存储的访问。

3.2 核心代理设计

3.2.1 协调代理(Orchestrator Agent)

协调代理是整个系统的"大脑",它的主要职责包括:

  • 理解和解析用户的数据分析请求
  • 规划完成任务所需的步骤
  • 分配任务给合适的专业代理
  • 监控任务执行进度
  • 处理异常和错误情况
  • 整合各代理的输出,形成最终结果
3.2.2 数据理解代理(Data Understanding Agent)

数据理解代理负责与数据打交道,它的职责包括:

  • 探索数据集的结构和内容
  • 识别数据类型和格式
  • 生成数据的统计摘要
  • 发现数据中的潜在问题和异常
  • 理解数据的业务含义
3.2.3 数据清洗代理(Data Cleaning Agent)

数据清洗代理专注于提高数据质量,它的职责包括:

  • 处理缺失值
  • 识别和处理异常值
  • 数据类型转换
  • 数据标准化和规范化
  • 处理重复数据
3.2.4 分析代理(Analysis Agent)

分析代理负责执行实际的数据分析任务,它的职责包括:

  • 选择合适的分析方法
  • 执行统计分析
  • 应用机器学习模型
  • 发现数据中的模式和趋势
  • 生成假设并进行验证
3.2.5 可视化代理(Visualization Agent)

可视化代理专注于将数据和分析结果转化为直观的图表,它的职责包括:

  • 选择合适的图表类型
  • 生成数据可视化
  • 确保可视化的清晰性和有效性
  • 添加必要的注释和解释
3.2.6 报告生成代理(Report Generation Agent)

报告生成代理负责将所有分析结果整合为一个连贯的报告,它的职责包括:

  • 组织分析结果的逻辑结构
  • 撰写解释性文本
  • 整合可视化图表
  • 确保报告的专业性和可读性
  • 根据用户需求调整报告风格

3.3 代理间交互设计

在Multi-Agent系统中,代理之间的有效交互至关重要。我们设计了以下几种交互模式:

3.3.1 顺序协作模式
报告生成代理 可视化代理 分析代理 数据清洗代理 数据理解代理 协调代理 用户 报告生成代理 可视化代理 分析代理 数据清洗代理 数据理解代理 协调代理 用户 数据分析请求 探索数据 数据摘要 清洗数据 清洗后数据 执行分析 分析结果 生成可视化 可视化图表 生成报告 最终报告 交付结果
3.3.2 并行协作模式

对于某些可以同时进行的任务,我们采用并行协作模式:

开始

协调代理

统计分析

趋势预测

相关性分析

结果整合

结束

3.3.3 迭代改进模式

对于需要多次迭代才能达到最佳效果的任务,我们采用迭代改进模式:

不满足要求

满足要求

初始分析

结果评估

反馈调整

输出结果

3.4 状态管理设计

有效的状态管理是Multi-Agent系统成功的关键。我们设计了一个综合的状态结构,用于在整个分析流程中传递信息:

from typing import TypedDict, List, Dict, Any, Optional
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

class DataSetInfo(TypedDict):
    name: str
    description: str
    columns: List[str]
    row_count: int
    column_types: Dict[str, str]
    sample_data: List[Dict[str, Any]]
    statistics: Dict[str, Any]

class AnalysisStep(TypedDict):
    step_name: str
    assigned_agent: str
    status: TaskStatus
    input_data: Optional[Dict[str, Any]]
    output_data: Optional[Dict[str, Any]]
    errors: Optional[List[str]]

class Visualization(TypedDict):
    id: str
    type: str
    title: str
    description: str
    data: Dict[str, Any]
    image_path: Optional[str]

class ReportSection(TypedDict):
    title: str
    content: str
    visualizations: List[str]  # 关联的可视化ID

class AnalysisState(TypedDict):
    # 基本信息
    request_id: str
    user_request: str
    
    # 数据集信息
    raw_data: Optional[DataSetInfo]
    cleaned_data: Optional[DataSetInfo]
    
    # 分析流程
    analysis_plan: List[AnalysisStep]
    current_step_index: int
    
    # 分析结果
    analysis_results: Dict[str, Any]
    
    # 可视化
    visualizations: List[Visualization]
    
    # 报告
    report_sections: List[ReportSection]
    final_report: Optional[str]
    
    # 错误处理
    errors: List[str]
    
    # 元数据
    created_at: str
    updated_at: str

这个状态结构涵盖了从初始请求到最终报告的整个分析过程中的所有关键信息,每个代理都可以读取和修改它需要的部分。

四、环境设置与工具准备

4.1 系统要求和依赖

在开始实现之前,我们需要准备好开发环境。以下是我们的系统要求和主要依赖:

  • Python 3.9+
  • LangGraph 0.0.26+
  • LangChain 0.1.0+
  • OpenAI API 或兼容的LLM服务
  • Pandas (数据处理)
  • NumPy (数值计算)
  • Matplotlib/Seaborn/Plotly (可视化)
  • Scikit-learn (机器学习工具)
  • FastAPI (API服务,可选)

4.2 环境安装步骤

让我们一步步设置开发环境:

  1. 创建虚拟环境:
python -m venv multiagent-env
source multiagent-env/bin/activate  # Linux/Mac
# 或者
multiagent-env\Scripts\activate  # Windows
  1. 安装核心依赖:
pip install langgraph langchain langchain-openai pandas numpy matplotlib seaborn plotly scikit-learn python-dotenv
  1. 设置环境变量:
    创建一个.env文件,添加必要的API密钥:
OPENAI_API_KEY=your_openai_api_key_here
# 可选的其他配置
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=your_langchain_api_key_here

4.3 项目结构设计

为了保持代码的组织性和可维护性,我们采用以下项目结构:

multiagent-analytics-platform/
├── .env
├── requirements.txt
├── README.md
├── config/
│   ├── __init__.py
│   └── settings.py
├── data/
│   ├── raw/
│   ├── processed/
│   └── sample/
├── src/
│   ├── __init__.py
│   ├── agents/
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── orchestrator.py
│   │   ├── data_understanding.py
│   │   ├── data_cleaning.py
│   │   ├── analysis.py
│   │   ├── visualization.py
│   │   └── report_generation.py
│   ├── graph/
│   │   ├── __init__.py
│   │   ├── state.py
│   │   ├── nodes.py
│   │   └── workflow.py
│   ├── tools/
│   │   ├── __init__.py
│   │   ├── data_tools.py
│   │   ├── analysis_tools.py
│   │   └── visualization_tools.py
│   ├── utils/
│   │   ├── __init__.py
│   │   ├── data_utils.py
│   │   └── visualization_utils.py
│   └── main.py
├── notebooks/
│   └── exploratory_analysis.ipynb
├── tests/
│   ├── __init__.py
│   ├── test_agents.py
│   └── test_tools.py
└── output/
    ├── visualizations/
    └── reports/

这个结构将不同功能的代码分开存放,便于开发和维护。

五、核心代理实现

5.1 基础代理类

首先,我们创建一个基础代理类,所有其他代理都将继承自这个类:

# src/agents/base.py
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
from langchain_core.language_models import BaseLanguageModel
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain_core.tools import BaseTool

class BaseAgent(ABC):
    """基础代理类,所有专业代理的父类"""
    
    def __init__(
        self,
        llm: BaseLanguageModel,
        tools: Optional[list[BaseTool]] = None,
        system_prompt: Optional[str] = None,
    ):
        self.llm = llm
        self.tools = tools or []
        self.system_prompt = system_prompt or self._get_default_system_prompt()
        self.agent_executor = self._create_agent_executor()
    
    @abstractmethod
    def _get_default_system_prompt(self) -> str:
        """获取默认的系统提示词,由子类实现"""
        pass
    
    def _create_agent_executor(self) -> AgentExecutor:
        """创建代理执行器"""
        prompt = ChatPromptTemplate.from_messages([
            ("system", self.system_prompt),
            MessagesPlaceholder(variable_name="messages"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])
        
        agent = create_openai_functions_agent(self.llm, self.tools, prompt)
        return AgentExecutor(agent=agent, tools=self.tools, verbose=True)
    
    async def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """运行代理并返回结果"""
        try:
            result = await self.agent_executor.ainvoke(input_data)
            return {
                "success": True,
                "output": result.get("output", ""),
                "intermediate_steps": result.get("intermediate_steps", [])
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "output": ""
            }

这个基础类提供了代理的通用功能,包括创建代理执行器和运行代理的方法。每个专业代理只需要实现自己的默认系统提示词,并提供特定的工具即可。

5.2 数据理解代理实现

数据理解代理是我们的第一个专业代理,负责探索和理解数据集:

# src/agents/data_understanding.py
from typing import List, Dict, Any
from langchain_core.language_models import BaseLanguageModel
from langchain_core.tools import tool
from .base import BaseAgent
import pandas as pd
import json

# 数据理解工具
@tool
def analyze_data_structure(file_path: str) -> str:
    """分析数据集的结构信息,包括行列数、列名、数据类型等"""
    try:
        df = pd.read_csv(file_path)
        structure_info = {
            "row_count": len(df),
            "column_count": len(df.columns),
            "columns": list(df.columns),
            "data_types": {col: str(df[col].dtype) for col in df.columns},
            "memory_usage": df.memory_usage(deep=True).sum() / (1024 * 1024)  # MB
        }
        return json.dumps(structure_info, ensure_ascii=False)
    except Exception as e:
        return f"Error analyzing data structure: {str(e)}"

@tool
def generate_data_summary(file_path: str) -> str:
    """生成数据集的统计摘要"""
    try:
        df = pd.read_csv(file_path)
        summary = {
            "numeric_columns": {},
            "categorical_columns": {}
        }
        
        # 数值列的统计信息
        numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
        for col in numeric_cols:
            summary["numeric_columns"][col] = {
                "count": int(df[col].count()),
                "missing": int(df[col].isna().sum()),
                "mean": float(df[col].mean()) if not df[col].isna().all() else None,
                "std": float(df[col].std()) if not df[col].isna().all() else None,
                "min": float(df[col].min()) if not df[col].isna().all() else None,
                "max": float(df[col].max()) if not df[col].isna().all() else None,
                "25%": float(df[col].quantile(0.25)) if not df[col].isna().all() else None,
                "50%": float(df[col].quantile(0.5)) if not df[col].isna().all() else None,
                "75%": float(df[col].quantile(0.75)) if not df[col].isna().all() else None
            }
        
        # 类别列的统计信息
        categorical_cols = df.select_dtypes(include=['object', 'category']).columns
        for col in categorical_cols:
            value_counts = df[col].value_counts().head(10).to_dict()
            # 转换为可序列化类型
            serializable_counts = {str(k): int(v) for k, v in value_counts.items()}
            summary["categorical_columns"][col] = {
                "count": int(df[col].count()),
                "missing": int(df[col].isna().sum()),
                "unique": int(df[col].nunique()),
                "top_values": serializable_counts
            }
        
        return json.dumps(summary, ensure_ascii=False)
    except Exception as e:
        return f"Error generating data summary: {str(e)}"

@tool
def sample_data(file_path: str, n: int = 5) -> str:
    """获取数据集的样本数据"""
    try:
        df = pd.read_csv(file_path, nrows=n)
        return df.to_json(orient='records', force_ascii=False)
    except Exception as e:
        return f"Error sampling data: {str(e)}"

class DataUnderstandingAgent(BaseAgent):
    """数据理解代理,负责探索和理解数据集"""
    
    def _get_default_system_prompt(self) -> str:
        return """你是一位专业的数据理解专家,负责探索和分析数据集的结构、内容和质量。

你的职责包括:
1. 分析数据集的整体结构(行列数、列名、数据类型等)
2. 生成数据的统计摘要(均值、中位数、标准差、分布等)
3. 检查数据质量问题(缺失值、异常值、不一致等)
4. 理解数据的业务含义和可能的分析方向

当你收到一个数据集时,请按照以下步骤操作:
1. 首先使用analyze_data_structure工具了解数据的基本结构
2. 然后使用generate_data_summary工具生成详细的统计摘要
3. 最后使用sample_data工具查看一些样本数据
4. 基于以上信息,提供一个全面的数据理解报告,包括:
   - 数据集的基本描述
   - 各列的含义和数据类型
   - 数据质量评估
   - 可能的分析方向和建议

请确保你的报告清晰、专业、有建设性。"""
    
    def __init__(self, llm: BaseLanguageModel):
        tools = [analyze_data_structure, generate_data_summary, sample_data]
        super().__init__(llm, tools)

这个数据理解代理配备了三个专门的工具,用于分析数据结构、生成统计摘要和获取样本数据。它能够自动执行这些工具,并基于结果生成一个全面的数据理解报告。

5.3 数据清洗代理实现

接下来,我们实现数据清洗代理,它负责处理数据质量问题:

# src/agents/data_cleaning.py
from typing import List, Dict, Any, Optional
from langchain_core.language_models import BaseLanguageModel
from langchain_core.tools import tool
from .base import BaseAgent
import pandas as pd
import numpy as np
import json
import os

# 数据清洗工具
@tool
def handle_missing_values(
    file_path: str, 
    strategy: str = "auto", 
    columns: Optional[List[str]] = None,
    output_path: Optional[str] = None
) -> str:
    """处理数据集中的缺失值
    
    参数:
        file_path: 输入文件路径
        strategy: 处理策略 ('auto', 'drop', 'mean', 'median', 'mode', 'forward_fill', 'backward_fill')
        columns: 要处理的列,None表示处理所有列
        output_path: 输出文件路径,None表示覆盖原文件
    """
    try:
        df = pd.read_csv(file_path)
        original_missing = df.isnull().sum().to_dict()
        
        if columns is None:
            columns = df.columns.tolist()
        
        if strategy == "auto":
            # 自动选择策略
            for col in columns:
                if df[col].isnull().sum() == 0:
                    continue
                    
                # 如果缺失值超过30%,考虑删除该列
                if df[col].isnull().sum() / len(df) > 0.3:
                    df.drop(col, axis=1, inplace=True)
                    continue
                
                # 根据列类型选择填充策略
                if df[col].dtype in ['int64', 'float64']:
                    # 数值列: 如果偏度较大使用中位数,否则使用均值
                    skewness = df[col].skew()
                    if abs(skewness) > 1:
                        df[col].fillna(df[col].median(), inplace=True)
                    else:
                        df[col].fillna(df[col].mean(), inplace=True)
                else:
                    # 类别列: 使用众数填充
                    mode_val = df[col].mode()
                    if not mode_val.empty:
                        df[col].fillna(mode_val[0], inplace=True)
        else:
            # 使用指定策略
            for col in columns:
                if strategy == "drop":
                    df.dropna(subset=[col], inplace=True)
                elif strategy == "mean" and df[col].dtype in ['int64', 'float64']:
                    df[col].fillna(df[col].mean(), inplace=True)
                elif strategy == "median" and df[col].dtype in ['int64', 'float64']:
                    df[col].fillna(df[col].median(), inplace=True)
                elif strategy == "mode":
                    mode_val = df[col].mode()
                    if not mode_val.empty:
                        df[col].fillna(mode_val[0], inplace=True)
                elif strategy == "forward_fill":
                    df[col].fillna(method='ffill', inplace=True)
                elif strategy == "backward_fill":
                    df[col].fillna(method='bfill', inplace=True)
        
        # 保存结果
        final_output_path = output_path or file_path
        df.to_csv(final_output_path, index=False)
        
        # 计算处理结果
        result = {
            "status": "success",
            "original_missing": {k: int(v) for k, v in original_missing.items()},
            "final_missing": {k: int(v) for k, v in df.isnull().sum().items()},
            "rows_removed": len(pd.read_csv(file_path)) - len(df),
            "columns_removed": len(pd.read_csv(file_path).columns) - len(df.columns),
            "output_path": final_output_path
        }
        
        return json.dumps(result, ensure_ascii=False)
    except Exception as e:
        return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False)

@tool
def handle_outliers(
    file_path: str,
    method: str = "iqr",
    columns: Optional[List[str]] = None,
    factor: float = 1.5,
    output_path: Optional[str] = None
) -> str:
    """处理数据集中的异常值
    
    参数:
        file_path: 输入文件路径
        method: 处理方法 ('iqr', 'zscore', 'winsorize')
        columns: 要处理的列,None表示处理所有数值列
        factor: 异常值检测因子
        output_path: 输出文件路径,None表示覆盖原文件
    """
    try:
        df = pd.read_csv(file_path)
        
        if columns is None:
            columns = df.select_dtypes(include=['int64', 'float64']).columns.tolist()
        
        outlier_info = {}
        
        for col in columns:
            if df[col].dtype not in ['int64', 'float64']:
                continue
                
            col_data = df[col].dropna()
            if len(col_data) == 0:
                continue
                
            outliers_count = 0
            
            if method == "iqr":
                Q1 = col_data.quantile(0.25)
                Q3 = col_data.quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - factor * IQR
                upper_bound = Q3 + factor * IQR
                
                # 检测异常值
                outliers = (df[col] < lower_bound) | (df[col] > upper_bound)
                outliers_count = outliers.sum()
                
                # 处理异常值:用边界值替代
                df.loc[df[col] < lower_bound, col] = lower_bound
                df.loc[df[col] > upper_bound, col] = upper_bound
                
            elif method == "zscore":
                z_scores = np.abs((col_data - col_data.mean()) / col_data.std())
                outlier_mask = z_scores > factor
                
                # 获取异常值索引
                outlier_indices = col_data[outlier_mask].index
                
                # 用均值替代异常值
                df.loc[outlier_indices, col] = col_data.mean()
                outliers_count = len(outlier_indices)
                
            elif method == "winsorize":
                from scipy.stats.mstats import winsorize
                # 对数据进行缩尾处理
                df[col] = winsorize(df[col], limits=[factor/2, factor/2])
                # 估算异常值数量
                Q1 = col_data.quantile(0.25)
                Q3 = col_data.quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                outliers_count = ((df[col] < lower_bound) | (df[col] > upper_bound)).sum()
            
            outlier_info[col] = int(outliers_count)
        
        # 保存结果
        final_output_path = output_path or file_path
        df.to_csv(final_output_path, index=False)
        
        result = {
            "status": "success",
            "outlier_info": outlier_info,
            "total_outliers_handled": sum(outlier_info.values()),
            "output_path": final_output_path
        }
        
        return json.dumps(result, ensure_ascii=False)
    except Exception as e:
        return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False)

@tool
def convert_data_types(
    file_path: str,
    type_mappings: Optional[Dict[str, str]] = None,
    auto_detect: bool = True,
    output_path: Optional[str] = None
) -> str:
    """转换数据列的数据类型
    
    参数:
        file_path: 输入文件路径
        type_mappings: 显式类型映射字典 {列名: 目标类型}
        auto_detect: 是否自动检测并转换类型
        output_path: 输出文件路径,None表示覆盖原文件
    """
    try:
        df = pd.read_csv(file_path)
        original_types = {col: str(df[col].dtype) for col in df.columns}
        changes_made = {}
        
        # 处理显式类型映射
        if type_mappings:
            for col, target_type in type_mappings.items():
                if col not in df.columns:
                    continue
                    
                try:
                    original_type = str(df[col].dtype)
                    
                    if target_type == 'int':
                        df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)
                    elif target_type == 'float':
                        df[col] = pd.to_numeric(df[col], errors='coerce')
                    elif target_type == 'datetime':
                        df[col] = pd.to_datetime(df[col], errors='coerce')
                    elif target_type == 'category':
                        df[col] = df[col].astype('category')
                    elif target_type == 'string':
                        df[col] = df[col].astype(str)
                    
                    new_type = str(df[col].dtype)
                    if original_type != new_type:
                        changes_made[col] = {
                            "from": original_type,
                            "to": new_type
                        }
                except Exception as e:
                    changes_made[col] = {
                        "error": str(e)
                    }
        
        # 自动检测和转换
        if auto_detect:
            for col in df.columns:
                # 跳过已经显式转换的列
                if col in changes_made and "error" not in changes_made[col]:
                    continue
                    
                original_type = str(df[col].dtype)
                
                # 尝试转换日期时间
                if original_type == 'object':
                    try:
                        # 只有当大部分值可以转换为日期时才进行转换
                        converted = pd.to_datetime(df[col], errors='coerce')
                        if converted.notna().mean() > 0.8:  # 至少80%的值可以转换
                            df[col] = converted
                            new_type = str(df[col].dtype)
                            changes_made[col] = {
                                "from": original_type,
                                "to": new_type,
                                "method": "auto_datetime"
                            }
                            continue
                    except:
                        pass
                    
                    # 尝试转换数值
                    try:
                        converted = pd.to_numeric(df[col], errors='coerce')
                        if converted.notna().mean() > 0.8:  # 至少80%的值可以转换
                            df[col] = converted
                            new_type = str(df[col].dtype)
                            changes_made[col] = {
                                "from": original_type,
                                "to": new_type,
                                "method": "auto_numeric"
                            }
                            continue
                    except:
                        pass
                    
                    # 检查是否应该是类别类型
                    try:
                        unique_ratio = df[col].nunique() / len(df[col].dropna())
                        if unique_ratio < 0.05 and df[col].nunique() < 100:  # 类别比例小于5%且类别数小于100
                            df[col] = df[col].astype('category')
                            new_type = str(df[col].dtype)
                            changes_made[col] = {
                                "from": original_type,
                                "to": new_type,
                                "method": "auto_category"
                            }
                    except:
                        pass
        
        # 保存结果
        final_output_path = output_path or file_path
        df.to_csv(final_output_path, index=False)
        
        result = {
            "status": "success",
            "original_types": original_types,
            "final_types": {col: str(df[col].dtype) for col in df.columns},
            "changes_made": changes_made,
            "output_path": final_output_path
        }
        
        return json.dumps(result, ensure_ascii=False)
    except Exception as e:
        return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False)

class DataCleaningAgent(BaseAgent):
    """数据清洗代理,负责处理数据质量问题"""
    
    def _get_default_system_prompt(self) -> str:
        return """你是一位专业的数据清洗专家,负责识别和处理数据集中的质量问题。

你的职责包括:
1. 处理缺失值 - 根据数据类型和分布选择合适的填充或删除策略
2. 检测和处理异常值 - 使用IQR、Z-score等方法识别和处理异常值
3. 数据类型转换 - 确保每列的数据类型合适且正确
4. 数据标准化 - 处理数据的度量单位和范围问题
5. 去除重复数据 - 识别和删除重复的记录

在执行数据清洗任务时,请遵循以下步骤:
1. 首先,仔细阅读数据理解阶段的报告,了解数据的基本情况
2. 识别数据中存在的主要质量问题
3. 优先处理最严重的问题
4. 选择合适的工具和参数执行清洗操作
5. 评估清洗结果,确保数据质量得到提升
6. 记录所有的修改和操作,确保可追溯性

请记住,数据清洗是一个迭代过程,可能需要多次尝试不同的方法才能获得最佳结果。同时,要注意不要过度清洗,以免丢失重要信息。"""
    
    def __init__(self, llm: BaseLanguageModel):
        tools = [handle_missing_values, handle_outliers, convert_data_types]
        super().__init__(llm, tools)

这个数据清洗代理提供了处理缺失值、异常值和数据类型转换的工具。它能够根据数据的特点自动选择合适的处理策略,也支持用户指定具体的处理方法。

由于代码量很大,我将在下面的部分继续实现其他代理和系统组件。每个代理都将有自己的专门工具和系统提示词,使它们能够在数据分析流程中发挥特定的作用。

(由于篇幅限制,文章后续部分将包括剩余代理的实现、LangGraph工作流的构建、系统集成、测试、最佳实践、案例研究等内容,以及总结和未来展望。)

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐