LangChain 系列 · 第二篇:LCEL——用管道的方式写 AI 逻辑

🎯 适合人群:读完第一篇、对 LangChain 基本概念有初步了解的工程师
⏱️ 阅读时间:约 25 分钟
💬 本文深入讲解 LangChain Expression Language(LCEL)的核心机制,涵盖 Runnable 接口、管道组合、并行执行、流式输出与错误处理


传送门:Langchan系列(一):为什么不直接用API

一、为什么需要 LCEL

在第一篇中,出现了这样一行代码:

chain = prompt | model | parser

这就是 LCEL(LangChain Expression Language)。它不是一门独立的语言,而是 LangChain 基于 Python 操作符重载实现的一套链式组合范式。

在 LCEL 出现之前,LangChain 使用类继承的方式构建链:

# ❌ 旧式写法:通过类实例组合
from langchain.chains import LLMChain, SequentialChain

chain1 = LLMChain(llm=model, prompt=prompt1)
chain2 = LLMChain(llm=model, prompt=prompt2)
pipeline = SequentialChain(chains=[chain1, chain2], ...)

这种写法存在几个问题:每种组合方式都需要一个对应的 Chain 类;流式输出和异步支持需要分别适配;中间步骤难以调试和可视化。

LCEL 用统一的 Runnable 接口解决了这些问题——所有组件遵循相同的协议,可以用 | 自由拼接,流式、异步、批量等能力开箱即用。


二、Runnable 接口

Runnable 是 LCEL 的核心抽象。LangChain 中的每一个组件——模型、提示词模板、输出解析器、自定义函数——都实现了 Runnable 接口。

2.1 Runnable 的六种调用方式

所有 Runnable 组件都提供以下方法:

方法 类型 说明
invoke(input) 同步 处理单个输入,返回单个结果
batch([input1, input2, ...]) 同步 处理多个输入,内部并发执行,返回结果列表
stream(input) 同步 处理单个输入,返回生成器,逐块输出结果
ainvoke(input) 异步 invoke 的异步版本
abatch([input1, ...]) 异步 batch 的异步版本
astream(input) 异步 stream 的异步版本
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

model = ChatOpenAI(model="gpt-4o-mini")

# invoke:同步调用
response = model.invoke([HumanMessage(content="用一句话介绍 Python")])
print(response.content)

# batch:并发处理多条输入
responses = model.batch([
    [HumanMessage(content="用一句话介绍 Python")],
    [HumanMessage(content="用一句话介绍 Go")],
    [HumanMessage(content="用一句话介绍 Rust")],
])

# stream:流式输出
for chunk in model.stream([HumanMessage(content="写一首关于编程的短诗")]):
    print(chunk.content, end="", flush=True)

2.2 主要 Runnable 组件一览

langchain_core.runnables 提供了以下内置 Runnable 类型:

用途
RunnableSequence | 操作符创建,顺序执行多个 Runnable,前一步的输出作为下一步的输入
RunnableParallel 并行执行多个 Runnable,将各自结果合并为一个字典返回
RunnableLambda 将任意 Python 函数包装为 Runnable,使其可以接入管道
RunnablePassthrough 将输入原样透传,不做任何处理,常用于在并行分支中保留原始输入
RunnableBranch 根据条件判断选择不同的执行分支,实现条件路由
RunnableWithFallbacks .with_fallbacks() 方法创建,执行失败时自动切换到备用 Runnable
RunnableRetry .with_retry() 方法创建,执行失败时按策略自动重试
RunnableBinding .bind() 方法创建,预绑定部分调用参数
RunnableConfigurableFields .configurable_fields() 创建,支持在运行时动态配置参数

三、管道操作符 |

3.1 原理

| 并非 Python 内置的位运算符在此处的直接应用,而是 LangChain 通过 __or____ror__ 方法重载实现的管道操作符。

# 这两种写法等价
chain = prompt | model | parser

# 展开后等价于:
from langchain_core.runnables import RunnableSequence
chain = RunnableSequence(first=prompt, middle=[model], last=parser)

RunnableSequence 在执行时将上一步的输出直接传递给下一步的输入,形成一条完整的处理流水线:

input_dict
    |
    v
ChatPromptTemplate  --> ChatPromptValue (messages list)
    |
    v
ChatOpenAI          --> AIMessage
    |
    v
StrOutputParser     --> str
    |
    v
output

3.2 类型约束

LCEL 对管道中相邻组件的输入/输出类型有隐式约束——上一步的输出类型必须与下一步的输入类型兼容。常见的类型流转如下:

组件 输入类型 输出类型
ChatPromptTemplate dict ChatPromptValue(消息列表)
ChatOpenAI ChatPromptValue 或消息列表 AIMessage
StrOutputParser AIMessage str
JsonOutputParser AIMessage dict
RunnableLambda(fn) 任意 任意(由 fn 决定)

⚠️ 如果类型不兼容,LCEL 在运行时而非编译时才会报错。建议在开发阶段逐步测试每个组件的输入输出,避免在复杂管道中定位类型错误。


四、RunnableLambda:接入自定义逻辑

4.1 基本用法

当需要将一个普通 Python 函数嵌入 LCEL 管道时,使用 RunnableLambda 进行包装:

from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 自定义预处理函数:将输入文本转为大写并截断
def preprocess(input: dict) -> dict:
    return {
        "text": input["text"].strip()[:500],  # 截断过长输入
        "language": input.get("language", "英文")
    }

prompt = ChatPromptTemplate.from_messages([
    ("system", "你是专业翻译,请将文本翻译成{language}。"),
    ("human", "{text}")
])
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()

# 将自定义函数嵌入管道
chain = RunnableLambda(preprocess) | prompt | model | parser

result = chain.invoke({"text": "  大语言模型正在改变软件工程。  ", "language": "英文"})
print(result)

4.2 使用 @chain 装饰器

对于更复杂的自定义逻辑,可使用 @chain 装饰器将函数直接转换为 Runnable,并支持在内部调用其他 Runnable:

from langchain_core.runnables import chain

@chain
def smart_translate(input: dict) -> str:
    """根据文本长度选择不同的翻译策略"""
    text = input["text"]

    if len(text) < 100:
        # 短文本:直接翻译
        return short_chain.invoke(input)
    else:
        # 长文本:先摘要再翻译
        summary = summary_chain.invoke({"text": text})
        return translate_chain.invoke({"text": summary, "language": input["language"]})

# smart_translate 本身就是一个 Runnable,可以参与管道
final_chain = preprocess_runnable | smart_translate | postprocess_runnable

五、RunnablePassthrough:保留原始输入

RunnablePassthrough 的作用是将输入原样传递到下一步,在构建需要同时保留原始数据和处理结果的管道时非常有用。

from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

prompt = ChatPromptTemplate.from_messages([
    ("human", "用一句话总结这段文字:{text}")
])
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()

# 同时保留原文和摘要
chain = RunnableParallel({
    "original": RunnablePassthrough(),   # 原样传递输入 dict
    "summary": prompt | model | parser   # 生成摘要
})

result = chain.invoke({"text": "LangChain 是一个模块化的 LLM 应用框架,提供了从提示词管理到 Agent 构建的完整工具链。"})
print(result["original"])  # {"text": "LangChain 是..."}
print(result["summary"])   # "LangChain 是一个模块化的 LLM 框架。"

💡 RunnablePassthrough.assign(**kwargs) 是一个常用的快捷方式,可以在透传原始输入的同时追加新字段,常用于 RAG 管道中传递检索结果:

from langchain_core.runnables import RunnablePassthrough

# 在 dict 中追加 "context" 字段,同时保留其他字段
add_context = RunnablePassthrough.assign(
    context=lambda x: retriever.invoke(x["question"])
)

六、RunnableParallel:并行执行

RunnableParallel 接受一个字典,字典的每个值是一个独立的 Runnable,所有分支并发执行,结果合并为一个字典返回。

from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()

def make_translate_chain(language: str):
    prompt = ChatPromptTemplate.from_messages([
        ("human", f"将以下文本翻译成{language},只输出译文:\n\n{{text}}")
    ])
    return prompt | model | parser

# 三个翻译分支并发执行
multi_translate = RunnableParallel({
    "english": make_translate_chain("英文"),
    "japanese": make_translate_chain("日文"),
    "french":  make_translate_chain("法文"),
})

result = multi_translate.invoke({"text": "大语言模型正在改变软件工程的边界。"})
print(result["english"])   # Large language models are reshaping...
print(result["japanese"])  # 大規模言語モデルは...
print(result["french"])    # Les grands modèles de langage...

📝 RunnableParallel 也可以写成字典字面量的形式,效果完全相同:

chain = {
    "english": make_translate_chain("英文"),
    "japanese": make_translate_chain("日文"),
} | postprocess_runnable

当字典作为 | 管道中的一个节点时,LangChain 会自动将其转换为 RunnableParallel


七、流式输出

LCEL 的流式支持是逐组件透传的——只要管道中的每个组件都支持流式,整条链就可以流式输出。

7.1 同步流式

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

chain = (
    ChatPromptTemplate.from_messages([("human", "写一篇关于{topic}的短文,200字左右")])
    | ChatOpenAI(model="gpt-4o-mini")
    | StrOutputParser()
)

# stream 返回生成器,每次 yield 一个文本片段
for chunk in chain.stream({"topic": "向量数据库"}):
    print(chunk, end="", flush=True)
print()

7.2 异步流式

在 FastAPI 等异步框架中,使用 astream 以避免阻塞事件循环:

import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

chain = (
    ChatPromptTemplate.from_messages([("human", "解释{concept}")])
    | ChatOpenAI(model="gpt-4o-mini")
    | StrOutputParser()
)

async def stream_response():
    async for chunk in chain.astream({"concept": "Transformer 注意力机制"}):
        print(chunk, end="", flush=True)
    print()

asyncio.run(stream_response())

7.3 流式中间步骤

使用 astream_events 可以监听管道中每个步骤的事件,适用于调试和可观测性:

async def debug_stream():
    async for event in chain.astream_events({"concept": "RAG"}, version="v2"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            chunk = event["data"]["chunk"].content
            print(chunk, end="", flush=True)
        elif kind == "on_chain_start":
            print(f"\n[START] {event['name']}")
        elif kind == "on_chain_end":
            print(f"\n[END]   {event['name']}")

八、错误处理

8.1 with_fallbacks:备用链切换

当主链执行失败时,with_fallbacks 按顺序尝试备用链,直到某个成功为止,适用于模型服务不稳定或预算限制场景:

from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# 优先使用 gpt-4o,失败时降级到 gpt-4o-mini
primary_model   = ChatOpenAI(model="gpt-4o")
fallback_model  = ChatOpenAI(model="gpt-4o-mini")

robust_model = primary_model.with_fallbacks(
    fallbacks=[fallback_model],
    exceptions_to_handle=(Exception,)   # 捕获所有异常时触发降级
)

chain = prompt | robust_model | StrOutputParser()

💡 with_fallbacks 也可以作用于整条链,而不只是模型:

robust_chain = primary_chain.with_fallbacks([fallback_chain])

8.2 with_retry:自动重试

网络抖动或 Rate Limit 导致的临时失败,使用 with_retry 自动重试:

from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini")

model_with_retry = model.with_retry(
    retry_if_exception_type=(Exception,),  # 触发重试的异常类型
    stop_after_attempt=3,                  # 最多重试 3 次
    wait_exponential_jitter=True           # 指数退避 + 随机抖动,避免惊群
)

chain = prompt | model_with_retry | StrOutputParser()

8.3 组合使用

实际生产中通常将两者结合——先重试,重试耗尽后再降级:

from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

primary = ChatOpenAI(model="gpt-4o").with_retry(stop_after_attempt=2)
fallback = ChatOpenAI(model="gpt-4o-mini").with_retry(stop_after_attempt=2)

robust_model = primary.with_fallbacks([fallback])
chain = prompt | robust_model | StrOutputParser()

九、RunnableBranch:条件路由

RunnableBranch 根据输入内容选择不同的执行分支,类似 if-elif-else 结构:

from langchain_core.runnables import RunnableBranch
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()

def make_chain(system_prompt: str):
    return (
        ChatPromptTemplate.from_messages([
            ("system", system_prompt),
            ("human", "{question}")
        ])
        | model | parser
    )

technical_chain = make_chain("你是一个资深软件工程师,用技术语言回答问题。")
simple_chain    = make_chain("你是一个耐心的老师,用简单易懂的语言回答问题。")
default_chain   = make_chain("你是一个通用助手,请回答以下问题。")

branch = RunnableBranch(
    (lambda x: x.get("audience") == "engineer", technical_chain),
    (lambda x: x.get("audience") == "beginner",  simple_chain),
    default_chain   # 以上条件均不满足时执行
)

# 根据 audience 字段路由到不同链
result = branch.invoke({"question": "什么是 Transformer?", "audience": "beginner"})
print(result)

📝 对于更复杂的路由逻辑(多步判断、动态分支数量),推荐使用 LangGraph,将在第九篇详细介绍。


十、.bind():预绑定参数

.bind() 允许在构建时为 Runnable 预先绑定部分参数,在运行时无需再传入:

from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini")

# 预绑定 stop 参数:遇到 "END" 时停止生成
model_with_stop = model.bind(stop=["END"])

# 预绑定工具(Tool Calling 场景)
model_with_tools = model.bind_tools([search_tool, calculator_tool])

常见场景:在管道中为特定步骤固定超参数,避免每次 invoke 时重复传递。


十一、常见坑与最佳实践

坑一:RunnableParallel 的输入必须是字典

# ❌ 传入字符串,RunnableParallel 无法路由到各分支
chain = RunnableParallel({"a": chain_a, "b": chain_b})
result = chain.invoke("some text")  # TypeError

# ✅ 传入字典
result = chain.invoke({"text": "some text"})

坑二:流式输出中对中间结果做非流式处理会截断流

# ❌ 在流式管道中插入非流式的后处理函数
def blocking_postprocess(text: str) -> str:
    time.sleep(1)   # 阻塞操作
    return text.upper()

chain = prompt | model | StrOutputParser() | RunnableLambda(blocking_postprocess)
# stream() 调用会等待 blocking_postprocess 完成后一次性输出,失去流式效果

# ✅ 在流式场景下,后处理函数应保持轻量

坑三:误用 | 连接不兼容的类型

# ❌ StrOutputParser 的输出是 str,无法作为 ChatPromptTemplate 的输入(需要 dict)
chain = prompt | model | StrOutputParser() | another_prompt  # 运行时 KeyError

# ✅ 用 RunnableLambda 做类型适配
chain = (
    prompt | model | StrOutputParser()
    | RunnableLambda(lambda text: {"result": text})
    | another_prompt
)

坑四:with_retry 不区分可重试和不可重试错误

# ❌ 对所有异常重试,会导致认证错误也被反复重试
model.with_retry(retry_if_exception_type=(Exception,), stop_after_attempt=3)

# ✅ 只对网络类错误重试
import httpx
model.with_retry(
    retry_if_exception_type=(httpx.HTTPStatusError, httpx.TimeoutException),
    stop_after_attempt=3
)

十二、总结

组件 类比 核心作用
RunnableSequence (|) Unix 管道 顺序串联各组件,前一步输出传给下一步
RunnableParallel 多线程 fork 并发执行多个分支,合并结果
RunnableLambda 适配器 将普通函数包装为可接入管道的 Runnable
RunnablePassthrough 旁路 透传输入,常用于在并行分支中保留原始数据
RunnableBranch if-elif-else 根据条件路由到不同执行分支
.with_fallbacks() 熔断降级 主链失败时切换到备用链
.with_retry() 重试策略 临时失败时按策略自动重试
.bind() 偏函数 预绑定参数,简化调用

🎯 LCEL 的价值在于:用统一的接口和极简的语法,让流式、异步、并行、容错这些原本需要大量样板代码实现的能力,对所有组件开箱即用。


参考资料


下期预告

LCEL 管道解决了"如何组合"的问题,但提示词本身的质量同样关键。

第三篇《Prompt 工程:把提示词写成代码》 将系统讲解 ChatPromptTemplateMessagesPlaceholderFewShotChatMessagePromptTemplatePipelinePromptTemplate 的使用方式,以及如何设计可复用、可测试的提示词体系——这是决定 LLM 应用输出质量的核心环节。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐