LangChain 系列 ·(二): LCEL——AI处理的Pipeline
LangChain 系列 · 第二篇:LCEL——用管道的方式写 AI 逻辑
🎯 适合人群:读完第一篇、对 LangChain 基本概念有初步了解的工程师
⏱️ 阅读时间:约 25 分钟
💬 本文深入讲解 LangChain Expression Language(LCEL)的核心机制,涵盖 Runnable 接口、管道组合、并行执行、流式输出与错误处理
一、为什么需要 LCEL
在第一篇中,出现了这样一行代码:
chain = prompt | model | parser
这就是 LCEL(LangChain Expression Language)。它不是一门独立的语言,而是 LangChain 基于 Python 操作符重载实现的一套链式组合范式。
在 LCEL 出现之前,LangChain 使用类继承的方式构建链:
# ❌ 旧式写法:通过类实例组合
from langchain.chains import LLMChain, SequentialChain
chain1 = LLMChain(llm=model, prompt=prompt1)
chain2 = LLMChain(llm=model, prompt=prompt2)
pipeline = SequentialChain(chains=[chain1, chain2], ...)
这种写法存在几个问题:每种组合方式都需要一个对应的 Chain 类;流式输出和异步支持需要分别适配;中间步骤难以调试和可视化。
LCEL 用统一的 Runnable 接口解决了这些问题——所有组件遵循相同的协议,可以用 | 自由拼接,流式、异步、批量等能力开箱即用。
二、Runnable 接口
Runnable 是 LCEL 的核心抽象。LangChain 中的每一个组件——模型、提示词模板、输出解析器、自定义函数——都实现了 Runnable 接口。
2.1 Runnable 的六种调用方式
所有 Runnable 组件都提供以下方法:
| 方法 | 类型 | 说明 |
|---|---|---|
invoke(input) |
同步 | 处理单个输入,返回单个结果 |
batch([input1, input2, ...]) |
同步 | 处理多个输入,内部并发执行,返回结果列表 |
stream(input) |
同步 | 处理单个输入,返回生成器,逐块输出结果 |
ainvoke(input) |
异步 | invoke 的异步版本 |
abatch([input1, ...]) |
异步 | batch 的异步版本 |
astream(input) |
异步 | stream 的异步版本 |
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
model = ChatOpenAI(model="gpt-4o-mini")
# invoke:同步调用
response = model.invoke([HumanMessage(content="用一句话介绍 Python")])
print(response.content)
# batch:并发处理多条输入
responses = model.batch([
[HumanMessage(content="用一句话介绍 Python")],
[HumanMessage(content="用一句话介绍 Go")],
[HumanMessage(content="用一句话介绍 Rust")],
])
# stream:流式输出
for chunk in model.stream([HumanMessage(content="写一首关于编程的短诗")]):
print(chunk.content, end="", flush=True)
2.2 主要 Runnable 组件一览
langchain_core.runnables 提供了以下内置 Runnable 类型:
| 类 | 用途 |
|---|---|
RunnableSequence |
由 | 操作符创建,顺序执行多个 Runnable,前一步的输出作为下一步的输入 |
RunnableParallel |
并行执行多个 Runnable,将各自结果合并为一个字典返回 |
RunnableLambda |
将任意 Python 函数包装为 Runnable,使其可以接入管道 |
RunnablePassthrough |
将输入原样透传,不做任何处理,常用于在并行分支中保留原始输入 |
RunnableBranch |
根据条件判断选择不同的执行分支,实现条件路由 |
RunnableWithFallbacks |
由 .with_fallbacks() 方法创建,执行失败时自动切换到备用 Runnable |
RunnableRetry |
由 .with_retry() 方法创建,执行失败时按策略自动重试 |
RunnableBinding |
由 .bind() 方法创建,预绑定部分调用参数 |
RunnableConfigurableFields |
由 .configurable_fields() 创建,支持在运行时动态配置参数 |
三、管道操作符 |
3.1 原理
| 并非 Python 内置的位运算符在此处的直接应用,而是 LangChain 通过 __or__ 和 __ror__ 方法重载实现的管道操作符。
# 这两种写法等价
chain = prompt | model | parser
# 展开后等价于:
from langchain_core.runnables import RunnableSequence
chain = RunnableSequence(first=prompt, middle=[model], last=parser)
RunnableSequence 在执行时将上一步的输出直接传递给下一步的输入,形成一条完整的处理流水线:
input_dict
|
v
ChatPromptTemplate --> ChatPromptValue (messages list)
|
v
ChatOpenAI --> AIMessage
|
v
StrOutputParser --> str
|
v
output
3.2 类型约束
LCEL 对管道中相邻组件的输入/输出类型有隐式约束——上一步的输出类型必须与下一步的输入类型兼容。常见的类型流转如下:
| 组件 | 输入类型 | 输出类型 |
|---|---|---|
ChatPromptTemplate |
dict |
ChatPromptValue(消息列表) |
ChatOpenAI |
ChatPromptValue 或消息列表 |
AIMessage |
StrOutputParser |
AIMessage |
str |
JsonOutputParser |
AIMessage |
dict |
RunnableLambda(fn) |
任意 | 任意(由 fn 决定) |
⚠️ 如果类型不兼容,LCEL 在运行时而非编译时才会报错。建议在开发阶段逐步测试每个组件的输入输出,避免在复杂管道中定位类型错误。
四、RunnableLambda:接入自定义逻辑
4.1 基本用法
当需要将一个普通 Python 函数嵌入 LCEL 管道时,使用 RunnableLambda 进行包装:
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 自定义预处理函数:将输入文本转为大写并截断
def preprocess(input: dict) -> dict:
return {
"text": input["text"].strip()[:500], # 截断过长输入
"language": input.get("language", "英文")
}
prompt = ChatPromptTemplate.from_messages([
("system", "你是专业翻译,请将文本翻译成{language}。"),
("human", "{text}")
])
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
# 将自定义函数嵌入管道
chain = RunnableLambda(preprocess) | prompt | model | parser
result = chain.invoke({"text": " 大语言模型正在改变软件工程。 ", "language": "英文"})
print(result)
4.2 使用 @chain 装饰器
对于更复杂的自定义逻辑,可使用 @chain 装饰器将函数直接转换为 Runnable,并支持在内部调用其他 Runnable:
from langchain_core.runnables import chain
@chain
def smart_translate(input: dict) -> str:
"""根据文本长度选择不同的翻译策略"""
text = input["text"]
if len(text) < 100:
# 短文本:直接翻译
return short_chain.invoke(input)
else:
# 长文本:先摘要再翻译
summary = summary_chain.invoke({"text": text})
return translate_chain.invoke({"text": summary, "language": input["language"]})
# smart_translate 本身就是一个 Runnable,可以参与管道
final_chain = preprocess_runnable | smart_translate | postprocess_runnable
五、RunnablePassthrough:保留原始输入
RunnablePassthrough 的作用是将输入原样传递到下一步,在构建需要同时保留原始数据和处理结果的管道时非常有用。
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_messages([
("human", "用一句话总结这段文字:{text}")
])
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
# 同时保留原文和摘要
chain = RunnableParallel({
"original": RunnablePassthrough(), # 原样传递输入 dict
"summary": prompt | model | parser # 生成摘要
})
result = chain.invoke({"text": "LangChain 是一个模块化的 LLM 应用框架,提供了从提示词管理到 Agent 构建的完整工具链。"})
print(result["original"]) # {"text": "LangChain 是..."}
print(result["summary"]) # "LangChain 是一个模块化的 LLM 框架。"
💡
RunnablePassthrough.assign(**kwargs)是一个常用的快捷方式,可以在透传原始输入的同时追加新字段,常用于 RAG 管道中传递检索结果:from langchain_core.runnables import RunnablePassthrough # 在 dict 中追加 "context" 字段,同时保留其他字段 add_context = RunnablePassthrough.assign( context=lambda x: retriever.invoke(x["question"]) )
六、RunnableParallel:并行执行
RunnableParallel 接受一个字典,字典的每个值是一个独立的 Runnable,所有分支并发执行,结果合并为一个字典返回。
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
def make_translate_chain(language: str):
prompt = ChatPromptTemplate.from_messages([
("human", f"将以下文本翻译成{language},只输出译文:\n\n{{text}}")
])
return prompt | model | parser
# 三个翻译分支并发执行
multi_translate = RunnableParallel({
"english": make_translate_chain("英文"),
"japanese": make_translate_chain("日文"),
"french": make_translate_chain("法文"),
})
result = multi_translate.invoke({"text": "大语言模型正在改变软件工程的边界。"})
print(result["english"]) # Large language models are reshaping...
print(result["japanese"]) # 大規模言語モデルは...
print(result["french"]) # Les grands modèles de langage...
📝
RunnableParallel也可以写成字典字面量的形式,效果完全相同:chain = { "english": make_translate_chain("英文"), "japanese": make_translate_chain("日文"), } | postprocess_runnable当字典作为
|管道中的一个节点时,LangChain 会自动将其转换为RunnableParallel。
七、流式输出
LCEL 的流式支持是逐组件透传的——只要管道中的每个组件都支持流式,整条链就可以流式输出。
7.1 同步流式
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
chain = (
ChatPromptTemplate.from_messages([("human", "写一篇关于{topic}的短文,200字左右")])
| ChatOpenAI(model="gpt-4o-mini")
| StrOutputParser()
)
# stream 返回生成器,每次 yield 一个文本片段
for chunk in chain.stream({"topic": "向量数据库"}):
print(chunk, end="", flush=True)
print()
7.2 异步流式
在 FastAPI 等异步框架中,使用 astream 以避免阻塞事件循环:
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
chain = (
ChatPromptTemplate.from_messages([("human", "解释{concept}")])
| ChatOpenAI(model="gpt-4o-mini")
| StrOutputParser()
)
async def stream_response():
async for chunk in chain.astream({"concept": "Transformer 注意力机制"}):
print(chunk, end="", flush=True)
print()
asyncio.run(stream_response())
7.3 流式中间步骤
使用 astream_events 可以监听管道中每个步骤的事件,适用于调试和可观测性:
async def debug_stream():
async for event in chain.astream_events({"concept": "RAG"}, version="v2"):
kind = event["event"]
if kind == "on_chat_model_stream":
chunk = event["data"]["chunk"].content
print(chunk, end="", flush=True)
elif kind == "on_chain_start":
print(f"\n[START] {event['name']}")
elif kind == "on_chain_end":
print(f"\n[END] {event['name']}")
八、错误处理
8.1 with_fallbacks:备用链切换
当主链执行失败时,with_fallbacks 按顺序尝试备用链,直到某个成功为止,适用于模型服务不稳定或预算限制场景:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# 优先使用 gpt-4o,失败时降级到 gpt-4o-mini
primary_model = ChatOpenAI(model="gpt-4o")
fallback_model = ChatOpenAI(model="gpt-4o-mini")
robust_model = primary_model.with_fallbacks(
fallbacks=[fallback_model],
exceptions_to_handle=(Exception,) # 捕获所有异常时触发降级
)
chain = prompt | robust_model | StrOutputParser()
💡
with_fallbacks也可以作用于整条链,而不只是模型:robust_chain = primary_chain.with_fallbacks([fallback_chain])
8.2 with_retry:自动重试
网络抖动或 Rate Limit 导致的临时失败,使用 with_retry 自动重试:
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o-mini")
model_with_retry = model.with_retry(
retry_if_exception_type=(Exception,), # 触发重试的异常类型
stop_after_attempt=3, # 最多重试 3 次
wait_exponential_jitter=True # 指数退避 + 随机抖动,避免惊群
)
chain = prompt | model_with_retry | StrOutputParser()
8.3 组合使用
实际生产中通常将两者结合——先重试,重试耗尽后再降级:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
primary = ChatOpenAI(model="gpt-4o").with_retry(stop_after_attempt=2)
fallback = ChatOpenAI(model="gpt-4o-mini").with_retry(stop_after_attempt=2)
robust_model = primary.with_fallbacks([fallback])
chain = prompt | robust_model | StrOutputParser()
九、RunnableBranch:条件路由
RunnableBranch 根据输入内容选择不同的执行分支,类似 if-elif-else 结构:
from langchain_core.runnables import RunnableBranch
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
def make_chain(system_prompt: str):
return (
ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{question}")
])
| model | parser
)
technical_chain = make_chain("你是一个资深软件工程师,用技术语言回答问题。")
simple_chain = make_chain("你是一个耐心的老师,用简单易懂的语言回答问题。")
default_chain = make_chain("你是一个通用助手,请回答以下问题。")
branch = RunnableBranch(
(lambda x: x.get("audience") == "engineer", technical_chain),
(lambda x: x.get("audience") == "beginner", simple_chain),
default_chain # 以上条件均不满足时执行
)
# 根据 audience 字段路由到不同链
result = branch.invoke({"question": "什么是 Transformer?", "audience": "beginner"})
print(result)
📝 对于更复杂的路由逻辑(多步判断、动态分支数量),推荐使用 LangGraph,将在第九篇详细介绍。
十、.bind():预绑定参数
.bind() 允许在构建时为 Runnable 预先绑定部分参数,在运行时无需再传入:
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o-mini")
# 预绑定 stop 参数:遇到 "END" 时停止生成
model_with_stop = model.bind(stop=["END"])
# 预绑定工具(Tool Calling 场景)
model_with_tools = model.bind_tools([search_tool, calculator_tool])
常见场景:在管道中为特定步骤固定超参数,避免每次 invoke 时重复传递。
十一、常见坑与最佳实践
坑一:RunnableParallel 的输入必须是字典
# ❌ 传入字符串,RunnableParallel 无法路由到各分支
chain = RunnableParallel({"a": chain_a, "b": chain_b})
result = chain.invoke("some text") # TypeError
# ✅ 传入字典
result = chain.invoke({"text": "some text"})
坑二:流式输出中对中间结果做非流式处理会截断流
# ❌ 在流式管道中插入非流式的后处理函数
def blocking_postprocess(text: str) -> str:
time.sleep(1) # 阻塞操作
return text.upper()
chain = prompt | model | StrOutputParser() | RunnableLambda(blocking_postprocess)
# stream() 调用会等待 blocking_postprocess 完成后一次性输出,失去流式效果
# ✅ 在流式场景下,后处理函数应保持轻量
坑三:误用 | 连接不兼容的类型
# ❌ StrOutputParser 的输出是 str,无法作为 ChatPromptTemplate 的输入(需要 dict)
chain = prompt | model | StrOutputParser() | another_prompt # 运行时 KeyError
# ✅ 用 RunnableLambda 做类型适配
chain = (
prompt | model | StrOutputParser()
| RunnableLambda(lambda text: {"result": text})
| another_prompt
)
坑四:with_retry 不区分可重试和不可重试错误
# ❌ 对所有异常重试,会导致认证错误也被反复重试
model.with_retry(retry_if_exception_type=(Exception,), stop_after_attempt=3)
# ✅ 只对网络类错误重试
import httpx
model.with_retry(
retry_if_exception_type=(httpx.HTTPStatusError, httpx.TimeoutException),
stop_after_attempt=3
)
十二、总结
| 组件 | 类比 | 核心作用 |
|---|---|---|
RunnableSequence (|) |
Unix 管道 | 顺序串联各组件,前一步输出传给下一步 |
RunnableParallel |
多线程 fork | 并发执行多个分支,合并结果 |
RunnableLambda |
适配器 | 将普通函数包装为可接入管道的 Runnable |
RunnablePassthrough |
旁路 | 透传输入,常用于在并行分支中保留原始数据 |
RunnableBranch |
if-elif-else | 根据条件路由到不同执行分支 |
.with_fallbacks() |
熔断降级 | 主链失败时切换到备用链 |
.with_retry() |
重试策略 | 临时失败时按策略自动重试 |
.bind() |
偏函数 | 预绑定参数,简化调用 |
🎯 LCEL 的价值在于:用统一的接口和极简的语法,让流式、异步、并行、容错这些原本需要大量样板代码实现的能力,对所有组件开箱即用。
参考资料
下期预告
LCEL 管道解决了"如何组合"的问题,但提示词本身的质量同样关键。
第三篇《Prompt 工程:把提示词写成代码》 将系统讲解 ChatPromptTemplate、MessagesPlaceholder、FewShotChatMessagePromptTemplate、PipelinePromptTemplate 的使用方式,以及如何设计可复用、可测试的提示词体系——这是决定 LLM 应用输出质量的核心环节。
更多推荐


所有评论(0)