LangGraph-AI应用开发框架(六)
·
一.LangGraph持久化(Persistence)
二.什么是持久化能力?




三.线程级持久化
1.线程级持久化是怎么工作的?



2.核心概念

a.Threads(线程)



b.Checkpoints(检查点)




具体示例:



3.线程级持久化使用姿势
a.步骤⼀:配置 checkpointer 持久化存储
![]()
一.方式1:内存存储

from langgraph.checkpoint.memory import InMemorySaver
# 定义存储⽅式
checkpointer = InMemorySaver()
# ⽤ checkpointer 编译图
agent = agent_builder.compile(checkpointer=checkpointer)
二.方式2:使用Postgres存储库





b.步骤二:使用Thread进行执行

from langchain.messages import HumanMessage
DB_URI = "postgresql://postgres:bit@192.168.100.233:5432/postgres"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
# 第一次使用 Postgres 检查点时需要调用 checkpointer.setup()
checkpointer.setup()
# 编译图
agent = agent_builder.compile(checkpointer=checkpointer)
# 第一次执行, 创建一个新的 Thread (thread_id="1")
config = {"configurable": {"thread_id": "1"}}
result1 = agent.invoke(
{"messages": [HumanMessage(content="今天西安的天气如何?")]},
config
)
print(f"调用 LLM 总次数: {result1["llm_calls"]}次")
for m in result1["messages"]:
m.pretty_print()

from langchain.messages import HumanMessage
DB_URI = "postgresql://postgres:bit@192.168.100.233:5432/postgres"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
# 第一次使用 Postgres 检查点时需要调用 checkpointer.setup()
# checkpointer.setup()
# 编译图
agent = agent_builder.compile(checkpointer=checkpointer)
# ............... 一段时间后, 程序可能重启了 (使用postgres存储) ...............
# 再次使用相同的 thread_id 调用
# LangGraph 会从上次的状态继续, 而不是重新开始
# 此时, result2 的上下文会包含之前的对话历史
config = {"configurable": {"thread_id": "1"}}
result2 = agent.invoke(
{"messages": [HumanMessage(content="我们刚才聊到哪了?")]},
config
)
print(f"调用 LLM 总次数: {result2["llm_calls"]}次")
for m in result2["messages"]:
m.pretty_print()



c.其他基本用法
一.获取状态快照

StateSnapshot(
# 当前状态值(如:对话消息列表)
values={'messages': [用户消息, AI回复, 用户消息...]},
# 接下来要执行的节点
next=('generate_response',),
# 配置信息
config={'configurable': {'thread_id': '123', 'checkpoint_id': 'abc'}},
# 元数据(步骤号、来源、写入信息等)
metadata={'step': 2, 'source': 'loop', 'writes': {...}},
# 父检查点(形成链表)
parent_config={'configurable': {'thread_id': '123', 'checkpoint_id': 'def...'}},
# 创建时间
created_at=''
)

from langchain.messages import HumanMessage
config = {"configurable": {"thread_id": "1"}}
# 调用前的状态快照
snapshot = agent.get_state(config)
print(snapshot)
result1 = agent.invoke(
{"messages": [HumanMessage(content="你好")]},
config
)
# 调用后的状态快照
snapshot = agent.get_state(config)
print(snapshot)

二.获取状态历史记录



三.重放

from langchain.messages import HumanMessage
config = {"configurable": {"thread_id": "1"}}
# 第一次执行
result1 = agent.invoke(
{"messages": [HumanMessage(content="今天西安的天气如何?")]},
config
)
# 保存调用工具前的状态
print("-" * 80)
print(f"第一次执行历史: ")
to_replay = None
for state in agent.get_state_history(config):
print("checkpoint_id: ", state.config["configurable"]["checkpoint_id"],
"消息数: ", len(state.values["messages"]),
"下一节点: ", state.next)
if len(state.values["messages"]) == 2: # 保存调用工具前的状态
to_replay = state
print("-" * 80)
print(f"从{to_replay.next}节点开始重新执行, 重放配置: {to_replay.config}")
# 第二次执行: 重放
result2 = agent.invoke(None, config=to_replay.config)
print("-" * 80)
print(f"第二次执行历史: 重放后")
# 查看新的历史记录
for state in agent.get_state_history(config):
print("checkpoint_id: ", state.config["configurable"]["checkpoint_id"],
"消息数: ", len(state.values["messages"]),
"下一节点: ", state.next)
result2['messages'][-1].pretty_print()


四.更新状态

from langchain.messages import HumanMessage
from langgraph.types import Overwrite
config = {"configurable": {"thread_id": "1"}}
# 第一次执行
result1 = agent.invoke(
{"messages": [HumanMessage(content="今天西安的天气如何?")]},
config
)
# 找到调用LLM前的步骤
print("-" * 80)
print(f"第一次执行历史: ")
selected_state = None
for state in agent.get_state_history(config):
print("checkpoint_id: ", state.config["configurable"]["checkpoint_id"],
"消息数: ", len(state.values["messages"]),
"下一节点: ", state.next)
if len(state.values["messages"]) == 1: # 此时消息数为1;下一节点是'llm_call'
selected_state = state
print("-" * 80)
print(f"更新前配置: {selected_state.config}")
# 根据指定的config,更新对应步骤的值
# 更新用户输入
new_config = agent.update_state(
selected_state.config,
{"messages": Overwrite([HumanMessage(content="今天北京的天气如何?")])} # 清空消息,重新写入
)
print("-" * 80)
print(f"更新后配置: {new_config}")
# 第二次执行: 重放更新后的配置
result2 = agent.invoke(None, config=new_config)
for message in result2['messages']:
message.pretty_print()


更多推荐

所有评论(0)