ChatGPT处理Excel表格实战指南:从数据清洗到自动化分析

作为一名经常和数据打交道的开发者,我深知处理Excel表格时的那种“痛并快乐着”。快乐的是数据就在那里,痛的是要把它变成可用的格式。特别是面对那些格式混乱、多表关联、需要人工校验的Excel文件时,传统的处理方法往往效率低下,容易出错。

今天,我想分享一套结合ChatGPT API的实战方案,希望能帮你把数据处理效率提升3倍以上。这套方案的核心思路是:让AI来理解表格的“语义”,而不仅仅是解析单元格。

1. 为什么需要ChatGPT来处理Excel?

在深入代码之前,我们先看看传统方法遇到的三大瓶颈:

  1. 非结构化数据解析困难 很多Excel表格并不是规整的数据库格式。你可能遇到合并单元格、多行表头、注释行混杂在数据中,甚至同一列里既有数字又有文本描述。用Pandas的read_excel()处理这些情况时,往往需要大量的事后清洗代码。

  2. 多表关联效率低下 当需要根据多个表格的信息进行关联分析时(比如销售表、客户表、产品表),传统方法需要写复杂的VLOOKUP或merge逻辑。一旦表格结构稍有变化,整个关联逻辑就可能失效。

  3. 人工校验耗时且易错 数据清洗中最耗时的部分往往是人工检查异常值、修正格式错误、统一命名规范。这些工作重复性高,但交给程序自动处理又担心误判。

2. 技术方案对比:传统 vs AI增强

为了更直观地展示差异,我整理了一个对比表格:

对比维度 传统Pandas方案 ChatGPT API方案
响应速度 毫秒级(本地处理) 秒级(API调用)
准确率 高(规则明确时) 更高(能理解上下文)
扩展性 需要为每种新格式编写代码 通过调整prompt适应新格式
开发成本 高(需要处理各种边界情况) 中(主要设计prompt)
适用场景 结构规整的表格 复杂、多变的表格格式

关键区别在于:传统方案是基于规则的,而ChatGPT方案是基于理解的。当表格格式变化时,你不需要重写解析逻辑,只需要调整描述表格的prompt。

3. 核心实现三步走

3.1 第一步:使用openpyxl加载并提取数据

虽然Pandas也能读取Excel,但openpyxl给了我们更细粒度的控制。特别是处理那些有复杂格式、合并单元格的表格时。

import openpyxl
from openpyxl import load_workbook
import json

def load_excel_with_openpyxl(file_path, sheet_name=None):
    """
    使用openpyxl加载Excel文件,提取表格结构和数据
    参数说明:
    - file_path: Excel文件路径
    - sheet_name: 指定工作表名称,默认为第一个工作表
    返回:包含表格元数据和数据的字典
    """
    # 加载工作簿
    wb = load_workbook(filename=file_path, data_only=True)
    
    # 选择工作表
    if sheet_name:
        ws = wb[sheet_name]
    else:
        ws = wb.active  # 默认使用第一个工作表
    
    # 获取表格范围
    max_row = ws.max_row
    max_column = ws.max_column
    
    # 提取表头(假设第一行为表头)
    headers = []
    for col in range(1, max_column + 1):
        cell_value = ws.cell(row=1, column=col).value
        headers.append(str(cell_value) if cell_value is not None else f"Column_{col}")
    
    # 提取数据行
    data_rows = []
    for row in range(2, max_row + 1):  # 从第二行开始
        row_data = {}
        for col_idx, header in enumerate(headers, start=1):
            cell_value = ws.cell(row=row, column=col_idx).value
            # 处理空值和不同类型的数据
            if cell_value is None:
                row_data[header] = ""
            elif isinstance(cell_value, (int, float)):
                row_data[header] = str(cell_value)
            else:
                row_data[header] = str(cell_value).strip()
        
        # 只添加非空行
        if any(row_data.values()):
            data_rows.append(row_data)
    
    # 构建返回结构
    result = {
        "file_name": file_path.split("/")[-1],
        "sheet_name": ws.title,
        "dimensions": f"{max_row}行 × {max_column}列",
        "headers": headers,
        "data": data_rows[:100],  # 限制前100行,避免token超限
        "total_rows": len(data_rows)
    }
    
    wb.close()
    return result

# 使用示例
excel_data = load_excel_with_openpyxl("销售数据.xlsx", "2024年Q1")
print(f"加载了{excel_data['total_rows']}行数据")

3.2 第二步:设计prompt模板处理数据清洗

这是整个方案的核心。好的prompt能让ChatGPT准确理解你的需求。

import openai
from typing import List, Dict, Any
import time

class ExcelProcessorWithChatGPT:
    def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):
        """
        初始化ChatGPT Excel处理器
        参数说明:
        - api_key: OpenAI API密钥
        - model: 使用的模型,默认为gpt-3.5-turbo(成本较低)
        参考:OpenAI官方文档 v1.0 - 模型列表
        """
        openai.api_key = api_key
        self.model = model
        self.prompt_template = """你是一个专业的数据清洗助手。请根据以下Excel表格数据,完成指定的清洗任务。

表格信息:
- 文件:{file_name}
- 工作表:{sheet_name}
- 维度:{dimensions}
- 表头:{headers}

原始数据(前{sample_size}行):
{data_sample}

清洗任务要求:
{task_description}

请严格按照以下JSON格式返回结果:
{{
  "cleaned_data": [
    {{
      "original_row": 原始行数据,
      "cleaned_row": 清洗后的行数据,
      "changes_made": 所做的修改说明,
      "confidence": 置信度(0-1)
    }}
  ],
  "summary": {{
    "total_rows_processed": 处理的总行数,
    "rows_modified": 修改的行数,
    "common_issues_found": 发现的常见问题列表,
    "processing_time": 处理时间
  }},
  "recommendations": 针对数据质量的改进建议
}}

注意:
1. 保持数据完整性,不要丢失任何原始信息
2. 对于不确定的修改,保持原样并降低置信度
3. 时间格式统一为YYYY-MM-DD
4. 金额格式统一为数字,不带货币符号
5. 中文文本保持UTF-8编码
"""
    
    def create_data_cleaning_prompt(self, excel_info: Dict[str, Any], 
                                   task_description: str,
                                   sample_size: int = 10) -> str:
        """
        创建数据清洗的prompt
        参数说明:
        - excel_info: load_excel_with_openpyxl返回的数据字典
        - task_description: 清洗任务的具体描述
        - sample_size: 发送给API的样本行数(控制token数量)
        """
        # 准备数据样本
        data_sample = excel_info['data'][:sample_size]
        data_sample_str = json.dumps(data_sample, ensure_ascii=False, indent=2)
        
        # 填充prompt模板
        prompt = self.prompt_template.format(
            file_name=excel_info['file_name'],
            sheet_name=excel_info['sheet_name'],
            dimensions=excel_info['dimensions'],
            headers=excel_info['headers'],
            sample_size=sample_size,
            data_sample=data_sample_str,
            task_description=task_description
        )
        
        return prompt
    
    def clean_data_with_chatgpt(self, excel_info: Dict[str, Any], 
                               task_description: str) -> Dict[str, Any]:
        """
        使用ChatGPT清洗Excel数据
        返回:清洗后的数据和分析结果
        """
        # 创建prompt
        prompt = self.create_data_cleaning_prompt(excel_info, task_description)
        
        try:
            # 调用ChatGPT API
            response = openai.ChatCompletion.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": "你是一个专业的数据分析师,擅长数据清洗和格式化。"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.1,  # 低温度确保输出稳定
                max_tokens=2000    # 根据响应长度调整
            )
            
            # 解析响应
            result_text = response.choices[0].message.content
            
            # 尝试解析JSON响应
            try:
                result = json.loads(result_text)
                return result
            except json.JSONDecodeError:
                # 如果响应不是合法JSON,返回原始文本
                return {"raw_response": result_text, "error": "JSON解析失败"}
                
        except Exception as e:
            return {"error": str(e), "task": task_description}

# 使用示例
processor = ExcelProcessorWithChatGPT(api_key="your-api-key")

# 定义清洗任务
task_desc = """
1. 统一日期格式:将所有日期列转换为YYYY-MM-DD格式
2. 清理金额数据:移除货币符号,转换为浮点数
3. 标准化产品名称:将相似的产品名称统一(如"iPhone13"和"iPhone 13"统一为"iPhone 13")
4. 填充空值:对于数值列,用0填充空值;对于文本列,用"未知"填充
5. 验证数据范围:检查销售额是否在合理范围内(0-1000000)
"""

result = processor.clean_data_with_chatgpt(excel_data, task_desc)
print(f"清洗完成,修改了{result.get('summary', {}).get('rows_modified', 0)}行数据")

3.3 第三步:实现异常重试机制

API调用可能会失败,我们需要一个健壮的重试机制。

import random
from datetime import datetime, timedelta

class RobustExcelProcessor(ExcelProcessorWithChatGPT):
    def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):
        super().__init__(api_key, model)
        self.max_retries = 3
        self.retry_delay = 1  # 初始延迟秒数
    
    def clean_data_with_retry(self, excel_info: Dict[str, Any], 
                             task_description: str) -> Dict[str, Any]:
        """
        带重试机制的数据清洗
        实现指数退避策略:1s, 2s, 4s, ...
        参考:OpenAI官方文档 v1.0 - 错误处理最佳实践
        """
        last_error = None
        
        for attempt in range(self.max_retries):
            try:
                print(f"第{attempt + 1}次尝试清洗数据...")
                
                # 调用父类方法
                result = super().clean_data_with_chatgpt(excel_info, task_description)
                
                # 检查是否有错误
                if "error" in result and "rate limit" in result["error"].lower():
                    # 遇到频率限制,需要等待
                    wait_time = self.retry_delay * (2 ** attempt) + random.uniform(0, 1)
                    print(f"遇到频率限制,等待{wait_time:.1f}秒后重试...")
                    time.sleep(wait_time)
                    continue
                    
                elif "error" in result:
                    # 其他错误,直接返回
                    return result
                    
                else:
                    # 成功,返回结果
                    return result
                    
            except openai.error.APIError as e:
                last_error = e
                print(f"API错误: {e}")
                
                # 指数退避
                wait_time = self.retry_delay * (2 ** attempt) + random.uniform(0, 1)
                print(f"等待{wait_time:.1f}秒后重试...")
                time.sleep(wait_time)
                
            except Exception as e:
                last_error = e
                print(f"未知错误: {e}")
                break
        
        # 所有重试都失败
        return {
            "error": f"清洗失败,重试{self.max_retries}次后仍错误",
            "last_error": str(last_error),
            "timestamp": datetime.now().isoformat()
        }
    
    def batch_process_with_retry(self, excel_files: List[Dict], 
                                task_description: str) -> List[Dict]:
        """
        批量处理多个Excel文件,每个文件独立重试
        参数说明:
        - excel_files: 包含文件信息的字典列表
        - task_description: 清洗任务描述
        返回:每个文件的处理结果列表
        """
        results = []
        
        for i, file_info in enumerate(excel_files):
            print(f"处理文件 {i+1}/{len(excel_files)}: {file_info.get('file_name', '未知')}")
            
            # 为每个文件单独重试
            result = self.clean_data_with_retry(file_info, task_description)
            result["file_index"] = i
            result["file_name"] = file_info.get("file_name", "未知")
            
            results.append(result)
            
            # 文件间添加小延迟,避免触发频率限制
            if i < len(excel_files) - 1:
                time.sleep(0.5)
        
        return results

# 使用示例
robust_processor = RobustExcelProcessor(api_key="your-api-key")

# 批量处理多个文件
file_list = [
    {"file_name": "sales_q1.xlsx", "data": excel_data_q1, ...},
    {"file_name": "sales_q2.xlsx", "data": excel_data_q2, ...},
]

batch_results = robust_processor.batch_process_with_retry(file_list, task_desc)
print(f"批量处理完成,成功{sum(1 for r in batch_results if 'error' not in r)}/{len(batch_results)}个文件")

4. 性能优化策略

4.1 批量处理时的API并发控制

当需要处理大量Excel文件时,合理的并发控制能显著提升效率。

import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import numpy as np

class ConcurrentExcelProcessor:
    def __init__(self, api_key: str, max_concurrent: int = 3):
        """
        并发处理Excel文件
        参数说明:
        - api_key: OpenAI API密钥
        - max_concurrent: 最大并发数,根据API限制调整
        参考:OpenAI官方文档 v1.0 - 频率限制说明
        """
        self.api_key = api_key
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_single_file_async(self, session: aiohttp.ClientSession,
                                       file_info: Dict[str, Any],
                                       task_description: str) -> Dict[str, Any]:
        """
        异步处理单个文件
        使用aiohttp提高IO效率
        """
        async with self.semaphore:  # 控制并发数
            try:
                # 准备请求数据
                prompt = self.create_prompt(file_info, task_description)
                
                # 异步调用API
                async with session.post(
                    "https://api.openai.com/v1/chat/completions",
                    headers={
                        "Authorization": f"Bearer {self.api_key}",
                        "Content-Type": "application/json"
                    },
                    json={
                        "model": "gpt-3.5-turbo",
                        "messages": [
                            {"role": "system", "content": "你是数据清洗专家"},
                            {"role": "user", "content": prompt}
                        ],
                        "temperature": 0.1,
                        "max_tokens": 1500
                    },
                    timeout=aiohttp.ClientTimeout(total=30)
                ) as response:
                    
                    if response.status == 200:
                        result = await response.json()
                        return self.parse_response(result, file_info)
                    else:
                        error_text = await response.text()
                        return {
                            "error": f"API错误: {response.status}",
                            "details": error_text,
                            "file": file_info.get("file_name")
                        }
                        
            except asyncio.TimeoutError:
                return {"error": "请求超时", "file": file_info.get("file_name")}
            except Exception as e:
                return {"error": str(e), "file": file_info.get("file_name")}
    
    async def process_batch_async(self, file_list: List[Dict],
                                 task_description: str) -> List[Dict]:
        """
        异步批量处理
        显著提升大量文件的处理速度
        """
        connector = aiohttp.TCPConnector(limit=self.max_concurrent)
        
        async with aiohttp.ClientSession(connector=connector) as session:
            tasks = []
            for file_info in file_list:
                task = self.process_single_file_async(session, file_info, task_description)
                tasks.append(task)
            
            # 等待所有任务完成
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 处理异常结果
            processed_results = []
            for i, result in enumerate(results):
                if isinstance(result, Exception):
                    processed_results.append({
                        "error": str(result),
                        "file": file_list[i].get("file_name", "未知")
                    })
                else:
                    processed_results.append(result)
            
            return processed_results
    
    def process_batch_sync(self, file_list: List[Dict],
                          task_description: str) -> List[Dict]:
        """
        同步接口,内部使用异步实现
        方便在同步代码中调用
        """
        # 创建或获取事件循环
        try:
            loop = asyncio.get_event_loop()
        except RuntimeError:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
        
        # 运行异步任务
        return loop.run_until_complete(
            self.process_batch_async(file_list, task_description)
        )

4.2 结果缓存设计

对于重复的处理任务,缓存可以大幅减少API调用次数和成本。

import redis
import hashlib
import pickle
from datetime import datetime, timedelta

class CachedExcelProcessor(ExcelProcessorWithChatGPT):
    def __init__(self, api_key: str, redis_host: str = "localhost",
                 redis_port: int = 6379, cache_ttl: int = 86400):
        """
        带缓存的数据处理器
        参数说明:
        - redis_host: Redis服务器地址
        - redis_port: Redis端口
        - cache_ttl: 缓存过期时间(秒),默认24小时
        """
        super().__init__(api_key)
        
        # 连接Redis
        try:
            self.redis_client = redis.Redis(
                host=redis_host,
                port=redis_port,
                decode_responses=False,  # 存储pickle数据
                socket_connect_timeout=3
            )
            self.redis_client.ping()  # 测试连接
            self.cache_enabled = True
            print("Redis缓存已启用")
        except Exception as e:
            print(f"Redis连接失败,禁用缓存: {e}")
            self.cache_enabled = False
            self.redis_client = None
        
        self.cache_ttl = cache_ttl
    
    def _generate_cache_key(self, excel_info: Dict, task_description: str) -> str:
        """
        生成缓存键
        基于文件内容和任务描述创建唯一标识
        """
        # 使用文件信息和任务描述生成哈希
        cache_data = {
            "file_name": excel_info.get("file_name"),
            "headers": excel_info.get("headers"),
            "data_sample": excel_info.get("data")[:5],  # 使用前5行作为样本
            "task": task_description
        }
        
        # 转换为字符串并哈希
        cache_str = json.dumps(cache_data, sort_keys=True)
        cache_hash = hashlib.md5(cache_str.encode()).hexdigest()
        
        return f"excel_clean:{cache_hash}"
    
    def clean_data_with_cache(self, excel_info: Dict[str, Any],
                            task_description: str) -> Dict[str, Any]:
        """
        带缓存的数据清洗
        先检查缓存,命中则直接返回,否则调用API并缓存结果
        """
        if not self.cache_enabled:
            # 缓存不可用,直接调用API
            return super().clean_data_with_chatgpt(excel_info, task_description)
        
        # 生成缓存键
        cache_key = self._generate_cache_key(excel_info, task_description)
        
        try:
            # 尝试从缓存读取
            cached_data = self.redis_client.get(cache_key)
            
            if cached_data:
                print("缓存命中,直接返回结果")
                return pickle.loads(cached_data)
            
            # 缓存未命中,调用API
            print("缓存未命中,调用API...")
            result = super().clean_data_with_chatgpt(excel_info, task_description)
            
            # 缓存结果(只缓存成功的响应)
            if "error" not in result:
                # 添加缓存时间戳
                result["_cached_at"] = datetime.now().isoformat()
                result["_cache_ttl"] = self.cache_ttl
                
                # 序列化并存储
                serialized = pickle.dumps(result)
                self.redis_client.setex(cache_key, self.cache_ttl, serialized)
                print(f"结果已缓存,键: {cache_key}, TTL: {self.cache_ttl}秒")
            
            return result
            
        except Exception as e:
            print(f"缓存处理出错: {e}")
            # 出错时降级到直接调用API
            return super().clean_data_with_chatgpt(excel_info, task_description)
    
    def get_cache_stats(self) -> Dict[str, Any]:
        """
        获取缓存统计信息
        """
        if not self.cache_enabled:
            return {"status": "disabled"}
        
        try:
            # 获取所有相关缓存键
            cache_keys = self.redis_client.keys("excel_clean:*")
            
            stats = {
                "total_cached_items": len(cache_keys),
                "cache_enabled": True,
                "cache_ttl": self.cache_ttl
            }
            
            # 如果有缓存项,获取更多信息
            if cache_keys:
                # 检查第一个键的TTL
                sample_key = cache_keys[0]
                ttl = self.redis_client.ttl(sample_key)
                stats["sample_ttl"] = ttl
                
                # 估算缓存大小
                total_size = 0
                for key in cache_keys[:10]:  # 只检查前10个
                    value = self.redis_client.get(key)
                    if value:
                        total_size += len(value)
                
                if len(cache_keys) > 10:
                    avg_size = total_size / 10
                    stats["estimated_total_size"] = avg_size * len(cache_keys)
                else:
                    stats["estimated_total_size"] = total_size
            
            return stats
            
        except Exception as e:
            return {"status": "error", "message": str(e)}

# Redis配置示例(docker-compose.yml片段)
"""
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    container_name: excel_cache_redis
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
    restart: unless-stopped

volumes:
  redis_data:
"""

# 使用示例
cached_processor = CachedExcelProcessor(
    api_key="your-api-key",
    redis_host="localhost",
    redis_port=6379,
    cache_ttl=3600  # 缓存1小时
)

# 第一次调用会缓存
result1 = cached_processor.clean_data_with_cache(excel_data, task_desc)

# 相同参数的第二次调用会命中缓存
result2 = cached_processor.clean_data_with_cache(excel_data, task_desc)

# 查看缓存统计
stats = cached_processor.get_cache_stats()
print(f"缓存统计: {stats}")

5. 避坑指南

5.1 处理中文编码的注意事项

中文处理是很多开发者容易踩坑的地方。

def handle_chinese_encoding(data: Dict[str, Any]) -> Dict[str, Any]:
    """
    正确处理中文编码问题
    确保在整个流程中保持UTF-8编码
    """
    result = {}
    
    for key, value in data.items():
        if isinstance(value, str):
            # 确保字符串是UTF-8编码
            try:
                # 尝试解码为UTF-8
                encoded = value.encode('utf-8').decode('utf-8')
                result[key] = encoded
            except (UnicodeEncodeError, UnicodeDecodeError):
                # 如果失败,尝试其他常见编码
                for encoding in ['gbk', 'gb2312', 'latin-1']:
                    try:
                        encoded = value.encode(encoding).decode('utf-8')
                        result[key] = encoded
                        print(f"警告: 将{key}从{encoding}转换为UTF-8")
                        break
                    except:
                        continue
                else:
                    # 所有编码都失败,保留原始值
                    result[key] = value
                    print(f"错误: 无法解码{key}的值")
        
        elif isinstance(value, dict):
            # 递归处理字典
            result[key] = handle_chinese_encoding(value)
        
        elif isinstance(value, list):
            # 处理列表
            result[key] = []
            for item in value:
                if isinstance(item, (str, dict)):
                    result[key].append(handle_chinese_encoding(item) 
                                     if isinstance(item, dict) 
                                     else handle_chinese_encoding({"item": item})["item"])
                else:
                    result[key].append(item)
        
        else:
            # 其他类型直接保留
            result[key] = value
    
    return result

# 在调用API前预处理中文数据
def preprocess_for_chinese(excel_info: Dict) -> Dict:
    """
    预处理Excel数据,确保中文正确处理
    """
    # 深度处理所有字符串字段
    processed_info = handle_chinese_encoding(excel_info)
    
    # 确保JSON序列化时使用ensure_ascii=False
    processed_info["_encoding_handled"] = True
    processed_info["_processed_at"] = datetime.now().isoformat()
    
    return processed_info

# 在prompt中添加中文处理说明
chinese_prompt_addition = """
特别注意中文处理:
1. 所有中文字符必须保持UTF-8编码
2. 不要将中文转换为Unicode转义序列(如\u4e2d\u6587)
3. 中文标点符号保持原样(,。!?等)
4. 中文与英文、数字混合时保持正确空格
"""

5.2 API计费策略优化

合理控制API调用成本非常重要。

class CostAwareExcelProcessor(ExcelProcessorWithChatGPT):
    def __init__(self, api_key: str, budget_limit: float = 10.0):
        """
        成本感知的Excel处理器
        参数说明:
        - budget_limit: 预算限制(美元)
        参考:OpenAI官方文档 v1.0 - 定价说明
        """
        super().__init__(api_key)
        self.budget_limit = budget_limit
        self.total_cost = 0.0
        self.cost_log = []
        
        # GPT-3.5-turbo定价(每1000 tokens)
        self.pricing = {
            "gpt-3.5-turbo": {
                "input": 0.0015,  # $0.0015 per 1K tokens
                "output": 0.0020   # $0.0020 per 1K tokens
            },
            "gpt-4": {
                "input": 0.03,     # $0.03 per 1K tokens
                "output": 0.06      # $0.06 per 1K tokens
            }
        }
    
    def estimate_cost(self, prompt: str, model: str = None) -> float:
        """
        估算API调用成本
        基于token数量估算
        """
        if model is None:
            model = self.model
        
        # 简单估算token数(英文约1token=4字符,中文约1token=2字符)
        prompt_length = len(prompt)
        
        # 估算中英文比例(简单方法)
        chinese_chars = sum(1 for c in prompt if '\u4e00' <= c <= '\u9fff')
        english_chars = prompt_length - chinese_chars
        
        # 估算token数
        estimated_tokens = (english_chars / 4) + (chinese_chars / 2)
        
        # 添加20%的缓冲
        estimated_tokens *= 1.2
        
        # 估算成本(假设输入输出比例1:1)
        if model in self.pricing:
            price_per_1k = self.pricing[model]["input"] + self.pricing[model]["output"]
            estimated_cost = (estimated_tokens / 1000) * price_per_1k
        else:
            # 默认使用gpt-3.5-turbo价格
            price_per_1k = self.pricing["gpt-3.5-turbo"]["input"] + self.pricing["gpt-3.5-turbo"]["output"]
            estimated_cost = (estimated_tokens / 1000) * price_per_1k
        
        return estimated_cost
    
    def clean_data_with_budget_check(self, excel_info: Dict[str, Any],
                                    task_description: str) -> Dict[str, Any]:
        """
        带预算检查的数据清洗
        超过预算时停止处理
        """
        # 创建prompt并估算成本
        prompt = self.create_data_cleaning_prompt(excel_info, task_description)
        estimated_cost = self.estimate_cost(prompt)
        
        print(f"估算成本: ${estimated_cost:.4f}")
        print(f"已用预算: ${self.total_cost:.2f}/{self.budget_limit}")
        
        # 检查预算
        if self.total_cost + estimated_cost > self.budget_limit:
            return {
                "error": "超出预算限制",
                "estimated_cost": estimated_cost,
                "remaining_budget": self.budget_limit - self.total_cost,
                "suggestion": "请简化prompt或使用更小的数据样本"
            }
        
        try:
            # 调用API
            start_time = time.time()
            result = super().clean_data_with_chatgpt(excel_info, task_description)
            processing_time = time.time() - start_time
            
            # 记录成本(这里使用估算值,实际中可以从API响应获取准确token数)
            self.total_cost += estimated_cost
            self.cost_log.append({
                "timestamp": datetime.now().isoformat(),
                "file": excel_info.get("file_name"),
                "estimated_cost": estimated_cost,
                "processing_time": processing_time,
                "remaining_budget": self.budget_limit - self.total_cost
            })
            
            # 添加成本信息到结果
            if isinstance(result, dict):
                result["cost_info"] = {
                    "estimated_cost": estimated_cost,
                    "total_cost_so_far": self.total_cost,
                    "remaining_budget": self.budget_limit - self.total_cost
                }
            
            return result
            
        except Exception as e:
            # 即使出错也记录成本(因为可能已经产生了费用)
            self.total_cost += estimated_cost * 0.5  # 假设出错时产生一半成本
            raise e
    
    def get_cost_summary(self) -> Dict[str, Any]:
        """
        获取成本摘要
        """
        return {
            "total_cost": self.total_cost,
            "budget_limit": self.budget_limit,
            "remaining_budget": self.budget_limit - self.total_cost,
            "usage_percentage": (self.total_cost / self.budget_limit * 100) if self.budget_limit > 0 else 0,
            "log_entries": len(self.cost_log),
            "average_cost_per_call": self.total_cost / len(self.cost_log) if self.cost_log else 0
        }
    
    def optimize_for_cost(self, excel_info: Dict, task_description: str) -> Dict:
        """
        成本优化策略
        1. 减少发送的数据量
        2. 使用更便宜的模型
        3. 简化prompt
        """
        optimized_info = excel_info.copy()
        
        # 策略1:减少数据样本
        original_rows = len(optimized_info.get("data", []))
        if original_rows > 20:
            # 如果数据太多,只发送前20行作为样本
            optimized_info["data"] = optimized_info["data"][:20]
            optimized_info["_optimization"] = f"数据从{original_rows}行减少到20行"
        
        # 策略2:简化prompt
        simplified_task = task_description
        if len(task_description) > 500:
            # 如果任务描述太长,提取关键点
            lines = task_description.split("\n")
            simplified_task = "\n".join(lines[:5]) + "\n...(任务描述已简化)"
        
        # 策略3:使用更便宜的模型(如果当前不是最便宜的)
        current_model = self.model
        if current_model != "gpt-3.5-turbo":
            print(f"切换到更经济的模型: gpt-3.5-turbo")
            self.model = "gpt-3.5-turbo"
        
        return {
            "optimized_info": optimized_info,
            "optimized_task": simplified_task,
            "original_cost_estimate": self.estimate_cost(
                self.create_data_cleaning_prompt(excel_info, task_description)
            ),
            "optimized_cost_estimate": self.estimate_cost(
                self.create_data_cleaning_prompt(optimized_info, simplified_task)
            )
        }

# 使用示例
cost_aware_processor = CostAwareExcelProcessor(
    api_key="your-api-key",
    budget_limit=5.0  # 5美元预算
)

# 先进行成本优化
optimization = cost_aware_processor.optimize_for_cost(excel_data, task_desc)
print(f"优化后预计节省: ${optimization['original_cost_estimate'] - optimization['optimized_cost_estimate']:.4f}")

# 使用优化后的参数处理
result = cost_aware_processor.clean_data_with_budget_check(
    optimization["optimized_info"],
    optimization["optimized_task"]
)

# 查看成本摘要
summary = cost_aware_processor.get_cost_summary()
print(f"成本摘要: {summary}")

5.3 敏感数据脱敏方案

处理包含敏感信息的Excel时,脱敏是必须的。

import re
from typing import Set, List

class DataAnonymizer:
    def __init__(self):
        """
        数据脱敏处理器
        用于在发送到API前移除或替换敏感信息
        """
        # 定义敏感模式(正则表达式)
        self.sensitive_patterns = {
            "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            "phone": r'\b(?:\+?86)?1[3-9]\d{9}\b|\b0\d{2,3}-\d{7,8}\b',
            "id_card": r'\b[1-9]\d{5}(?:18|19|20)\d{2}(?:0[1-9]|1[0-2])(?:0[1-9]|[12]\d|3[01])\d{3}[\dXx]\b',
            "bank_card": r'\b\d{16,19}\b',
            "name": r'\b[张李王刘陈杨赵黄周吴徐孙胡朱高林何郭马罗梁宋郑谢韩唐冯于董萧程曹袁邓许傅沈曾彭吕苏卢蒋蔡贾丁魏薛叶阎余潘杜戴夏钟汪田任姜范方石姚谭廖邹熊金陆郝孔白崔康毛邱秦江史顾侯邵孟龙万段雷钱汤尹黎易常武乔贺赖龚文\·\s]{2,4}\b'
        }
        
        # 替换映射
        self.replacement_map = {
            "email": "user@example.com",
            "phone": "13800138000",
            "id_card": "110101199001011234",
            "bank_card": "6228480012345678901",
            "name": "张三"
        }
    
    def anonymize_text(self, text: str, 
                      patterns_to_use: Set[str] = None) -> str:
        """
        对文本进行脱敏处理
        参数说明:
        - text: 原始文本
        - patterns_to_use: 指定要处理的敏感模式集合
        返回:脱敏后的文本
        """
        if not text or not isinstance(text, str):
            return text
        
        anonymized = text
        
        # 如果没有指定模式,使用所有模式
        if patterns_to_use is None:
            patterns_to_use = set(self.sensitive_patterns.keys())
        
        for pattern_name in patterns_to_use:
            if pattern_name in self.sensitive_patterns:
                pattern = self.sensitive_patterns[pattern_name]
                replacement = self.replacement_map.get(pattern_name, "[REDACTED]")
                
                # 使用正则替换
                anonymized = re.sub(
                    pattern, 
                    replacement, 
                    anonymized, 
                    flags=re.IGNORECASE
                )
        
        return anonymized
    
    def anonymize_excel_data(self, excel_info: Dict[str, Any],
                           sensitive_columns: List[str] = None) -> Dict[str, Any]:
        """
        对Excel数据进行脱敏
        参数说明:
        - excel_info: Excel数据字典
        - sensitive_columns: 需要脱敏的列名列表
        返回:脱敏后的数据
        """
        anonymized_info = excel_info.copy()
        
        # 如果没有指定敏感列,尝试自动识别
        if sensitive_columns is None:
            sensitive_columns = self._detect_sensitive_columns(excel_info)
        
        # 脱敏数据行
        anonymized_data = []
        for row in excel_info.get("data", []):
            anonymized_row = row.copy()
            
            for col_name, col_value in row.items():
                # 如果列名包含敏感关键词或是指定列,进行脱敏
                if (col_name in sensitive_columns or 
                    any(keyword in col_name.lower() for keyword in 
                        ["姓名", "名字", "电话", "手机", "邮箱", "身份证", "卡号"])):
                    
                    if isinstance(col_value, str):
                        anonymized_row[col_name] = self.anonymize_text(col_value)
                    else:
                        anonymized_row[col_name] = "[REDACTED]"
            
            anonymized_data.append(anonymized_row)
        
        anonymized_info["data"] = anonymized_data
        anonymized_info["_anonymized"] = True
        anonymized_info["_sensitive_columns"] = sensitive_columns
        anonymized_info["_anonymized_at"] = datetime.now().isoformat()
        
        return anonymized_info
    
    def _detect_sensitive_columns(self, excel_info: Dict[str, Any]) -> List[str]:
        """
        自动检测可能包含敏感信息的列
        基于列名和样本数据内容
        """
        sensitive_columns = []
        headers = excel_info.get("headers", [])
        sample_data = excel_info.get("data", [])[:10]  # 检查前10行样本
        
        if not sample_data:
            return sensitive_columns
        
        for header in headers:
            header_lower = header.lower()
            
            # 基于列名检测
            sensitive_keywords = ["姓名", "名字", "name", "电话", "手机", "phone", 
                                "邮箱", "email", "身份证", "id", "卡号", "card"]
            
            if any(keyword in header_lower for keyword in sensitive_keywords):
                sensitive_columns.append(header)
                continue
            
            # 基于内容检测
            column_values = [str(row.get(header, "")) for row in sample_data 
                           if header in row and row[header]]
            
            if not column_values:
                continue
            
            # 检查是否包含敏感模式
            sample_text = " ".join(column_values[:3])  # 检查前3个值
            
            for pattern_name, pattern in self.sensitive_patterns.items():
                if re.search(pattern, sample_text, re.IGNORECASE):
                    sensitive_columns.append(header)
                    break
        
        return list(set(sensitive_columns))  # 去重
    
    def restore_original_data(self, anonymized_result: Dict[str, Any],
                            original_excel_info: Dict[str, Any]) -> Dict[str, Any]:
        """
        将脱敏结果恢复为原始数据
        参数说明:
        - anonymized_result: ChatGPT处理后的脱敏结果
        - original_excel_info: 原始Excel数据
        返回:恢复原始敏感信息的结果
        """
        if not anonymized_result.get("_anonymized", False):
            return anonymized_result
        
        restored_result = anonymized_result.copy()
        
        # 恢复cleaned_data中的敏感信息
        if "cleaned_data" in anonymized_result:
            restored_cleaned_data = []
            
            for i, cleaned_item in enumerate(anonymized_result["cleaned_data"]):
                if i < len(original_excel_info.get("data", [])):
                    original_row = original_excel_info["data"][i]
                    
                    # 创建恢复后的行
                    restored_row = cleaned_item.copy()
                    
                    # 恢复敏感列
                    sensitive_cols = anonymized_result.get("_sensitive_columns", [])
                    for col in sensitive_cols:
                        if col in original_row:
                            restored_row["cleaned_row"][col] = original_row[col]
                    
                    restored_cleaned_data.append(restored_row)
                else:
                    restored_cleaned_data.append(cleaned_item)
            
            restored_result["cleaned_data"] = restored_cleaned_data
        
        # 标记为已恢复
        restored_result["_restored"] = True
        restored_result["_restored_at"] = datetime.now().isoformat()
        
        return restored_result

# 使用示例
anonymizer = DataAnonymizer()

# 脱敏处理
anonymized_excel_data = anonymizer.anonymize_excel_data(excel_data)

# 使用脱敏数据调用ChatGPT
processor = ExcelProcessorWithChatGPT(api_key="your-api-key")
anonymized_result = processor.clean_data_with_chatgpt(anonymized_excel_data, task_desc)

# 恢复原始数据
restored_result = anonymizer.restore_original_data(anonymized_result, excel_data)

print(f"脱敏列: {anonymized_excel_data.get('_sensitive_columns', [])}")
print(f"数据已恢复: {restored_result.get('_restored', False)}")

6. 进阶思考:结合LangChain实现复杂表格推理

当我们已经能够用ChatGPT处理基本的Excel清洗任务后,自然会想到:能不能处理更复杂的场景?比如:

  1. 跨表格推理:从多个相关表格中提取信息并建立关联
  2. 智能推断:基于现有数据推断缺失值或检测矛盾
  3. 自动报告生成:根据分析结果生成自然语言报告

这就是LangChain可以大显身手的地方。LangChain是一个用于开发大语言模型应用的框架,它提供了更高级的抽象和工具集成。

6.1 LangChain的基本集成思路

from langchain.llms import OpenAI
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.agents import create_csv_agent
import pandas as pd

class LangChainExcelAnalyzer:
    def __init__(self, api_key: str):
        """
        使用LangChain进行高级Excel分析
        """
        self.llm = OpenAI(
            openai_api_key=api_key,
            temperature=0,
            max_tokens=1000
        )
    
    def analyze_with_chain(self, excel_path: str, question: str) -> str:
        """
        使用LangChain Chain进行表格分析
        适合固定的分析流程
        """
        # 加载Excel数据
        df = pd.read_excel(excel_path)
        
        # 创建分析模板
        analysis_template = """
        你是一个数据分析专家。请分析以下表格数据并回答问题。

        表格摘要:
        - 形状:{shape}行×{columns}列
        - 列名:{column_names}
        - 前3行数据:
        {sample_data}

        问题:{question}

        请提供详细的分析,包括:
        1. 关键发现
        2. 数据趋势
        3. 异常值或问题
        4. 建议的下一步行动

        分析结果:
        """
        
        prompt = PromptTemplate(
            input_variables=["shape", "column_names", "sample_data", "question"],
            template=analysis_template
        )
        
        # 准备数据
        sample_data = df.head(3).to_string()
        
        # 创建Chain并运行
        chain = LLMChain(llm=self.llm, prompt=prompt)
        result = chain.run({
            "shape": df.shape,
            "column_names": ", ".join(df.columns.tolist()),
            "sample_data": sample_data,
            "question": question
        })
        
        return result
    
    def create_analysis_agent(self, excel_path: str):
        """
        创建CSV分析Agent(Excel可先转为CSV)
        Agent可以处理更复杂的多步推理
        """
        # 将Excel转为CSV临时文件
        df = pd.read_excel(excel_path)
        csv_path = excel_path.replace(".xlsx", "_temp.csv")
        df.to_csv(csv_path, index=False)
        
        # 创建CSV Agent
        agent = create_csv_agent(
            self.llm,
            csv_path,
            verbose=True,
            handle_parsing_errors=True
        )
        
        return agent, csv_path

# 使用示例
analyzer = LangChainExcelAnalyzer(api_key="your-api-key")

# 使用Chain进行固定分析
analysis_result = analyzer.analyze_with_chain(
    "销售数据.xlsx",
    "哪个产品的销售额增长最快?可能的原因是什么?"
)
print(f"分析结果:\n{analysis_result}")

# 使用Agent进行交互式分析
agent, temp_csv = analyzer.create_analysis_agent("销售数据.xlsx")

# Agent可以回答更复杂的问题
try:
    agent_result = agent.run("计算每个月的平均销售额,并找出异常月份")
    print(f"Agent分析:\n{agent_result}")
finally:
    # 清理临时文件
    import os
    if os.path.exists(temp_csv):
        os.remove(temp_csv)

6.2 实现多表格关联分析

更复杂的场景是分析多个相关表格:

class MultiTableAnalyzer:
    def __init__(self, api_key: str):
        self.llm = OpenAI(openai_api_key=api_key)
    
    def analyze_related_tables(self, tables_info: List[Dict]) -> Dict:
        """
        分析多个相关表格
        tables_info格式:
        [
            {
                "name": "销售表",
                "columns": ["日期", "产品ID", "销售额", "销售员"],
                "sample": "前几行数据",
                "description": "包含每日销售记录"
            },
            {
                "name": "产品表", 
                "columns": ["产品ID", "产品名称", "类别", "单价"],
                "sample": "前几行数据",
                "description": "产品主数据"
            }
        ]
        """
        # 构建多表格分析prompt
        tables_desc = "\n\n".join([
            f"表格:{t['name']}\n"
            f"描述:{t['description']}\n"
            f"列:{', '.join(t['columns'])}\n"
            f"样例:\n{t['sample'][:500]}..."  # 限制长度
            for t in tables_info
        ])
        
        prompt = f"""你是一个数据分析专家,需要分析以下多个相关表格:

{tables_desc}

请完成以下分析任务:
1. 识别表格之间的关联关系(通过哪些字段可以关联)
2. 分析销售表现最好的产品类别
3. 找出销售额与产品单价的关系
4. 建议可以进一步分析的方向

请以JSON格式返回结果,包含:
- table_relationships: 表格关联关系
- top_performing_categories: 表现最好的类别
- price_sales_correlation: 价格与销售的关系分析
- further_analysis_suggestions: 进一步分析建议
"""
        
        # 这里可以调用LLM进行分析
        # 实际实现中会调用LangChain的相应功能
        
        return {
            "status": "analysis_required",
            "prompt": prompt,
            "tables_count": len(tables_info)
        }

实践总结与展望

通过上面的实战指南,我们实现了从基础到进阶的ChatGPT处理Excel全流程。关键收获包括:

  1. 理解AI处理表格的本质:不是简单的规则匹配,而是语义理解
  2. 掌握健壮性设计:重试机制、缓存、错误处理缺一不可
  3. 成本控制意识:估算、监控和优化API使用成本
  4. 数据安全优先:敏感数据必须脱敏处理

在实际项目中,我建议:

  • 从小规模开始,验证效果后再扩大
  • 建立处理日志,便于调试和优化
  • 结合传统方法,AI处理复杂部分,规则处理简单部分
  • 定期评估效果和成本,调整策略

开放性问题:LangChain的深度应用

最后留一个思考题:如何用LangChain实现这样的复杂场景?

"我有12个Excel文件,分别是12个月的销售数据。每个文件有5个工作表:订单明细、客户信息、产品目录、销售员业绩、退货记录。需要自动分析:1)季度趋势;2)客户购买模式;3)产品关联销售;4)生成月度报告摘要。"

这需要:

  1. 多文件批量处理能力
  2. 跨工作表关联分析
  3. 时间序列分析
  4. 自然语言报告生成

如果你对这类复杂场景感兴趣,我推荐尝试从0打造个人豆包实时通话AI动手实验。虽然主题不同,但其中关于AI能力集成、实时处理、多模块协作的思路是相通的。我在实际操作中发现,这种端到端的实践能帮你更好地理解如何将多个AI能力组合起来解决实际问题。

特别是实验中对实时语音识别、智能对话、语音合成的集成,和我们在Excel处理中遇到的"解析-理解-输出"流程有异曲同工之妙。通过动手实践,你能更深刻地理解如何设计prompt、如何处理流式数据、如何优化系统性能——这些技能在处理复杂Excel分析时同样重要。

无论你是想深入AI数据处理,还是探索其他AI应用场景,从实际动手开始总是最好的选择。毕竟,真正的理解来自于实践中的调试和优化。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐