传统 ETL 管道是精心编写的规则代码,一旦数据格式变化就需要修改。LLM 的出现让数据工程进入了一个新纪元——用语言描述意图,让 AI 理解数据语义,自动完成复杂的数据处理任务。

一、数据工程的新挑战与 LLM 的机会### 1.1 传统 ETL 的痛点数据工程师每天面临的现实:python# 传统 ETL 的典型代码def parse_customer_address(raw_address: str) -> dict: """ 解析地址字符串——这个函数写起来要多复杂? 输入可能是: - "北京市朝阳区建国路88号现代城A座2001室" - "上海 浦东新区 陆家嘴环路1000号 恒生银行大厦 20F" - "广东省深圳市南山区科技园科技南12路" - "No.88, Jianguo Road, Chaoyang District, Beijing" """ # 正则表达式地狱... # 处理各种格式变体... # 还是有 20% 的地址解析不了 pass传统数据工程的三大痛点:1. 格式多样性:同一类数据来自不同系统,格式千差万别2. 语义理解缺失:规则引擎能处理结构,但无法理解含义3. 脆弱的维护成本:数据格式一变,大量规则代码需要更新### 1.2 LLM 能解决什么问题LLM 天然擅长的数据处理任务:- 非结构化文本解析:从自然语言中提取结构化字段- 数据清洗与标准化:理解语义来修正错误数据- 跨语言/格式转换:英文合同条款 → 中文结构化摘要- 数据分类与标注:用语义理解代替关键词匹配- 数据质量检测:理解业务含义,发现异常数据—## 二、LLM-Powered ETL 架构设计### 2.1 整体架构数据源(多样化原始数据) ↓[L1: 轻量预处理层] - 格式统一(字符编码、换行符) - 大文件分片 - 元数据提取 ↓[L2: LLM 语义处理层] ← 核心创新 - 字段提取 - 数据清洗 - 分类标注 - 格式标准化 ↓[L3: 结构化后处理层] - JSON Schema 验证 - 数据类型转换 - 约束检查 ↓[L4: 存储层] - 数据仓库/数据湖 - 向量数据库(语义搜索场景)### 2.2 关键设计原则原则 1:LLM 处理语义,规则处理格式pythonclass HybridETLPipeline: """ 混合处理策略:用规则处理可预测的格式,用 LLM 处理语义 """ def process_document(self, raw_doc: str) -> dict: # 第一步:规则处理(快速、便宜) # 处理可预测的格式问题 doc = self._rule_based_preprocess(raw_doc) # 判断是否需要 LLM 处理 if self._needs_semantic_processing(doc): # 第二步:LLM 处理(慢、贵,但智能) # 只对需要语义理解的字段使用 LLM enriched = self._llm_enrich(doc) else: enriched = doc # 第三步:验证(规则检查,快速) return self._validate_and_convert(enriched) def _needs_semantic_processing(self, doc: dict) -> bool: """判断文档是否需要语义处理""" # 如果关键字段已经结构化,跳过 LLM if doc.get("structured_fields_complete"): return False # 如果有复杂的文本字段需要理解 if any(len(v) > 100 for v in doc.values() if isinstance(v, str)): return True return False—## 三、核心应用场景与工程实现### 3.1 非结构化文档解析pythonfrom langchain_openai import ChatOpenAIfrom langchain_core.prompts import ChatPromptTemplatefrom pydantic import BaseModel, Fieldfrom typing import Optional, Listfrom datetime import dateclass ContractInfo(BaseModel): """合同信息的 Pydantic 模型""" contract_id: Optional[str] = Field(None, description="合同编号") party_a: str = Field(..., description="甲方名称") party_b: str = Field(..., description="乙方名称") contract_amount: Optional[float] = Field(None, description="合同金额(元)") currency: str = Field("CNY", description="货币类型") sign_date: Optional[date] = Field(None, description="签订日期") effective_date: Optional[date] = Field(None, description="生效日期") expiry_date: Optional[date] = Field(None, description="到期日期") key_obligations_a: List[str] = Field(default_factory=list, description="甲方主要义务") key_obligations_b: List[str] = Field(default_factory=list, description="乙方主要义务") penalty_clauses: List[str] = Field(default_factory=list, description="违约条款摘要")class ContractParser: def __init__(self): self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) self.structured_llm = self.llm.with_structured_output(ContractInfo) def parse(self, contract_text: str) -> ContractInfo: """从合同文本中提取结构化信息""" prompt = ChatPromptTemplate.from_messages([ ("system", """你是一个专业的合同分析助手。请从提供的合同文本中提取关键信息,填充到结构化格式中。- 金额统一转换为人民币(元)- 日期统一格式为 YYYY-MM-DD- 义务和违约条款提取核心要点,每条不超过 50 字"""), ("human", "以下是合同文本,请提取关键信息:\n\n{contract_text}") ]) chain = prompt | self.structured_llm return chain.invoke({"contract_text": contract_text[:8000]}) # 控制 token### 3.2 批量数据清洗与标准化pythonimport asynciofrom typing import List, Dict, Anyimport jsonclass LLMDataCleaner: """ 使用 LLM 进行批量数据清洗 关键优化:批处理 + 异步并发 """ def __init__(self, model="gpt-4o-mini", batch_size=10): self.llm = ChatOpenAI(model=model, temperature=0) self.batch_size = batch_size async def clean_batch( self, records: List[Dict], cleaning_rules: str ) -> List[Dict]: """ 批量清洗:将多条记录打包在一个 LLM 请求中 大幅减少 API 调用次数和成本 """ # 将多条记录序列化为 JSON records_json = json.dumps(records, ensure_ascii=False, indent=2) prompt = f"""请按照以下规则清洗和标准化数据:清洗规则:{cleaning_rules}待处理数据(JSON 格式):{records_json}请返回清洗后的数据,保持相同的 JSON 格式和记录数量。对于无法清洗的字段,保持原值并在该记录中添加 "_warnings" 字段说明问题。""" response = await self.llm.ainvoke(prompt) try: # 提取 JSON 部分 cleaned = json.loads(self._extract_json(response.content)) return cleaned except json.JSONDecodeError: # 如果批处理失败,降级为单条处理 return await self._fallback_single_process(records, cleaning_rules) async def clean_all( self, records: List[Dict], cleaning_rules: str, max_concurrent: int = 5 ) -> List[Dict]: """ 并发批量处理所有记录 """ # 分批 batches = [ records[i:i+self.batch_size] for i in range(0, len(records), self.batch_size) ] # 并发控制 semaphore = asyncio.Semaphore(max_concurrent) async def process_batch_with_semaphore(batch): async with semaphore: return await self.clean_batch(batch, cleaning_rules) # 并发执行所有批次 results = await asyncio.gather(*[ process_batch_with_semaphore(batch) for batch in batches ]) # 合并结果 return [item for batch_result in results for item in batch_result]### 3.3 智能数据分类与标注pythonclass LLMDataClassifier: """ 基于语义的数据分类器 比关键词匹配更智能,比训练专用分类模型更灵活 """ def __init__(self): self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) self._label_cache = {} # 缓存相似记录的分类结果 def classify_batch( self, texts: List[str], categories: Dict[str, str], # {类别名: 类别描述} examples: List[Dict] = None # 少样本示例 ) -> List[str]: """ 批量文本分类 Args: texts: 待分类的文本列表 categories: 分类体系 {类别名: 描述} examples: 少样本示例 [{"text": ..., "label": ...}] """ # 构建分类说明 category_desc = "\n".join([ f"- {name}: {desc}" for name, desc in categories.items() ]) # 少样本示例 examples_text = "" if examples: examples_text = "\n\n示例:\n" + "\n".join([ f'文本:"{e["text"]}"\n分类:{e["label"]}' for e in examples ]) # 批量处理(将多条文本一起发送) texts_json = json.dumps( [{"id": i, "text": t} for i, t in enumerate(texts)], ensure_ascii=False ) prompt = f"""请对以下文本进行分类。分类体系:{category_desc}{examples_text}待分类文本(JSON 格式):{texts_json}请返回 JSON 格式的分类结果:[{{"id": 0, "label": "类别名", "confidence": 0.9, "reason": "简短理由"}}, ...]""" response = self.llm.invoke(prompt) results = json.loads(self._extract_json(response.content)) # 按 id 排序并返回标签 results.sort(key=lambda x: x["id"]) return [r["label"] for r in results]### 3.4 LLM 驱动的数据质量检测pythonclass LLMDataQualityChecker: """ 语义级数据质量检测 能发现规则引擎发现不了的业务逻辑问题 """ def __init__(self): self.llm = ChatOpenAI(model="gpt-4o", temperature=0) def check_quality( self, record: Dict, business_rules: str, context: str = "" ) -> Dict: """ 对单条记录进行深度质量检查 """ prompt = f"""请对以下数据记录进行质量检查。业务规则:{business_rules}{f'业务背景:{context}' if context else ''}数据记录:{json.dumps(record, ensure_ascii=False, indent=2)}请检查以下几类问题:1. 字段完整性:必填字段是否存在2. 数据一致性:字段之间是否相互矛盾3. 业务逻辑合理性:数据是否符合业务规则4. 异常值:数值是否在合理范围内返回 JSON 格式:{{ "quality_score": 0-100, "issues": [ {{"field": "字段名", "issue_type": "问题类型", "description": "问题描述", "severity": "high/medium/low"}} ], "recommendation": "整体建议"}}""" response = self.llm.invoke(prompt) return json.loads(self._extract_json(response.content))—## 四、成本控制策略LLM 驱动的数据工程最大的挑战是成本。以下是工程实践中的成本优化策略:### 4.1 智能路由:只在必要时用 LLMpythonclass CostAwareETLRouter: """ 成本感知的 ETL 路由器 优先使用低成本方案,只在必要时升级到 LLM """ def process_record(self, record: Dict) -> Dict: # 级别 1:纯规则处理(成本:$0) result = self._try_rule_based(record) if result["confidence"] >= 0.95: return result # 级别 2:小模型(成本:低) result = self._try_small_model(record) if result["confidence"] >= 0.90: return result # 级别 3:高性价比 LLM(gpt-4o-mini,成本:中) result = self._try_cheap_llm(record) if result["confidence"] >= 0.85: return result # 级别 4:高质量 LLM(gpt-4o,成本:高) # 只对最复杂的数据使用 return self._try_powerful_llm(record) def estimate_cost(self, dataset_size: int) -> Dict: """估算处理成本""" # 假设各级别处理比例 distribution = { "rule_based": 0.6, # 60% 可以用规则处理 "small_model": 0.2, # 20% 用小模型 "cheap_llm": 0.15, # 15% 用 gpt-4o-mini "powerful_llm": 0.05, # 5% 用 gpt-4o } # 每条记录的平均 token 数(输入+输出) avg_tokens = 500 costs = { "rule_based": 0, "small_model": dataset_size * distribution["small_model"] * avg_tokens * 0.0000001, "cheap_llm": dataset_size * distribution["cheap_llm"] * avg_tokens * 0.00000015, "powerful_llm": dataset_size * distribution["powerful_llm"] * avg_tokens * 0.0000025, } return { "total_cost_usd": sum(costs.values()), "breakdown": costs, "cost_per_1000_records": sum(costs.values()) / dataset_size * 1000 }### 4.2 结果缓存pythonimport hashlibfrom functools import lru_cacheclass CachedLLMProcessor: """ LLM 处理结果缓存 相同或相似的输入直接返回缓存结果 """ def __init__(self, cache_backend="redis"): self.cache = self._init_cache(cache_backend) self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) def process(self, text: str, task_type: str) -> Dict: # 生成缓存 key cache_key = hashlib.md5( f"{task_type}:{text}".encode() ).hexdigest() # 检查缓存 cached = self.cache.get(cache_key) if cached: return json.loads(cached) # LLM 处理 result = self._llm_process(text, task_type) # 存入缓存(TTL 7 天) self.cache.set(cache_key, json.dumps(result), ttl=604800) return result—## 五、与现有数据栈集成### 5.1 与 Airflow 集成pythonfrom airflow.decorators import task, dagfrom datetime import datetime@dag( schedule_interval="@daily", start_date=datetime(2026, 1, 1), catchup=False)def llm_enhanced_etl(): @task def extract_raw_data() -> List[Dict]: """从数据源提取原始数据""" return fetch_from_source() @task def llm_enrich_data(raw_records: List[Dict]) -> List[Dict]: """LLM 语义增强处理""" cleaner = LLMDataCleaner(batch_size=20) return asyncio.run(cleaner.clean_all( raw_records, cleaning_rules=CLEANING_RULES, max_concurrent=5 )) @task def validate_and_load(enriched_records: List[Dict]): """验证并加载到数据仓库""" valid_records = [] for record in enriched_records: if validate_schema(record): valid_records.append(record) load_to_warehouse(valid_records) return { "total": len(enriched_records), "loaded": len(valid_records), "rejected": len(enriched_records) - len(valid_records) } # 定义 DAG 流程 raw = extract_raw_data() enriched = llm_enrich_data(raw) validate_and_load(enriched)—## 六、监控与可观测性pythonclass LLMETLMonitor: """ LLM ETL 管道监控 重点关注:成本、延迟、质量 """ def __init__(self): self.metrics = defaultdict(list) def record_processing( self, record_id: str, latency_ms: float, tokens_used: int, cost_usd: float, quality_score: float, processing_tier: str, issues_found: int ): self.metrics["latency_ms"].append(latency_ms) self.metrics["tokens_used"].append(tokens_used) self.metrics["cost_usd"].append(cost_usd) self.metrics["quality_score"].append(quality_score) self.metrics["issues_found"].append(issues_found) def get_daily_summary(self) -> Dict: metrics = self.metrics n = len(metrics["latency_ms"]) return { "records_processed": n, "total_cost_usd": sum(metrics["cost_usd"]), "avg_latency_ms": sum(metrics["latency_ms"]) / n, "p95_latency_ms": sorted(metrics["latency_ms"])[int(n * 0.95)], "avg_quality_score": sum(metrics["quality_score"]) / n, "total_issues_found": sum(metrics["issues_found"]), "cost_per_record": sum(metrics["cost_usd"]) / n, }—## 七、总结与展望LLM 驱动的数据工程不是要替代传统 ETL,而是为其注入语义理解能力:LLM 最适合的数据工程任务:- 非结构化文本解析(合同、报告、日志)- 多语言/多格式数据标准化- 基于语义的数据分类与标注- 复杂的数据质量检测(需要业务逻辑理解)保持传统方法的场景:- 简单的格式转换(CSV→JSON)- 大批量高速数据流(实时流处理)- 已有完善规则的结构化数据关键工程原则:混合使用,智能路由,成本意识。在正确的地方用 LLM,在不需要的地方坚决不用,才能构建既智能又经济可行的现代数据管道。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐