【LangChain V1.2】新版LangChain与LangGraph智能体开发完整教程(详细注释版)
新版LangChain与LangGraph智能体开发完整教程(详细注释版)
目录
第一章:大模型调用基础
章节概述
本章介绍如何使用新版LangChain的统一接口初始化和调用各种大语言模型。通过init_chat_model函数,可以轻松切换不同的模型提供商,无需修改业务代码。
1.1 环境配置
配置说明
环境配置是使用LangChain的第一步。建议使用.env文件管理敏感信息(如API密钥),避免硬编码到源代码中。
1.1.1 安装依赖
依赖说明
langchain: 核心框架,提供Agent、Tool等高级抽象langchain-core: 核心接口和类型定义langchain-community: 第三方集成(各种模型提供商适配器)python-dotenv: 环境变量加载库
# =============================================================================
# 功能说明:安装LangChain开发所需的Python依赖包
# 使用场景:项目初始化时一次性安装,或在CI/CD环境中构建时安装
# 版本说明:推荐使用最新版本,LangChain 1.0+已稳定
# 注意事项:
# 1. 建议使用虚拟环境隔离项目依赖
# 2. 可使用 uv/pip/poetry 等包管理器
# 3. GPU环境可能需要额外安装 CUDA 相关驱动
# =============================================================================
pip install langchain langchain-core langchain-community python-dotenv
1.1.2 创建环境配置文件
安全建议
.env文件应加入.gitignore,避免提交到版本控制系统- 生产环境可使用环境变量或密钥管理服务(如AWS Secrets Manager)
- 不同环境(开发/测试/生产)应使用不同的API密钥
# =============================================================================
# 功能说明:创建环境配置文件,存储各模型提供商的API密钥
# 使用场景:
# - 开发环境:存储测试用API密钥
# - 测试环境:使用限流后的测试密钥
# - 生产环境:使用正式环境密钥(建议从密钥服务读取)
# 版本说明:通用配置方式,适用于所有LangChain版本
# 注意事项:
# 1. 绝对不要将真实API密钥提交到Git仓库
# 2. 密钥应定期轮换
# 3. 不同模型的API端点可能不同,需仔细核对
# =============================================================================
# .env文件
DEEPSEEK_API_KEY=your_api_key # DeepSeek模型访问密钥
DEEPSEEK_BASE_URL=https://api.deepseek.com # DeepSeek API地址
OPENAI_API_KEY=your_api_key # OpenAI模型访问密钥
OPENAI_BASE_URL=https://api.openai.com/v1 # OpenAI API地址
ANTHROPIC_API_KEY=your_api_key # Anthropic(Claude)访问密钥
ANTHROPIC_BASE_URL=https://api.anthropic.com # Anthropic API地址
DASHSCOPE_API_KEY=your_api_key # 阿里云通义千问访问密钥
DASHSCOPE_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1 # 通义千问API地址
ZHIPUAI_API_KEY=your_api_key # 智谱AI访问密钥
ZHIPUAI_BASE_URL=https://open.bigmodel.cn/api/paas/v4 # 智谱AI API地址
1.1.3 加载环境变量
# =============================================================================
# 功能说明:从.env文件加载环境变量,统一管理各模型API配置
# 使用场景:
# - 应用启动时加载配置
# - 多环境配置切换(dev/staging/prod)
# 版本说明:通用配置加载方式
# 注意事项:
# 1. load_dotenv(override=True) 会覆盖已存在的环境变量
# 2. 建议在应用入口尽早调用
# 3. 生产环境可结合环境变量使用
# =============================================================================
import os
from dotenv import load_dotenv
# =============================================================================
# 函数:load_dotenv
# 功能:从当前目录或父目录加载.env文件中的环境变量
# 参数:
# - override (bool): 是否覆盖已存在的环境变量,默认False
# True用于开发环境,False用于避免意外覆盖系统变量
# 返回值:无
# =============================================================================
# 从.env文件加载环境变量
load_dotenv(override=True)
# =============================================================================
# 配置变量声明
# 说明:统一从环境变量读取API密钥和端点,支持灵活配置
# 命名规范:
# - 使用全大写+下划线命名(符合Python常量惯例)
# - 变量名应清晰表达其用途
# =============================================================================
# 加载各模型API配置
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY") # DeepSeek API密钥
DEEPSEEK_BASE_URL = os.getenv("DEEPSEEK_BASE_URL") # DeepSeek API端点
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") # OpenAI API密钥
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL") # OpenAI API端点
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY") # Anthropic API密钥
ANTHROPIC_BASE_URL = os.getenv("ANTHROPIC_BASE_URL") # Anthropic API端点
DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY") # 阿里云通义千问密钥
DASHSCOPE_BASE_URL = os.getenv("DASHSCOPE_BASE_URL") # 通义千问API端点
ZHIPUAI_API_KEY = os.getenv("ZHIPUAI_API_KEY") # 智谱AI密钥
ZHIPUAI_BASE_URL = os.getenv("ZHIPUAI_BASE_URL") # 智谱AI API端点
1.2 模型初始化
架构说明
┌─────────────────────────────────────────────────────────────────┐ │ init_chat_model 统一入口 │ └─────────────────────────────────────────────────────────────────┘ │ ┌───────────────────┼───────────────────┐ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ DeepSeek │ │ OpenAI │ │ Anthropic│ └──────────┘ └──────────┘ └──────────┘通过统一接口
init_chat_model,可以无缝切换不同的模型提供商。
版本差异说明
- 新版 (LangChain >= 1.0): 使用
init_chat_model统一接口- 旧版 (< 1.0): 需要使用
ChatOpenAI,ChatAnthropic,ChatDeepSeek等独立类- 迁移建议: 新项目建议使用
init_chat_model,旧代码可逐步迁移
1.2.1 使用init_chat_model初始化模型
# =============================================================================
# 功能说明:使用新版LangChain的统一接口初始化各种大语言模型
# 使用场景:
# - 需要在多个模型间灵活切换的项目
# - 追求代码一致性和可维护性的企业项目
# - 需要根据负载动态选择模型的场景
# 版本说明:LangChain 1.0+ 推荐用法
# 注意事项:
# 1. api_key可传入None(对于本地模型如Ollama)
# 2. base_url需包含完整的API路径
# 3. 建议为每个模型创建独立实例,便于管理和监控
# =============================================================================
from langchain.chat_models import init_chat_model
from langchain_core.language_models import BaseChatModel
# =============================================================================
# 模型实例化函数:init_chat_model
# 功能:统一的模型初始化接口,支持多种模型提供商
# 参数说明:
# - model (str): 模型名称,如"deepseek-chat"、"gpt-4"、"claude-3-5-haiku"
# - model_provider (str): 模型提供商标识,如"deepseek"、"openai"、"anthropic"
# - api_key (str, optional): API访问密钥,本地模型可省略
# - base_url (str, optional): API端点URL
# - temperature (float, optional): 生成温度,控制随机性,0-2之间
# - max_tokens (int, optional): 最大生成token数
# 返回值:BaseChatModel实例
# =============================================================================
# =============================================================================
# 模型:DeepSeek Chat
# 提供商:DeepSeek(中国大模型厂商)
# 特点:性价比高,支持长上下文,中文支持好
# 适用场景:需要中文能力强、成本敏感的应用
# =============================================================================
deepseek_llm: BaseChatModel = init_chat_model(
model="deepseek-chat", # 模型标识符
model_provider="deepseek", # 提供商标识
api_key=DEEPSEEK_API_KEY, # API密钥(从环境变量读取)
base_url=DEEPSEEK_BASE_URL, # API端点
)
# =============================================================================
# 模型:OpenAI GPT-4
# 提供商:OpenAI
# 特点:性能最强,生态完善,但成本较高
# 适用场景:对质量要求极高、预算充足的应用
# =============================================================================
openai_llm = init_chat_model(
model="gpt-4", # GPT-4模型
model_provider="openai",
api_key=OPENAI_API_KEY,
base_url=OPENAI_BASE_URL,
)
# =============================================================================
# 模型:Claude 3.5 Haiku
# 提供商:Anthropic
# 特点:速度快、成本效益高,支持超长上下文
# 适用场景:需要快速响应、平衡成本和性能的应用
# =============================================================================
anthropic_llm = init_chat_model(
model="claude-3-5-haiku-20241022", # 使用日期版本确保稳定性
model_provider="anthropic",
api_key=ANTHROPIC_API_KEY,
base_url=ANTHROPIC_BASE_URL,
)
# =============================================================================
# 模型:Ollama 本地模型
# 提供商:Ollama(本地部署框架)
# 特点:完全本地运行,无需API密钥,支持多种开源模型
# 适用场景:
# - 隐私敏感场景(数据不出境)
# - 开发测试环境
# - 离线环境
# =============================================================================
ollama_llm = init_chat_model(
model="deepseek-r1:1.5b", # Ollama模型名称格式
model_provider="ollama",
base_url="http://localhost:11434", # Ollama默认端口
# 注意:Ollama本地模型不需要api_key
)
# =============================================================================
# 模型:通义千问 Qwen Plus
# 提供商:阿里云(通过DashScope)
# 特点:中文能力强,支持函数调用,成本适中
# 适用场景:国内生产环境,中文为主的应用
# =============================================================================
tongyi_llm = init_chat_model(
model="qwen-plus", # 通义千问plus版本
model_provider="openai", # 兼容OpenAI格式
api_key=DASHSCOPE_API_KEY,
base_url=DASHSCOPE_BASE_URL,
)
# =============================================================================
# 模型:智谱 GLM-4
# 提供商:智谱AI
# 特点:国产大模型,中文理解能力强
# 适用场景:国内应用,需要国产化替代的项目
# =============================================================================
zhipu_llm = init_chat_model(
model="glm-4",
model_provider="openai", # 兼容OpenAI格式
api_key=ZHIPUAI_API_KEY,
base_url=ZHIPUAI_BASE_URL,
)
1.2.2 统一初始化文件
架构设计说明
创建独立的init_llm.py模块,统一管理所有模型实例有以下好处:
- 单一职责:模型配置集中在专门模块
- 易于维护:修改模型配置只需改一处
- 按需加载:外部代码只需导入需要的模型
- 测试友好:便于mock和单元测试
"""
统一初始化所有支持的聊天模型
该模块提供项目级的模型实例管理,所有需要使用LLM的地方都应从
这里导入对应的模型实例,而不是重复初始化。
模块结构:
- 模型实例定义(全局常量)
- 配置参数说明
- 使用示例
使用方式:
from init_llm import deepseek_llm
response = deepseek_llm.invoke("你好")
版本说明:
LangChain 1.0+ 推荐使用 init_chat_model
旧版本请使用各提供商的独立类(如 ChatOpenAI)
"""
from langchain.chat_models import init_chat_model
from langchain_core.language_models import BaseChatModel
from env_utils import *
# =============================================================================
# 模型实例化区域
# 说明:
# - 每个模型都是全局单例,导入即创建
# - 模型配置统一在此管理
# - 可根据环境变量动态选择默认模型
# =============================================================================
# DeepSeek 模型实例
# 用途:主要使用的对话模型,适合中文场景
# 配置说明:
# - temperature=0.7:平衡创意和稳定性
# - 如需更确定性结果,可设为0.1-0.3
deepseek_llm: BaseChatModel = init_chat_model(
model="deepseek-chat",
model_provider="deepseek",
api_key=DEEPSEEK_API_KEY,
base_url=DEEPSEEK_BASE_URL,
)
# OpenAI 模型实例
# 用途:需要最高质量输出的场景
# 成本提示:GPT-4成本较高,建议用于复杂推理任务
openai_llm = init_chat_model(
model="gpt-4",
model_provider="openai",
api_key=OPENAI_API_KEY,
base_url=OPENAI_BASE_URL,
)
# Anthropic Claude 模型实例
# 用途:需要长上下文理解的任务
# 优势:上下文窗口大(200K tokens),适合长文档分析
anthropic_llm = init_chat_model(
model="claude-3-5-haiku-20241022",
model_provider="anthropic",
api_key=ANTHROPIC_API_KEY,
base_url=ANTHROPIC_BASE_URL,
)
# Ollama 本地模型实例
# 用途:开发测试、隐私敏感场景
# 注意:需先启动Ollama服务(ollama serve)
ollama_llm = init_chat_model(
model="deepseek-r1:1.5b",
model_provider="ollama",
base_url="http://localhost:11434",
)
# 通义千问模型实例
# 用途:国内生产环境,中文对话为主
# 优势:中文语境理解好,价格相对实惠
tongyi_llm = init_chat_model(
model="qwen-plus",
model_provider="openai",
api_key=DASHSCOPE_API_KEY,
base_url=DASHSCOPE_BASE_URL,
)
# 智谱AI模型实例
# 用途:需要使用国产模型的项目
# 特点:GLM系列模型,开源程度较高
zhipu_llm = init_chat_model(
model="glm-4",
model_provider="openai",
api_key=ZHIPUAI_API_KEY,
base_url=ZHIPUAI_BASE_URL,
)
1.3 模型调用方式
调用方式对比
方式 方法 适用场景 特点 同步 invoke简单脚本、测试 简单直接,阻塞等待 流式 stream聊天界面、实时展示 逐字输出,用户体验好 批量 batch离线批处理 并行处理,吞吐量高 异步 ainvoke/abatchWeb服务、高并发 非阻塞,资源利用率高
1.3.1 Invoke调用(同步)
使用场景说明
- 同步调用是最基础的调用方式
- 适合:CLI工具、简单脚本、单次请求
- 不适合:需要高并发的Web服务
单条消息调用
# =============================================================================
# 功能说明:使用invoke方法进行最简单的单条消息对话
# 使用场景:
# - 测试模型响应
# - 简单的命令行对话
# - 单次问答任务
# 版本说明:通用方法,所有LangChain版本支持
# 注意事项:
# 1. invoke是同步阻塞方法,会等待完整响应后返回
# 2. 返回的是AIMessage对象,包含丰富的元数据
# 3. 长时间请求可能触发超时,需要配置timeout
# =============================================================================
from init_llm import deepseek_llm
# =============================================================================
# 方法:invoke
# 功能:向模型发送消息并获取完整响应
# 参数:
# - input: 可以是字符串、字典消息列表、或消息对象列表
# 返回值:AIMessage对象,包含content(文本内容)和其他元数据
# =============================================================================
# 单条消息调用模型
resp = deepseek_llm.invoke("请介绍一下你自己")
# 查看返回类型
print(type(resp)) # 输出:<class 'langchain_core.messages.ai.AIMessage'>
# 获取文本内容
print(resp.content) # 输出模型的文本回复
字典格式消息列表
消息格式说明
字典格式的消息列表是最灵活的方式,支持:
- system: 系统提示词(可选,通常放在最前面)
- user: 用户消息
- assistant: 助手消息(用于多轮对话的历史记录)
# =============================================================================
# 功能说明:使用字典格式构建多轮对话上下文
# 使用场景:
# - 需要保留对话历史的场景
# - 需要设置系统提示词的任务
# - 构建复杂对话流程
# 版本说明:通用格式,LangChain 0.x 和 1.x 都支持
# 注意事项:
# 1. 字典格式会自动转换为消息对象
# 2. system消息通常只需要一条,放在最前面
# 3. 历史消息按时间顺序排列
# =============================================================================
# 定义对话历史,包含系统提示和历史对话
conversations = [
# 系统消息:定义AI的角色和行为
{"role": "system", "content": "你是一个翻译助手,可以将汉语翻译成英语"},
# 用户消息:第一个翻译请求
{"role": "user", "content": "翻译:我喜欢编程"},
# 助手消息:模型的历史回复(用于上下文理解)
{"role": "assistant", "content": "I like programming."},
# 用户消息:第二个翻译请求
{"role": "user", "content": "翻译:我喜欢大模型"},
]
# =============================================================================
# 调用invoke并传入消息列表
# 说明:模型会根据完整的上下文生成回复
# 注意:旧的对话历史会增加token消耗和延迟
# =============================================================================
resp = deepseek_llm.invoke(conversations)
print(resp.content) # 输出:I like large language models.
消息对象格式
类型安全说明
使用消息对象(HumanMessage/SystemMessage等)相比字典格式的优势:
- 类型检查:IDE可以提供自动补全和类型检查
- 方法支持:消息对象有额外的方法(如pretty_print)
- 扩展性:便于添加自定义消息类型
# =============================================================================
# 功能说明:使用LangChain消息对象构建对话
# 使用场景:
# - 需要类型安全的大型项目
# - 需要对消息进行额外处理的场景
# - 企业级应用开发
# 版本说明:LangChain核心类型,所有版本支持
# 注意事项:
# 1. 消息对象相比字典有轻微的性能开销
# 2. 建议在关键业务逻辑中使用消息对象
# 3. 混合使用字典和对象可能导致类型不一致
# =============================================================================
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
# =============================================================================
# 消息类型说明:
# - SystemMessage: 系统提示词,定义AI行为
# - HumanMessage: 用户消息
# - AIMessage: 助手消息(模型回复或人工注入的历史)
# - ToolMessage: 工具调用结果消息
# - CustomMessage: 自定义消息类型(可扩展)
# =============================================================================
# 使用消息对象构建对话
conversations = [
# SystemMessage:定义翻译助手的角色
SystemMessage(content="你是一个翻译助手,可以将汉语翻译成英语"),
# HumanMessage:用户的第一句话
HumanMessage(content="翻译:我喜欢编程"),
# AIMessage:助手的历史回复
AIMessage(content="I like programming."),
# HumanMessage:用户的第二句话
HumanMessage(content="翻译:我喜欢大模型"),
]
# 调用模型
resp = deepseek_llm.invoke(conversations)
print(resp.content) # 输出翻译结果
1.3.2 Stream调用(流式)
适用场景
- 聊天机器人的打字效果
- 代码生成工具(如GitHub Copilot)
- 实时展示AI思考过程的场景
- 长文本生成的界面
性能说明
- 流式输出用户体验更好,因为可以逐步看到结果
- 总体响应时间可能略长(需要等待第一个token)
- 适合前端展示,后端处理逻辑不变
# =============================================================================
# 功能说明:使用stream方法进行流式调用,实现打字机效果
# 使用场景:
# - 聊天机器人界面,需要实时显示输出
# - 长文本生成,需要显示进度
# - 需要尽早展示部分结果给用户
# 版本说明:通用方法,所有LangChain版本支持
# 注意事项:
# 1. stream返回生成器,需要迭代获取
# 2. 每个chunk是响应的一部分
# 3. 适合前端实时展示
# =============================================================================
from init_llm import deepseek_llm
# =============================================================================
# 方法:stream
# 功能:流式返回模型响应,边生成边输出
# 参数:与invoke相同
# 返回值:生成器,每次迭代返回一个chunk(AIMessageChunk)
# 性能提示:
# - 第一个chunk通常需要等待模型"思考"
# - 适合WebSocket推送、前端SSE等场景
# =============================================================================
# 流式调用模型
response = deepseek_llm.stream("请介绍一下你自己")
# =============================================================================
# 迭代处理流式响应
# 说明:
# - 每个chunk.content是增量内容
# - end="|" 用于分隔显示,便于观察
# - flush=True 确保立即显示(不缓冲)
# =============================================================================
for chunk in response:
# chunk 是 AIMessageChunk 对象
# chunk.content 是本轮的增量文本
print(chunk.content, end="|", flush=True)
# 输出示例:你好|,我|是一|个由|深|度求|索|...
1.3.3 Batch调用(批量)
适用场景
- 离线数据处理任务
- 需要同时处理多个独立请求
- 批量生成内容(如批量生成商品描述)
- 自动化测试
性能说明
- batch会并行处理多个请求
- 但受限于模型提供商的速率限制
- 适合不需要实时响应的场景
# =============================================================================
# 功能说明:使用batch方法批量处理多个请求
# 使用场景:
# - 离线批量处理
# - 一次性生成多个独立内容
# - 自动化测试多个用例
# 版本说明:通用方法
# 注意事项:
# 1. 请求是并行执行的(取决于提供商限制)
# 2. 如果某个请求失败,整个batch可能失败
# 3. 需要考虑API速率限制
# =============================================================================
from init_llm import deepseek_llm
# =============================================================================
# 方法:batch
# 功能:批量处理多个请求,并行执行
# 参数:消息列表
# 返回值:响应列表
# 适用条件:请求之间相互独立
# =============================================================================
# 定义要批量处理的问题列表
questions = [
"请介绍一下你自己",
"飞机为什么会飞",
"什么是大模型",
]
# 批量调用
resp = deepseek_llm.batch(questions)
# =============================================================================
# 处理批量响应
# 说明:返回顺序与输入顺序一致
# 注意:如果需要知道每个响应对应哪个请求,建议使用zip
# =============================================================================
for item in resp:
print(item.content)
print("-" * 50)
1.3.4 异步批量调用(并发)
适用场景
- 高并发Web服务
- 需要控制并发数量的场景
- 需要实时处理大量请求的服务端
并发控制说明
max_concurrency参数控制最大并发数:
- 值太大会触发API速率限制
- 值太小会降低吞吐量
- 建议根据API限制调整
# =============================================================================
# 功能说明:使用batch_as_completed实现带并发控制的批量处理
# 使用场景:
# - 高并发服务
# - 需要实时处理完成的任务
# - 需要控制API调用频率的场景
# 版本说明:LangChain 0.x+ 支持
# 注意事项:
# 1. 返回的是Future对象组成的生成器
# 2. 先完成的任务先返回
# 3. 需要使用await或同步迭代
# =============================================================================
from init_llm import deepseek_llm
# =============================================================================
# 方法:batch_as_completed
# 功能:异步批量执行,返回完成的Future
# 参数:
# - inputs: 输入列表
# - config: 配置字典,包含max_concurrency等参数
# 返回值:迭代器,每次返回(asyncio.Future, result)对
# =============================================================================
resp = deepseek_llm.batch_as_completed(
[
"请介绍一下你自己",
"飞机为什么会飞",
"什么是大模型",
],
config={
"max_concurrency": 3, # 最大并发数,根据API限制调整
}
)
# =============================================================================
# 迭代处理完成的任务
# 说明:先完成的任务先返回,无需等待所有任务完成
# 适用场景:需要尽快处理完成项的实时系统
# =============================================================================
for item in resp:
print(item)
1.4 异步调用
异步 vs 同步
同步调用: T1: [----请求----][----处理----][----返回----] 异步调用: T1: [发起请求] [处理其他任务] [获取结果] T2: [----请求----][----处理----]
适用场景
- Web框架(FastAPI、Flask异步版本)
- 需要同时处理多个请求
- 不想阻塞主线程的场景
1.4.1 Ainvoke异步调用
# =============================================================================
# 功能说明:演示ainvoke异步调用的非阻塞特性
# 使用场景:
# - FastAPI异步端点
> - 需要同时发起多个请求
> - GUI应用中避免阻塞主线程
# 版本说明:Python 3.7+,async/await语法
# 注意事项:
> 1. 需要在async函数中调用
> 2. 使用asyncio.run()执行顶层协程
> 3. 异步代码中不能混用同步调用
# =============================================================================
import asyncio
import time
from langchain.chat_models import init_chat_model
from env_utils import DEEPSEEK_API_KEY, DEEPSEEK_BASE_URL
# =============================================================================
# 模型初始化
# 说明:异步调用使用的模型实例与同步相同
# 注意:某些提供商可能有异步特定的配置
# =============================================================================
llm = init_chat_model(
model="deepseek-chat",
model_provider="deepseek",
api_key=DEEPSEEK_API_KEY,
base_url=DEEPSEEK_BASE_URL,
)
async def demo_async_invoke():
"""
演示异步调用的非阻塞特性。
该函数展示了如何:
1. 发起异步请求但不等待
2. 在等待期间执行其他任务
3. 稍后获取异步结果
Returns:
无(直接打印结果)
使用示例:
>>> asyncio.run(demo_async_invoke())
"""
print("程序开始...")
# =================================================================
# 步骤1:发起异步请求
# 说明:使用llm.ainvoke立即返回一个协程对象
# 注意:此时请求已发送,但不会阻塞
# =================================================================
print("发起异步模型调用 (ainvoke)...")
async_task = llm.ainvoke("用一句话解释人工智能。")
# =================================================================
# 步骤2:在等待模型响应的同时执行其他任务
# 说明:主线程可以继续处理其他逻辑
# 这是异步的核心优势
# =================================================================
print("模型请求已发送,程序无需等待,继续执行...")
for i in range(3):
time.sleep(1) # 模拟其他任务
print(f"正在执行第{i + 1}个任务...")
# =================================================================
# 步骤3:等待并获取模型结果
# 说明:使用await等待协程完成
# 此时才真正阻塞等待模型响应
# =================================================================
print("其他任务已完成,现在等待模型返回结果...")
response = await async_task
print(f"模型返回: {response.content}")
# =============================================================================
# 执行异步函数
# asyncio.run() 创建事件循环并执行协程
# 这是标准的Python异步入口
# =============================================================================
asyncio.run(demo_async_invoke())
1.4.2 Astream异步流式调用
# =============================================================================
# 功能说明:演示异步流式调用的非阻塞特性
# 使用场景:
# - 异步Web服务
> - 需要流式推送的前端
> - 实时处理AI响应的系统
# 版本说明:需要Python 3.7+
# 注意事项:
> 1. 使用async for迭代流
> 2. 每个chunk处理后可以立即推送
> 3. 适合WebSocket或SSE场景
# =============================================================================
async def demo_async_stream():
"""
演示异步流式调用的非阻塞特性。
该函数展示如何:
1. 异步发起流式请求
2. 并行执行其他任务
3. 异步迭代处理流式响应
Returns:
无
架构说明:
请求阶段可以并行处理其他逻辑
响应阶段通过async for逐块处理
"""
print("程序开始...")
# =================================================================
# 发起异步流式调用
# 注意:astream返回的是异步生成器
# =================================================================
print("发起异步流式调用 (astream)...")
stream_resp = llm.astream("请一句话解释机器学习的基本概念。")
print("流式请求已发送,程序无需等待,继续执行...")
# =================================================================
# 并行执行其他任务
# 说明:与同步流式不同,异步可以在等待时处理其他逻辑
# =================================================================
for i in range(3):
time.sleep(1)
print(f"正在执行第{i + 1}个任务...")
print("其他任务已完成,开始处理流式结果...")
print("流式输出: ", end="", flush=True)
# =================================================================
# 异步迭代流式响应
# async for:异步生成器的迭代方式
# 每个chunk立即处理或推送
# =================================================================
async for chunk in stream_resp:
if hasattr(chunk, 'content'):
print(chunk.content, end="", flush=True)
print("流式输出结束")
1.4.3 Abatch异步批量调用
# =============================================================================
# 功能说明:演示异步批量调用的非阻塞特性
# 使用场景:
> - 异步Web服务中的批量处理
> - 需要高并发的后台任务
> - 批量API调用
# 版本说明:Python 3.7+
# 注意事项:
> 1. 使用abatch而不是batch
> 2. 返回协程对象列表
> 3. 需要await获取最终结果
# =============================================================================
async def demo_async_batch():
"""
演示异步批量调用的非阻塞特性。
该函数展示如何:
1. 异步发起多个批量请求
2. 并行执行其他任务
3. 异步获取所有批量结果
Returns:
无
性能提示:
异步批量比同步批量有更好的资源利用率
特别适合I/O密集型任务
"""
print("程序开始...")
questions = ["用一句话说明深度学习与传统机器学习的区别"]
# =================================================================
# 发起异步批量调用
# 说明:abatch返回协程对象,不会立即执行
# =================================================================
print("发起异步批量调用 (abatch)...")
batch_resp = llm.abatch(questions)
print("批量请求已发送,程序无需等待,继续执行...")
# =================================================================
# 并行执行其他任务
# 说明:在批量请求执行期间处理其他逻辑
# =================================================================
for i in range(3):
time.sleep(1)
print(f"正在执行第{i + 1}个任务...")
# =================================================================
# 等待批量请求完成
# await batch_resp:等待所有请求完成
# =================================================================
print("其他任务已完成,现在等待批量处理结果...")
responses = await batch_resp
# =================================================================
# 处理响应结果
# responses 是响应列表,按输入顺序排列
# =================================================================
for response in responses:
print(f"批量响应: {response.content}")
1.5 推理模型
模型类型说明
- Chat模型:直接生成对话回复
- Reasoner模型(推理模型):先进行推理思考,再给出答案
- 适用场景:复杂推理、数学问题、代码调试
版本差异说明
- 支持推理的模型:DeepSeek-R1、OpenAI o1/o3、Anthropic Claude等
- 旧版本:需要手动实现推理过程
- 新版:模型原生支持推理,API返回结果中包含推理内容
# =============================================================================
# 功能说明:使用DeepSeek的Reasoner模型进行推理
# 使用场景:
# - 数学问题求解
# - 复杂逻辑推理
# - 代码调试和分析
# - 需要展示推理过程的场景
# 版本说明:DeepSeek API 1.0+ 支持
# 注意事项:
# 1. 推理模型成本通常高于普通Chat模型
# 2. 响应时间可能较长
# 3. reasoning_content字段包含推理过程
# =============================================================================
from langchain.chat_models import init_chat_model
from env_utils import DEEPSEEK_API_KEY, DEEPSEEK_BASE_URL
# =============================================================================
# 初始化推理模型
# 说明:deepseek-reasoner是DeepSeek的专用推理模型
# 特点:先思考再回答,适合复杂问题
# =============================================================================
reasoner_llm = init_chat_model(
model="deepseek-reasoner", # 推理专用模型
model_provider="deepseek",
api_key=DEEPSEEK_API_KEY,
base_url=DEEPSEEK_BASE_URL,
)
# =============================================================================
# 调用推理模型
# 注意:response中包含额外的推理内容
# =============================================================================
response = reasoner_llm.invoke("10个字解释为什么天空是蓝色的?")
# =============================================================================
# 获取推理过程(reasoning_content)
# 说明:推理模型会在reasoning_content中展示思考过程
# content_blocks 包含完整的响应结构
# =============================================================================
# 新版API可能使用不同方式获取推理内容
if hasattr(response, 'reasoning_content'):
# 部分模型直接提供reasoning_content属性
reasoning_steps = response.reasoning_content
print("推理过程:", reasoning_steps)
elif hasattr(response, 'content_blocks'):
# 通过content_blocks获取
reasoning_steps = [b for b in response.content_blocks if b["type"] == "reasoning"]
print("推理过程:", " ".join(step["reasoning"] for step in reasoning_steps))
# 最终答案在content中
print("最终答案:", response.content)
1.6 速率限制
速率限制说明
使用InMemoryRateLimiter可以:
- 避免触发API提供商的速率限制
- 保护下游服务不被冲垮
- 实现公平调度
适用场景
- 公共服务需要限流
- 多租户环境下的公平使用
- 测试环境避免超出配额
# =============================================================================
# 功能说明:使用InMemoryRateLimiter控制API调用频率
# 使用场景:
# - 需要严格控制调用频率
# - 多用户共享API配额的场景
> - 避免触发API速率限制错误
# 版本说明:LangChain 0.x+ 支持
# 注意事项:
> 1. 限流器是内存级别的,多进程需要共享限流器
> 2. 请求仍会排队等待,不是直接丢弃
> 3. 需要根据API限制合理配置参数
# =============================================================================
import time
from langchain.chat_models import init_chat_model
from langchain_core.rate_limiters import InMemoryRateLimiter
from env_utils import DEEPSEEK_API_KEY, DEEPSEEK_BASE_URL
# =============================================================================
# 创建速率限制器
# 参数说明:
# - requests_per_second: 每秒允许的最大请求数
# - check_every_n_seconds: 检查频率
# - max_bucket_size: 突发容量,允许短暂超过限制
# =============================================================================
rate_limiter = InMemoryRateLimiter(
requests_per_second=0.1, # 每10秒最多1个请求(适合免费API)
check_every_n_seconds=0.1, # 检查间隔0.1秒
max_bucket_size=5, # 突发容量,最多允许5个请求
)
# =============================================================================
# 初始化带速率限制的模型
# 说明:将限流器传入模型的rate_limiter参数
# 限流器会自动在每次请求前进行检查
# =============================================================================
deepseek_llm = init_chat_model(
model="deepseek-chat",
model_provider="deepseek",
api_key=DEEPSEEK_API_KEY,
base_url=DEEPSEEK_BASE_URL,
rate_limiter=rate_limiter # 传入速率限制器
)
# =============================================================================
# 使用限流后的模型
# 说明:每个请求都会经过速率限制器
# 时间间隔小于限制时,请求会等待
# =============================================================================
for i in range(3):
response = deepseek_llm.invoke("你好")
print(response.content)
print(f"请求时间戳: {time.time()}") # 观察时间间隔
第二章:智能体创建与管理
章节概述
本章介绍如何使用create_agent创建AI智能体,包括工具绑定、中间件配置、Prompt管理等高级功能。
架构设计说明
┌────────────────────────────────────────────────────────────────────┐ │ Agent 系统架构 │ └────────────────────────────────────────────────────────────────────┘ │ ┌───────────────────────────┼───────────────────────────┐ ▼ ▼ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Prompt │ │ Model │ │ Tools │ │ Management │ │ (LLM) │ │ Binding │ └──────────────┘ └──────────────┘ └──────────────┘ │ ▼ ┌──────────────────┐ │ Tool Executor │ └──────────────────┘ │ ┌──────────────────┐ │ Middleware │ │ (Pre/Post Hook) │ └──────────────────┘
2.1 创建基础智能体
Agent创建流程
- 定义工具(Tool)
- 选择模型(Model)
- 配置系统提示词(System Prompt)
- 创建Agent实例
- 调用Agent
2.1.1 使用create_agent创建智能体
# =============================================================================
# 功能说明:使用create_agent创建最简单的AI智能体
# 使用场景:
> - 需要工具调用能力的对话助手
> - 简单的问答机器人
> - 入门级Agent开发示例
# 版本说明:LangChain 1.0+ 新增API
# 注意事项:
> 1. create_agent返回的是可调用的Agent对象
> 2. tools参数可以为空列表
> 3. system_prompt定义Agent角色和行为
# =============================================================================
from langchain.agents import create_agent
from langchain_core.tools import tool
from init_llm import deepseek_llm
# =============================================================================
# 装饰器:@tool
# 功能:将普通函数转换为Agent可调用的工具
# 参数说明:
> - name: 工具名称(可选,默认使用函数名)
> - description: 工具描述(Agent决定是否调用的重要依据)
> - args_schema: 参数Schema(可选,用于复杂参数)
# =============================================================================
@tool
def get_weather(location: str) -> str:
"""
获取指定位置的天气信息。
这是一个简单的天气查询工具,返回指定城市的天气状况。
Args:
location: 城市名称,如"北京"、"上海"、"东京"等
Returns:
格式化的天气信息字符串
Example:
>>> get_weather("北京")
"北京天气:晴朗,25°C"
"""
return f"天气信息:{location}的天气是晴朗的"
# =============================================================================
# 创建Agent
# 函数:create_agent
# 参数说明:
> - model: 底层使用的语言模型(必填)
> - tools: 可用工具列表(可选,默认空)
> - system_prompt: 系统提示词(可选)
> - middleware: 中间件列表(可选)
> - checkpointer: 检查点存储(可选,用于记忆)
# 返回值:Agent可调用对象
# =============================================================================
agent = create_agent(
model=deepseek_llm, # 底层模型
tools=[get_weather], # 可用工具
system_prompt="你是一个天气助手,你可以帮助用户获取指定位置的天气信息。",
)
# =============================================================================
# 调用Agent
# 方法:invoke
# 参数:
> - input: 输入字典,必须包含"messages"键
> - messages格式:可以是字典列表或消息对象列表
# 返回值:包含执行结果的字典,messages中包含完整对话历史
# =============================================================================
resp = agent.invoke({"messages": [{"role": "user", "content": "北京的天气"}]})
# =============================================================================
# 响应结构说明
# resp 是一个字典,包含:
> - messages: 消息列表,包含完整对话历史
> - 其他状态字段(如有自定义状态)
# messages列表项类型:
> - HumanMessage: 用户消息
> - AIMessage: 模型回复
> - ToolMessage: 工具调用结果
# =============================================================================
print(resp)
2.1.2 带多个工具的智能体
# =============================================================================
# 功能说明:创建带有多个工具的智能体,演示复杂工具定义
# 使用场景:
> - 企业级客服机器人
> - 多功能助手
> - 需要多领域知识查询的系统
# 版本说明:LangChain 1.0+
# 注意事项:
> 1. 每个工具应有清晰的description
> 2. 参数应提供详细的类型和取值说明
> 3. 工具之间应有明确的功能区分
# =============================================================================
from langchain.agents import create_agent
from langchain_core.tools import tool
from init_llm import deepseek_llm
# =============================================================================
# 工具1:股票查询
# 说明:完整的工具定义,包含详细的docstring
# =============================================================================
@tool
def get_stock_price(company: str, timeframe: str = "today") -> str:
"""
获取指定公司的股票价格信息。
该工具提供实时和历史股票价格查询,支持今日、本周、本月三个时间维度。
Args:
company: 公司名称,支持以下值:
- 苹果公司(或Apple)
- 微软公司(或Microsoft)
- 谷歌公司(或Google)
timeframe: 时间范围,可选值:
- today: 今日价格
- week: 本周平均价格
- month: 本月平均价格
默认值:today
Returns:
格式化的股票价格字符串,示例:
"苹果公司 today股票价格: 185.20美元"
Raises:
无异常返回,当公司不存在时返回友好的错误信息
Example:
>>> get_stock_price("苹果公司", "today")
"苹果公司 today股票价格: 185.20美元"
"""
# 模拟数据(实际应用中应调用真实API)
mock_data = {
"苹果公司": {"today": 185.20, "week": 183.50, "month": 180.75},
"微软公司": {"today": 415.86, "week": 412.30, "month": 405.42},
"谷歌公司": {"today": 15.42, "week": 15.20, "month": 14.85}
}
# 查询数据
if company in mock_data:
price = mock_data[company].get(timeframe, "未知时间范围")
return f"{company} {timeframe}股票价格: {price}美元"
else:
return f"未找到股票代码 {company} 的数据"
# =============================================================================
# 工具2:新闻搜索
# 说明:演示返回多条结果的工具
# =============================================================================
@tool
def search_news(company: str) -> str:
"""
搜索指定公司的财经新闻。
该工具提供公司相关的最新财经新闻,包括股价变动、重要公告、行业动态等。
Args:
company: 公司名称
Returns:
每行一条新闻的字符串,格式为新闻标题。
如无相关新闻,返回友好提示。
Example:
>>> search_news("苹果公司")
"苹果发布新款iPhone,股价上涨3%\\n苹果与欧盟达成反垄断和解协议"
"""
# 模拟新闻数据
mock_news = {
"苹果公司": [
"苹果发布新款iPhone,股价上涨3%",
"苹果与欧盟达成反垄断和解协议",
"苹果将在印度扩大生产规模"
],
"微软公司": [
"微软Azure云业务季度增长超预期",
"微软完成对Nuance的收购",
"微软推出新一代AI助手Copilot"
],
}
# 获取新闻列表
news_list = mock_news.get(company, [f"未找到{company}的相关新闻"])
return "\n".join(news_list)
# =============================================================================
# 创建多工具Agent
# 说明:tools参数传入工具列表
# Agent会根据用户问题自动选择合适的工具
# =============================================================================
agent = create_agent(
model=deepseek_llm,
tools=[get_stock_price, search_news], # 传入多个工具
)
# =============================================================================
# 调用多工具Agent
# 说明:Agent会自动决定调用哪个工具
# 可能需要多轮交互(问价格 -> 调工具 -> 回答 -> 问新闻 -> 调工具 -> 回答)
# =============================================================================
resp = agent.invoke({"messages": [
{"role": "user", "content": "苹果公司上周股价是多少?有什么新闻?"},
]})
print(resp)
2.2 动态模型选择
架构设计说明
┌─────────────────────────────────────────────────────────────────┐ │ 动态模型选择中间件架构 │ └─────────────────────────────────────────────────────────────────┘ 用户请求 → [中间件] → 根据条件选择模型 → [选中的模型] → 响应 ↓ ┌─────────┴─────────┐ ↓ ↓ [基础模型] [高级模型] (message_count < 3) (message_count >= 3)
使用场景说明
- 成本优化:简单问题用便宜模型,复杂问题用贵模型
- 性能调优:根据任务复杂度选择合适的模型
- 多租户:根据用户等级选择不同能力的模型
# =============================================================================
# 功能说明:使用中间件实现根据对话状态动态选择模型
# 使用场景:
> - 需要根据上下文复杂度选择模型
> - 实现成本优化策略
> - A/B测试不同模型效果
# 版本说明:LangChain 1.0+ 中间件API
# 注意事项:
> 1. 中间件在模型调用前后执行
> 2. 可以修改请求和响应
> 3. 中间件顺序很重要
# =============================================================================
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call, ModelResponse, ModelRequest
from langchain.chat_models import init_chat_model
from langchain_core.tools import tool
from env_utils import DEEPSEEK_API_KEY, DEEPSEEK_BASE_URL, DASHSCOPE_API_KEY, DASHSCOPE_BASE_URL
# =============================================================================
# 工具定义
# 两个简单的查询工具
# =============================================================================
@tool
def get_current_location() -> str:
"""获取当前位置。"""
return "当前位置为北京市。"
@tool
def get_weather(city: str) -> str:
"""获取指定城市的天气信息。"""
return f"{city}的天气为晴朗,25°C。"
# =============================================================================
# 基础模型配置
# 说明:用于简单查询的轻量级模型
# =============================================================================
basic_model = init_chat_model(
model="deepseek-chat",
model_provider="deepseek",
api_key=DEEPSEEK_API_KEY,
base_url=DEEPSEEK_BASE_URL,
)
# =============================================================================
# 高级模型配置
# 说明:用于复杂查询的高性能模型
# =============================================================================
advanced_model = init_chat_model(
model="qwen-plus", # 通义千问plus
model_provider="openai",
api_key=DASHSCOPE_API_KEY,
base_url=DASHSCOPE_BASE_URL,
)
# =============================================================================
# 动态模型选择中间件
# 装饰器:@wrap_model_call
# 功能:在模型调用前/后执行自定义逻辑
# 参数说明:
> - request: ModelRequest对象,包含请求信息
> - handler: 实际的模型调用处理器
# 返回值:ModelResponse对象
# =============================================================================
@wrap_model_call
def dynamic_model_selection(request: ModelRequest, handler) -> ModelResponse:
"""
动态模型选择中间件。
根据对话状态(消息数量)自动选择合适的模型:
- 消息数 < 3:使用基础模型(简单查询)
- 消息数 >= 3:使用高级模型(复杂对话)
Args:
request: 模型请求对象,包含:
- state: 包含messages等状态信息
handler: 模型调用处理器
Returns:
ModelResponse: 模型响应对象
使用场景:
- 成本敏感的项目(简单问题用便宜模型)
- 需要差异化服务质量的项目
- 实现渐进式复杂度的对话系统
"""
print("request : ", request)
# =================================================================
# 获取当前对话的消息数量
# 说明:通过request.state访问状态
# messages包含完整的对话历史
# =================================================================
message_count = len(request.state['messages'])
# =================================================================
# 根据条件选择模型
# 策略说明:
# - 初始对话(< 3条消息)使用基础模型
# - 复杂对话(>= 3条消息)使用高级模型
# 实际项目中可根据业务逻辑调整策略
# =================================================================
if message_count < 3:
model = basic_model
print(f"使用基础模型 (消息数: {message_count})")
else:
model = advanced_model
print(f"使用高级模型 (消息数: {message_count})")
# =================================================================
# 执行模型调用
# request.override(model=model) 创建新的请求,使用指定的模型
# handler() 执行实际的模型调用
# =================================================================
return handler(request.override(model=model))
# =============================================================================
# 创建带中间件的Agent
# 说明:middleware参数传入中间件列表
# 中间件按照列表顺序执行
# =============================================================================
agent = create_agent(
model=basic_model, # 默认模型(会被中间件覆盖)
tools=[get_current_location, get_weather],
middleware=[dynamic_model_selection], # 动态模型选择中间件
)
# =============================================================================
# 测试动态模型选择
# 第一次调用:消息数=2(system + user),使用基础模型
# =============================================================================
response = agent.invoke({"messages":[
{"role":"system","content":"你是一个天气助手"},
{"role":"user","content":"我现在在的位置天气如何?"}
]})
print(response)
2.3 Prompt配置
Prompt工程最佳实践
- 清晰明确:清楚地说明Agent的角色和限制
- 结构化:使用清晰的格式组织Prompt
- 示例驱动:提供Few-shot示例
- 动态适配:根据上下文动态调整Prompt
2.3.1 字符串格式Prompt
# =============================================================================
# 功能说明:使用简单字符串作为系统Prompt
# 使用场景:
> - 快速原型开发
> - 简单的单角色Agent
> - Prompt不需要动态变化的场景
# 版本说明:所有版本支持
# 注意事项:
> 1. 字符串Prompt最简单但扩展性差
> 2. 建议仅用于简单场景
> 3. 复杂场景建议使用Structured Prompt
# =============================================================================
agent = create_agent(
model=deepseek_llm,
tools=[get_weather],
system_prompt="你是能查询任何问题的助手" # 简单字符串Prompt
)
2.3.2 SystemMessage对象格式Prompt
使用场景说明
- 需要明确指定Prompt类型的场景
- 与其他SystemMessage混合使用的场景
- 需要在代码中以类型安全方式处理Prompt
# =============================================================================
# 功能说明:使用SystemMessage对象作为Prompt
# 使用场景:
> - 需要与其他系统消息组合
> - 追求类型安全的项目
> - 需要在运行时动态构建Prompt
# 版本说明:LangChain核心功能
# 注意事项:
> 1. SystemMessage可以包含更多元信息
> 2. 便于与消息历史管理系统集成
# =============================================================================
from langchain_core.messages import SystemMessage
agent = create_agent(
model=deepseek_llm,
tools=[get_weather],
system_prompt=SystemMessage(content="你是能查询任何问题的助手")
)
2.3.3 动态Prompt
架构设计说明
┌─────────────────────────────────────────────────────────────────┐ │ 动态Prompt中间件流程 │ └─────────────────────────────────────────────────────────────────┘ 用户请求 → [动态Prompt中间件] → 根据context生成Prompt → [LLM] → 响应 ↑ context参数传入
使用场景
- 客服分级服务:VIP用户获得更专业的回复
- 多租户隔离:不同租户使用不同的Prompt模板
- 任务导向:根据任务类型选择不同的指令
# =============================================================================
# 功能说明:使用dynamic_prompt中间件根据运行时上下文动态生成Prompt
# 使用场景:
> - 根据用户类型生成不同风格的回复
> - 根据任务类型选择不同的指令模板
> - 实现个性化的对话体验
# 版本说明:LangChain 1.0+
# 注意事项:
> 1. context_schema定义上下文参数结构
> 2. 动态Prompt可以访问runtime.context
> 3. 应提供合理的默认值
# =============================================================================
from typing import TypedDict
from langchain.agents import create_agent
from langchain.agents.middleware import dynamic_prompt, ModelRequest
from langchain_core.tools import tool
from init_llm import deepseek_llm
# =============================================================================
# 上下文Schema定义
# 说明:定义动态Prompt可以访问的上下文参数
# 使用TypedDict确保类型安全
# =============================================================================
class AgentContext(TypedDict):
"""
Agent运行时上下文定义。
该类定义动态Prompt可以访问的上下文信息,
包含用户类型和用户ID。
Attributes:
query_type: 查询类型,支持"vip"和"normal"
uid: 用户唯一标识符
"""
query_type: str # 用户类型:vip/normal
uid: str # 用户ID
# =============================================================================
# 动态Prompt函数
# 装饰器:@dynamic_prompt
# 功能:根据context参数动态生成系统Prompt
# =============================================================================
@dynamic_prompt
def dynamic_support_prompt(request: ModelRequest) -> str:
"""
根据query_type动态生成客服Prompt。
该函数根据用户类型生成不同风格的客服指令:
- VIP用户:专业、深度分析的回复风格
- 普通用户:简洁、友好的回复风格
Args:
request: 包含运行时上下文信息的请求对象
Returns:
str: 根据用户类型定制的系统Prompt
内部逻辑:
1. 从request.runtime.context获取用户类型
2. 根据类型选择对应的Prompt模板
3. 返回组合后的完整Prompt
"""
print("request", request)
# =================================================================
# 获取运行时上下文
# 说明:通过request.runtime.context访问
# =================================================================
query_type = request.runtime.context["query_type"]
# =================================================================
# 基础指令模板
# 说明:所有用户共享的基础指令
# =================================================================
base_instruction = "你是一名专业的电商客服助手。请根据工具查询结果,准确、清晰地回答用户问题。"
# =================================================================
# 根据用户类型选择不同的扩展指令
# VIP用户:专业严谨风格
# 普通用户:简洁友好风格
# =================================================================
if query_type == "vip":
return f"""{base_instruction}
当前角色:高级支持专员
工作要求:
1.深度分析:仔细分析用户描述,识别潜在的根本问题。
2.精准分类:将问题明确归类(如"物流问题"、"产品质量"、"售后申请")。
3.方案规划:若工具能解决,提供具体步骤;若需人工,明确告知后续流程。
请使用更专业、严谨的语言。
"""
else:
return f"""{base_instruction}
当前角色:一线客服助手
工作要求:
1.简洁友好:回复要简单明了,避免复杂术语。
保持友好和高效的沟通风格。
"""
# =============================================================================
# 创建Agent
# 说明:context_schema定义上下文参数结构
# =============================================================================
agent = create_agent(
model=deepseek_llm,
tools=[],
middleware=[dynamic_support_prompt], # 动态Prompt中间件
context_schema=AgentContext # 上下文Schema
)
# =============================================================================
# 测试VIP用户
# 说明:传入vip类型的上下文
# =============================================================================
response1 = agent.invoke(
{"messages": [{"role": "user", "content": "我有一个问题需要帮助"}]},
context={"query_type": "vip", "uid": "user123"} # 传入上下文
)
# =============================================================================
# 测试普通用户
# 说明:传入normal类型的上下文
# =============================================================================
response2 = agent.invoke(
{"messages": [{"role": "user", "content": "我有一个问题需要帮助"}]},
context={"query_type": "normal", "uid": "user456"} # 传入上下文
)
第三章:工具开发与错误处理
章节概述
本章详细介绍如何定义高质量的工具,包括参数验证、错误处理、Schema定义等企业级实践。
3.1 基础工具定义
3.1.1 使用@tool装饰器
# =============================================================================
# 功能说明:演示@tool装饰器的两种用法
# 使用场景:
> - 简单工具:直接使用@tool
> - 复杂工具:需要详细文档的用@tool(name, description)
# 版本说明:LangChain核心功能
# 注意事项:
> 1. 工具的description非常重要,影响Agent选择
> 2. 应提供清晰的参数说明
> 3. docstring会被自动用作工具描述(除非手动指定)
# =============================================================================
from langchain.agents import create_agent
from langchain_core.tools import tool
from init_llm import deepseek_llm
# =============================================================================
# 简单工具示例
# 说明:@tool直接装饰函数,函数名和docstring作为工具名和描述
# =============================================================================
@tool
def get_weather(location: str) -> str:
"""
获取指定位置的天气信息。
Args:
location: 城市名称
Returns:
天气信息字符串
"""
return f"{location}的天气是晴朗的"
# =============================================================================
# 带完整文档的工具示例
# 说明:使用@tool(name, description)显式指定工具信息
# =============================================================================
@tool("get_employee_info", description="根据员工ID查询员工的姓名、部门、职务和邮箱。")
def get_employee_info(employee_id: str) -> str:
"""
根据员工ID查询员工的详细信息。
该工具从企业数据库中查询员工的基本信息,包括姓名、部门、职务和邮箱。
Args:
employee_id: 员工唯一标识符,格式为E开头的编号,如E001、E002
Returns:
包含员工完整信息的格式化字符串,格式如下:
"员工ID {id} 的信息如下:姓名 - {name},部门 - {dept},职务 - {pos},邮箱 - {email}"
Raises:
当员工ID不存在时,返回友好的错误提示
Example:
>>> get_employee_info("E001")
"员工ID E001 的信息如下:姓名 - 张三,部门 - 技术部,职务 - 高级软件工程师,邮箱 - zhangsan@company.com"
"""
# 模拟员工数据库
mock_employee_database = {
"E001": {
"name": "张三",
"department": "技术部",
"position": "高级软件工程师",
"email": "zhangsan@company.com"
},
"E002": {
"name": "李四",
"department": "市场部",
"position": "市场经理",
"email": "lisi@company.com"
},
"E003": {
"name": "王五",
"department": "人力资源部",
"position": "招聘专员",
"email": "wangwu@company.com"
}
}
# =================================================================
# 查询员工记录
# 使用dict.get()安全访问,避免KeyError
# =================================================================
employee_record = mock_employee_database.get(employee_id)
if employee_record:
# =================================================================
# 格式化返回信息
# 说明:清晰分隔各字段,便于前端展示
# =================================================================
return f"员工ID {employee_id} 的信息如下:姓名 - {employee_record['name']},部门 - {employee_record['department']},职务 - {employee_record['position']},邮箱 - {employee_record['email']}"
else:
return f"员工ID {employee_id} 不存在"
# =============================================================================
# 创建Agent并测试工具
# =============================================================================
agent = create_agent(
model=deepseek_llm,
tools=[get_employee_info],
system_prompt="你是一个员工信息查询助手"
)
result = agent.invoke({"messages": [{"role": "user", "content": "查询员工ID E001 的信息"}]})
print(result["messages"][-1].content)
3.1.2 自定义工具名称和描述
# =============================================================================
# 功能说明:演示自定义工具名称和描述的用法
# 使用场景:
> - 函数名不够语义化时
> - 同一功能需要多个别名时
> - 需要更详细描述的场景
# 版本说明:LangChain核心功能
# 注意事项:
> 1. name参数会覆盖默认的函数名
> 2. description参数是Agent判断是否调用工具的主要依据
> 3. 描述应包含参数说明和返回值说明
# =============================================================================
@tool("get_employee_info", description="根据员工ID查询员工的姓名、部门、职务和邮箱。")
def get_employee_info(employee_id: str) -> str:
"""根据员工ID查询员工的详细信息"""
# 实现代码...
3.2 使用Pydantic Schema定义工具参数
架构设计说明
Pydantic Schema vs JSON Schema ┌─────────────────────────────────────────────────────────────┐ │ 参数定义方式对比 │ ├─────────────────────────────────────────────────────────────┤ │ Pydantic Schema │ │ ✓ 类型安全(IDE自动补全) │ │ ✓ 支持数据验证(@field_validator) │ │ ✓ 支持默认值和必填控制 │ │ ✓ 面向对象,易于维护 │ │ │ │ JSON Schema │ │ ✓ 更轻量(无需定义类) │ │ ✓ 更接近OpenAPI标准 │ │ ✓ 适合简单的参数定义 │ └─────────────────────────────────────────────────────────────┘
# =============================================================================
# 功能说明:使用Pydantic BaseModel定义复杂工具参数
# 使用场景:
> - 多参数组合查询
> - 需要参数验证的场景
> - 需要提供默认值和可选参数的查询
# 版本说明:需要安装pydantic
# 注意事项:
> 1. BaseModel子类定义参数结构
> 2. Field()添加元信息和描述
> 3. @field_validator进行自定义验证
# =============================================================================
import json
from typing import Optional, Literal
from langchain.agents import create_agent
from langchain_core.tools import tool
from pydantic import BaseModel, Field, field_validator
from init_llm import deepseek_llm
# =============================================================================
# Pydantic Schema定义
# 说明:使用类定义参数结构,支持复杂的验证逻辑
# =============================================================================
class QueryTicketsSchema(BaseModel):
"""
工单查询参数Schema。
该类定义工单查询工具的参数结构,支持多种过滤条件组合。
Attributes:
ticket_id: 工单ID,精确匹配
assigner: 分配人姓名,精确匹配
status: 工单状态,可选值:open/resolved/closed
priority: 优先级,可选值:low/medium/high
使用示例:
>>> schema = QueryTicketsSchema(status="open", priority="high")
>>> # 查询所有状态为open且优先级为high的工单
"""
# =================================================================
# 参数定义
# Field参数说明:
# - default: 默认值(None表示可选参数)
# - description: 参数描述(Agent据此理解参数含义)
# =================================================================
ticket_id: Optional[str] = Field(
default=None,
description="工单ID"
)
assigner: Optional[str] = Field(
default=None,
description="工单分配人"
)
status: Optional[Literal["open", "resolved", "closed"]] = Field(
default=None,
description="工单状态,open(待处理),resolved(已处理),closed(已关闭)"
)
priority: Optional[Literal["low", "medium", "high"]] = Field(
default=None,
description="工单优先级,low(低),medium(中),high(高)"
)
# =================================================================
# 参数验证器
# 说明:在参数传递给函数前进行预处理
# 可以进行格式化、转换、验证等操作
# =================================================================
@field_validator("ticket_id")
def validate_ticket_id(cls, v):
"""
自动将ticket_id转为大写。
确保输入的ticket_id格式统一,避免大小写不一致导致的查询问题。
Args:
v: 输入的ticket_id值
Returns:
转为大写后的ticket_id(如果非空)
"""
return v.upper() if v else None
# =============================================================================
# 使用Schema的工具定义
# args_schema参数:指定参数验证Schema
# =============================================================================
@tool(args_schema=QueryTicketsSchema)
def query_tickets(
ticket_id: str = None,
assigner: str = None,
status: str = None,
priority: str = None
) -> str:
"""
根据条件查询工单详细信息。
支持多条件组合查询,所有参数都是可选的,不传参返回所有工单。
Args:
ticket_id: 工单ID(精确匹配)
assigner: 分配人姓名(精确匹配)
status: 工单状态(open/resolved/closed)
priority: 优先级(low/medium/high)
Returns:
JSON格式的工单列表,包含总数和详情
Example:
>>> query_tickets(assigner="张三", priority="high")
'{"total_count": 2, "tickets": [...]}'
"""
# 模拟工单数据库
mock_tickets_db = [
{
"ticket_id": "TK2025012001",
"assigner": "张三",
"title": "登录页面加载缓慢",
"status": "open",
"priority": "low"
},
{
"ticket_id": "TK2025012002",
"assigner": "李四",
"title": "用户头像上传失败",
"status": "open",
"priority": "medium"
},
{
"ticket_id": "TK2025011901",
"assigner": "张三",
"title": "支付成功通知未发送",
"status": "resolved",
"priority": "high"
},
]
# =================================================================
# 条件过滤
# 策略:逐层筛选,保留满足所有条件的记录
# =================================================================
filtered_tickets = mock_tickets_db
if ticket_id:
filtered_tickets = [t for t in filtered_tickets if t["ticket_id"] == ticket_id]
if assigner:
filtered_tickets = [t for t in filtered_tickets if t["assigner"] == assigner]
if status:
filtered_tickets = [t for t in filtered_tickets if t["status"] == status]
if priority:
filtered_tickets = [t for t in filtered_tickets if t["priority"] == priority]
# =================================================================
# 返回结果
# 说明:使用JSON格式便于前端解析
# ensure_ascii=False: 保留中文
# indent=2: 格式化输出,便于调试
# =================================================================
if not filtered_tickets:
return "没有找到任何工单"
return json.dumps({
"total_count": len(filtered_tickets),
"tickets": filtered_tickets
}, ensure_ascii=False, indent=2)
# =============================================================================
# 创建Agent并测试
# =============================================================================
agent = create_agent(
model=deepseek_llm,
tools=[query_tickets],
system_prompt="你是一个工单查询助手"
)
result = agent.invoke({
"messages": [{"role": "user", "content": "请帮我查询一下张三负责的优先级别为高的工单信息"}]
})
print(result["messages"][-1].content)
3.3 使用JSONSchema定义工具参数
对比说明
特性 Pydantic Schema JSON Schema 类型安全 ✓ 强类型 ✗ 弱类型 验证器 ✓ 内置支持 ✗ 需自定义 代码量 较多 较少 复杂度 适合复杂结构 适合简单结构
# =============================================================================
# 功能说明:使用JSONSchema定义工具参数
# 使用场景:
> - 追求简洁的参数定义
> - 需要OpenAPI兼容的场景
> - 简单参数结构的工具
# 版本说明:LangChain核心功能
# 注意事项:
> 1. JSONSchema是声明式的,适合简单参数
> 2. 复杂验证逻辑需要额外处理
> 3. 部分IDE可能不支持JSONSchema的自动补全
# =============================================================================
from langchain.agents import create_agent
from langchain_core.tools import tool
from init_llm import deepseek_llm
# =============================================================================
# JSONSchema定义
# 说明:使用字典格式定义参数结构
# 遵循JSON Schema规范
# =============================================================================
query_ticket_jsonschema = {
"type": "object", # 类型:对象
"properties": { # 属性定义
"ticket_id": {
"type": "string",
"description": "工单ID"
},
"assigner": {
"type": "string",
"description": "工单分配人"
},
"status": {
"type": "string",
"enum": ["open", "resolved", "closed"], # 枚举值限制
"description": "工单状态"
},
"priority": {
"type": "string",
"enum": ["low", "medium", "high"],
"description": "工单优先级"
}
},
"required": [] # 必填参数列表(空表示全部可选)
}
@tool(args_schema=query_ticket_jsonschema)
def query_tickets(
ticket_id: str = None,
assigner: str = None,
status: str = None,
priority: str = None
) -> str:
"""根据条件查询工单详细信息"""
# 实现代码...
3.4 工具调用错误处理
错误处理策略
┌─────────────────────────────────────────────────────────────────┐ │ 错误处理架构 │ └─────────────────────────────────────────────────────────────────┘ 工具执行 → [try块] → 正常返回 ↓ [except块] → 捕获异常 → 转换为友好错误信息 → 返回ToolMessage
最佳实践
- 分层处理:不同类型的异常返回不同的友好提示
- 日志记录:记录详细错误信息便于排查
- 用户友好:向用户展示可理解的错误信息
- 可恢复性:某些错误可以提供重试建议
3.4.1 基础错误处理
# =============================================================================
# 功能说明:使用wrap_tool_call中间件捕获和处理工具调用错误
# 使用场景:
> - 防止工具异常导致整个Agent崩溃
> - 将错误转换为用户友好的消息
> - 实现重试逻辑
# 版本说明:LangChain 1.0+ 中间件API
# 注意事项:
> 1. 中间件捕获所有工具执行异常
> 2. 返回ToolMessage而非抛出异常
> 3. 可以实现复杂的重试和降级逻辑
# =============================================================================
import requests
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_tool_call
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool
from langgraph.prebuilt.tool_node import ToolCallRequest
from init_llm import deepseek_llm
# =============================================================================
# 工具定义
# 说明:模拟可能失败的API调用
# =============================================================================
@tool
def get_stock_price(symbol: str) -> str:
"""
获取指定股票代码的当前价格。
Args:
symbol: 股票代码,如"TCEHY"(腾讯)、"AAPL"(苹果)
Returns:
股票价格信息字符串
Raises:
Exception: 网络错误或API不可用时抛出异常
"""
print(f"调用股票查询工具: {symbol}")
try:
# =================================================================
# 模拟可能失败的API调用
# 说明:timeout=1秒模拟快速失败场景
# 实际使用中应设置合理的超时时间
# =================================================================
response = requests.get(
f"https://api.xxx.com/stocks/{symbol}",
timeout=1
)
return f"股票 {symbol} 当前价格为 {response['price']}"
except requests.exceptions.RequestException as e:
# =================================================================
# 异常处理
# 说明:捕获网络相关异常,记录并重新抛出
# 重新抛出可以让中间件统一处理
# =================================================================
print(f"查询股票数据失败: {str(e)}")
raise Exception(f"查询股票数据失败: {str(e)}")
# =============================================================================
# 错误处理中间件
# 说明:捕获工具调用异常,返回友好的错误消息
# =============================================================================
@wrap_tool_call
def handle_tool_call_error(request: ToolCallRequest, handler):
"""
工具调用错误处理中间件。
捕获工具执行过程中的所有异常,并转换为用户友好的错误消息。
这样做的好处:
1. 防止Agent因异常中断
2. 向用户提供可理解的错误信息
3. 便于日志记录和问题追踪
Args:
request: 工具调用请求,包含:
- tool_call: 工具调用信息(id、名称、参数)
handler: 实际工具执行处理器
Returns:
ToolMessage: 正常返回或错误消息的ToolMessage
错误处理策略:
- 连接错误:提示稍后重试
- 权限错误:提示联系管理员
- 输入错误:提示检查参数
- 其他错误:通用错误提示
"""
print("request", request)
try:
# =================================================================
# 执行工具调用
# 说明:正常情况下返回工具执行结果
# =================================================================
return handler(request)
except Exception as e:
# =================================================================
# 异常捕获
# 说明:将异常转换为ToolMessage
# 返回给Agent继续处理流程
# =================================================================
return ToolMessage(
content=f"当前股票查询服务不可用,错误信息: {str(e)}",
tool_call_id=request.tool_call["id"] # 必须提供正确的tool_call_id
)
# =============================================================================
# 创建带错误处理的Agent
# =============================================================================
agent = create_agent(
model=deepseek_llm,
tools=[get_stock_price],
middleware=[handle_tool_call_error] # 错误处理中间件
)
# =============================================================================
# 测试错误处理
# 由于API不存在,会触发异常
# 中间件捕获异常并返回友好提示
# =============================================================================
result = agent.invoke({"messages": [{"role": "user", "content": "查询TCEHY的股票价格"}]})
print(result["messages"][-1].content)
3.4.2 分层错误处理
架构设计说明
┌─────────────────────────────────────────────────────────────────┐ │ 分层错误处理策略 │ └─────────────────────────────────────────────────────────────────┘ 异常类型 处理策略 用户提示 ───────────────────────────────────────────────────────────── ConnectionError → 网络问题 → "稍后重试" PermissionError → 权限问题 → "联系管理员" ToolException → 输入验证问题 → "检查参数" Exception → 其他未分类问题 → "技术团队通知"
# =============================================================================
# 功能说明:演示分层错误处理,根据异常类型提供不同的用户提示
# 使用场景:
> - 需要区分不同错误类型的场景
> - 提供精准的错误恢复指导
> - 企业级应用的质量保证
# 版本说明:LangChain 1.0+
# 注意事项:
> 1. 异常类型应覆盖所有可能的错误
> 2. 提示信息应具有可操作性
> 3. 关键错误应记录日志便于排查
# =============================================================================
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_tool_call
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool, ToolException
from pydantic import BaseModel
from init_llm import deepseek_llm
import json
import random
# =============================================================================
# 模拟数据
# 说明:简单的内存数据库用于演示
# =============================================================================
TICKET_DATABASE = {
"T001": {"title": "登录问题", "status": "处理中", "assignee": "张三"},
"T002": {"title": "支付失败", "status": "已解决", "assignee": "李四"},
"T003": {"title": "页面加载慢", "status": "待处理", "assignee": "王五"}
}
# =============================================================================
# 工具1:查询工单
# 说明:模拟50%概率的连接失败
# =============================================================================
@tool
def query_ticket(ticket_id: str) -> str:
"""
根据工单ID查询工单详情。
Args:
ticket_id: 工单ID,如T001、T002
Returns:
工单详情的JSON字符串
Raises:
ConnectionError: 数据库连接超时时抛出
ToolException: 工单不存在时抛出
"""
# 模拟网络不稳定:50%概率失败
if random.random() < 0.5:
raise ConnectionError("数据库连接超时,请稍后重试")
if ticket_id not in TICKET_DATABASE:
raise ToolException(f"工单ID {ticket_id} 不存在")
return json.dumps(TICKET_DATABASE[ticket_id], ensure_ascii=False, indent=2)
# =============================================================================
# 工具2:更新工单状态
# 说明:模拟权限检查失败
# =============================================================================
@tool
def update_ticket_status(ticket_id: str, new_status: str) -> str:
"""
更新工单状态。
Args:
ticket_id: 工单ID
new_status: 新状态,可选值:待处理、处理中、已解决、已关闭
Returns:
更新成功的确认消息
Raises:
ToolException: 工单不存在或状态值无效时抛出
PermissionError: 权限不足时抛出(模拟20%概率)
"""
valid_statuses = ["待处理", "处理中", "已解决", "已关闭"]
if ticket_id not in TICKET_DATABASE:
raise ToolException(f"工单ID {ticket_id} 不存在")
if new_status not in valid_statuses:
raise ToolException(f"状态必须是: {', '.join(valid_statuses)}")
# 模拟权限检查失败
if random.random() < 0.2:
raise PermissionError("权限不足:只有管理员可以关闭工单")
TICKET_DATABASE[ticket_id]["status"] = new_status
return f"工单 {ticket_id} 状态已更新为: {new_status}"
# =============================================================================
# 分层错误处理中间件
# 说明:根据不同异常类型提供不同的处理策略
# =============================================================================
@wrap_tool_call
def intelligent_error_handler(request, handler):
"""
智能错误处理中间件。
根据错误类型返回不同的指导信息,帮助用户了解发生了什么问题以及如何解决。
错误分类:
1. ConnectionError - 系统暂时繁忙
2. PermissionError - 权限限制
3. ToolException - 输入验证失败
4. 其他异常 - 未知错误
Args:
request: 工具调用请求
handler: 工具执行处理器
Returns:
ToolMessage: 包含错误信息的工具消息
"""
try:
return handler(request)
except ConnectionError as e:
# =================================================================
# 连接错误处理
# 说明:网络或数据库暂时不可用
# 建议:稍后重试
# =================================================================
return ToolMessage(
content=f"系统暂时繁忙:{str(e)}。建议您稍后重试此操作。",
tool_call_id=request.tool_call["id"]
)
except PermissionError as e:
# =================================================================
# 权限错误处理
# 说明:用户没有执行此操作的权限
# 建议:联系管理员
# =================================================================
return ToolMessage(
content=f"权限限制:{str(e)}。如需执行此操作,请联系管理员。",
tool_call_id=request.tool_call["id"]
)
except ToolException as e:
# =================================================================
# 工具异常处理
# 说明:通常是输入参数验证失败
# 建议:检查输入
# =================================================================
return ToolMessage(
content=f"输入验证失败:{str(e)}。请检查输入参数是否正确。",
tool_call_id=request.tool_call["id"]
)
except Exception as e:
# =================================================================
# 通用异常处理
# 说明:捕获所有未预料的异常
# 建议:技术团队介入
# =================================================================
return ToolMessage(
content=f"意外错误:{str(e)}。技术团队已收到通知,请稍后重试。",
tool_call_id=request.tool_call["id"]
)
# =============================================================================
# 创建Agent
# =============================================================================
agent = create_agent(
model=deepseek_llm,
tools=[query_ticket, update_ticket_status],
middleware=[intelligent_error_handler],
system_prompt="你是企业客服工单系统助手"
)
# =============================================================================
# 测试各种错误场景
# =============================================================================
test_cases = [
"查询工单T999的详情", # 不存在的工单ID -> ToolException
"把工单T001状态更新为完结状态", # 无效状态 -> ToolException
"查询工单T001的详情", # 正常查询(可能触发ConnectionError)
]
for query in test_cases:
response = agent.invoke({"messages": [{"role": "user", "content": query}]})
print(f"用户查询: {query}")
print(f"助手回复: {response['messages'][-1].content}")
print("-" * 50)
第四章:智能体调用与流式处理
章节概述
本章介绍Agent的多种调用方式(同步/异步/流式),以及Checkpoint和Store的使用。
4.1 Agent调用方式
调用方式对比
┌─────────────────────────────────────────────────────────────────┐ │ Agent调用方式对比 │ ├─────────────────────────────────────────────────────────────────┤ │ invoke(同步) → 等待完整响应,适合简单场景 │ │ stream(流式) → 边生成边输出,适合实时展示 │ │ ainvoke(异步) → 非阻塞调用,适合高并发 │ └─────────────────────────────────────────────────────────────────┘
4.1.1 Invoke调用
# =============================================================================
# 功能说明:使用invoke方法调用Agent
# 使用场景:
> - 简单的同步请求
> - CLI工具或脚本
> - 不需要实时展示的场景
# 版本说明:LangChain核心API
# 注意事项:
> 1. invoke会等待完整响应后才返回
> 2. 适合简单的一次性请求
> 3. 响应包含完整的messages历史
# =============================================================================
from langchain.agents import create_agent
from langchain.tools import tool
from langgraph.graph.state import CompiledStateGraph
from init_llm import deepseek_llm
@tool
def get_weather(city: str) -> str:
"""
获取指定城市的天气信息。
Args:
city: 城市名称
Returns:
天气信息字符串
"""
return f"{city}的天气为晴朗,25°C。"
# =============================================================================
# 创建Agent
# =============================================================================
agent = create_agent(
model=deepseek_llm,
tools=[get_weather],
system_prompt="你是能查询任何问题的助手"
)
# =============================================================================
# 使用invoke调用
# 说明:等待完整响应后返回
# 返回值是包含messages的字典
# =============================================================================
resp = agent.invoke({
"messages": [
{"role": "system", "content": "你是一个天气查询助手,只回答天气相关的问题"},
{"role": "user", "content": "北京天气如何?"}
]
})
# =============================================================================
# 遍历并打印消息
# pretty_print() 是消息对象的格式化打印方法
# =============================================================================
for msg in resp["messages"]:
msg.pretty_print()
4.1.2 Stream流式调用
适用场景
- 聊天机器人界面
- 实时展示AI思考过程
- 需要尽早反馈的长文本生成
# =============================================================================
# 功能说明:使用stream方法流式调用Agent
# 使用场景:
> - WebSocket实时推送
> - 前端SSE (Server-Sent Events)
> - 需要展示进度的场景
# 版本说明:LangChain 0.x+
# 注意事项:
> 1. stream返回生成器
> 2. 每个chunk是执行过程中的一个步骤
> 3. 可以配合stream_mode控制输出内容
# =============================================================================
from langchain.agents import create_agent
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from init_llm import deepseek_llm
@tool
def query_customer_data(customer_id: str) -> dict:
"""
查询客户基本信息。
Args:
customer_id: 客户ID
Returns:
客户信息字典
"""
return {
"customer_id": customer_id,
"name": "张三",
"level": "VIP",
"join_date": "2023-01-15"
}
# =============================================================================
# 创建Agent(带检查点支持)
# 说明:InMemorySaver用于会话状态持久化
# =============================================================================
agent = create_agent(
model=deepseek_llm,
tools=[query_customer_data],
checkpointer=InMemorySaver()
)
# =============================================================================
# 配置参数
# 说明:configurable.thread_id用于区分不同会话
# =============================================================================
config = {"configurable": {"thread_id": "xxx"}}
# =============================================================================
# 流式调用Agent
# 参数说明:
> - stream_mode: 输出模式
> - "checkpoints": 输出检查点状态
> - "updates": 输出每步更新
> - "messages": 输出消息增量
> - "custom": 自定义输出
# =============================================================================
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "查询客户ID为12345的完整信息"}]},
config=config,
stream_mode="checkpoints"
):
print(chunk)
print("-" * 50)
4.1.3 异步Agent调用
# =============================================================================
# 功能说明:使用异步方式调用Agent
# 使用场景:
> - FastAPI异步端点
> - 需要同时处理多个请求
> - 高并发Web服务
# 版本说明:Python 3.7+,需要async/await支持
# 注意事项:
> 1. 需要在async函数中调用
> 2. 使用asyncio.run()执行顶层协程
> 3. 适合I/O密集型场景
# =============================================================================
import asyncio
from langchain.agents import create_agent
from langchain.tools import tool
from init_llm import deepseek_llm
@tool
def get_weather(city: str) -> str:
"""获取指定城市的天气信息。"""
weather_data = {"北京": "晴朗,15°C", "上海": "多云,18°C"}
return f"{city}的天气:{weather_data.get(city, '信息暂缺')}"
@tool
def get_scenic_spots(city: str, interest_type: str = "通用") -> str:
"""根据兴趣类型推荐城市景点"""
return f"{city}景点推荐:故宫、长城"
# =============================================================================
# 异步Agent调用函数
# =============================================================================
async def async_query():
"""
异步查询示例。
演示如何在异步上下文中调用Agent。
"""
# 创建Agent
agent = create_agent(
model=deepseek_llm,
tools=[get_weather, get_scenic_spots],
system_prompt="你是一个专业的旅行规划助手",
)
# =================================================================
# 使用ainvoke异步调用
# 说明:aiQuery不阻塞,可以并发执行其他任务
# =================================================================
response = await agent.ainvoke({
"messages": [{"role": "user", "content": "请帮我查询北京的天气信息"}]
})
return response
# =============================================================================
# 执行异步函数
# =============================================================================
if __name__ == "__main__":
response = asyncio.run(async_query())
print(response['messages'][-1].content)
4.2 Checkpoint检查点
Checkpoint机制说明
┌─────────────────────────────────────────────────────────────────┐ │ Checkpoint 工作原理 │ └─────────────────────────────────────────────────────────────────┘ 会话1: 消息1 → 消息2 → 消息3 → [保存Checkpoint] → 消息4 → ... ↑ 会话2: ──────────────────────────────────── [恢复Checkpoint] → 消息3 作用: 1. 保存对话状态,实现多轮对话 2. 支持会话恢复和继续 3. 实现对话历史的持久化
Checkpoint vs Store
- Checkpoint: 保存当前对话状态(短期记忆)
- Store: 保存跨会话的持久化数据(长期记忆)
4.2.1 内存检查点(InMemorySaver)
# =============================================================================
# 功能说明:使用InMemorySaver实现会话状态保存
# 使用场景:
> - 开发测试环境
> - 单进程应用
> - 不需要持久化的场景
# 版本说明:LangGraph组件
# 注意事项:
> 1. 内存检查点进程重启后丢失
> 2. 适合临时测试或开发
> 3. 生产环境建议使用MySQL等持久化存储
# =============================================================================
from langchain.agents import create_agent
from langchain_core.stores import InMemoryStore
from langgraph.checkpoint.memory import InMemorySaver
from init_llm import deepseek_llm
# =============================================================================
# 创建检查点和存储
# InMemorySaver: 对话状态持久化(短期记忆)
# InMemoryStore: 跨会话数据存储(长期记忆)
# =============================================================================
checkpointer = InMemorySaver()
agent = create_agent(
model=deepseek_llm,
tools=[],
checkpointer=checkpointer, # 传入检查点器
store=InMemoryStore() # 传入存储(可选)
)
# =============================================================================
# Session 1
# 说明:不同的thread_id代表不同的会话
# =============================================================================
config1 = {"configurable": {"thread_id": "session_001"}}
resp1 = agent.invoke(
{"messages": [{"role": "user", "content": "你好,我叫张三"}]},
config=config1
)
print(resp1["messages"][-1].content)
# =============================================================================
# Session 2
# 说明:不同会话无法访问之前的状态
# 这是因为使用了不同的thread_id
# =============================================================================
config2 = {"configurable": {"thread_id": "session_002"}}
resp2 = agent.invoke(
{"messages": [{"role": "user", "content": "我叫什么名字?"}]},
config=config2
)
print(resp2["messages"][-1].content) # Agent不知道名字(新会话)
4.2.2 MySQL持久化检查点
架构设计说明
┌─────────────────────────────────────────────────────────────────┐ │ MySQL Checkpoint 架构 │ └─────────────────────────────────────────────────────────────────┘ ┌─────────┐ ┌─────────────┐ ┌─────────────┐ │ Agent │────▶│ Checkpointer │────▶│ MySQL │ └─────────┘ └─────────────┘ └─────────────┘ │ ┌─────┴─────┐ │ checkpoints│ │ metadata │ └───────────┘
适用场景
- 生产环境需要会话持久化
- 多实例部署需要共享会话
- 需要会话历史的审计追溯
# =============================================================================
# 功能说明:使用MySQL持久化检查点
# 使用场景:
> - 生产环境
> - 多进程/多实例部署
> - 需要会话历史追溯的场景
# 版本说明:需要安装pymysql
# 注意事项:
> 1. 需要提前创建数据库
> 2. checkpointer.setup() 会自动创建表
> 3. 建议使用连接池管理连接
# =============================================================================
from langchain.agents import create_agent
from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver
from init_llm import deepseek_llm
# =============================================================================
# 数据库连接配置
# 说明:使用PyMySQLSaver连接MySQL
# URI格式:mysql+pymysql://用户名:密码@主机:端口/数据库?字符集
# =============================================================================
DB_URI = "mysql+pymysql://root:123456@localhost:3306/langchain_db?charset=utf8mb4"
# =============================================================================
# 使用上下文管理器
# 说明:确保连接正确关闭
# =============================================================================
with PyMySQLSaver.from_conn_string(DB_URI) as checkpointer:
# =================================================================
# 初始化数据库表
# 说明:setup()会创建必要的表结构
# 只需执行一次,后续运行会自动跳过
# =================================================================
checkpointer.setup()
# =================================================================
# 创建Agent
# =================================================================
agent = create_agent(
model=deepseek_llm,
tools=[],
checkpointer=checkpointer # 使用MySQL检查点
)
# =================================================================
# 会话1
# =================================================================
config = {"configurable": {"thread_id": "session001"}}
resp1 = agent.invoke(
{"messages": [{"role": "user", "content": "你好,我叫张三"}]},
config=config
)
print(resp1["messages"][-1].content)
# =================================================================
# 会话2(使用同一thread_id,会话继续)
# =================================================================
resp2 = agent.invoke(
{"messages": [{"role": "user", "content": "你知道我的信息吗?"}]},
config=config
)
print(resp2["messages"][-1].content) # Agent记得之前的对话
4.3 自定义流式输出
使用场景说明
- 在工具执行过程中显示进度
- 实现自定义的流式UI
- 需要同时输出多种类型数据的场景
# =============================================================================
# 功能说明:使用get_stream_writer实现自定义流式输出
# 使用场景:
> - 工具执行进度展示
> - 多数据类型同时流式输出
> - 自定义UI组件的数据推送
# 版本说明:LangGraph配置API
# 注意事项:
> 1. get_stream_writer()需要在工具执行上下文中调用
> 2. 可以输出任意结构的数据
> 3. 配合agent.stream()的stream_mode使用
# =============================================================================
import time
from langchain.agents import create_agent
from langchain_core.tools import tool
from langgraph.config import get_stream_writer
from init_llm import deepseek_llm
@tool
def generate_sales_report() -> str:
"""
生成销售报告。
该工具演示如何使用get_stream_writer输出自定义进度信息。
"""
# =================================================================
# 获取流式写入器
# 说明:writer可以在工具执行过程中输出数据
# 可以是任意结构的数据(字典、字符串等)
# =================================================================
writer = get_stream_writer()
# 输出开始状态
writer({"type": "生成销售报告", "message": "开始生成销售报告"})
time.sleep(1)
writer({"type": "生成销售报告", "message": "销售报告生成了 25%"})
time.sleep(1)
writer({"type": "生成销售报告", "message": "销售报告生成了 50%"})
time.sleep(1)
writer({"type": "生成销售报告", "message": "销售报告生成了 75%"})
time.sleep(1)
writer({"type": "生成销售报告", "message": "销售报告生成了 100%"})
@tool
def generate_inventory_report() -> str:
"""生成库存报告"""
writer = get_stream_writer()
writer("开始生成库存报告")
time.sleep(1)
writer("库存报告生成了 25%")
time.sleep(1)
writer("库存报告生成了 50%")
time.sleep(1)
writer("库存报告生成了 75%")
time.sleep(1)
writer("库存报告生成了 100%")
# =============================================================================
# 创建Agent
# =============================================================================
agent = create_agent(
model=deepseek_llm,
tools=[generate_sales_report, generate_inventory_report]
)
# =============================================================================
# 流式调用并处理自定义输出
# stream_mode: ["custom", "updates"]
# 说明:
> - "custom": 捕获工具中writer()的输出
> - "updates": 捕获Agent状态的更新
# =============================================================================
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "生成销售报告和库存报告"}]},
stream_mode=["custom", "updates"]
):
print(chunk)
print("-" * 50)
第五章:记忆管理系统
章节概述
本章介绍LangChain的记忆管理系统,包括短期记忆(基于Checkpoint)和长期记忆(基于Store)。
记忆系统架构
┌─────────────────────────────────────────────────────────────────┐ │ 记忆管理系统架构 │ └─────────────────────────────────────────────────────────────────┘ ┌─────────────────────┐ ┌─────────────────────┐ │ 短期记忆 │ │ 长期记忆 │ │ (Checkpoint) │ │ (Store) │ ├─────────────────────┤ ├─────────────────────┤ │ 生命周期: 会话内 │ │ 生命周期: 永久 │ │ 存储内容: 对话历史 │ │ 存储内容: 用户偏好 │ │ 实现方式: 自动保存 │ │ 实现方式: 手动存储 │ └─────────────────────┘ └─────────────────────┘ 会话A: ──────────────────────────────────────── │ 消息1 │ 消息2 │ 消息3 │ (Checkpoint保存) │ ... 会话B: ─── (Store查询) ──────────────────────────
5.1 短期记忆(Short Memory)
机制说明
短期记忆基于Checkpoint实现,用于保存单次会话中的对话历史,使得Agent能够记住之前说过的话。
5.1.1 基础短期记忆
# =============================================================================
# 功能说明:使用Checkpoint实现基础的短期记忆
# 使用场景:
> - 需要多轮对话的应用
> - 简单的对话式AI助手
> - 需要Agent记住之前对话内容的场景
# 版本说明:LangGraph组件
# 注意事项:
> 1. Checkpoint自动保存对话历史
> 2. 同一thread_id共享记忆
> 3. 不同thread_id有独立的记忆
# =============================================================================
from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
from init_llm import deepseek_llm
# =============================================================================
# 创建内存检查点
# 说明:InMemorySaver使用内存存储,进程重启后丢失
# =============================================================================
checkpointer = InMemorySaver()
# =============================================================================
# 创建带检查点的Agent
# 说明:checkpointer会自动保存每次invoke的状态
# =============================================================================
agent = create_agent(
model=deepseek_llm,
tools=[],
checkpointer=checkpointer # 启用短期记忆
)
# =============================================================================
# 配置会话ID
# 说明:thread_id用于区分不同会话
# =============================================================================
config = {"configurable": {"thread_id": "session001"}}
# =============================================================================
# 第一轮对话:告诉Agent名字
# =============================================================================
resp1 = agent.invoke(
{"messages": [{"role": "user", "content": "你好,我叫张三"}]},
config=config
)
print(resp1["messages"][-1].content)
# =============================================================================
# 第二轮对话:询问名字
# 说明:由于使用了相同的thread_id,Agent能记住之前说过的话
# =============================================================================
resp2 = agent.invoke(
{"messages": [{"role": "user", "content": "我叫什么名字?"}]},
config=config
)
print(resp2["messages"][-1].content) # Agent会回答"张三"
5.1.2 自定义状态(State)
State扩展说明
除了messages,Agent还可以维护自定义的状态信息,如用户ID、偏好设置等。
# =============================================================================
# 功能说明:使用自定义State扩展Agent状态
# 使用场景:
> - 需要传递用户信息
> - 需要维护会话上下文
> - 实现复杂的多轮对话逻辑
# 版本说明:LangChain 1.0+
# 注意事项:
> 1. 自定义State需要继承AgentState
> 2. 状态会在每次invoke时更新
> 3. 状态可以用于工具调用的条件判断
# =============================================================================
from langchain.agents import create_agent, AgentState
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from init_llm import deepseek_llm
# =============================================================================
# 自定义状态定义
# 说明:继承AgentState添加自定义字段
# =============================================================================
class CustomState(AgentState):
"""
自定义会话状态。
该类扩展了基础AgentState,添加了用户相关的状态字段。
Attributes:
user_id: 用户唯一标识符
hobby: 用户爱好
other_info: 其他用户信息(字典格式)
"""
user_id: str # 用户ID
hobby: str # 用户爱好
other_info: dict # 其他信息
# =============================================================================
# 创建Agent时指定state_schema
# =============================================================================
agent = create_agent(
model=deepseek_llm,
tools=[],
checkpointer=InMemorySaver(),
state_schema=CustomState # 指定自定义状态
)
config = {"configurable": {"thread_id": "session001"}}
# =============================================================================
# 调用时传入自定义状态
# 说明:状态会保存到Checkpoint中
# =============================================================================
resp1 = agent.invoke({
"messages": [{"role": "user", "content": "你好,我叫张三"}],
"user_id": "user001", # 自定义状态
"hobby": "旅游、滑雪、喝茶", # 自定义状态
"other_info": {"age": 28, "gender": "男"} # 自定义状态
}, config=config)
print(resp1["messages"][-1].content)
# =============================================================================
# 查询当前状态
# 说明:get_state返回当前会话的所有状态
# =============================================================================
state = agent.get_state(config=config)
print(state)
5.1.3 在工具中修改状态
状态修改模式说明
使用Command对象在工具中修改状态,可以实现:
- 更新State字段
- 添加ToolMessage
- 控制Agent下一步行为
# =============================================================================
# 功能说明:在工具中通过Command对象修改状态
# 使用场景:
> - 工具执行后需要更新上下文
> - 需要在工具中添加消息
> - 实现条件性的状态更新
# 版本说明:LangGraph Types
# 注意事项:
> 1. Command是状态更新的原子操作
> 2. 可以同时更新多个字段
> 3. 必须提供正确的tool_call_id
# =============================================================================
from langchain.agents import create_agent, AgentState
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import ToolRuntime
from langgraph.types import Command
from init_llm import deepseek_llm
# =============================================================================
# 工具1:获取信息
# 说明:读取ToolRuntime中的状态
# =============================================================================
@tool
def get_info(runtime: ToolRuntime) -> str:
"""
获取用户信息。
Args:
runtime: 运行时对象,包含状态和工具调用信息
Returns:
用户信息字符串
"""
# =================================================================
# 通过runtime.state访问状态
# 说明:runtime是LangGraph自动注入的上下文对象
# =================================================================
user_id = runtime.state["user_id"]
user_level = runtime.state["user_level"]
return f"用户ID: {user_id}, 用户等级: {user_level}"
# =============================================================================
# 工具2:更新信息
# 说明:使用Command对象更新状态
# =============================================================================
@tool
def update_info(runtime: ToolRuntime, user_id: str, user_level: str) -> Command:
"""
更新用户信息。
Args:
runtime: 运行时对象
user_id: 新用户ID
user_level: 新用户等级
Returns:
Command对象,用于更新状态和添加消息
"""
# =================================================================
# 参数验证
# 说明:如果参数无效,返回错误消息而不更新状态
# =================================================================
if not user_id or not user_level:
return Command(update={
"messages": [ToolMessage(
content=f"用户ID不能为空或用户等级不能为空",
tool_call_id=runtime.tool_call_id
)]
})
# =================================================================
# 使用Command更新状态
# Command参数说明:
# - update: 要更新的状态字段
# - messages: 要添加的消息(ToolMessage)
# 返回后,Agent会使用更新后的状态继续执行
# =================================================================
return Command(update={
"user_id": user_id, # 更新user_id
"user_level": user_level, # 更新user_level
"messages": [ToolMessage(
content=f"用户信息已经更改,用户ID: {user_id}, 用户等级: {user_level}",
tool_call_id=runtime.tool_call_id
)]
})
# =============================================================================
# 自定义状态定义
# =============================================================================
class CustomState(AgentState):
user_id: str
user_level: str
# =============================================================================
# 创建Agent
# =============================================================================
agent = create_agent(
model=deepseek_llm,
tools=[get_info, update_info],
checkpointer=InMemorySaver(),
state_schema=CustomState
)
config = {"configurable": {"thread_id": "session001"}}
# =============================================================================
# 测试:获取信息
# =============================================================================
resp1 = agent.invoke({
"messages": [{"role": "user", "content": "获取我的信息"}],
"user_id": "user_001",
"user_level": "VIP",
}, config=config)
print(resp1["messages"][-1].content)
# =============================================================================
# 测试:更新状态
# =============================================================================
resp2 = agent.invoke({
"messages": [{"role": "user", "content": "我把我的信息user_id改成 user_002,用户等级改成 normal"}]},
config=config
)
print(resp2["messages"][-1].content)
# =============================================================================
# 验证状态已更新
# =============================================================================
state = agent.get_state(config=config)
print(state)
5.2 中间件修改状态
中间件分类
- before_model: 模型调用前执行,用于预处理
- after_model: 模型调用后执行,用于后处理和状态统计
5.2.1 状态统计中间件
# =============================================================================
# 功能说明:使用before_model和after_model中间件实现状态统计
# 使用场景:
> - 统计工具调用次数
> - 统计模型调用次数
> - 实现限流逻辑
# 版本说明:LangChain 1.0+ 中间件API
# 注意事项:
> 1. 中间件函数必须返回dict(要更新的状态)
> 2. before_model在模型调用前执行
> 3. after_model在模型调用后执行
# =============================================================================
from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import before_model, after_model
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import ToolRuntime
from langgraph.runtime import Runtime
from init_llm import deepseek_llm
@tool
def get_weather(city: str) -> str:
"""根据城市名称获取天气信息"""
return f"{city}天气晴朗"
# =============================================================================
# 模型调用前中间件
# 说明:统计已执行的工具调用次数
# =============================================================================
@before_model
def before_model(state: AgentState, runtime: Runtime) -> dict:
"""
模型调用前中间件。
该中间件在每次模型调用前执行,统计工具调用次数。
Args:
state: 当前Agent状态,包含所有messages
runtime: 运行时对象
Returns:
dict: 要更新的状态字段
"""
print("before_model state:", state)
# =================================================================
# 统计工具调用次数
# 说明:遍历messages,统计ToolMessage数量
# =================================================================
tool_call_count = len([msg for msg in state["messages"] if isinstance(msg, ToolMessage)])
return {"tool_call_count": tool_call_count}
# =============================================================================
# 模型调用后中间件
# 说明:统计模型调用次数
# =============================================================================
@after_model
def after_model(state: AgentState, runtime: Runtime) -> dict:
"""
模型调用后中间件。
该中间件在每次模型调用后执行,统计模型调用次数。
Args:
state: 当前Agent状态
runtime: 运行时对象
Returns:
dict: 要更新的状态字段
"""
print("after_model state:", state)
# 获取当前调用次数(初始为0)
model_call_count = state.get("model_call_count", 0)
# 增加计数
model_call_count += 1
return {"model_call_count": model_call_count}
# =============================================================================
# 自定义状态
# =============================================================================
class CustomState(AgentState):
tool_call_count: int # 工具调用计数
model_call_count: int # 模型调用计数
# =============================================================================
# 创建Agent
# =============================================================================
agent = create_agent(
model=deepseek_llm,
tools=[get_weather],
middleware=[before_model, after_model], # 注册中间件
checkpointer=InMemorySaver(),
state_schema=CustomState
)
config = {"configurable": {"thread_id": "session001"}}
# =============================================================================
# 第一轮对话
# =============================================================================
resp1 = agent.invoke({"messages": [{"role": "user", "content": "你好,我叫张三"}]}, config=config)
print(resp1["messages"][-1].content)
# =============================================================================
# 第二轮对话(会调用工具)
# =============================================================================
resp2 = agent.invoke({"messages": [{"role": "user", "content": "北京天气如何?"}]}, config=config)
print(resp2["messages"][-1].content)
# =============================================================================
# 查看状态统计
# =============================================================================
state = agent.get_state(config=config)
print(state)
5.3 长期记忆(Long Memory)
长期记忆机制说明
┌─────────────────────────────────────────────────────────────────┐ │ 长期记忆(Store)架构 │ └─────────────────────────────────────────────────────────────────┘ ┌─────────┐ ┌─────────────┐ ┌─────────────────┐ │ Store │────▶│ Namespace │────▶│ Items │ └─────────┘ └─────────────┘ └─────────────────┘ │ ┌──────┴──────┐ │ ("user",) │ │ ("products",)│ └─────────────┘ Namespace: 数据分类的维度,支持多层嵌套 Items: 实际存储的数据,格式为Key-Value
5.3.1 基础长期记忆
# =============================================================================
# 功能说明:使用InMemoryStore实现基础的长期记忆
# 使用场景:
> - 存储用户偏好
> - 跨会话数据共享
> - 实现RAG风格的记忆检索
# 版本说明:LangGraph Store API
# 注意事项:
> 1. Store是持久化的,可以在不同会话间共享
> 2. 数据按Namespace组织
> 3. 支持按Key查询和按Namespace搜索
# =============================================================================
from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.memory import InMemoryStore
# =============================================================================
# 创建内存存储
# 说明:InMemoryStore是Store的内存实现
# =============================================================================
store = InMemoryStore()
# =============================================================================
# 定义命名空间
# 说明:Namespace用于数据分类,类似文件目录
# 可以是多层嵌套的元组
# =============================================================================
namespace = ("user1", "preferences")
# =============================================================================
# 写入数据
# 方法:store.put(namespace, key, value)
# 说明:
> - namespace: 数据所属的命名空间(必须是元组)
> - key: 数据的唯一标识符
> - value: 要存储的数据(必须是可序列化的)
# =============================================================================
store.put(
namespace, # 命名空间
"city", # Key
{"like": ["北京", "上海"], "dislike": ["广州"]} # Value
)
store.put(
namespace,
"fruit",
{"like": ["苹果", "香蕉"], "dislike": ["橙子"]}
)
# =============================================================================
# 读取数据
# 方法:store.get(namespace, key)
# 返回:包含key、value等属性的对象
# =============================================================================
memory = store.get(namespace, "city")
print("memory:", memory)
print("key:", memory.key)
print("value:", memory.value)
# =============================================================================
# 搜索数据
# 方法:store.search(namespace)
# 说明:返回命名空间下所有数据
# =============================================================================
all_memory = store.search(namespace)
for item in all_memory:
print("key:", item.key)
print("value:", item.value)
5.3.2 Agent中使用长期记忆
# =============================================================================
# 功能说明:在Agent中集成长期记忆
# 使用场景:
> - 需要跨会话记住用户信息的场景
> - 实现个性化推荐
> - 构建用户画像
# 版本说明:LangGraph Store API
# 注意事项:
> 1. 需要同时配置checkpointer和store
> 2. 工具可以通过runtime参数访问store
> 3. context_schema用于传递用户标识
# =============================================================================
from langchain.agents import create_agent
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import ToolRuntime
from langgraph.store.memory import InMemoryStore
from pydantic import BaseModel
from init_llm import deepseek_llm
# =============================================================================
# 初始化存储
# =============================================================================
checkpointer = InMemorySaver()
store = InMemoryStore()
# =============================================================================
# 预存用户信息
# 说明:在Agent运行前预先存储数据
# =============================================================================
store.put(
("users",), # 命名空间
"user_123", # Key = 用户ID
{"name": "张三", "age": 28, "city": "北京", "hobby": "编程、阅读"}
)
store.put(
("users",),
"user_456",
{"name": "李四", "age": 32, "city": "上海", "hobby": "旅游、摄影"}
)
# =============================================================================
# 工具:查询用户信息
# 说明:工具内部从Store中读取数据
# =============================================================================
@tool
def get_user_info(runtime: ToolRuntime) -> str:
"""
从长期记忆中查询用户信息。
Args:
runtime: 运行时对象,包含context
Returns:
用户信息字符串
"""
print("runtime:", runtime)
# =================================================================
# 从context中获取用户ID
# 说明:context在调用invoke时传入
# =================================================================
user_id = runtime.context.user_id
# =================================================================
# 从Store中查询数据
# 说明:使用与预存相同的命名空间和Key
# =================================================================
user_info_item = store.get(("users",), user_id)
if user_info_item:
value = user_info_item.value
return f"用户id:{user_id},用户姓名:{value['name']},用户age:{value['age']}"
else:
return "用户不存在"
# =============================================================================
# Context Schema
# 说明:定义传递给Agent的上下文参数
# =============================================================================
class UserContext(BaseModel):
user_id: str # 用户ID,用于查询长期记忆
# =============================================================================
# 创建Agent
# =============================================================================
agent = create_agent(
model=deepseek_llm,
tools=[get_user_info],
checkpointer=checkpointer,
store=store, # 传入Store
system_prompt="每次查询用户信息时,都要调用工具get_user_info",
context_schema=UserContext # 传入Context Schema
)
config = {"configurable": {"thread_id": "session001"}}
# =============================================================================
# 测试:查询用户123
# =============================================================================
resp1 = agent.invoke(
{"messages": [{"role": "user", "content": "你知道我的信息吗?"}]},
config=config,
context=UserContext(user_id="user_123") # 传入用户ID
)
print(resp1["messages"][-1].content)
# =============================================================================
# 测试:查询用户456
# =============================================================================
resp2 = agent.invoke(
{"messages": [{"role": "user", "content": "你知道我的信息吗?"}]},
config=config,
context=UserContext(user_id="user_456")
)
print(resp2["messages"][-1].content)
5.3.3 在工具中修改长期记忆
# =============================================================================
# 功能说明:在工具中修改长期记忆
# 使用场景:
> - 保存用户偏好
> - 更新用户画像
> - 记录用户交互历史
# 版本说明:LangGraph Store API
# 注意事项:
> 1. 使用runtime.store访问Store
> 2. 数据会自动持久化(内存或MySQL)
> 3. 可以在不同会话间共享
# =============================================================================
from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.memory import InMemoryStore
from langchain.tools import tool, ToolRuntime
from pydantic import BaseModel, Field
from typing import Literal
from init_llm import deepseek_llm
import uuid
# =============================================================================
# Context定义
# =============================================================================
class UserContext(BaseModel):
user_id: str
# =============================================================================
# Preference参数Schema
# =============================================================================
class UserPreference(BaseModel):
category: Literal["color", "food", "music"] = Field(
description="用户偏好类别"
)
preference: str = Field(description="具体偏好内容")
# =============================================================================
# 初始化存储
# =============================================================================
store = InMemoryStore()
checkpointer = InMemorySaver()
# =============================================================================
# 工具1:保存用户偏好
# =============================================================================
@tool(args_schema=UserPreference)
def save_user_preference(category: str, preference: str, runtime: ToolRuntime) -> str:
"""
将用户偏好保存到长期记忆中。
Args:
category: 偏好类别(color/food/music)
preference: 具体偏好内容
runtime: 运行时对象
Returns:
保存成功的确认消息
"""
user_id = runtime.context.user_id
# =================================================================
# 创建命名空间
# 说明:使用(user_id, "preferences")格式
# 便于按用户隔离数据
# =================================================================
namespace = (user_id, "preferences")
# =================================================================
# 生成唯一ID
# 说明:使用UUID确保每次保存都是新记录
# 如需覆盖,可使用固定的Key
# =================================================================
memory_id = str(uuid.uuid4())
# =================================================================
# 保存到Store
# 说明:runtime.store是Agent的Store实例
# =================================================================
runtime.store.put(namespace, memory_id, {
"category": category,
"preference": preference,
})
return f"已成功保存你的{category}偏好:{preference}"
# =============================================================================
# 工具2:获取用户偏好
# =============================================================================
@tool
def get_user_preferences(runtime: ToolRuntime) -> str:
"""
从长期记忆中获取用户所有偏好。
Args:
runtime: 运行时对象
Returns:
用户偏好列表
"""
user_id = runtime.context.user_id
namespace = (user_id, "preferences")
# =================================================================
# 搜索命名空间下的所有数据
# =================================================================
memories = runtime.store.search(namespace)
if not memories:
return f"您还没有保存过偏好"
preferences_list = []
for mem in memories:
pref = mem.value
preferences_list.append(f"- 种类:{pref['category']},偏好:{pref['preference']}")
return f"你的偏好有:\n" + "\n".join(preferences_list)
# =============================================================================
# 创建Agent
# =============================================================================
memory_agent = create_agent(
model=deepseek_llm,
tools=[save_user_preference, get_user_preferences],
checkpointer=checkpointer,
store=store,
context_schema=UserContext
)
# =============================================================================
# 第一轮:保存颜色偏好
# =============================================================================
result1 = memory_agent.invoke(
{"messages": [{"role": "user", "content": "请记住我喜欢的颜色是蓝色"}]},
config={"configurable": {"thread_id": "thread1"}},
context=UserContext(user_id="current_user")
)
print(f"Agent回复: {result1['messages'][-1].content}")
# =============================================================================
# 第二轮:保存食物偏好
# =============================================================================
result2 = memory_agent.invoke(
{"messages": [{"role": "user", "content": "我还喜欢的食物是意大利面"}]},
config={"configurable": {"thread_id": "thread1"}},
context=UserContext(user_id="current_user")
)
print(f"Agent回复: {result2['messages'][-1].content}")
# =============================================================================
# 第三轮:在新会话中查询所有偏好
# 说明:跨会话访问长期记忆
# =============================================================================
result3 = memory_agent.invoke(
{"messages": [{"role": "user", "content": "告诉我我都喜欢什么颜色和食物"}]},
config={"configurable": {"thread_id": "thread2"}}, # 新会话
context=UserContext(user_id="current_user")
)
print(f"Agent回复: {result3['messages'][-1].content}")
5.4 短长期记忆结合实战
架构设计说明
┌─────────────────────────────────────────────────────────────────┐ │ 完整电商客服系统架构 │ └─────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────┐ │ Agent │ ├─────────────────────────────────────────────────────────────────┤ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ 短期记忆 │ │ 长期记忆 │ │ 上下文 │ │ 摘要 │ │ │ │(Checkptr)│ │ (Store) │ │(Context) │ │(Middleware)│ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ └─────────────────────────────────────────────────────────────────┘ │ ┌──────────────────────┼──────────────────────┐ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ 订单查询 │ │ 偏好更新 │ │ 商品推荐 │ │ (Tool) │ │ (Tool) │ │ (Tool) │ └──────────┘ └──────────┘ └──────────┘ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ MySQL DB │ │ MySQL DB │ │ 推荐算法 │ └──────────┘ └──────────┘ └──────────┘
# =============================================================================
# 功能说明:完整的电商客服示例,结合短期记忆、长期记忆、上下文和消息摘要
# 使用场景:
> - 企业级客服系统
> - 需要个性化服务的电商平台
> - 复杂的多轮对话场景
# 版本说明:LangChain 1.0+
# 注意事项:
> 1. 需要MySQL数据库支持
> 2. SummarizationMiddleware用于长对话摘要
> 3. 建议结合生产环境进行优化
# =============================================================================
import uuid
import warnings
from typing import Optional
from pydantic import BaseModel, Field
from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import SummarizationMiddleware, wrap_tool_call
from langchain_core.tools import tool
from langchain_core.messages import ToolMessage
from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver
from langgraph.store.mysql.pymysql import PyMySQLStore
from langgraph.prebuilt import ToolRuntime
from langgraph.types import Command
from init_llm import deepseek_llm
warnings.filterwarnings("ignore", category=UserWarning, module="pydantic.main")
# =============================================================================
# Context定义
# 说明:定义传递给Agent的用户上下文
# =============================================================================
class UserContext(BaseModel):
"""
用户上下文。
Attributes:
user_id: 用户的唯一标识符
channel: 用户咨询渠道,如APP、Web、小程序
"""
user_id: str = Field(description="用户的唯一标识符")
channel: str = Field(description="用户咨询渠道,如: APP, Web, 小程序")
# =============================================================================
# 自定义状态
# 说明:用于会话级别的状态管理
# =============================================================================
class CustomerSessionState(AgentState):
"""
客服会话状态。
Attributes:
current_order_id: 当前正在查询的订单号
"""
current_order_id: str # 当前正在查询的订单号
# =============================================================================
# 模拟数据
# 说明:实际应用中应连接真实数据库
# =============================================================================
MOCK_DATABASE = {
"orders": {
"order001": {"order_id": "order001", "status": "已发货", "product": "智能手机"},
"order002": {"order_id": "order002", "status": "待支付", "product": "智能手表"},
}
}
# =============================================================================
# 工具1:查询订单状态
# =============================================================================
@tool
def query_order_status(order_id: str, runtime: ToolRuntime) -> Command:
"""
查询用户订单状态。
Args:
order_id: 订单ID
runtime: 运行时对象
Returns:
Command对象,更新状态和返回消息
"""
order_info = MOCK_DATABASE["orders"].get(order_id)
if not order_info:
return Command(update={
"messages": [ToolMessage(
content=f"错误:订单 [{order_id}] 不存在",
tool_call_id=runtime.tool_call_id
)]
})
# =================================================================
# 更新当前订单ID到状态中
# 说明:便于后续工具获取
# =================================================================
return Command(update={
"current_order_id": order_id,
"messages": [ToolMessage(
content=f"订单 [{order_id}] 状态: {order_info['status']}, 商品: {order_info['product']}",
tool_call_id=runtime.tool_call_id
)]
})
# =============================================================================
# 工具2:更新用户偏好
# =============================================================================
@tool
def update_user_preference(category: str, liked_item: str, runtime: ToolRuntime) -> str:
"""
更新用户长期偏好。
Args:
category: 偏好类别
liked_item: 喜欢的内容
runtime: 运行时对象
Returns:
操作结果消息
"""
user_id = runtime.context.user_id
# =================================================================
# 构建命名空间
# 格式:user_{user_id}/preferences
# =================================================================
namespace = (f"user_{user_id}", "preferences")
# 生成唯一ID
key = str(uuid.uuid4())
# 保存到长期记忆
runtime.store.put(namespace, key, {"category": category, "liked_item": liked_item})
return f"已成功将您的偏好记录到长期记忆: 喜欢 {category} 类的 {liked_item}。"
# =============================================================================
# 工具3:获取推荐
# =============================================================================
@tool
def get_recommendation(runtime: ToolRuntime) -> str:
"""
获取用户推荐商品。
综合考虑用户当前订单和历史偏好,生成个性化推荐。
Args:
runtime: 运行时对象
Returns:
推荐结果字符串
"""
user_id = runtime.context.user_id
current_order = runtime.state.get("current_order_id", "未知订单")
namespace = (f"user_{user_id}", "preferences")
# 查询用户偏好
prefs = runtime.store.search(namespace)
pref_list = [f"{p.value.get('category')}({p.value.get('liked_item')})" for p in prefs[-3:]]
return f"基于用户当前的订单 [{current_order}] 和长期偏好 {pref_list if pref_list else '无'},为用户推荐相关配件。"
# =============================================================================
# 错误处理中间件
# =============================================================================
@wrap_tool_call
def handle_tool_errors(request, handler):
"""
工具错误处理中间件。
捕获工具执行中的所有异常,返回友好的错误消息。
"""
try:
return handler(request)
except Exception as e:
return ToolMessage(
content=f"调用工具错误:请稍后重试,错误信息:({str(e)})",
tool_call_id=request.tool_call["id"]
)
# =============================================================================
# 数据库连接配置
# =============================================================================
DB_URI = "mysql+pymysql://root:123456@localhost:3306/langchain_db?charset=utf8mb4"
# =============================================================================
# 创建Agent
# =============================================================================
with (
PyMySQLSaver.from_conn_string(DB_URI) as checkpointer,
PyMySQLStore.from_conn_string(DB_URI) as store
):
# =================================================================
# 初始化数据库表
# =================================================================
checkpointer.setup()
store.setup()
# =================================================================
# 创建Agent
# 说明:组合多种中间件和功能
# =================================================================
agent = create_agent(
model=deepseek_llm,
tools=[query_order_status, update_user_preference, get_recommendation],
system_prompt="""你是智能电商客服助手,具备回答用户咨询、查询订单状态、更新用户偏好和推荐商品功能。""",
checkpointer=checkpointer,
store=store,
state_schema=CustomerSessionState,
context_schema=UserContext,
middleware=[
# =================================================================
# 消息摘要中间件
# 说明:当消息数达到阈值时,自动生成摘要
# 参数说明:
# - trigger: 触发条件(messages, 10)= 10条消息时触发
# - keep: 摘要后保留的消息数(messages, 5)= 保留最后5条
# =================================================================
SummarizationMiddleware(
model=deepseek_llm,
summary_prompt="请总结以下对话内容:{messages}",
trigger=("messages", 10),
keep=("messages", 5),
),
handle_tool_errors
],
)
# =================================================================
# 交互循环
# =================================================================
user_context = UserContext(user_id="customer_001", channel="Web")
config = {"configurable": {"thread_id": "session_02"}}
print("智能电商客服助手 - 输入 'quit' 退出")
while True:
user_input = input("[你]: ").strip()
if user_input.lower() in ['quit', 'exit', '退出']:
print("感谢你的咨询,再见!")
break
if not user_input:
continue
print("[客服助手]: ")
# =================================================================
# 流式输出
# =================================================================
for chunk in agent.stream(
{"messages": {"role": "user", "content": user_input}},
config=config,
context=user_context
):
for step, data in chunk.items():
if step in ["model", "tools"]:
data["messages"][-1].pretty_print()
附录:环境配置与最佳实践
A.1 安装依赖
# =============================================================================
# 功能说明:完整的环境依赖安装命令
# 说明:分门别类地安装,便于理解和管理
# 版本:推荐使用最新稳定版本
# =============================================================================
# =============================================================================
# 基础依赖
# 说明:LangChain核心框架
# =============================================================================
pip install langchain>=1.0.0 # 核心框架
pip install langchain-core # 核心接口和类型
pip install langchain-community # 第三方集成
pip install python-dotenv # 环境变量管理
# =============================================================================
# 数据库支持
# 说明:用于Checkpoint和Store的持久化
# =============================================================================
pip install pymysql # MySQL数据库驱动
# =============================================================================
# 开发工具
# 说明:用于代码质量和测试
# =============================================================================
pip install mypy>=1.11.1 # 类型检查
pip install ruff>=0.6.1 # 代码格式化/检查
pip install pytest>=8.3.5 # 单元测试框架
A.2 数据库初始化
-- =============================================================================
-- 功能说明:MySQL数据库初始化脚本
-- 说明:创建LangChain所需的数据库和用户
-- =============================================================================
-- 创建数据库(指定字符集为utf8mb4,支持中文)
CREATE DATABASE langchain_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
-- 创建专用用户(生产环境推荐)
-- 注意:生产环境应使用强密码
CREATE USER 'langchain'@'%' IDENTIFIED BY 'your_password';
-- 授予权限
GRANT ALL PRIVILEGES ON langchain_db.* TO 'langchain'@'%';
-- 刷新权限使更改生效
FLUSH PRIVILEGES;
A.3 项目结构推荐
项目结构说明
your_project/ # 项目根目录 ├── src/ # 源代码目录 │ └── agent/ # Agent相关代码 │ ├── __init__.py │ └── graph.py # Agent图定义 ├── tests/ # 测试目录 │ ├── unit_tests/ # 单元测试 │ └── integration_tests/ # 集成测试 ├── .env.example # 环境变量示例 ├── pyproject.toml # 项目配置 └── uv.lock # 依赖锁定文件
A.4 最佳实践
企业级开发规范
┌─────────────────────────────────────────────────────────────────┐ │ 开发规范清单 │ ├─────────────────────────────────────────────────────────────────┤ │ 1. 环境变量管理 │ │ ✓ 使用.env文件管理API密钥 │ │ ✓ 不硬编码敏感信息 │ │ ✓ 生产环境使用密钥管理服务 │ │ │ │ 2. 错误处理 │ │ ✓ 工具添加try-catch │ │ ✓ 使用中间件统一处理错误 │ │ ✓ 返回友好的错误消息 │ │ │ │ 3. 记忆管理 │ │ ✓ 短期记忆用Checkpoint │ │ ✓ 长期记忆用Store │ │ ✓ 定期清理无用数据 │ │ │ │ 4. 状态管理 │ │ ✓ 使用TypeDict定义状态 │ │ ✓ 状态更新使用Command对象 │ │ ✓ 避免状态泄漏 │ │ │ │ 5. 中间件使用 │ │ ✓ 按顺序注册中间件 │ │ ✓ 避免中间件冲突 │ │ ✓ 中间件应保持轻量 │ └─────────────────────────────────────────────────────────────────┘
A.5 常见问题排查
问题诊断指南
┌─────────────────────────────────────────────────────────────────┐ │ 常见问题与解决方案 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 问题1: API Key无效 │ │ ├─ 症状:认证失败错误 │ │ ├─ 检查: │ │ │ ✓ .env文件是否正确配置 │ │ │ ✓ load_dotenv()是否被调用 │ │ │ ✓ API密钥是否正确 │ │ └─ 解决:重新生成API密钥 │ │ │ │ 问题2: 工具未调用 │ │ ├─ 症状:Agent忽略工具,直接回复 │ │ ├─ 检查: │ │ │ ✓ 工具description是否清晰 │ │ │ ✓ 参数Schema是否正确定义 │ │ │ ✓ 系统Prompt是否提及工具功能 │ │ └─ 解决:优化工具描述和Prompt │ │ │ │ 问题3: 记忆丢失 │ │ ├─ 症状:Agent不记得之前的对话 │ │ ├─ 检查: │ │ │ ✓ Checkpointer是否配置 │ │ │ ✓ thread_id是否正确传入 │ │ │ ✓ 内存检查点是否重启后清空 │ │ └─ 解决:使用持久化检查点(MySQL) │ │ │ │ 问题4: 状态不同步 │ │ ├─ 症状:状态更新后Agent行为异常 │ │ ├─ 检查: │ │ │ ✓ 中间件状态更新是否正确 │ │ │ ✓ Command对象是否正确使用 │ │ │ ✓ 状态类型是否匹配 │ │ └─ 解决:调试状态更新逻辑 │ │ │ └─────────────────────────────────────────────────────────────────┘
- API Key无效:检查
.env文件配置和环境变量加载 - 工具未调用:检查工具描述是否清晰,参数Schema是否正确定义
- 记忆丢失:检查Checkpointer配置和thread_id是否正确
- 状态不同步:检查中间件中的状态更新逻辑
更多推荐


所有评论(0)