新版LangChain与LangGraph智能体开发完整教程(详细注释版)

目录


第一章:大模型调用基础

章节概述
本章介绍如何使用新版LangChain的统一接口初始化和调用各种大语言模型。通过init_chat_model函数,可以轻松切换不同的模型提供商,无需修改业务代码。

1.1 环境配置

配置说明
环境配置是使用LangChain的第一步。建议使用.env文件管理敏感信息(如API密钥),避免硬编码到源代码中。

1.1.1 安装依赖

依赖说明

  • langchain: 核心框架,提供Agent、Tool等高级抽象
  • langchain-core: 核心接口和类型定义
  • langchain-community: 第三方集成(各种模型提供商适配器)
  • python-dotenv: 环境变量加载库
# =============================================================================
# 功能说明:安装LangChain开发所需的Python依赖包
# 使用场景:项目初始化时一次性安装,或在CI/CD环境中构建时安装
# 版本说明:推荐使用最新版本,LangChain 1.0+已稳定
# 注意事项:
#   1. 建议使用虚拟环境隔离项目依赖
#   2. 可使用 uv/pip/poetry 等包管理器
#   3. GPU环境可能需要额外安装 CUDA 相关驱动
# =============================================================================

pip install langchain langchain-core langchain-community python-dotenv

1.1.2 创建环境配置文件

安全建议

  • .env文件应加入.gitignore,避免提交到版本控制系统
  • 生产环境可使用环境变量或密钥管理服务(如AWS Secrets Manager)
  • 不同环境(开发/测试/生产)应使用不同的API密钥
# =============================================================================
# 功能说明:创建环境配置文件,存储各模型提供商的API密钥
# 使用场景:
#   - 开发环境:存储测试用API密钥
#   - 测试环境:使用限流后的测试密钥
#   - 生产环境:使用正式环境密钥(建议从密钥服务读取)
# 版本说明:通用配置方式,适用于所有LangChain版本
# 注意事项:
#   1. 绝对不要将真实API密钥提交到Git仓库
#   2. 密钥应定期轮换
#   3. 不同模型的API端点可能不同,需仔细核对
# =============================================================================

# .env文件
DEEPSEEK_API_KEY=your_api_key                    # DeepSeek模型访问密钥
DEEPSEEK_BASE_URL=https://api.deepseek.com       # DeepSeek API地址
OPENAI_API_KEY=your_api_key                      # OpenAI模型访问密钥
OPENAI_BASE_URL=https://api.openai.com/v1        # OpenAI API地址
ANTHROPIC_API_KEY=your_api_key                   # Anthropic(Claude)访问密钥
ANTHROPIC_BASE_URL=https://api.anthropic.com     # Anthropic API地址
DASHSCOPE_API_KEY=your_api_key                   # 阿里云通义千问访问密钥
DASHSCOPE_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1  # 通义千问API地址
ZHIPUAI_API_KEY=your_api_key                     # 智谱AI访问密钥
ZHIPUAI_BASE_URL=https://open.bigmodel.cn/api/paas/v4  # 智谱AI API地址

1.1.3 加载环境变量

# =============================================================================
# 功能说明:从.env文件加载环境变量,统一管理各模型API配置
# 使用场景:
#   - 应用启动时加载配置
#   - 多环境配置切换(dev/staging/prod)
# 版本说明:通用配置加载方式
# 注意事项:
#   1. load_dotenv(override=True) 会覆盖已存在的环境变量
#   2. 建议在应用入口尽早调用
#   3. 生产环境可结合环境变量使用
# =============================================================================

import os
from dotenv import load_dotenv

# =============================================================================
# 函数:load_dotenv
# 功能:从当前目录或父目录加载.env文件中的环境变量
# 参数:
#   - override (bool): 是否覆盖已存在的环境变量,默认False
#                      True用于开发环境,False用于避免意外覆盖系统变量
# 返回值:无
# =============================================================================

# 从.env文件加载环境变量
load_dotenv(override=True)

# =============================================================================
# 配置变量声明
# 说明:统一从环境变量读取API密钥和端点,支持灵活配置
# 命名规范:
#   - 使用全大写+下划线命名(符合Python常量惯例)
#   - 变量名应清晰表达其用途
# =============================================================================

# 加载各模型API配置
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")           # DeepSeek API密钥
DEEPSEEK_BASE_URL = os.getenv("DEEPSEEK_BASE_URL")        # DeepSeek API端点
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")              # OpenAI API密钥
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL")            # OpenAI API端点
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")        # Anthropic API密钥
ANTHROPIC_BASE_URL = os.getenv("ANTHROPIC_BASE_URL")      # Anthropic API端点
DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY")        # 阿里云通义千问密钥
DASHSCOPE_BASE_URL = os.getenv("DASHSCOPE_BASE_URL")      # 通义千问API端点
ZHIPUAI_API_KEY = os.getenv("ZHIPUAI_API_KEY")            # 智谱AI密钥
ZHIPUAI_BASE_URL = os.getenv("ZHIPUAI_BASE_URL")          # 智谱AI API端点

1.2 模型初始化

架构说明

┌─────────────────────────────────────────────────────────────────┐
│                     init_chat_model 统一入口                     │
└─────────────────────────────────────────────────────────────────┘
                             │
         ┌───────────────────┼───────────────────┐
         ▼                   ▼                   ▼
   ┌──────────┐       ┌──────────┐       ┌──────────┐
   │ DeepSeek │       │  OpenAI  │       │ Anthropic│
   └──────────┘       └──────────┘       └──────────┘

通过统一接口init_chat_model,可以无缝切换不同的模型提供商。

版本差异说明

  • 新版 (LangChain >= 1.0): 使用init_chat_model统一接口
  • 旧版 (< 1.0): 需要使用ChatOpenAI, ChatAnthropic, ChatDeepSeek等独立类
  • 迁移建议: 新项目建议使用init_chat_model,旧代码可逐步迁移

1.2.1 使用init_chat_model初始化模型

# =============================================================================
# 功能说明:使用新版LangChain的统一接口初始化各种大语言模型
# 使用场景:
#   - 需要在多个模型间灵活切换的项目
#   - 追求代码一致性和可维护性的企业项目
#   - 需要根据负载动态选择模型的场景
# 版本说明:LangChain 1.0+ 推荐用法
# 注意事项:
#   1. api_key可传入None(对于本地模型如Ollama)
#   2. base_url需包含完整的API路径
#   3. 建议为每个模型创建独立实例,便于管理和监控
# =============================================================================

from langchain.chat_models import init_chat_model
from langchain_core.language_models import BaseChatModel

# =============================================================================
# 模型实例化函数:init_chat_model
# 功能:统一的模型初始化接口,支持多种模型提供商
# 参数说明:
#   - model (str): 模型名称,如"deepseek-chat"、"gpt-4"、"claude-3-5-haiku"
#   - model_provider (str): 模型提供商标识,如"deepseek"、"openai"、"anthropic"
#   - api_key (str, optional): API访问密钥,本地模型可省略
#   - base_url (str, optional): API端点URL
#   - temperature (float, optional): 生成温度,控制随机性,0-2之间
#   - max_tokens (int, optional): 最大生成token数
# 返回值:BaseChatModel实例
# =============================================================================

# =============================================================================
# 模型:DeepSeek Chat
# 提供商:DeepSeek(中国大模型厂商)
# 特点:性价比高,支持长上下文,中文支持好
# 适用场景:需要中文能力强、成本敏感的应用
# =============================================================================
deepseek_llm: BaseChatModel = init_chat_model(
    model="deepseek-chat",                    # 模型标识符
    model_provider="deepseek",                # 提供商标识
    api_key=DEEPSEEK_API_KEY,                # API密钥(从环境变量读取)
    base_url=DEEPSEEK_BASE_URL,               # API端点
)

# =============================================================================
# 模型:OpenAI GPT-4
# 提供商:OpenAI
# 特点:性能最强,生态完善,但成本较高
# 适用场景:对质量要求极高、预算充足的应用
# =============================================================================
openai_llm = init_chat_model(
    model="gpt-4",                            # GPT-4模型
    model_provider="openai",
    api_key=OPENAI_API_KEY,
    base_url=OPENAI_BASE_URL,
)

# =============================================================================
# 模型:Claude 3.5 Haiku
# 提供商:Anthropic
# 特点:速度快、成本效益高,支持超长上下文
# 适用场景:需要快速响应、平衡成本和性能的应用
# =============================================================================
anthropic_llm = init_chat_model(
    model="claude-3-5-haiku-20241022",       # 使用日期版本确保稳定性
    model_provider="anthropic",
    api_key=ANTHROPIC_API_KEY,
    base_url=ANTHROPIC_BASE_URL,
)

# =============================================================================
# 模型:Ollama 本地模型
# 提供商:Ollama(本地部署框架)
# 特点:完全本地运行,无需API密钥,支持多种开源模型
# 适用场景:
#   - 隐私敏感场景(数据不出境)
#   - 开发测试环境
#   - 离线环境
# =============================================================================
ollama_llm = init_chat_model(
    model="deepseek-r1:1.5b",                 # Ollama模型名称格式
    model_provider="ollama",
    base_url="http://localhost:11434",        # Ollama默认端口
    # 注意:Ollama本地模型不需要api_key
)

# =============================================================================
# 模型:通义千问 Qwen Plus
# 提供商:阿里云(通过DashScope)
# 特点:中文能力强,支持函数调用,成本适中
# 适用场景:国内生产环境,中文为主的应用
# =============================================================================
tongyi_llm = init_chat_model(
    model="qwen-plus",                        # 通义千问plus版本
    model_provider="openai",                 # 兼容OpenAI格式
    api_key=DASHSCOPE_API_KEY,
    base_url=DASHSCOPE_BASE_URL,
)

# =============================================================================
# 模型:智谱 GLM-4
# 提供商:智谱AI
# 特点:国产大模型,中文理解能力强
# 适用场景:国内应用,需要国产化替代的项目
# =============================================================================
zhipu_llm = init_chat_model(
    model="glm-4",
    model_provider="openai",                 # 兼容OpenAI格式
    api_key=ZHIPUAI_API_KEY,
    base_url=ZHIPUAI_BASE_URL,
)

1.2.2 统一初始化文件

架构设计说明
创建独立的init_llm.py模块,统一管理所有模型实例有以下好处:

  1. 单一职责:模型配置集中在专门模块
  2. 易于维护:修改模型配置只需改一处
  3. 按需加载:外部代码只需导入需要的模型
  4. 测试友好:便于mock和单元测试
"""
统一初始化所有支持的聊天模型

该模块提供项目级的模型实例管理,所有需要使用LLM的地方都应从
这里导入对应的模型实例,而不是重复初始化。

模块结构:
    - 模型实例定义(全局常量)
    - 配置参数说明
    - 使用示例

使用方式:
    from init_llm import deepseek_llm
    
    response = deepseek_llm.invoke("你好")

版本说明:
    LangChain 1.0+ 推荐使用 init_chat_model
    旧版本请使用各提供商的独立类(如 ChatOpenAI)
"""
from langchain.chat_models import init_chat_model
from langchain_core.language_models import BaseChatModel
from env_utils import *

# =============================================================================
# 模型实例化区域
# 说明:
#   - 每个模型都是全局单例,导入即创建
#   - 模型配置统一在此管理
#   - 可根据环境变量动态选择默认模型
# =============================================================================

# DeepSeek 模型实例
# 用途:主要使用的对话模型,适合中文场景
# 配置说明:
#   - temperature=0.7:平衡创意和稳定性
#   - 如需更确定性结果,可设为0.1-0.3
deepseek_llm: BaseChatModel = init_chat_model(
    model="deepseek-chat",
    model_provider="deepseek",
    api_key=DEEPSEEK_API_KEY,
    base_url=DEEPSEEK_BASE_URL,
)

# OpenAI 模型实例
# 用途:需要最高质量输出的场景
# 成本提示:GPT-4成本较高,建议用于复杂推理任务
openai_llm = init_chat_model(
    model="gpt-4",
    model_provider="openai",
    api_key=OPENAI_API_KEY,
    base_url=OPENAI_BASE_URL,
)

# Anthropic Claude 模型实例
# 用途:需要长上下文理解的任务
# 优势:上下文窗口大(200K tokens),适合长文档分析
anthropic_llm = init_chat_model(
    model="claude-3-5-haiku-20241022",
    model_provider="anthropic",
    api_key=ANTHROPIC_API_KEY,
    base_url=ANTHROPIC_BASE_URL,
)

# Ollama 本地模型实例
# 用途:开发测试、隐私敏感场景
# 注意:需先启动Ollama服务(ollama serve)
ollama_llm = init_chat_model(
    model="deepseek-r1:1.5b",
    model_provider="ollama",
    base_url="http://localhost:11434",
)

# 通义千问模型实例
# 用途:国内生产环境,中文对话为主
# 优势:中文语境理解好,价格相对实惠
tongyi_llm = init_chat_model(
    model="qwen-plus",
    model_provider="openai",
    api_key=DASHSCOPE_API_KEY,
    base_url=DASHSCOPE_BASE_URL,
)

# 智谱AI模型实例
# 用途:需要使用国产模型的项目
# 特点:GLM系列模型,开源程度较高
zhipu_llm = init_chat_model(
    model="glm-4",
    model_provider="openai",
    api_key=ZHIPUAI_API_KEY,
    base_url=ZHIPUAI_BASE_URL,
)

1.3 模型调用方式

调用方式对比

方式 方法 适用场景 特点
同步 invoke 简单脚本、测试 简单直接,阻塞等待
流式 stream 聊天界面、实时展示 逐字输出,用户体验好
批量 batch 离线批处理 并行处理,吞吐量高
异步 ainvoke/abatch Web服务、高并发 非阻塞,资源利用率高

1.3.1 Invoke调用(同步)

使用场景说明

  • 同步调用是最基础的调用方式
  • 适合:CLI工具、简单脚本、单次请求
  • 不适合:需要高并发的Web服务
单条消息调用
# =============================================================================
# 功能说明:使用invoke方法进行最简单的单条消息对话
# 使用场景:
#   - 测试模型响应
#   - 简单的命令行对话
#   - 单次问答任务
# 版本说明:通用方法,所有LangChain版本支持
# 注意事项:
#   1. invoke是同步阻塞方法,会等待完整响应后返回
#   2. 返回的是AIMessage对象,包含丰富的元数据
#   3. 长时间请求可能触发超时,需要配置timeout
# =============================================================================

from init_llm import deepseek_llm

# =============================================================================
# 方法:invoke
# 功能:向模型发送消息并获取完整响应
# 参数:
#   - input: 可以是字符串、字典消息列表、或消息对象列表
# 返回值:AIMessage对象,包含content(文本内容)和其他元数据
# =============================================================================

# 单条消息调用模型
resp = deepseek_llm.invoke("请介绍一下你自己")

# 查看返回类型
print(type(resp))                    # 输出:<class 'langchain_core.messages.ai.AIMessage'>

# 获取文本内容
print(resp.content)                  # 输出模型的文本回复
字典格式消息列表

消息格式说明
字典格式的消息列表是最灵活的方式,支持:

  • system: 系统提示词(可选,通常放在最前面)
  • user: 用户消息
  • assistant: 助手消息(用于多轮对话的历史记录)
# =============================================================================
# 功能说明:使用字典格式构建多轮对话上下文
# 使用场景:
#   - 需要保留对话历史的场景
#   - 需要设置系统提示词的任务
#   - 构建复杂对话流程
# 版本说明:通用格式,LangChain 0.x 和 1.x 都支持
# 注意事项:
#   1. 字典格式会自动转换为消息对象
#   2. system消息通常只需要一条,放在最前面
#   3. 历史消息按时间顺序排列
# =============================================================================

# 定义对话历史,包含系统提示和历史对话
conversations = [
    # 系统消息:定义AI的角色和行为
    {"role": "system", "content": "你是一个翻译助手,可以将汉语翻译成英语"},
    
    # 用户消息:第一个翻译请求
    {"role": "user", "content": "翻译:我喜欢编程"},
    
    # 助手消息:模型的历史回复(用于上下文理解)
    {"role": "assistant", "content": "I like programming."},
    
    # 用户消息:第二个翻译请求
    {"role": "user", "content": "翻译:我喜欢大模型"},
]

# =============================================================================
# 调用invoke并传入消息列表
# 说明:模型会根据完整的上下文生成回复
# 注意:旧的对话历史会增加token消耗和延迟
# =============================================================================

resp = deepseek_llm.invoke(conversations)
print(resp.content)                  # 输出:I like large language models.
消息对象格式

类型安全说明
使用消息对象(HumanMessage/SystemMessage等)相比字典格式的优势:

  1. 类型检查:IDE可以提供自动补全和类型检查
  2. 方法支持:消息对象有额外的方法(如pretty_print)
  3. 扩展性:便于添加自定义消息类型
# =============================================================================
# 功能说明:使用LangChain消息对象构建对话
# 使用场景:
#   - 需要类型安全的大型项目
#   - 需要对消息进行额外处理的场景
#   - 企业级应用开发
# 版本说明:LangChain核心类型,所有版本支持
# 注意事项:
#   1. 消息对象相比字典有轻微的性能开销
#   2. 建议在关键业务逻辑中使用消息对象
#   3. 混合使用字典和对象可能导致类型不一致
# =============================================================================

from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

# =============================================================================
# 消息类型说明:
# - SystemMessage: 系统提示词,定义AI行为
# - HumanMessage: 用户消息
# - AIMessage: 助手消息(模型回复或人工注入的历史)
# - ToolMessage: 工具调用结果消息
# - CustomMessage: 自定义消息类型(可扩展)
# =============================================================================

# 使用消息对象构建对话
conversations = [
    # SystemMessage:定义翻译助手的角色
    SystemMessage(content="你是一个翻译助手,可以将汉语翻译成英语"),
    
    # HumanMessage:用户的第一句话
    HumanMessage(content="翻译:我喜欢编程"),
    
    # AIMessage:助手的历史回复
    AIMessage(content="I like programming."),
    
    # HumanMessage:用户的第二句话
    HumanMessage(content="翻译:我喜欢大模型"),
]

# 调用模型
resp = deepseek_llm.invoke(conversations)
print(resp.content)                  # 输出翻译结果

1.3.2 Stream调用(流式)

适用场景

  • 聊天机器人的打字效果
  • 代码生成工具(如GitHub Copilot)
  • 实时展示AI思考过程的场景
  • 长文本生成的界面

性能说明

  • 流式输出用户体验更好,因为可以逐步看到结果
  • 总体响应时间可能略长(需要等待第一个token)
  • 适合前端展示,后端处理逻辑不变
# =============================================================================
# 功能说明:使用stream方法进行流式调用,实现打字机效果
# 使用场景:
#   - 聊天机器人界面,需要实时显示输出
#   - 长文本生成,需要显示进度
#   - 需要尽早展示部分结果给用户
# 版本说明:通用方法,所有LangChain版本支持
# 注意事项:
#   1. stream返回生成器,需要迭代获取
#   2. 每个chunk是响应的一部分
#   3. 适合前端实时展示
# =============================================================================

from init_llm import deepseek_llm

# =============================================================================
# 方法:stream
# 功能:流式返回模型响应,边生成边输出
# 参数:与invoke相同
# 返回值:生成器,每次迭代返回一个chunk(AIMessageChunk)
# 性能提示:
#   - 第一个chunk通常需要等待模型"思考"
#   - 适合WebSocket推送、前端SSE等场景
# =============================================================================

# 流式调用模型
response = deepseek_llm.stream("请介绍一下你自己")

# =============================================================================
# 迭代处理流式响应
# 说明:
#   - 每个chunk.content是增量内容
#   - end="|" 用于分隔显示,便于观察
#   - flush=True 确保立即显示(不缓冲)
# =============================================================================

for chunk in response:
    # chunk 是 AIMessageChunk 对象
    # chunk.content 是本轮的增量文本
    print(chunk.content, end="|", flush=True)

# 输出示例:你好|,我|是一|个由|深|度求|索|...

1.3.3 Batch调用(批量)

适用场景

  • 离线数据处理任务
  • 需要同时处理多个独立请求
  • 批量生成内容(如批量生成商品描述)
  • 自动化测试

性能说明

  • batch会并行处理多个请求
  • 但受限于模型提供商的速率限制
  • 适合不需要实时响应的场景
# =============================================================================
# 功能说明:使用batch方法批量处理多个请求
# 使用场景:
#   - 离线批量处理
#   - 一次性生成多个独立内容
#   - 自动化测试多个用例
# 版本说明:通用方法
# 注意事项:
#   1. 请求是并行执行的(取决于提供商限制)
#   2. 如果某个请求失败,整个batch可能失败
#   3. 需要考虑API速率限制
# =============================================================================

from init_llm import deepseek_llm

# =============================================================================
# 方法:batch
# 功能:批量处理多个请求,并行执行
# 参数:消息列表
# 返回值:响应列表
# 适用条件:请求之间相互独立
# =============================================================================

# 定义要批量处理的问题列表
questions = [
    "请介绍一下你自己",
    "飞机为什么会飞",
    "什么是大模型",
]

# 批量调用
resp = deepseek_llm.batch(questions)

# =============================================================================
# 处理批量响应
# 说明:返回顺序与输入顺序一致
# 注意:如果需要知道每个响应对应哪个请求,建议使用zip
# =============================================================================

for item in resp:
    print(item.content)
    print("-" * 50)

1.3.4 异步批量调用(并发)

适用场景

  • 高并发Web服务
  • 需要控制并发数量的场景
  • 需要实时处理大量请求的服务端

并发控制说明
max_concurrency参数控制最大并发数:

  • 值太大会触发API速率限制
  • 值太小会降低吞吐量
  • 建议根据API限制调整
# =============================================================================
# 功能说明:使用batch_as_completed实现带并发控制的批量处理
# 使用场景:
#   - 高并发服务
#   - 需要实时处理完成的任务
#   - 需要控制API调用频率的场景
# 版本说明:LangChain 0.x+ 支持
# 注意事项:
#   1. 返回的是Future对象组成的生成器
#   2. 先完成的任务先返回
#   3. 需要使用await或同步迭代
# =============================================================================

from init_llm import deepseek_llm

# =============================================================================
# 方法:batch_as_completed
# 功能:异步批量执行,返回完成的Future
# 参数:
#   - inputs: 输入列表
#   - config: 配置字典,包含max_concurrency等参数
# 返回值:迭代器,每次返回(asyncio.Future, result)对
# =============================================================================

resp = deepseek_llm.batch_as_completed(
    [
        "请介绍一下你自己",
        "飞机为什么会飞",
        "什么是大模型",
    ],
    config={
        "max_concurrency": 3,  # 最大并发数,根据API限制调整
    }
)

# =============================================================================
# 迭代处理完成的任务
# 说明:先完成的任务先返回,无需等待所有任务完成
# 适用场景:需要尽快处理完成项的实时系统
# =============================================================================

for item in resp:
    print(item)

1.4 异步调用

异步 vs 同步

同步调用:
T1: [----请求----][----处理----][----返回----]

异步调用:
T1: [发起请求]      [处理其他任务]    [获取结果]
T2:      [----请求----][----处理----]

适用场景

  • Web框架(FastAPI、Flask异步版本)
  • 需要同时处理多个请求
  • 不想阻塞主线程的场景

1.4.1 Ainvoke异步调用

# =============================================================================
# 功能说明:演示ainvoke异步调用的非阻塞特性
# 使用场景:
#   - FastAPI异步端点
>   - 需要同时发起多个请求
>   - GUI应用中避免阻塞主线程
# 版本说明:Python 3.7+,async/await语法
# 注意事项:
>   1. 需要在async函数中调用
>   2. 使用asyncio.run()执行顶层协程
>   3. 异步代码中不能混用同步调用
# =============================================================================

import asyncio
import time
from langchain.chat_models import init_chat_model
from env_utils import DEEPSEEK_API_KEY, DEEPSEEK_BASE_URL

# =============================================================================
# 模型初始化
# 说明:异步调用使用的模型实例与同步相同
# 注意:某些提供商可能有异步特定的配置
# =============================================================================

llm = init_chat_model(
    model="deepseek-chat",
    model_provider="deepseek",
    api_key=DEEPSEEK_API_KEY,
    base_url=DEEPSEEK_BASE_URL,
)

async def demo_async_invoke():
    """
    演示异步调用的非阻塞特性。
    
    该函数展示了如何:
    1. 发起异步请求但不等待
    2. 在等待期间执行其他任务
    3. 稍后获取异步结果
    
    Returns:
        无(直接打印结果)
    
    使用示例:
        >>> asyncio.run(demo_async_invoke())
    """
    print("程序开始...")

    # =================================================================
    # 步骤1:发起异步请求
    # 说明:使用llm.ainvoke立即返回一个协程对象
    # 注意:此时请求已发送,但不会阻塞
    # =================================================================
    print("发起异步模型调用 (ainvoke)...")
    async_task = llm.ainvoke("用一句话解释人工智能。")

    # =================================================================
    # 步骤2:在等待模型响应的同时执行其他任务
    # 说明:主线程可以继续处理其他逻辑
    # 这是异步的核心优势
    # =================================================================
    print("模型请求已发送,程序无需等待,继续执行...")
    for i in range(3):
        time.sleep(1)  # 模拟其他任务
        print(f"正在执行第{i + 1}个任务...")

    # =================================================================
    # 步骤3:等待并获取模型结果
    # 说明:使用await等待协程完成
    # 此时才真正阻塞等待模型响应
    # =================================================================
    print("其他任务已完成,现在等待模型返回结果...")
    response = await async_task
    print(f"模型返回: {response.content}")

# =============================================================================
# 执行异步函数
# asyncio.run() 创建事件循环并执行协程
# 这是标准的Python异步入口
# =============================================================================

asyncio.run(demo_async_invoke())

1.4.2 Astream异步流式调用

# =============================================================================
# 功能说明:演示异步流式调用的非阻塞特性
# 使用场景:
#   - 异步Web服务
>   - 需要流式推送的前端
>   - 实时处理AI响应的系统
# 版本说明:需要Python 3.7+
# 注意事项:
>   1. 使用async for迭代流
>   2. 每个chunk处理后可以立即推送
>   3. 适合WebSocket或SSE场景
# =============================================================================

async def demo_async_stream():
    """
    演示异步流式调用的非阻塞特性。
    
    该函数展示如何:
    1. 异步发起流式请求
    2. 并行执行其他任务
    3. 异步迭代处理流式响应
    
    Returns:
        无
    
    架构说明:
        请求阶段可以并行处理其他逻辑
        响应阶段通过async for逐块处理
    """
    print("程序开始...")

    # =================================================================
    # 发起异步流式调用
    # 注意:astream返回的是异步生成器
    # =================================================================
    print("发起异步流式调用 (astream)...")
    stream_resp = llm.astream("请一句话解释机器学习的基本概念。")

    print("流式请求已发送,程序无需等待,继续执行...")
    
    # =================================================================
    # 并行执行其他任务
    # 说明:与同步流式不同,异步可以在等待时处理其他逻辑
    # =================================================================
    for i in range(3):
        time.sleep(1)
        print(f"正在执行第{i + 1}个任务...")

    print("其他任务已完成,开始处理流式结果...")
    print("流式输出: ", end="", flush=True)
    
    # =================================================================
    # 异步迭代流式响应
    # async for:异步生成器的迭代方式
    # 每个chunk立即处理或推送
    # =================================================================
    async for chunk in stream_resp:
        if hasattr(chunk, 'content'):
            print(chunk.content, end="", flush=True)
    print("流式输出结束")

1.4.3 Abatch异步批量调用

# =============================================================================
# 功能说明:演示异步批量调用的非阻塞特性
# 使用场景:
>   - 异步Web服务中的批量处理
>   - 需要高并发的后台任务
>   - 批量API调用
# 版本说明:Python 3.7+
# 注意事项:
>   1. 使用abatch而不是batch
>   2. 返回协程对象列表
>   3. 需要await获取最终结果
# =============================================================================

async def demo_async_batch():
    """
    演示异步批量调用的非阻塞特性。
    
    该函数展示如何:
    1. 异步发起多个批量请求
    2. 并行执行其他任务
    3. 异步获取所有批量结果
    
    Returns:
        无
    
    性能提示:
        异步批量比同步批量有更好的资源利用率
        特别适合I/O密集型任务
    """
    print("程序开始...")

    questions = ["用一句话说明深度学习与传统机器学习的区别"]

    # =================================================================
    # 发起异步批量调用
    # 说明:abatch返回协程对象,不会立即执行
    # =================================================================
    print("发起异步批量调用 (abatch)...")
    batch_resp = llm.abatch(questions)

    print("批量请求已发送,程序无需等待,继续执行...")
    
    # =================================================================
    # 并行执行其他任务
    # 说明:在批量请求执行期间处理其他逻辑
    # =================================================================
    for i in range(3):
        time.sleep(1)
        print(f"正在执行第{i + 1}个任务...")

    # =================================================================
    # 等待批量请求完成
    # await batch_resp:等待所有请求完成
    # =================================================================
    print("其他任务已完成,现在等待批量处理结果...")
    responses = await batch_resp

    # =================================================================
    # 处理响应结果
    # responses 是响应列表,按输入顺序排列
    # =================================================================
    for response in responses:
        print(f"批量响应: {response.content}")

1.5 推理模型

模型类型说明

  • Chat模型:直接生成对话回复
  • Reasoner模型(推理模型):先进行推理思考,再给出答案
  • 适用场景:复杂推理、数学问题、代码调试

版本差异说明

  • 支持推理的模型:DeepSeek-R1、OpenAI o1/o3、Anthropic Claude等
  • 旧版本:需要手动实现推理过程
  • 新版:模型原生支持推理,API返回结果中包含推理内容
# =============================================================================
# 功能说明:使用DeepSeek的Reasoner模型进行推理
# 使用场景:
#   - 数学问题求解
#   - 复杂逻辑推理
#   - 代码调试和分析
#   - 需要展示推理过程的场景
# 版本说明:DeepSeek API 1.0+ 支持
# 注意事项:
#   1. 推理模型成本通常高于普通Chat模型
#   2. 响应时间可能较长
#   3. reasoning_content字段包含推理过程
# =============================================================================

from langchain.chat_models import init_chat_model
from env_utils import DEEPSEEK_API_KEY, DEEPSEEK_BASE_URL

# =============================================================================
# 初始化推理模型
# 说明:deepseek-reasoner是DeepSeek的专用推理模型
# 特点:先思考再回答,适合复杂问题
# =============================================================================
reasoner_llm = init_chat_model(
    model="deepseek-reasoner",              # 推理专用模型
    model_provider="deepseek",
    api_key=DEEPSEEK_API_KEY,
    base_url=DEEPSEEK_BASE_URL,
)

# =============================================================================
# 调用推理模型
# 注意:response中包含额外的推理内容
# =============================================================================
response = reasoner_llm.invoke("10个字解释为什么天空是蓝色的?")

# =============================================================================
# 获取推理过程(reasoning_content)
# 说明:推理模型会在reasoning_content中展示思考过程
# content_blocks 包含完整的响应结构
# =============================================================================
# 新版API可能使用不同方式获取推理内容
if hasattr(response, 'reasoning_content'):
    # 部分模型直接提供reasoning_content属性
    reasoning_steps = response.reasoning_content
    print("推理过程:", reasoning_steps)
elif hasattr(response, 'content_blocks'):
    # 通过content_blocks获取
    reasoning_steps = [b for b in response.content_blocks if b["type"] == "reasoning"]
    print("推理过程:", " ".join(step["reasoning"] for step in reasoning_steps))

# 最终答案在content中
print("最终答案:", response.content)

1.6 速率限制

速率限制说明
使用InMemoryRateLimiter可以:

  • 避免触发API提供商的速率限制
  • 保护下游服务不被冲垮
  • 实现公平调度

适用场景

  • 公共服务需要限流
  • 多租户环境下的公平使用
  • 测试环境避免超出配额
# =============================================================================
# 功能说明:使用InMemoryRateLimiter控制API调用频率
# 使用场景:
#   - 需要严格控制调用频率
#   - 多用户共享API配额的场景
>   - 避免触发API速率限制错误
# 版本说明:LangChain 0.x+ 支持
# 注意事项:
>   1. 限流器是内存级别的,多进程需要共享限流器
>   2. 请求仍会排队等待,不是直接丢弃
>   3. 需要根据API限制合理配置参数
# =============================================================================

import time
from langchain.chat_models import init_chat_model
from langchain_core.rate_limiters import InMemoryRateLimiter
from env_utils import DEEPSEEK_API_KEY, DEEPSEEK_BASE_URL

# =============================================================================
# 创建速率限制器
# 参数说明:
#   - requests_per_second: 每秒允许的最大请求数
#   - check_every_n_seconds: 检查频率
#   - max_bucket_size: 突发容量,允许短暂超过限制
# =============================================================================
rate_limiter = InMemoryRateLimiter(
    requests_per_second=0.1,      # 每10秒最多1个请求(适合免费API)
    check_every_n_seconds=0.1,    # 检查间隔0.1秒
    max_bucket_size=5,           # 突发容量,最多允许5个请求
)

# =============================================================================
# 初始化带速率限制的模型
# 说明:将限流器传入模型的rate_limiter参数
# 限流器会自动在每次请求前进行检查
# =============================================================================
deepseek_llm = init_chat_model(
    model="deepseek-chat",
    model_provider="deepseek",
    api_key=DEEPSEEK_API_KEY,
    base_url=DEEPSEEK_BASE_URL,
    rate_limiter=rate_limiter      # 传入速率限制器
)

# =============================================================================
# 使用限流后的模型
# 说明:每个请求都会经过速率限制器
# 时间间隔小于限制时,请求会等待
# =============================================================================
for i in range(3):
    response = deepseek_llm.invoke("你好")
    print(response.content)
    print(f"请求时间戳: {time.time()}")  # 观察时间间隔

第二章:智能体创建与管理

章节概述
本章介绍如何使用create_agent创建AI智能体,包括工具绑定、中间件配置、Prompt管理等高级功能。

架构设计说明

┌────────────────────────────────────────────────────────────────────┐
│                          Agent 系统架构                             │
└────────────────────────────────────────────────────────────────────┘
                                   │
       ┌───────────────────────────┼───────────────────────────┐
       ▼                           ▼                           ▼
┌──────────────┐          ┌──────────────┐          ┌──────────────┐
│   Prompt     │          │    Model     │          │    Tools     │
│  Management  │          │   (LLM)      │          │   Binding    │
└──────────────┘          └──────────────┘          └──────────────┘
                                   │
                                   ▼
                        ┌──────────────────┐
                        │  Tool Executor   │
                        └──────────────────┘
                                   │
                        ┌──────────────────┐
                        │   Middleware     │
                        │ (Pre/Post Hook)  │
                        └──────────────────┘

2.1 创建基础智能体

Agent创建流程

  1. 定义工具(Tool)
  2. 选择模型(Model)
  3. 配置系统提示词(System Prompt)
  4. 创建Agent实例
  5. 调用Agent

2.1.1 使用create_agent创建智能体

# =============================================================================
# 功能说明:使用create_agent创建最简单的AI智能体
# 使用场景:
>   - 需要工具调用能力的对话助手
>   - 简单的问答机器人
>   - 入门级Agent开发示例
# 版本说明:LangChain 1.0+ 新增API
# 注意事项:
>   1. create_agent返回的是可调用的Agent对象
>   2. tools参数可以为空列表
>   3. system_prompt定义Agent角色和行为
# =============================================================================

from langchain.agents import create_agent
from langchain_core.tools import tool
from init_llm import deepseek_llm

# =============================================================================
# 装饰器:@tool
# 功能:将普通函数转换为Agent可调用的工具
# 参数说明:
>   - name: 工具名称(可选,默认使用函数名)
>   - description: 工具描述(Agent决定是否调用的重要依据)
>   - args_schema: 参数Schema(可选,用于复杂参数)
# =============================================================================

@tool
def get_weather(location: str) -> str:
    """
    获取指定位置的天气信息。
    
    这是一个简单的天气查询工具,返回指定城市的天气状况。
    
    Args:
        location: 城市名称,如"北京"、"上海"、"东京"等
        
    Returns:
        格式化的天气信息字符串
        
    Example:
        >>> get_weather("北京")
        "北京天气:晴朗,25°C"
    """
    return f"天气信息:{location}的天气是晴朗的"

# =============================================================================
# 创建Agent
# 函数:create_agent
# 参数说明:
>   - model: 底层使用的语言模型(必填)
>   - tools: 可用工具列表(可选,默认空)
>   - system_prompt: 系统提示词(可选)
>   - middleware: 中间件列表(可选)
>   - checkpointer: 检查点存储(可选,用于记忆)
# 返回值:Agent可调用对象
# =============================================================================

agent = create_agent(
    model=deepseek_llm,                          # 底层模型
    tools=[get_weather],                          # 可用工具
    system_prompt="你是一个天气助手,你可以帮助用户获取指定位置的天气信息。",
)

# =============================================================================
# 调用Agent
# 方法:invoke
# 参数:
>   - input: 输入字典,必须包含"messages">   - messages格式:可以是字典列表或消息对象列表
# 返回值:包含执行结果的字典,messages中包含完整对话历史
# =============================================================================

resp = agent.invoke({"messages": [{"role": "user", "content": "北京的天气"}]})

# =============================================================================
# 响应结构说明
# resp 是一个字典,包含:
>   - messages: 消息列表,包含完整对话历史
>   - 其他状态字段(如有自定义状态)
# messages列表项类型:
>   - HumanMessage: 用户消息
>   - AIMessage: 模型回复
>   - ToolMessage: 工具调用结果
# =============================================================================
print(resp)

2.1.2 带多个工具的智能体

# =============================================================================
# 功能说明:创建带有多个工具的智能体,演示复杂工具定义
# 使用场景:
>   - 企业级客服机器人
>   - 多功能助手
>   - 需要多领域知识查询的系统
# 版本说明:LangChain 1.0+
# 注意事项:
>   1. 每个工具应有清晰的description
>   2. 参数应提供详细的类型和取值说明
>   3. 工具之间应有明确的功能区分
# =============================================================================

from langchain.agents import create_agent
from langchain_core.tools import tool
from init_llm import deepseek_llm

# =============================================================================
# 工具1:股票查询
# 说明:完整的工具定义,包含详细的docstring
# =============================================================================
@tool
def get_stock_price(company: str, timeframe: str = "today") -> str:
    """
    获取指定公司的股票价格信息。
    
    该工具提供实时和历史股票价格查询,支持今日、本周、本月三个时间维度。
    
    Args:
        company: 公司名称,支持以下值:
            - 苹果公司(或Apple)
            - 微软公司(或Microsoft)
            - 谷歌公司(或Google)
        timeframe: 时间范围,可选值:
            - today: 今日价格
            - week: 本周平均价格
            - month: 本月平均价格
            默认值:today
            
    Returns:
        格式化的股票价格字符串,示例:
        "苹果公司 today股票价格: 185.20美元"
        
    Raises:
        无异常返回,当公司不存在时返回友好的错误信息
        
    Example:
        >>> get_stock_price("苹果公司", "today")
        "苹果公司 today股票价格: 185.20美元"
    """
    # 模拟数据(实际应用中应调用真实API)
    mock_data = {
        "苹果公司": {"today": 185.20, "week": 183.50, "month": 180.75},
        "微软公司": {"today": 415.86, "week": 412.30, "month": 405.42},
        "谷歌公司": {"today": 15.42, "week": 15.20, "month": 14.85}
    }
    
    # 查询数据
    if company in mock_data:
        price = mock_data[company].get(timeframe, "未知时间范围")
        return f"{company} {timeframe}股票价格: {price}美元"
    else:
        return f"未找到股票代码 {company} 的数据"

# =============================================================================
# 工具2:新闻搜索
# 说明:演示返回多条结果的工具
# =============================================================================
@tool
def search_news(company: str) -> str:
    """
    搜索指定公司的财经新闻。
    
    该工具提供公司相关的最新财经新闻,包括股价变动、重要公告、行业动态等。
    
    Args:
        company: 公司名称
        
    Returns:
        每行一条新闻的字符串,格式为新闻标题。
        如无相关新闻,返回友好提示。
        
    Example:
        >>> search_news("苹果公司")
        "苹果发布新款iPhone,股价上涨3%\\n苹果与欧盟达成反垄断和解协议"
    """
    # 模拟新闻数据
    mock_news = {
        "苹果公司": [
            "苹果发布新款iPhone,股价上涨3%",
            "苹果与欧盟达成反垄断和解协议",
            "苹果将在印度扩大生产规模"
        ],
        "微软公司": [
            "微软Azure云业务季度增长超预期",
            "微软完成对Nuance的收购",
            "微软推出新一代AI助手Copilot"
        ],
    }
    
    # 获取新闻列表
    news_list = mock_news.get(company, [f"未找到{company}的相关新闻"])
    return "\n".join(news_list)

# =============================================================================
# 创建多工具Agent
# 说明:tools参数传入工具列表
# Agent会根据用户问题自动选择合适的工具
# =============================================================================

agent = create_agent(
    model=deepseek_llm,
    tools=[get_stock_price, search_news],      # 传入多个工具
)

# =============================================================================
# 调用多工具Agent
# 说明:Agent会自动决定调用哪个工具
# 可能需要多轮交互(问价格 -> 调工具 -> 回答 -> 问新闻 -> 调工具 -> 回答)
# =============================================================================

resp = agent.invoke({"messages": [
    {"role": "user", "content": "苹果公司上周股价是多少?有什么新闻?"},
]})
print(resp)

2.2 动态模型选择

架构设计说明

┌─────────────────────────────────────────────────────────────────┐
│                    动态模型选择中间件架构                         │
└─────────────────────────────────────────────────────────────────┘

用户请求 → [中间件] → 根据条件选择模型 → [选中的模型] → 响应
                       ↓
             ┌─────────┴─────────┐
             ↓                   ↓
      [基础模型]            [高级模型]
      (message_count < 3)   (message_count >= 3)

使用场景说明

  • 成本优化:简单问题用便宜模型,复杂问题用贵模型
  • 性能调优:根据任务复杂度选择合适的模型
  • 多租户:根据用户等级选择不同能力的模型
# =============================================================================
# 功能说明:使用中间件实现根据对话状态动态选择模型
# 使用场景:
>   - 需要根据上下文复杂度选择模型
>   - 实现成本优化策略
>   - A/B测试不同模型效果
# 版本说明:LangChain 1.0+ 中间件API
# 注意事项:
>   1. 中间件在模型调用前后执行
>   2. 可以修改请求和响应
>   3. 中间件顺序很重要
# =============================================================================

from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call, ModelResponse, ModelRequest
from langchain.chat_models import init_chat_model
from langchain_core.tools import tool
from env_utils import DEEPSEEK_API_KEY, DEEPSEEK_BASE_URL, DASHSCOPE_API_KEY, DASHSCOPE_BASE_URL

# =============================================================================
# 工具定义
# 两个简单的查询工具
# =============================================================================

@tool
def get_current_location() -> str:
    """获取当前位置。"""
    return "当前位置为北京市。"

@tool
def get_weather(city: str) -> str:
    """获取指定城市的天气信息。"""
    return f"{city}的天气为晴朗,25°C。"

# =============================================================================
# 基础模型配置
# 说明:用于简单查询的轻量级模型
# =============================================================================
basic_model = init_chat_model(
    model="deepseek-chat",
    model_provider="deepseek",
    api_key=DEEPSEEK_API_KEY,
    base_url=DEEPSEEK_BASE_URL,
)

# =============================================================================
# 高级模型配置
# 说明:用于复杂查询的高性能模型
# =============================================================================
advanced_model = init_chat_model(
    model="qwen-plus",                         # 通义千问plus
    model_provider="openai",
    api_key=DASHSCOPE_API_KEY,
    base_url=DASHSCOPE_BASE_URL,
)

# =============================================================================
# 动态模型选择中间件
# 装饰器:@wrap_model_call
# 功能:在模型调用前/后执行自定义逻辑
# 参数说明:
>   - request: ModelRequest对象,包含请求信息
>   - handler: 实际的模型调用处理器
# 返回值:ModelResponse对象
# =============================================================================

@wrap_model_call
def dynamic_model_selection(request: ModelRequest, handler) -> ModelResponse:
    """
    动态模型选择中间件。
    
    根据对话状态(消息数量)自动选择合适的模型:
    - 消息数 < 3:使用基础模型(简单查询)
    - 消息数 >= 3:使用高级模型(复杂对话)
    
    Args:
        request: 模型请求对象,包含:
            - state: 包含messages等状态信息
        handler: 模型调用处理器
        
    Returns:
        ModelResponse: 模型响应对象
        
    使用场景:
        - 成本敏感的项目(简单问题用便宜模型)
        - 需要差异化服务质量的项目
        - 实现渐进式复杂度的对话系统
    """
    print("request : ", request)
    
    # =================================================================
    # 获取当前对话的消息数量
    # 说明:通过request.state访问状态
    # messages包含完整的对话历史
    # =================================================================
    message_count = len(request.state['messages'])
    
    # =================================================================
    # 根据条件选择模型
    # 策略说明:
    #   - 初始对话(< 3条消息)使用基础模型
    #   - 复杂对话(>= 3条消息)使用高级模型
    # 实际项目中可根据业务逻辑调整策略
    # =================================================================
    if message_count < 3:
        model = basic_model
        print(f"使用基础模型 (消息数: {message_count})")
    else:
        model = advanced_model
        print(f"使用高级模型 (消息数: {message_count})")
    
    # =================================================================
    # 执行模型调用
    # request.override(model=model) 创建新的请求,使用指定的模型
    # handler() 执行实际的模型调用
    # =================================================================
    return handler(request.override(model=model))

# =============================================================================
# 创建带中间件的Agent
# 说明:middleware参数传入中间件列表
# 中间件按照列表顺序执行
# =============================================================================

agent = create_agent(
    model=basic_model,                          # 默认模型(会被中间件覆盖)
    tools=[get_current_location, get_weather],
    middleware=[dynamic_model_selection],       # 动态模型选择中间件
)

# =============================================================================
# 测试动态模型选择
# 第一次调用:消息数=2(system + user),使用基础模型
# =============================================================================
response = agent.invoke({"messages":[
    {"role":"system","content":"你是一个天气助手"},
    {"role":"user","content":"我现在在的位置天气如何?"}
]})
print(response)

2.3 Prompt配置

Prompt工程最佳实践

  1. 清晰明确:清楚地说明Agent的角色和限制
  2. 结构化:使用清晰的格式组织Prompt
  3. 示例驱动:提供Few-shot示例
  4. 动态适配:根据上下文动态调整Prompt

2.3.1 字符串格式Prompt

# =============================================================================
# 功能说明:使用简单字符串作为系统Prompt
# 使用场景:
>   - 快速原型开发
>   - 简单的单角色Agent
>   - Prompt不需要动态变化的场景
# 版本说明:所有版本支持
# 注意事项:
>   1. 字符串Prompt最简单但扩展性差
>   2. 建议仅用于简单场景
>   3. 复杂场景建议使用Structured Prompt
# =============================================================================

agent = create_agent(
    model=deepseek_llm,
    tools=[get_weather],
    system_prompt="你是能查询任何问题的助手"    # 简单字符串Prompt
)

2.3.2 SystemMessage对象格式Prompt

使用场景说明

  • 需要明确指定Prompt类型的场景
  • 与其他SystemMessage混合使用的场景
  • 需要在代码中以类型安全方式处理Prompt
# =============================================================================
# 功能说明:使用SystemMessage对象作为Prompt
# 使用场景:
>   - 需要与其他系统消息组合
>   - 追求类型安全的项目
>   - 需要在运行时动态构建Prompt
# 版本说明:LangChain核心功能
# 注意事项:
>   1. SystemMessage可以包含更多元信息
>   2. 便于与消息历史管理系统集成
# =============================================================================

from langchain_core.messages import SystemMessage

agent = create_agent(
    model=deepseek_llm,
    tools=[get_weather],
    system_prompt=SystemMessage(content="你是能查询任何问题的助手")
)

2.3.3 动态Prompt

架构设计说明

┌─────────────────────────────────────────────────────────────────┐
│                      动态Prompt中间件流程                        │
└─────────────────────────────────────────────────────────────────┘

用户请求 → [动态Prompt中间件] → 根据context生成Prompt → [LLM] → 响应
                                   ↑
                           context参数传入

使用场景

  • 客服分级服务:VIP用户获得更专业的回复
  • 多租户隔离:不同租户使用不同的Prompt模板
  • 任务导向:根据任务类型选择不同的指令
# =============================================================================
# 功能说明:使用dynamic_prompt中间件根据运行时上下文动态生成Prompt
# 使用场景:
>   - 根据用户类型生成不同风格的回复
>   - 根据任务类型选择不同的指令模板
>   - 实现个性化的对话体验
# 版本说明:LangChain 1.0+
# 注意事项:
>   1. context_schema定义上下文参数结构
>   2. 动态Prompt可以访问runtime.context
>   3. 应提供合理的默认值
# =============================================================================

from typing import TypedDict
from langchain.agents import create_agent
from langchain.agents.middleware import dynamic_prompt, ModelRequest
from langchain_core.tools import tool
from init_llm import deepseek_llm

# =============================================================================
# 上下文Schema定义
# 说明:定义动态Prompt可以访问的上下文参数
# 使用TypedDict确保类型安全
# =============================================================================

class AgentContext(TypedDict):
    """
    Agent运行时上下文定义。
    
    该类定义动态Prompt可以访问的上下文信息,
    包含用户类型和用户ID。
    
    Attributes:
        query_type: 查询类型,支持"vip"和"normal"
        uid: 用户唯一标识符
    """
    query_type: str                             # 用户类型:vip/normal
    uid: str                                   # 用户ID

# =============================================================================
# 动态Prompt函数
# 装饰器:@dynamic_prompt
# 功能:根据context参数动态生成系统Prompt
# =============================================================================

@dynamic_prompt
def dynamic_support_prompt(request: ModelRequest) -> str:
    """
    根据query_type动态生成客服Prompt。
    
    该函数根据用户类型生成不同风格的客服指令:
    - VIP用户:专业、深度分析的回复风格
    - 普通用户:简洁、友好的回复风格
    
    Args:
        request: 包含运行时上下文信息的请求对象
        
    Returns:
        str: 根据用户类型定制的系统Prompt
        
    内部逻辑:
        1. 从request.runtime.context获取用户类型
        2. 根据类型选择对应的Prompt模板
        3. 返回组合后的完整Prompt
    """
    print("request", request)
    
    # =================================================================
    # 获取运行时上下文
    # 说明:通过request.runtime.context访问
    # =================================================================
    query_type = request.runtime.context["query_type"]
    
    # =================================================================
    # 基础指令模板
    # 说明:所有用户共享的基础指令
    # =================================================================
    base_instruction = "你是一名专业的电商客服助手。请根据工具查询结果,准确、清晰地回答用户问题。"
    
    # =================================================================
    # 根据用户类型选择不同的扩展指令
    # VIP用户:专业严谨风格
    # 普通用户:简洁友好风格
    # =================================================================
    if query_type == "vip":
        return f"""{base_instruction}
                    当前角色:高级支持专员
                    工作要求:
                    1.深度分析:仔细分析用户描述,识别潜在的根本问题。
                    2.精准分类:将问题明确归类(如"物流问题"、"产品质量"、"售后申请")。
                    3.方案规划:若工具能解决,提供具体步骤;若需人工,明确告知后续流程。
                    请使用更专业、严谨的语言。
                    """
    else:
        return f"""{base_instruction}
                    当前角色:一线客服助手
                    工作要求:
                    1.简洁友好:回复要简单明了,避免复杂术语。
                    保持友好和高效的沟通风格。
                    """

# =============================================================================
# 创建Agent
# 说明:context_schema定义上下文参数结构
# =============================================================================

agent = create_agent(
    model=deepseek_llm,
    tools=[],
    middleware=[dynamic_support_prompt],        # 动态Prompt中间件
    context_schema=AgentContext                 # 上下文Schema
)

# =============================================================================
# 测试VIP用户
# 说明:传入vip类型的上下文
# =============================================================================
response1 = agent.invoke(
    {"messages": [{"role": "user", "content": "我有一个问题需要帮助"}]},
    context={"query_type": "vip", "uid": "user123"}  # 传入上下文
)

# =============================================================================
# 测试普通用户
# 说明:传入normal类型的上下文
# =============================================================================
response2 = agent.invoke(
    {"messages": [{"role": "user", "content": "我有一个问题需要帮助"}]},
    context={"query_type": "normal", "uid": "user456"}  # 传入上下文
)

第三章:工具开发与错误处理

章节概述
本章详细介绍如何定义高质量的工具,包括参数验证、错误处理、Schema定义等企业级实践。

3.1 基础工具定义

3.1.1 使用@tool装饰器

# =============================================================================
# 功能说明:演示@tool装饰器的两种用法
# 使用场景:
>   - 简单工具:直接使用@tool
>   - 复杂工具:需要详细文档的用@tool(name, description)
# 版本说明:LangChain核心功能
# 注意事项:
>   1. 工具的description非常重要,影响Agent选择
>   2. 应提供清晰的参数说明
>   3. docstring会被自动用作工具描述(除非手动指定)
# =============================================================================

from langchain.agents import create_agent
from langchain_core.tools import tool
from init_llm import deepseek_llm

# =============================================================================
# 简单工具示例
# 说明:@tool直接装饰函数,函数名和docstring作为工具名和描述
# =============================================================================

@tool
def get_weather(location: str) -> str:
    """
    获取指定位置的天气信息。
    
    Args:
        location: 城市名称
        
    Returns:
        天气信息字符串
    """
    return f"{location}的天气是晴朗的"

# =============================================================================
# 带完整文档的工具示例
# 说明:使用@tool(name, description)显式指定工具信息
# =============================================================================

@tool("get_employee_info", description="根据员工ID查询员工的姓名、部门、职务和邮箱。")
def get_employee_info(employee_id: str) -> str:
    """
    根据员工ID查询员工的详细信息。
    
    该工具从企业数据库中查询员工的基本信息,包括姓名、部门、职务和邮箱。
    
    Args:
        employee_id: 员工唯一标识符,格式为E开头的编号,如E001、E002
        
    Returns:
        包含员工完整信息的格式化字符串,格式如下:
        "员工ID {id} 的信息如下:姓名 - {name},部门 - {dept},职务 - {pos},邮箱 - {email}"
        
    Raises:
        当员工ID不存在时,返回友好的错误提示
        
    Example:
        >>> get_employee_info("E001")
        "员工ID E001 的信息如下:姓名 - 张三,部门 - 技术部,职务 - 高级软件工程师,邮箱 - zhangsan@company.com"
    """
    # 模拟员工数据库
    mock_employee_database = {
        "E001": {
            "name": "张三",
            "department": "技术部",
            "position": "高级软件工程师",
            "email": "zhangsan@company.com"
        },
        "E002": {
            "name": "李四",
            "department": "市场部",
            "position": "市场经理",
            "email": "lisi@company.com"
        },
        "E003": {
            "name": "王五",
            "department": "人力资源部",
            "position": "招聘专员",
            "email": "wangwu@company.com"
        }
    }
    
    # =================================================================
    # 查询员工记录
    # 使用dict.get()安全访问,避免KeyError
    # =================================================================
    employee_record = mock_employee_database.get(employee_id)
    
    if employee_record:
        # =================================================================
        # 格式化返回信息
        # 说明:清晰分隔各字段,便于前端展示
        # =================================================================
        return f"员工ID {employee_id} 的信息如下:姓名 - {employee_record['name']},部门 - {employee_record['department']},职务 - {employee_record['position']},邮箱 - {employee_record['email']}"
    else:
        return f"员工ID {employee_id} 不存在"

# =============================================================================
# 创建Agent并测试工具
# =============================================================================

agent = create_agent(
    model=deepseek_llm,
    tools=[get_employee_info],
    system_prompt="你是一个员工信息查询助手"
)

result = agent.invoke({"messages": [{"role": "user", "content": "查询员工ID E001 的信息"}]})
print(result["messages"][-1].content)

3.1.2 自定义工具名称和描述

# =============================================================================
# 功能说明:演示自定义工具名称和描述的用法
# 使用场景:
>   - 函数名不够语义化时
>   - 同一功能需要多个别名时
>   - 需要更详细描述的场景
# 版本说明:LangChain核心功能
# 注意事项:
>   1. name参数会覆盖默认的函数名
>   2. description参数是Agent判断是否调用工具的主要依据
>   3. 描述应包含参数说明和返回值说明
# =============================================================================

@tool("get_employee_info", description="根据员工ID查询员工的姓名、部门、职务和邮箱。")
def get_employee_info(employee_id: str) -> str:
    """根据员工ID查询员工的详细信息"""
    # 实现代码...

3.2 使用Pydantic Schema定义工具参数

架构设计说明

Pydantic Schema vs JSON Schema
┌─────────────────────────────────────────────────────────────┐
│                      参数定义方式对比                          │
├─────────────────────────────────────────────────────────────┤
│ Pydantic Schema                                              │
│   ✓ 类型安全(IDE自动补全)                                   │
│   ✓ 支持数据验证(@field_validator)                        │
│   ✓ 支持默认值和必填控制                                     │
│   ✓ 面向对象,易于维护                                        │
│                                                              │
│ JSON Schema                                                  │
│   ✓ 更轻量(无需定义类)                                      │
│   ✓ 更接近OpenAPI标准                                        │
│   ✓ 适合简单的参数定义                                        │
└─────────────────────────────────────────────────────────────┘
# =============================================================================
# 功能说明:使用Pydantic BaseModel定义复杂工具参数
# 使用场景:
>   - 多参数组合查询
>   - 需要参数验证的场景
>   - 需要提供默认值和可选参数的查询
# 版本说明:需要安装pydantic
# 注意事项:
>   1. BaseModel子类定义参数结构
>   2. Field()添加元信息和描述
>   3. @field_validator进行自定义验证
# =============================================================================

import json
from typing import Optional, Literal
from langchain.agents import create_agent
from langchain_core.tools import tool
from pydantic import BaseModel, Field, field_validator
from init_llm import deepseek_llm

# =============================================================================
# Pydantic Schema定义
# 说明:使用类定义参数结构,支持复杂的验证逻辑
# =============================================================================

class QueryTicketsSchema(BaseModel):
    """
    工单查询参数Schema。
    
    该类定义工单查询工具的参数结构,支持多种过滤条件组合。
    
    Attributes:
        ticket_id: 工单ID,精确匹配
        assigner: 分配人姓名,精确匹配
        status: 工单状态,可选值:open/resolved/closed
        priority: 优先级,可选值:low/medium/high
        
    使用示例:
        >>> schema = QueryTicketsSchema(status="open", priority="high")
        >>> # 查询所有状态为open且优先级为high的工单
    """
    
    # =================================================================
    # 参数定义
    # Field参数说明:
    #   - default: 默认值(None表示可选参数)
    #   - description: 参数描述(Agent据此理解参数含义)
    # =================================================================
    ticket_id: Optional[str] = Field(
        default=None,
        description="工单ID"
    )
    assigner: Optional[str] = Field(
        default=None,
        description="工单分配人"
    )
    status: Optional[Literal["open", "resolved", "closed"]] = Field(
        default=None, 
        description="工单状态,open(待处理),resolved(已处理),closed(已关闭)"
    )
    priority: Optional[Literal["low", "medium", "high"]] = Field(
        default=None, 
        description="工单优先级,low(低),medium(中),high(高)"
    )
    
    # =================================================================
    # 参数验证器
    # 说明:在参数传递给函数前进行预处理
    # 可以进行格式化、转换、验证等操作
    # =================================================================
    @field_validator("ticket_id")
    def validate_ticket_id(cls, v):
        """
        自动将ticket_id转为大写。
        
        确保输入的ticket_id格式统一,避免大小写不一致导致的查询问题。
        
        Args:
            v: 输入的ticket_id值
            
        Returns:
            转为大写后的ticket_id(如果非空)
        """
        return v.upper() if v else None

# =============================================================================
# 使用Schema的工具定义
# args_schema参数:指定参数验证Schema
# =============================================================================

@tool(args_schema=QueryTicketsSchema)
def query_tickets(
        ticket_id: str = None,
        assigner: str = None,
        status: str = None,
        priority: str = None
) -> str:
    """
    根据条件查询工单详细信息。
    
    支持多条件组合查询,所有参数都是可选的,不传参返回所有工单。
    
    Args:
        ticket_id: 工单ID(精确匹配)
        assigner: 分配人姓名(精确匹配)
        status: 工单状态(open/resolved/closed)
        priority: 优先级(low/medium/high)
        
    Returns:
        JSON格式的工单列表,包含总数和详情
        
    Example:
        >>> query_tickets(assigner="张三", priority="high")
        '{"total_count": 2, "tickets": [...]}'
    """
    # 模拟工单数据库
    mock_tickets_db = [
        {
            "ticket_id": "TK2025012001",
            "assigner": "张三",
            "title": "登录页面加载缓慢",
            "status": "open",
            "priority": "low"
        },
        {
            "ticket_id": "TK2025012002",
            "assigner": "李四",
            "title": "用户头像上传失败",
            "status": "open",
            "priority": "medium"
        },
        {
            "ticket_id": "TK2025011901",
            "assigner": "张三",
            "title": "支付成功通知未发送",
            "status": "resolved",
            "priority": "high"
        },
    ]
    
    # =================================================================
    # 条件过滤
    # 策略:逐层筛选,保留满足所有条件的记录
    # =================================================================
    filtered_tickets = mock_tickets_db
    
    if ticket_id:
        filtered_tickets = [t for t in filtered_tickets if t["ticket_id"] == ticket_id]
    if assigner:
        filtered_tickets = [t for t in filtered_tickets if t["assigner"] == assigner]
    if status:
        filtered_tickets = [t for t in filtered_tickets if t["status"] == status]
    if priority:
        filtered_tickets = [t for t in filtered_tickets if t["priority"] == priority]
    
    # =================================================================
    # 返回结果
    # 说明:使用JSON格式便于前端解析
    # ensure_ascii=False: 保留中文
    # indent=2: 格式化输出,便于调试
    # =================================================================
    if not filtered_tickets:
        return "没有找到任何工单"
    
    return json.dumps({
        "total_count": len(filtered_tickets),
        "tickets": filtered_tickets
    }, ensure_ascii=False, indent=2)

# =============================================================================
# 创建Agent并测试
# =============================================================================

agent = create_agent(
    model=deepseek_llm,
    tools=[query_tickets],
    system_prompt="你是一个工单查询助手"
)

result = agent.invoke({
    "messages": [{"role": "user", "content": "请帮我查询一下张三负责的优先级别为高的工单信息"}]
})
print(result["messages"][-1].content)

3.3 使用JSONSchema定义工具参数

对比说明

特性 Pydantic Schema JSON Schema
类型安全 ✓ 强类型 ✗ 弱类型
验证器 ✓ 内置支持 ✗ 需自定义
代码量 较多 较少
复杂度 适合复杂结构 适合简单结构
# =============================================================================
# 功能说明:使用JSONSchema定义工具参数
# 使用场景:
>   - 追求简洁的参数定义
>   - 需要OpenAPI兼容的场景
>   - 简单参数结构的工具
# 版本说明:LangChain核心功能
# 注意事项:
>   1. JSONSchema是声明式的,适合简单参数
>   2. 复杂验证逻辑需要额外处理
>   3. 部分IDE可能不支持JSONSchema的自动补全
# =============================================================================

from langchain.agents import create_agent
from langchain_core.tools import tool
from init_llm import deepseek_llm

# =============================================================================
# JSONSchema定义
# 说明:使用字典格式定义参数结构
# 遵循JSON Schema规范
# =============================================================================

query_ticket_jsonschema = {
    "type": "object",                           # 类型:对象
    "properties": {                             # 属性定义
        "ticket_id": {
            "type": "string",
            "description": "工单ID"
        },
        "assigner": {
            "type": "string",
            "description": "工单分配人"
        },
        "status": {
            "type": "string", 
            "enum": ["open", "resolved", "closed"],  # 枚举值限制
            "description": "工单状态"
        },
        "priority": {
            "type": "string", 
            "enum": ["low", "medium", "high"],
            "description": "工单优先级"
        }
    },
    "required": []                              # 必填参数列表(空表示全部可选)
}

@tool(args_schema=query_ticket_jsonschema)
def query_tickets(
        ticket_id: str = None,
        assigner: str = None,
        status: str = None,
        priority: str = None
) -> str:
    """根据条件查询工单详细信息"""
    # 实现代码...

3.4 工具调用错误处理

错误处理策略

┌─────────────────────────────────────────────────────────────────┐
│                      错误处理架构                                 │
└─────────────────────────────────────────────────────────────────┘

工具执行 → [try块] → 正常返回
               ↓
           [except块] → 捕获异常 → 转换为友好错误信息 → 返回ToolMessage

最佳实践

  1. 分层处理:不同类型的异常返回不同的友好提示
  2. 日志记录:记录详细错误信息便于排查
  3. 用户友好:向用户展示可理解的错误信息
  4. 可恢复性:某些错误可以提供重试建议

3.4.1 基础错误处理

# =============================================================================
# 功能说明:使用wrap_tool_call中间件捕获和处理工具调用错误
# 使用场景:
>   - 防止工具异常导致整个Agent崩溃
>   - 将错误转换为用户友好的消息
>   - 实现重试逻辑
# 版本说明:LangChain 1.0+ 中间件API
# 注意事项:
>   1. 中间件捕获所有工具执行异常
>   2. 返回ToolMessage而非抛出异常
>   3. 可以实现复杂的重试和降级逻辑
# =============================================================================

import requests
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_tool_call
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool
from langgraph.prebuilt.tool_node import ToolCallRequest
from init_llm import deepseek_llm

# =============================================================================
# 工具定义
# 说明:模拟可能失败的API调用
# =============================================================================

@tool
def get_stock_price(symbol: str) -> str:
    """
    获取指定股票代码的当前价格。
    
    Args:
        symbol: 股票代码,如"TCEHY"(腾讯)、"AAPL"(苹果)
        
    Returns:
        股票价格信息字符串
        
    Raises:
        Exception: 网络错误或API不可用时抛出异常
    """
    print(f"调用股票查询工具: {symbol}")
    try:
        # =================================================================
        # 模拟可能失败的API调用
        # 说明:timeout=1秒模拟快速失败场景
        # 实际使用中应设置合理的超时时间
        # =================================================================
        response = requests.get(
            f"https://api.xxx.com/stocks/{symbol}",
            timeout=1
        )
        return f"股票 {symbol} 当前价格为 {response['price']}"
    except requests.exceptions.RequestException as e:
        # =================================================================
        # 异常处理
        # 说明:捕获网络相关异常,记录并重新抛出
        # 重新抛出可以让中间件统一处理
        # =================================================================
        print(f"查询股票数据失败: {str(e)}")
        raise Exception(f"查询股票数据失败: {str(e)}")

# =============================================================================
# 错误处理中间件
# 说明:捕获工具调用异常,返回友好的错误消息
# =============================================================================

@wrap_tool_call
def handle_tool_call_error(request: ToolCallRequest, handler):
    """
    工具调用错误处理中间件。
    
    捕获工具执行过程中的所有异常,并转换为用户友好的错误消息。
    这样做的好处:
    1. 防止Agent因异常中断
    2. 向用户提供可理解的错误信息
    3. 便于日志记录和问题追踪
    
    Args:
        request: 工具调用请求,包含:
            - tool_call: 工具调用信息(id、名称、参数)
        handler: 实际工具执行处理器
        
    Returns:
        ToolMessage: 正常返回或错误消息的ToolMessage
        
    错误处理策略:
        - 连接错误:提示稍后重试
        - 权限错误:提示联系管理员
        - 输入错误:提示检查参数
        - 其他错误:通用错误提示
    """
    print("request", request)
    
    try:
        # =================================================================
        # 执行工具调用
        # 说明:正常情况下返回工具执行结果
        # =================================================================
        return handler(request)
    except Exception as e:
        # =================================================================
        # 异常捕获
        # 说明:将异常转换为ToolMessage
        # 返回给Agent继续处理流程
        # =================================================================
        return ToolMessage(
            content=f"当前股票查询服务不可用,错误信息: {str(e)}",
            tool_call_id=request.tool_call["id"]   # 必须提供正确的tool_call_id
        )

# =============================================================================
# 创建带错误处理的Agent
# =============================================================================

agent = create_agent(
    model=deepseek_llm,
    tools=[get_stock_price],
    middleware=[handle_tool_call_error]          # 错误处理中间件
)

# =============================================================================
# 测试错误处理
# 由于API不存在,会触发异常
# 中间件捕获异常并返回友好提示
# =============================================================================

result = agent.invoke({"messages": [{"role": "user", "content": "查询TCEHY的股票价格"}]})
print(result["messages"][-1].content)

3.4.2 分层错误处理

架构设计说明

┌─────────────────────────────────────────────────────────────────┐
│                    分层错误处理策略                               │
└─────────────────────────────────────────────────────────────────┘

异常类型                处理策略                用户提示
─────────────────────────────────────────────────────────────
ConnectionError    →   网络问题           →   "稍后重试"
PermissionError    →   权限问题           →   "联系管理员"
ToolException      →   输入验证问题       →   "检查参数"
Exception          →   其他未分类问题     →   "技术团队通知"
# =============================================================================
# 功能说明:演示分层错误处理,根据异常类型提供不同的用户提示
# 使用场景:
>   - 需要区分不同错误类型的场景
>   - 提供精准的错误恢复指导
>   - 企业级应用的质量保证
# 版本说明:LangChain 1.0+
# 注意事项:
>   1. 异常类型应覆盖所有可能的错误
>   2. 提示信息应具有可操作性
>   3. 关键错误应记录日志便于排查
# =============================================================================

from langchain.agents import create_agent
from langchain.agents.middleware import wrap_tool_call
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool, ToolException
from pydantic import BaseModel
from init_llm import deepseek_llm
import json
import random

# =============================================================================
# 模拟数据
# 说明:简单的内存数据库用于演示
# =============================================================================

TICKET_DATABASE = {
    "T001": {"title": "登录问题", "status": "处理中", "assignee": "张三"},
    "T002": {"title": "支付失败", "status": "已解决", "assignee": "李四"},
    "T003": {"title": "页面加载慢", "status": "待处理", "assignee": "王五"}
}

# =============================================================================
# 工具1:查询工单
# 说明:模拟50%概率的连接失败
# =============================================================================

@tool
def query_ticket(ticket_id: str) -> str:
    """
    根据工单ID查询工单详情。
    
    Args:
        ticket_id: 工单ID,如T001、T002
        
    Returns:
        工单详情的JSON字符串
        
    Raises:
        ConnectionError: 数据库连接超时时抛出
        ToolException: 工单不存在时抛出
    """
    # 模拟网络不稳定:50%概率失败
    if random.random() < 0.5:
        raise ConnectionError("数据库连接超时,请稍后重试")
    
    if ticket_id not in TICKET_DATABASE:
        raise ToolException(f"工单ID {ticket_id} 不存在")
    
    return json.dumps(TICKET_DATABASE[ticket_id], ensure_ascii=False, indent=2)

# =============================================================================
# 工具2:更新工单状态
# 说明:模拟权限检查失败
# =============================================================================

@tool
def update_ticket_status(ticket_id: str, new_status: str) -> str:
    """
    更新工单状态。
    
    Args:
        ticket_id: 工单ID
        new_status: 新状态,可选值:待处理、处理中、已解决、已关闭
        
    Returns:
        更新成功的确认消息
        
    Raises:
        ToolException: 工单不存在或状态值无效时抛出
        PermissionError: 权限不足时抛出(模拟20%概率)
    """
    valid_statuses = ["待处理", "处理中", "已解决", "已关闭"]
    
    if ticket_id not in TICKET_DATABASE:
        raise ToolException(f"工单ID {ticket_id} 不存在")
    
    if new_status not in valid_statuses:
        raise ToolException(f"状态必须是: {', '.join(valid_statuses)}")
    
    # 模拟权限检查失败
    if random.random() < 0.2:
        raise PermissionError("权限不足:只有管理员可以关闭工单")
    
    TICKET_DATABASE[ticket_id]["status"] = new_status
    return f"工单 {ticket_id} 状态已更新为: {new_status}"

# =============================================================================
# 分层错误处理中间件
# 说明:根据不同异常类型提供不同的处理策略
# =============================================================================

@wrap_tool_call
def intelligent_error_handler(request, handler):
    """
    智能错误处理中间件。
    
    根据错误类型返回不同的指导信息,帮助用户了解发生了什么问题以及如何解决。
    
    错误分类:
    1. ConnectionError - 系统暂时繁忙
    2. PermissionError - 权限限制
    3. ToolException - 输入验证失败
    4. 其他异常 - 未知错误
    
    Args:
        request: 工具调用请求
        handler: 工具执行处理器
        
    Returns:
        ToolMessage: 包含错误信息的工具消息
    """
    try:
        return handler(request)
    except ConnectionError as e:
        # =================================================================
        # 连接错误处理
        # 说明:网络或数据库暂时不可用
        # 建议:稍后重试
        # =================================================================
        return ToolMessage(
            content=f"系统暂时繁忙:{str(e)}。建议您稍后重试此操作。",
            tool_call_id=request.tool_call["id"]
        )
    except PermissionError as e:
        # =================================================================
        # 权限错误处理
        # 说明:用户没有执行此操作的权限
        # 建议:联系管理员
        # =================================================================
        return ToolMessage(
            content=f"权限限制:{str(e)}。如需执行此操作,请联系管理员。",
            tool_call_id=request.tool_call["id"]
        )
    except ToolException as e:
        # =================================================================
        # 工具异常处理
        # 说明:通常是输入参数验证失败
        # 建议:检查输入
        # =================================================================
        return ToolMessage(
            content=f"输入验证失败:{str(e)}。请检查输入参数是否正确。",
            tool_call_id=request.tool_call["id"]
        )
    except Exception as e:
        # =================================================================
        # 通用异常处理
        # 说明:捕获所有未预料的异常
        # 建议:技术团队介入
        # =================================================================
        return ToolMessage(
            content=f"意外错误:{str(e)}。技术团队已收到通知,请稍后重试。",
            tool_call_id=request.tool_call["id"]
        )

# =============================================================================
# 创建Agent
# =============================================================================

agent = create_agent(
    model=deepseek_llm,
    tools=[query_ticket, update_ticket_status],
    middleware=[intelligent_error_handler],
    system_prompt="你是企业客服工单系统助手"
)

# =============================================================================
# 测试各种错误场景
# =============================================================================

test_cases = [
    "查询工单T999的详情",              # 不存在的工单ID -> ToolException
    "把工单T001状态更新为完结状态",      # 无效状态 -> ToolException
    "查询工单T001的详情",              # 正常查询(可能触发ConnectionError)
]

for query in test_cases:
    response = agent.invoke({"messages": [{"role": "user", "content": query}]})
    print(f"用户查询: {query}")
    print(f"助手回复: {response['messages'][-1].content}")
    print("-" * 50)

第四章:智能体调用与流式处理

章节概述
本章介绍Agent的多种调用方式(同步/异步/流式),以及Checkpoint和Store的使用。

4.1 Agent调用方式

调用方式对比

┌─────────────────────────────────────────────────────────────────┐
│                      Agent调用方式对比                            │
├─────────────────────────────────────────────────────────────────┤
│  invoke(同步)    →  等待完整响应,适合简单场景                    │
│  stream(流式)    →  边生成边输出,适合实时展示                    │
│  ainvoke(异步)   →  非阻塞调用,适合高并发                        │
└─────────────────────────────────────────────────────────────────┘

4.1.1 Invoke调用

# =============================================================================
# 功能说明:使用invoke方法调用Agent
# 使用场景:
>   - 简单的同步请求
>   - CLI工具或脚本
>   - 不需要实时展示的场景
# 版本说明:LangChain核心API
# 注意事项:
>   1. invoke会等待完整响应后才返回
>   2. 适合简单的一次性请求
>   3. 响应包含完整的messages历史
# =============================================================================

from langchain.agents import create_agent
from langchain.tools import tool
from langgraph.graph.state import CompiledStateGraph
from init_llm import deepseek_llm

@tool
def get_weather(city: str) -> str:
    """
    获取指定城市的天气信息。
    
    Args:
        city: 城市名称
        
    Returns:
        天气信息字符串
    """
    return f"{city}的天气为晴朗,25°C。"

# =============================================================================
# 创建Agent
# =============================================================================

agent = create_agent(
    model=deepseek_llm,
    tools=[get_weather],
    system_prompt="你是能查询任何问题的助手"
)

# =============================================================================
# 使用invoke调用
# 说明:等待完整响应后返回
# 返回值是包含messages的字典
# =============================================================================

resp = agent.invoke({
    "messages": [
        {"role": "system", "content": "你是一个天气查询助手,只回答天气相关的问题"},
        {"role": "user", "content": "北京天气如何?"}
    ]
})

# =============================================================================
# 遍历并打印消息
# pretty_print() 是消息对象的格式化打印方法
# =============================================================================

for msg in resp["messages"]:
    msg.pretty_print()

4.1.2 Stream流式调用

适用场景

  • 聊天机器人界面
  • 实时展示AI思考过程
  • 需要尽早反馈的长文本生成
# =============================================================================
# 功能说明:使用stream方法流式调用Agent
# 使用场景:
>   - WebSocket实时推送
>   - 前端SSE (Server-Sent Events)
>   - 需要展示进度的场景
# 版本说明:LangChain 0.x+
# 注意事项:
>   1. stream返回生成器
>   2. 每个chunk是执行过程中的一个步骤
>   3. 可以配合stream_mode控制输出内容
# =============================================================================

from langchain.agents import create_agent
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from init_llm import deepseek_llm

@tool
def query_customer_data(customer_id: str) -> dict:
    """
    查询客户基本信息。
    
    Args:
        customer_id: 客户ID
        
    Returns:
        客户信息字典
    """
    return {
        "customer_id": customer_id,
        "name": "张三",
        "level": "VIP",
        "join_date": "2023-01-15"
    }

# =============================================================================
# 创建Agent(带检查点支持)
# 说明:InMemorySaver用于会话状态持久化
# =============================================================================

agent = create_agent(
    model=deepseek_llm,
    tools=[query_customer_data],
    checkpointer=InMemorySaver()
)

# =============================================================================
# 配置参数
# 说明:configurable.thread_id用于区分不同会话
# =============================================================================

config = {"configurable": {"thread_id": "xxx"}}

# =============================================================================
# 流式调用Agent
# 参数说明:
>   - stream_mode: 输出模式
>     - "checkpoints": 输出检查点状态
>     - "updates": 输出每步更新
>     - "messages": 输出消息增量
>     - "custom": 自定义输出
# =============================================================================

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "查询客户ID为12345的完整信息"}]},
    config=config,
    stream_mode="checkpoints"
):
    print(chunk)
    print("-" * 50)

4.1.3 异步Agent调用

# =============================================================================
# 功能说明:使用异步方式调用Agent
# 使用场景:
>   - FastAPI异步端点
>   - 需要同时处理多个请求
>   - 高并发Web服务
# 版本说明:Python 3.7+,需要async/await支持
# 注意事项:
>   1. 需要在async函数中调用
>   2. 使用asyncio.run()执行顶层协程
>   3. 适合I/O密集型场景
# =============================================================================

import asyncio
from langchain.agents import create_agent
from langchain.tools import tool
from init_llm import deepseek_llm

@tool
def get_weather(city: str) -> str:
    """获取指定城市的天气信息。"""
    weather_data = {"北京": "晴朗,15°C", "上海": "多云,18°C"}
    return f"{city}的天气:{weather_data.get(city, '信息暂缺')}"

@tool
def get_scenic_spots(city: str, interest_type: str = "通用") -> str:
    """根据兴趣类型推荐城市景点"""
    return f"{city}景点推荐:故宫、长城"

# =============================================================================
# 异步Agent调用函数
# =============================================================================

async def async_query():
    """
    异步查询示例。
    
    演示如何在异步上下文中调用Agent。
    """
    # 创建Agent
    agent = create_agent(
        model=deepseek_llm,
        tools=[get_weather, get_scenic_spots],
        system_prompt="你是一个专业的旅行规划助手",
    )
    
    # =================================================================
    # 使用ainvoke异步调用
    # 说明:aiQuery不阻塞,可以并发执行其他任务
    # =================================================================
    response = await agent.ainvoke({
        "messages": [{"role": "user", "content": "请帮我查询北京的天气信息"}]
    })
    
    return response

# =============================================================================
# 执行异步函数
# =============================================================================

if __name__ == "__main__":
    response = asyncio.run(async_query())
    print(response['messages'][-1].content)

4.2 Checkpoint检查点

Checkpoint机制说明

┌─────────────────────────────────────────────────────────────────┐
│                      Checkpoint 工作原理                         │
└─────────────────────────────────────────────────────────────────┘

会话1: 消息1 → 消息2 → 消息3 → [保存Checkpoint] → 消息4 → ...
                                                         ↑
会话2: ──────────────────────────────────── [恢复Checkpoint] → 消息3

作用:
1. 保存对话状态,实现多轮对话
2. 支持会话恢复和继续
3. 实现对话历史的持久化

Checkpoint vs Store

  • Checkpoint: 保存当前对话状态(短期记忆)
  • Store: 保存跨会话的持久化数据(长期记忆)

4.2.1 内存检查点(InMemorySaver)

# =============================================================================
# 功能说明:使用InMemorySaver实现会话状态保存
# 使用场景:
>   - 开发测试环境
>   - 单进程应用
>   - 不需要持久化的场景
# 版本说明:LangGraph组件
# 注意事项:
>   1. 内存检查点进程重启后丢失
>   2. 适合临时测试或开发
>   3. 生产环境建议使用MySQL等持久化存储
# =============================================================================

from langchain.agents import create_agent
from langchain_core.stores import InMemoryStore
from langgraph.checkpoint.memory import InMemorySaver
from init_llm import deepseek_llm

# =============================================================================
# 创建检查点和存储
# InMemorySaver: 对话状态持久化(短期记忆)
# InMemoryStore: 跨会话数据存储(长期记忆)
# =============================================================================

checkpointer = InMemorySaver()

agent = create_agent(
    model=deepseek_llm,
    tools=[],
    checkpointer=checkpointer,                  # 传入检查点器
    store=InMemoryStore()                       # 传入存储(可选)
)

# =============================================================================
# Session 1
# 说明:不同的thread_id代表不同的会话
# =============================================================================

config1 = {"configurable": {"thread_id": "session_001"}}

resp1 = agent.invoke(
    {"messages": [{"role": "user", "content": "你好,我叫张三"}]},
    config=config1
)
print(resp1["messages"][-1].content)

# =============================================================================
# Session 2
# 说明:不同会话无法访问之前的状态
# 这是因为使用了不同的thread_id
# =============================================================================

config2 = {"configurable": {"thread_id": "session_002"}}

resp2 = agent.invoke(
    {"messages": [{"role": "user", "content": "我叫什么名字?"}]},
    config=config2
)
print(resp2["messages"][-1].content)  # Agent不知道名字(新会话)

4.2.2 MySQL持久化检查点

架构设计说明

┌─────────────────────────────────────────────────────────────────┐
│                    MySQL Checkpoint 架构                         │
└─────────────────────────────────────────────────────────────────┘

┌─────────┐     ┌─────────────┐     ┌─────────────┐
│  Agent  │────▶│ Checkpointer │────▶│    MySQL    │
└─────────┘     └─────────────┘     └─────────────┘
                                         │
                                   ┌─────┴─────┐
                                   │ checkpoints│
                                   │  metadata  │
                                   └───────────┘

适用场景

  • 生产环境需要会话持久化
  • 多实例部署需要共享会话
  • 需要会话历史的审计追溯
# =============================================================================
# 功能说明:使用MySQL持久化检查点
# 使用场景:
>   - 生产环境
>   - 多进程/多实例部署
>   - 需要会话历史追溯的场景
# 版本说明:需要安装pymysql
# 注意事项:
>   1. 需要提前创建数据库
>   2. checkpointer.setup() 会自动创建表
>   3. 建议使用连接池管理连接
# =============================================================================

from langchain.agents import create_agent
from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver
from init_llm import deepseek_llm

# =============================================================================
# 数据库连接配置
# 说明:使用PyMySQLSaver连接MySQL
# URI格式:mysql+pymysql://用户名:密码@主机:端口/数据库?字符集
# =============================================================================

DB_URI = "mysql+pymysql://root:123456@localhost:3306/langchain_db?charset=utf8mb4"

# =============================================================================
# 使用上下文管理器
# 说明:确保连接正确关闭
# =============================================================================

with PyMySQLSaver.from_conn_string(DB_URI) as checkpointer:
    # =================================================================
    # 初始化数据库表
    # 说明:setup()会创建必要的表结构
    # 只需执行一次,后续运行会自动跳过
    # =================================================================
    checkpointer.setup()
    
    # =================================================================
    # 创建Agent
    # =================================================================
    agent = create_agent(
        model=deepseek_llm,
        tools=[],
        checkpointer=checkpointer              # 使用MySQL检查点
    )
    
    # =================================================================
    # 会话1
    # =================================================================
    config = {"configurable": {"thread_id": "session001"}}
    
    resp1 = agent.invoke(
        {"messages": [{"role": "user", "content": "你好,我叫张三"}]},
        config=config
    )
    print(resp1["messages"][-1].content)
    
    # =================================================================
    # 会话2(使用同一thread_id,会话继续)
    # =================================================================
    resp2 = agent.invoke(
        {"messages": [{"role": "user", "content": "你知道我的信息吗?"}]},
        config=config
    )
    print(resp2["messages"][-1].content)  # Agent记得之前的对话

4.3 自定义流式输出

使用场景说明

  • 在工具执行过程中显示进度
  • 实现自定义的流式UI
  • 需要同时输出多种类型数据的场景
# =============================================================================
# 功能说明:使用get_stream_writer实现自定义流式输出
# 使用场景:
>   - 工具执行进度展示
>   - 多数据类型同时流式输出
>   - 自定义UI组件的数据推送
# 版本说明:LangGraph配置API
# 注意事项:
>   1. get_stream_writer()需要在工具执行上下文中调用
>   2. 可以输出任意结构的数据
>   3. 配合agent.stream()的stream_mode使用
# =============================================================================

import time
from langchain.agents import create_agent
from langchain_core.tools import tool
from langgraph.config import get_stream_writer
from init_llm import deepseek_llm

@tool
def generate_sales_report() -> str:
    """
    生成销售报告。
    
    该工具演示如何使用get_stream_writer输出自定义进度信息。
    """
    # =================================================================
    # 获取流式写入器
    # 说明:writer可以在工具执行过程中输出数据
    # 可以是任意结构的数据(字典、字符串等)
    # =================================================================
    writer = get_stream_writer()
    
    # 输出开始状态
    writer({"type": "生成销售报告", "message": "开始生成销售报告"})
    time.sleep(1)
    
    writer({"type": "生成销售报告", "message": "销售报告生成了 25%"})
    time.sleep(1)
    
    writer({"type": "生成销售报告", "message": "销售报告生成了 50%"})
    time.sleep(1)
    
    writer({"type": "生成销售报告", "message": "销售报告生成了 75%"})
    time.sleep(1)
    
    writer({"type": "生成销售报告", "message": "销售报告生成了 100%"})

@tool
def generate_inventory_report() -> str:
    """生成库存报告"""
    writer = get_stream_writer()
    
    writer("开始生成库存报告")
    time.sleep(1)
    writer("库存报告生成了 25%")
    time.sleep(1)
    writer("库存报告生成了 50%")
    time.sleep(1)
    writer("库存报告生成了 75%")
    time.sleep(1)
    writer("库存报告生成了 100%")

# =============================================================================
# 创建Agent
# =============================================================================

agent = create_agent(
    model=deepseek_llm,
    tools=[generate_sales_report, generate_inventory_report]
)

# =============================================================================
# 流式调用并处理自定义输出
# stream_mode: ["custom", "updates"]
# 说明:
>   - "custom": 捕获工具中writer()的输出
>   - "updates": 捕获Agent状态的更新
# =============================================================================

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "生成销售报告和库存报告"}]},
    stream_mode=["custom", "updates"]
):
    print(chunk)
    print("-" * 50)

第五章:记忆管理系统

章节概述
本章介绍LangChain的记忆管理系统,包括短期记忆(基于Checkpoint)和长期记忆(基于Store)。

记忆系统架构

┌─────────────────────────────────────────────────────────────────┐
│                      记忆管理系统架构                              │
└─────────────────────────────────────────────────────────────────┘

┌─────────────────────┐         ┌─────────────────────┐
│    短期记忆          │         │    长期记忆          │
│  (Checkpoint)       │         │   (Store)           │
├─────────────────────┤         ├─────────────────────┤
│ 生命周期: 会话内     │         │ 生命周期: 永久        │
│ 存储内容: 对话历史   │         │ 存储内容: 用户偏好    │
│ 实现方式: 自动保存   │         │ 实现方式: 手动存储    │
└─────────────────────┘         └─────────────────────┘

会话A: ────────────────────────────────────────
       │ 消息1 │ 消息2 │ 消息3 │ (Checkpoint保存) │ ...
会话B: ─── (Store查询) ──────────────────────────

5.1 短期记忆(Short Memory)

机制说明
短期记忆基于Checkpoint实现,用于保存单次会话中的对话历史,使得Agent能够记住之前说过的话。

5.1.1 基础短期记忆

# =============================================================================
# 功能说明:使用Checkpoint实现基础的短期记忆
# 使用场景:
>   - 需要多轮对话的应用
>   - 简单的对话式AI助手
>   - 需要Agent记住之前对话内容的场景
# 版本说明:LangGraph组件
# 注意事项:
>   1. Checkpoint自动保存对话历史
>   2. 同一thread_id共享记忆
>   3. 不同thread_id有独立的记忆
# =============================================================================

from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
from init_llm import deepseek_llm

# =============================================================================
# 创建内存检查点
# 说明:InMemorySaver使用内存存储,进程重启后丢失
# =============================================================================

checkpointer = InMemorySaver()

# =============================================================================
# 创建带检查点的Agent
# 说明:checkpointer会自动保存每次invoke的状态
# =============================================================================

agent = create_agent(
    model=deepseek_llm,
    tools=[],
    checkpointer=checkpointer                  # 启用短期记忆
)

# =============================================================================
# 配置会话ID
# 说明:thread_id用于区分不同会话
# =============================================================================

config = {"configurable": {"thread_id": "session001"}}

# =============================================================================
# 第一轮对话:告诉Agent名字
# =============================================================================

resp1 = agent.invoke(
    {"messages": [{"role": "user", "content": "你好,我叫张三"}]},
    config=config
)
print(resp1["messages"][-1].content)

# =============================================================================
# 第二轮对话:询问名字
# 说明:由于使用了相同的thread_id,Agent能记住之前说过的话
# =============================================================================

resp2 = agent.invoke(
    {"messages": [{"role": "user", "content": "我叫什么名字?"}]},
    config=config
)
print(resp2["messages"][-1].content)  # Agent会回答"张三"

5.1.2 自定义状态(State)

State扩展说明
除了messages,Agent还可以维护自定义的状态信息,如用户ID、偏好设置等。

# =============================================================================
# 功能说明:使用自定义State扩展Agent状态
# 使用场景:
>   - 需要传递用户信息
>   - 需要维护会话上下文
>   - 实现复杂的多轮对话逻辑
# 版本说明:LangChain 1.0+
# 注意事项:
>   1. 自定义State需要继承AgentState
>   2. 状态会在每次invoke时更新
>   3. 状态可以用于工具调用的条件判断
# =============================================================================

from langchain.agents import create_agent, AgentState
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from init_llm import deepseek_llm

# =============================================================================
# 自定义状态定义
# 说明:继承AgentState添加自定义字段
# =============================================================================

class CustomState(AgentState):
    """
    自定义会话状态。
    
    该类扩展了基础AgentState,添加了用户相关的状态字段。
    
    Attributes:
        user_id: 用户唯一标识符
        hobby: 用户爱好
        other_info: 其他用户信息(字典格式)
    """
    user_id: str                               # 用户ID
    hobby: str                                 # 用户爱好
    other_info: dict                           # 其他信息

# =============================================================================
# 创建Agent时指定state_schema
# =============================================================================

agent = create_agent(
    model=deepseek_llm,
    tools=[],
    checkpointer=InMemorySaver(),
    state_schema=CustomState                   # 指定自定义状态
)

config = {"configurable": {"thread_id": "session001"}}

# =============================================================================
# 调用时传入自定义状态
# 说明:状态会保存到Checkpoint中
# =============================================================================

resp1 = agent.invoke({
    "messages": [{"role": "user", "content": "你好,我叫张三"}],
    "user_id": "user001",                     # 自定义状态
    "hobby": "旅游、滑雪、喝茶",                # 自定义状态
    "other_info": {"age": 28, "gender": "男"}  # 自定义状态
}, config=config)
print(resp1["messages"][-1].content)

# =============================================================================
# 查询当前状态
# 说明:get_state返回当前会话的所有状态
# =============================================================================

state = agent.get_state(config=config)
print(state)

5.1.3 在工具中修改状态

状态修改模式说明
使用Command对象在工具中修改状态,可以实现:

  1. 更新State字段
  2. 添加ToolMessage
  3. 控制Agent下一步行为
# =============================================================================
# 功能说明:在工具中通过Command对象修改状态
# 使用场景:
>   - 工具执行后需要更新上下文
>   - 需要在工具中添加消息
>   - 实现条件性的状态更新
# 版本说明:LangGraph Types
# 注意事项:
>   1. Command是状态更新的原子操作
>   2. 可以同时更新多个字段
>   3. 必须提供正确的tool_call_id
# =============================================================================

from langchain.agents import create_agent, AgentState
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import ToolRuntime
from langgraph.types import Command
from init_llm import deepseek_llm

# =============================================================================
# 工具1:获取信息
# 说明:读取ToolRuntime中的状态
# =============================================================================

@tool
def get_info(runtime: ToolRuntime) -> str:
    """
    获取用户信息。
    
    Args:
        runtime: 运行时对象,包含状态和工具调用信息
        
    Returns:
        用户信息字符串
    """
    # =================================================================
    # 通过runtime.state访问状态
    # 说明:runtime是LangGraph自动注入的上下文对象
    # =================================================================
    user_id = runtime.state["user_id"]
    user_level = runtime.state["user_level"]
    return f"用户ID: {user_id}, 用户等级: {user_level}"

# =============================================================================
# 工具2:更新信息
# 说明:使用Command对象更新状态
# =============================================================================

@tool
def update_info(runtime: ToolRuntime, user_id: str, user_level: str) -> Command:
    """
    更新用户信息。
    
    Args:
        runtime: 运行时对象
        user_id: 新用户ID
        user_level: 新用户等级
        
    Returns:
        Command对象,用于更新状态和添加消息
    """
    # =================================================================
    # 参数验证
    # 说明:如果参数无效,返回错误消息而不更新状态
    # =================================================================
    if not user_id or not user_level:
        return Command(update={
            "messages": [ToolMessage(
                content=f"用户ID不能为空或用户等级不能为空",
                tool_call_id=runtime.tool_call_id
            )]
        })
    
    # =================================================================
    # 使用Command更新状态
    # Command参数说明:
    #   - update: 要更新的状态字段
    #   - messages: 要添加的消息(ToolMessage)
    # 返回后,Agent会使用更新后的状态继续执行
    # =================================================================
    return Command(update={
        "user_id": user_id,                    # 更新user_id
        "user_level": user_level,              # 更新user_level
        "messages": [ToolMessage(
            content=f"用户信息已经更改,用户ID: {user_id}, 用户等级: {user_level}",
            tool_call_id=runtime.tool_call_id
        )]
    })

# =============================================================================
# 自定义状态定义
# =============================================================================

class CustomState(AgentState):
    user_id: str
    user_level: str

# =============================================================================
# 创建Agent
# =============================================================================

agent = create_agent(
    model=deepseek_llm,
    tools=[get_info, update_info],
    checkpointer=InMemorySaver(),
    state_schema=CustomState
)

config = {"configurable": {"thread_id": "session001"}}

# =============================================================================
# 测试:获取信息
# =============================================================================

resp1 = agent.invoke({
    "messages": [{"role": "user", "content": "获取我的信息"}],
    "user_id": "user_001",
    "user_level": "VIP",
}, config=config)
print(resp1["messages"][-1].content)

# =============================================================================
# 测试:更新状态
# =============================================================================

resp2 = agent.invoke({
    "messages": [{"role": "user", "content": "我把我的信息user_id改成 user_002,用户等级改成 normal"}]},
    config=config
)
print(resp2["messages"][-1].content)

# =============================================================================
# 验证状态已更新
# =============================================================================

state = agent.get_state(config=config)
print(state)

5.2 中间件修改状态

中间件分类

  • before_model: 模型调用前执行,用于预处理
  • after_model: 模型调用后执行,用于后处理和状态统计

5.2.1 状态统计中间件

# =============================================================================
# 功能说明:使用before_model和after_model中间件实现状态统计
# 使用场景:
>   - 统计工具调用次数
>   - 统计模型调用次数
>   - 实现限流逻辑
# 版本说明:LangChain 1.0+ 中间件API
# 注意事项:
>   1. 中间件函数必须返回dict(要更新的状态)
>   2. before_model在模型调用前执行
>   3. after_model在模型调用后执行
# =============================================================================

from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import before_model, after_model
from langchain_core.messages import ToolMessage
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import ToolRuntime
from langgraph.runtime import Runtime
from init_llm import deepseek_llm

@tool
def get_weather(city: str) -> str:
    """根据城市名称获取天气信息"""
    return f"{city}天气晴朗"

# =============================================================================
# 模型调用前中间件
# 说明:统计已执行的工具调用次数
# =============================================================================

@before_model
def before_model(state: AgentState, runtime: Runtime) -> dict:
    """
    模型调用前中间件。
    
    该中间件在每次模型调用前执行,统计工具调用次数。
    
    Args:
        state: 当前Agent状态,包含所有messages
        runtime: 运行时对象
        
    Returns:
        dict: 要更新的状态字段
    """
    print("before_model state:", state)
    
    # =================================================================
    # 统计工具调用次数
    # 说明:遍历messages,统计ToolMessage数量
    # =================================================================
    tool_call_count = len([msg for msg in state["messages"] if isinstance(msg, ToolMessage)])
    return {"tool_call_count": tool_call_count}

# =============================================================================
# 模型调用后中间件
# 说明:统计模型调用次数
# =============================================================================

@after_model
def after_model(state: AgentState, runtime: Runtime) -> dict:
    """
    模型调用后中间件。
    
    该中间件在每次模型调用后执行,统计模型调用次数。
    
    Args:
        state: 当前Agent状态
        runtime: 运行时对象
        
    Returns:
        dict: 要更新的状态字段
    """
    print("after_model state:", state)
    
    # 获取当前调用次数(初始为0)
    model_call_count = state.get("model_call_count", 0)
    # 增加计数
    model_call_count += 1
    return {"model_call_count": model_call_count}

# =============================================================================
# 自定义状态
# =============================================================================

class CustomState(AgentState):
    tool_call_count: int                       # 工具调用计数
    model_call_count: int                     # 模型调用计数

# =============================================================================
# 创建Agent
# =============================================================================

agent = create_agent(
    model=deepseek_llm,
    tools=[get_weather],
    middleware=[before_model, after_model],   # 注册中间件
    checkpointer=InMemorySaver(),
    state_schema=CustomState
)

config = {"configurable": {"thread_id": "session001"}}

# =============================================================================
# 第一轮对话
# =============================================================================

resp1 = agent.invoke({"messages": [{"role": "user", "content": "你好,我叫张三"}]}, config=config)
print(resp1["messages"][-1].content)

# =============================================================================
# 第二轮对话(会调用工具)
# =============================================================================

resp2 = agent.invoke({"messages": [{"role": "user", "content": "北京天气如何?"}]}, config=config)
print(resp2["messages"][-1].content)

# =============================================================================
# 查看状态统计
# =============================================================================

state = agent.get_state(config=config)
print(state)

5.3 长期记忆(Long Memory)

长期记忆机制说明

┌─────────────────────────────────────────────────────────────────┐
│                    长期记忆(Store)架构                          │
└─────────────────────────────────────────────────────────────────┘

┌─────────┐     ┌─────────────┐     ┌─────────────────┐
│  Store  │────▶│  Namespace  │────▶│     Items       │
└─────────┘     └─────────────┘     └─────────────────┘
                      │
               ┌──────┴──────┐
               │ ("user",)   │
               │ ("products",)│
               └─────────────┘

Namespace: 数据分类的维度,支持多层嵌套
Items: 实际存储的数据,格式为Key-Value

5.3.1 基础长期记忆

# =============================================================================
# 功能说明:使用InMemoryStore实现基础的长期记忆
# 使用场景:
>   - 存储用户偏好
>   - 跨会话数据共享
>   - 实现RAG风格的记忆检索
# 版本说明:LangGraph Store API
# 注意事项:
>   1. Store是持久化的,可以在不同会话间共享
>   2. 数据按Namespace组织
>   3. 支持按Key查询和按Namespace搜索
# =============================================================================

from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.memory import InMemoryStore

# =============================================================================
# 创建内存存储
# 说明:InMemoryStore是Store的内存实现
# =============================================================================

store = InMemoryStore()

# =============================================================================
# 定义命名空间
# 说明:Namespace用于数据分类,类似文件目录
# 可以是多层嵌套的元组
# =============================================================================

namespace = ("user1", "preferences")

# =============================================================================
# 写入数据
# 方法:store.put(namespace, key, value)
# 说明:
>   - namespace: 数据所属的命名空间(必须是元组)
>   - key: 数据的唯一标识符
>   - value: 要存储的数据(必须是可序列化的)
# =============================================================================

store.put(
    namespace,                                  # 命名空间
    "city",                                     # Key
    {"like": ["北京", "上海"], "dislike": ["广州"]}  # Value
)

store.put(
    namespace,
    "fruit",
    {"like": ["苹果", "香蕉"], "dislike": ["橙子"]}
)

# =============================================================================
# 读取数据
# 方法:store.get(namespace, key)
# 返回:包含key、value等属性的对象
# =============================================================================

memory = store.get(namespace, "city")
print("memory:", memory)
print("key:", memory.key)
print("value:", memory.value)

# =============================================================================
# 搜索数据
# 方法:store.search(namespace)
# 说明:返回命名空间下所有数据
# =============================================================================

all_memory = store.search(namespace)
for item in all_memory:
    print("key:", item.key)
    print("value:", item.value)

5.3.2 Agent中使用长期记忆

# =============================================================================
# 功能说明:在Agent中集成长期记忆
# 使用场景:
>   - 需要跨会话记住用户信息的场景
>   - 实现个性化推荐
>   - 构建用户画像
# 版本说明:LangGraph Store API
# 注意事项:
>   1. 需要同时配置checkpointer和store
>   2. 工具可以通过runtime参数访问store
>   3. context_schema用于传递用户标识
# =============================================================================

from langchain.agents import create_agent
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import ToolRuntime
from langgraph.store.memory import InMemoryStore
from pydantic import BaseModel
from init_llm import deepseek_llm

# =============================================================================
# 初始化存储
# =============================================================================

checkpointer = InMemorySaver()
store = InMemoryStore()

# =============================================================================
# 预存用户信息
# 说明:在Agent运行前预先存储数据
# =============================================================================

store.put(
    ("users",),                                 # 命名空间
    "user_123",                                 # Key = 用户ID
    {"name": "张三", "age": 28, "city": "北京", "hobby": "编程、阅读"}
)
store.put(
    ("users",),
    "user_456",
    {"name": "李四", "age": 32, "city": "上海", "hobby": "旅游、摄影"}
)

# =============================================================================
# 工具:查询用户信息
# 说明:工具内部从Store中读取数据
# =============================================================================

@tool
def get_user_info(runtime: ToolRuntime) -> str:
    """
    从长期记忆中查询用户信息。
    
    Args:
        runtime: 运行时对象,包含context
        
    Returns:
        用户信息字符串
    """
    print("runtime:", runtime)
    
    # =================================================================
    # 从context中获取用户ID
    # 说明:context在调用invoke时传入
    # =================================================================
    user_id = runtime.context.user_id
    
    # =================================================================
    # 从Store中查询数据
    # 说明:使用与预存相同的命名空间和Key
    # =================================================================
    user_info_item = store.get(("users",), user_id)
    
    if user_info_item:
        value = user_info_item.value
        return f"用户id:{user_id},用户姓名:{value['name']},用户age:{value['age']}"
    else:
        return "用户不存在"

# =============================================================================
# Context Schema
# 说明:定义传递给Agent的上下文参数
# =============================================================================

class UserContext(BaseModel):
    user_id: str                               # 用户ID,用于查询长期记忆

# =============================================================================
# 创建Agent
# =============================================================================

agent = create_agent(
    model=deepseek_llm,
    tools=[get_user_info],
    checkpointer=checkpointer,
    store=store,                                # 传入Store
    system_prompt="每次查询用户信息时,都要调用工具get_user_info",
    context_schema=UserContext                  # 传入Context Schema
)

config = {"configurable": {"thread_id": "session001"}}

# =============================================================================
# 测试:查询用户123
# =============================================================================

resp1 = agent.invoke(
    {"messages": [{"role": "user", "content": "你知道我的信息吗?"}]},
    config=config,
    context=UserContext(user_id="user_123")      # 传入用户ID
)
print(resp1["messages"][-1].content)

# =============================================================================
# 测试:查询用户456
# =============================================================================

resp2 = agent.invoke(
    {"messages": [{"role": "user", "content": "你知道我的信息吗?"}]},
    config=config,
    context=UserContext(user_id="user_456")
)
print(resp2["messages"][-1].content)

5.3.3 在工具中修改长期记忆

# =============================================================================
# 功能说明:在工具中修改长期记忆
# 使用场景:
>   - 保存用户偏好
>   - 更新用户画像
>   - 记录用户交互历史
# 版本说明:LangGraph Store API
# 注意事项:
>   1. 使用runtime.store访问Store
>   2. 数据会自动持久化(内存或MySQL)
>   3. 可以在不同会话间共享
# =============================================================================

from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.memory import InMemoryStore
from langchain.tools import tool, ToolRuntime
from pydantic import BaseModel, Field
from typing import Literal
from init_llm import deepseek_llm
import uuid

# =============================================================================
# Context定义
# =============================================================================

class UserContext(BaseModel):
    user_id: str

# =============================================================================
# Preference参数Schema
# =============================================================================

class UserPreference(BaseModel):
    category: Literal["color", "food", "music"] = Field(
        description="用户偏好类别"
    )
    preference: str = Field(description="具体偏好内容")

# =============================================================================
# 初始化存储
# =============================================================================

store = InMemoryStore()
checkpointer = InMemorySaver()

# =============================================================================
# 工具1:保存用户偏好
# =============================================================================

@tool(args_schema=UserPreference)
def save_user_preference(category: str, preference: str, runtime: ToolRuntime) -> str:
    """
    将用户偏好保存到长期记忆中。
    
    Args:
        category: 偏好类别(color/food/music)
        preference: 具体偏好内容
        runtime: 运行时对象
        
    Returns:
        保存成功的确认消息
    """
    user_id = runtime.context.user_id
    
    # =================================================================
    # 创建命名空间
    # 说明:使用(user_id, "preferences")格式
    # 便于按用户隔离数据
    # =================================================================
    namespace = (user_id, "preferences")
    
    # =================================================================
    # 生成唯一ID
    # 说明:使用UUID确保每次保存都是新记录
    # 如需覆盖,可使用固定的Key
    # =================================================================
    memory_id = str(uuid.uuid4())
    
    # =================================================================
    # 保存到Store
    # 说明:runtime.store是Agent的Store实例
    # =================================================================
    runtime.store.put(namespace, memory_id, {
        "category": category,
        "preference": preference,
    })
    
    return f"已成功保存你的{category}偏好:{preference}"

# =============================================================================
# 工具2:获取用户偏好
# =============================================================================

@tool
def get_user_preferences(runtime: ToolRuntime) -> str:
    """
    从长期记忆中获取用户所有偏好。
    
    Args:
        runtime: 运行时对象
        
    Returns:
        用户偏好列表
    """
    user_id = runtime.context.user_id
    namespace = (user_id, "preferences")
    
    # =================================================================
    # 搜索命名空间下的所有数据
    # =================================================================
    memories = runtime.store.search(namespace)
    
    if not memories:
        return f"您还没有保存过偏好"
    
    preferences_list = []
    for mem in memories:
        pref = mem.value
        preferences_list.append(f"- 种类:{pref['category']},偏好:{pref['preference']}")
    
    return f"你的偏好有:\n" + "\n".join(preferences_list)

# =============================================================================
# 创建Agent
# =============================================================================

memory_agent = create_agent(
    model=deepseek_llm,
    tools=[save_user_preference, get_user_preferences],
    checkpointer=checkpointer,
    store=store,
    context_schema=UserContext
)

# =============================================================================
# 第一轮:保存颜色偏好
# =============================================================================

result1 = memory_agent.invoke(
    {"messages": [{"role": "user", "content": "请记住我喜欢的颜色是蓝色"}]},
    config={"configurable": {"thread_id": "thread1"}},
    context=UserContext(user_id="current_user")
)
print(f"Agent回复: {result1['messages'][-1].content}")

# =============================================================================
# 第二轮:保存食物偏好
# =============================================================================

result2 = memory_agent.invoke(
    {"messages": [{"role": "user", "content": "我还喜欢的食物是意大利面"}]},
    config={"configurable": {"thread_id": "thread1"}},
    context=UserContext(user_id="current_user")
)
print(f"Agent回复: {result2['messages'][-1].content}")

# =============================================================================
# 第三轮:在新会话中查询所有偏好
# 说明:跨会话访问长期记忆
# =============================================================================

result3 = memory_agent.invoke(
    {"messages": [{"role": "user", "content": "告诉我我都喜欢什么颜色和食物"}]},
    config={"configurable": {"thread_id": "thread2"}},  # 新会话
    context=UserContext(user_id="current_user")
)
print(f"Agent回复: {result3['messages'][-1].content}")

5.4 短长期记忆结合实战

架构设计说明

┌─────────────────────────────────────────────────────────────────┐
│                 完整电商客服系统架构                               │
└─────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│                         Agent                                    │
├─────────────────────────────────────────────────────────────────┤
│  ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐     │
│  │  短期记忆 │   │  长期记忆 │   │   上下文  │   │   摘要    │     │
│  │(Checkptr)│   │ (Store) │   │(Context) │   │(Middleware)│    │
│  └──────────┘   └──────────┘   └──────────┘   └──────────┘     │
└─────────────────────────────────────────────────────────────────┘
                               │
        ┌──────────────────────┼──────────────────────┐
        ▼                      ▼                      ▼
  ┌──────────┐          ┌──────────┐          ┌──────────┐
  │  订单查询  │          │ 偏好更新  │          │  商品推荐  │
  │  (Tool)   │          │  (Tool)   │          │  (Tool)   │
  └──────────┘          └──────────┘          └──────────┘
        │                      │                      │
        ▼                      ▼                      ▼
  ┌──────────┐          ┌──────────┐          ┌──────────┐
  │ MySQL DB │          │ MySQL DB │          │  推荐算法  │
  └──────────┘          └──────────┘          └──────────┘
# =============================================================================
# 功能说明:完整的电商客服示例,结合短期记忆、长期记忆、上下文和消息摘要
# 使用场景:
>   - 企业级客服系统
>   - 需要个性化服务的电商平台
>   - 复杂的多轮对话场景
# 版本说明:LangChain 1.0+
# 注意事项:
>   1. 需要MySQL数据库支持
>   2. SummarizationMiddleware用于长对话摘要
>   3. 建议结合生产环境进行优化
# =============================================================================

import uuid
import warnings
from typing import Optional
from pydantic import BaseModel, Field
from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import SummarizationMiddleware, wrap_tool_call
from langchain_core.tools import tool
from langchain_core.messages import ToolMessage
from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver
from langgraph.store.mysql.pymysql import PyMySQLStore
from langgraph.prebuilt import ToolRuntime
from langgraph.types import Command
from init_llm import deepseek_llm

warnings.filterwarnings("ignore", category=UserWarning, module="pydantic.main")

# =============================================================================
# Context定义
# 说明:定义传递给Agent的用户上下文
# =============================================================================

class UserContext(BaseModel):
    """
    用户上下文。
    
    Attributes:
        user_id: 用户的唯一标识符
        channel: 用户咨询渠道,如APP、Web、小程序
    """
    user_id: str = Field(description="用户的唯一标识符")
    channel: str = Field(description="用户咨询渠道,如: APP, Web, 小程序")

# =============================================================================
# 自定义状态
# 说明:用于会话级别的状态管理
# =============================================================================

class CustomerSessionState(AgentState):
    """
    客服会话状态。
    
    Attributes:
        current_order_id: 当前正在查询的订单号
    """
    current_order_id: str                       # 当前正在查询的订单号

# =============================================================================
# 模拟数据
# 说明:实际应用中应连接真实数据库
# =============================================================================

MOCK_DATABASE = {
    "orders": {
        "order001": {"order_id": "order001", "status": "已发货", "product": "智能手机"},
        "order002": {"order_id": "order002", "status": "待支付", "product": "智能手表"},
    }
}

# =============================================================================
# 工具1:查询订单状态
# =============================================================================

@tool
def query_order_status(order_id: str, runtime: ToolRuntime) -> Command:
    """
    查询用户订单状态。
    
    Args:
        order_id: 订单ID
        runtime: 运行时对象
        
    Returns:
        Command对象,更新状态和返回消息
    """
    order_info = MOCK_DATABASE["orders"].get(order_id)
    
    if not order_info:
        return Command(update={
            "messages": [ToolMessage(
                content=f"错误:订单 [{order_id}] 不存在",
                tool_call_id=runtime.tool_call_id
            )]
        })
    
    # =================================================================
    # 更新当前订单ID到状态中
    # 说明:便于后续工具获取
    # =================================================================
    return Command(update={
        "current_order_id": order_id,
        "messages": [ToolMessage(
            content=f"订单 [{order_id}] 状态: {order_info['status']}, 商品: {order_info['product']}",
            tool_call_id=runtime.tool_call_id
        )]
    })

# =============================================================================
# 工具2:更新用户偏好
# =============================================================================

@tool
def update_user_preference(category: str, liked_item: str, runtime: ToolRuntime) -> str:
    """
    更新用户长期偏好。
    
    Args:
        category: 偏好类别
        liked_item: 喜欢的内容
        runtime: 运行时对象
        
    Returns:
        操作结果消息
    """
    user_id = runtime.context.user_id
    
    # =================================================================
    # 构建命名空间
    # 格式:user_{user_id}/preferences
    # =================================================================
    namespace = (f"user_{user_id}", "preferences")
    
    # 生成唯一ID
    key = str(uuid.uuid4())
    
    # 保存到长期记忆
    runtime.store.put(namespace, key, {"category": category, "liked_item": liked_item})
    
    return f"已成功将您的偏好记录到长期记忆: 喜欢 {category} 类的 {liked_item}。"

# =============================================================================
# 工具3:获取推荐
# =============================================================================

@tool
def get_recommendation(runtime: ToolRuntime) -> str:
    """
    获取用户推荐商品。
    
    综合考虑用户当前订单和历史偏好,生成个性化推荐。
    
    Args:
        runtime: 运行时对象
        
    Returns:
        推荐结果字符串
    """
    user_id = runtime.context.user_id
    current_order = runtime.state.get("current_order_id", "未知订单")
    namespace = (f"user_{user_id}", "preferences")
    
    # 查询用户偏好
    prefs = runtime.store.search(namespace)
    pref_list = [f"{p.value.get('category')}({p.value.get('liked_item')})" for p in prefs[-3:]]
    
    return f"基于用户当前的订单 [{current_order}] 和长期偏好 {pref_list if pref_list else '无'},为用户推荐相关配件。"

# =============================================================================
# 错误处理中间件
# =============================================================================

@wrap_tool_call
def handle_tool_errors(request, handler):
    """
    工具错误处理中间件。
    
    捕获工具执行中的所有异常,返回友好的错误消息。
    """
    try:
        return handler(request)
    except Exception as e:
        return ToolMessage(
            content=f"调用工具错误:请稍后重试,错误信息:({str(e)})",
            tool_call_id=request.tool_call["id"]
        )

# =============================================================================
# 数据库连接配置
# =============================================================================

DB_URI = "mysql+pymysql://root:123456@localhost:3306/langchain_db?charset=utf8mb4"

# =============================================================================
# 创建Agent
# =============================================================================

with (
    PyMySQLSaver.from_conn_string(DB_URI) as checkpointer,
    PyMySQLStore.from_conn_string(DB_URI) as store
):
    # =================================================================
    # 初始化数据库表
    # =================================================================
    checkpointer.setup()
    store.setup()
    
    # =================================================================
    # 创建Agent
    # 说明:组合多种中间件和功能
    # =================================================================
    agent = create_agent(
        model=deepseek_llm,
        tools=[query_order_status, update_user_preference, get_recommendation],
        system_prompt="""你是智能电商客服助手,具备回答用户咨询、查询订单状态、更新用户偏好和推荐商品功能。""",
        checkpointer=checkpointer,
        store=store,
        state_schema=CustomerSessionState,
        context_schema=UserContext,
        middleware=[
            # =================================================================
            # 消息摘要中间件
            # 说明:当消息数达到阈值时,自动生成摘要
            # 参数说明:
            #   - trigger: 触发条件(messages, 10)= 10条消息时触发
            #   - keep: 摘要后保留的消息数(messages, 5)= 保留最后5条
            # =================================================================
            SummarizationMiddleware(
                model=deepseek_llm,
                summary_prompt="请总结以下对话内容:{messages}",
                trigger=("messages", 10),
                keep=("messages", 5),
            ),
            handle_tool_errors
        ],
    )
    
    # =================================================================
    # 交互循环
    # =================================================================
    user_context = UserContext(user_id="customer_001", channel="Web")
    config = {"configurable": {"thread_id": "session_02"}}
    
    print("智能电商客服助手 - 输入 'quit' 退出")
    
    while True:
        user_input = input("[你]: ").strip()
        
        if user_input.lower() in ['quit', 'exit', '退出']:
            print("感谢你的咨询,再见!")
            break
        
        if not user_input:
            continue
        
        print("[客服助手]: ")
        
        # =================================================================
        # 流式输出
        # =================================================================
        for chunk in agent.stream(
            {"messages": {"role": "user", "content": user_input}},
            config=config,
            context=user_context
        ):
            for step, data in chunk.items():
                if step in ["model", "tools"]:
                    data["messages"][-1].pretty_print()

附录:环境配置与最佳实践

A.1 安装依赖

# =============================================================================
# 功能说明:完整的环境依赖安装命令
# 说明:分门别类地安装,便于理解和管理
# 版本:推荐使用最新稳定版本
# =============================================================================

# =============================================================================
# 基础依赖
# 说明:LangChain核心框架
# =============================================================================
pip install langchain>=1.0.0                # 核心框架
pip install langchain-core                  # 核心接口和类型
pip install langchain-community             # 第三方集成
pip install python-dotenv                   # 环境变量管理

# =============================================================================
# 数据库支持
# 说明:用于Checkpoint和Store的持久化
# =============================================================================
pip install pymysql                         # MySQL数据库驱动

# =============================================================================
# 开发工具
# 说明:用于代码质量和测试
# =============================================================================
pip install mypy>=1.11.1                   # 类型检查
pip install ruff>=0.6.1                     # 代码格式化/检查
pip install pytest>=8.3.5                   # 单元测试框架

A.2 数据库初始化

-- =============================================================================
-- 功能说明:MySQL数据库初始化脚本
-- 说明:创建LangChain所需的数据库和用户
-- =============================================================================

-- 创建数据库(指定字符集为utf8mb4,支持中文)
CREATE DATABASE langchain_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

-- 创建专用用户(生产环境推荐)
-- 注意:生产环境应使用强密码
CREATE USER 'langchain'@'%' IDENTIFIED BY 'your_password';

-- 授予权限
GRANT ALL PRIVILEGES ON langchain_db.* TO 'langchain'@'%';

-- 刷新权限使更改生效
FLUSH PRIVILEGES;

A.3 项目结构推荐

项目结构说明

your_project/                    # 项目根目录
├── src/                        # 源代码目录
│   └── agent/                  # Agent相关代码
│       ├── __init__.py
│       └── graph.py            # Agent图定义
├── tests/                      # 测试目录
│   ├── unit_tests/             # 单元测试
│   └── integration_tests/       # 集成测试
├── .env.example                # 环境变量示例
├── pyproject.toml              # 项目配置
└── uv.lock                     # 依赖锁定文件

A.4 最佳实践

企业级开发规范

┌─────────────────────────────────────────────────────────────────┐
│                      开发规范清单                                 │
├─────────────────────────────────────────────────────────────────┤
│ 1. 环境变量管理                                                   │
│    ✓ 使用.env文件管理API密钥                                      │
│    ✓ 不硬编码敏感信息                                             │
│    ✓ 生产环境使用密钥管理服务                                      │
│                                                                  │
│ 2. 错误处理                                                       │
│    ✓ 工具添加try-catch                                            │
│    ✓ 使用中间件统一处理错误                                        │
│    ✓ 返回友好的错误消息                                           │
│                                                                  │
│ 3. 记忆管理                                                       │
│    ✓ 短期记忆用Checkpoint                                         │
│    ✓ 长期记忆用Store                                             │
│    ✓ 定期清理无用数据                                             │
│                                                                  │
│ 4. 状态管理                                                       │
│    ✓ 使用TypeDict定义状态                                         │
│    ✓ 状态更新使用Command对象                                       │
│    ✓ 避免状态泄漏                                                 │
│                                                                  │
│ 5. 中间件使用                                                     │
│    ✓ 按顺序注册中间件                                             │
│    ✓ 避免中间件冲突                                               │
│    ✓ 中间件应保持轻量                                             │
└─────────────────────────────────────────────────────────────────┘

A.5 常见问题排查

问题诊断指南

┌─────────────────────────────────────────────────────────────────┐
│                      常见问题与解决方案                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│ 问题1: API Key无效                                               │
│ ├─ 症状:认证失败错误                                             │
│ ├─ 检查:                                                         │
│ │   ✓ .env文件是否正确配置                                        │
│ │   ✓ load_dotenv()是否被调用                                    │
│ │   ✓ API密钥是否正确                                             │
│ └─ 解决:重新生成API密钥                                          │
│                                                                  │
│ 问题2: 工具未调用                                                  │
│ ├─ 症状:Agent忽略工具,直接回复                                   │
│ ├─ 检查:                                                         │
│ │   ✓ 工具description是否清晰                                     │
│ │   ✓ 参数Schema是否正确定义                                       │
│ │   ✓ 系统Prompt是否提及工具功能                                   │
│ └─ 解决:优化工具描述和Prompt                                      │
│                                                                  │
│ 问题3: 记忆丢失                                                   │
│ ├─ 症状:Agent不记得之前的对话                                     │
│ ├─ 检查:                                                         │
│ │   ✓ Checkpointer是否配置                                        │
│ │   ✓ thread_id是否正确传入                                       │
│ │   ✓ 内存检查点是否重启后清空                                     │
│ └─ 解决:使用持久化检查点(MySQL)                                 │
│                                                                  │
│ 问题4: 状态不同步                                                  │
│ ├─ 症状:状态更新后Agent行为异常                                   │
│ ├─ 检查:                                                         │
│ │   ✓ 中间件状态更新是否正确                                       │
│ │   ✓ Command对象是否正确使用                                      │
│ │   ✓ 状态类型是否匹配                                            │
│ └─ 解决:调试状态更新逻辑                                          │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘
  1. API Key无效:检查.env文件配置和环境变量加载
  2. 工具未调用:检查工具描述是否清晰,参数Schema是否正确定义
  3. 记忆丢失:检查Checkpointer配置和thread_id是否正确
  4. 状态不同步:检查中间件中的状态更新逻辑

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐