一.LangGraph持久化(Persistence)

二.什么是持久化能力?

三.线程级持久化

1.线程级持久化是怎么工作的?

2.核心概念

a.Threads(线程)

b.Checkpoints(检查点)

具体示例:

3.线程级持久化使用姿势

a.步骤⼀:配置 checkpointer 持久化存储

一.方式1:内存存储

from langgraph.checkpoint.memory import InMemorySaver
# 定义存储⽅式 
checkpointer = InMemorySaver()
# ⽤ checkpointer 编译图 
agent = agent_builder.compile(checkpointer=checkpointer)
二.方式2:使用Postgres存储库

b.步骤二:使用Thread进行执行

from langchain.messages import HumanMessage

DB_URI = "postgresql://postgres:bit@192.168.100.233:5432/postgres"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:

    # 第一次使用 Postgres 检查点时需要调用 checkpointer.setup()
    checkpointer.setup()

    # 编译图
    agent = agent_builder.compile(checkpointer=checkpointer)

    # 第一次执行, 创建一个新的 Thread (thread_id="1")
    config = {"configurable": {"thread_id": "1"}}
    result1 = agent.invoke(
        {"messages": [HumanMessage(content="今天西安的天气如何?")]},
        config
    )

    print(f"调用 LLM 总次数: {result1["llm_calls"]}次")
    for m in result1["messages"]:
        m.pretty_print()

from langchain.messages import HumanMessage

DB_URI = "postgresql://postgres:bit@192.168.100.233:5432/postgres"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:

    # 第一次使用 Postgres 检查点时需要调用 checkpointer.setup()
    # checkpointer.setup()

    # 编译图
    agent = agent_builder.compile(checkpointer=checkpointer)

    # ............... 一段时间后, 程序可能重启了 (使用postgres存储) ...............

    # 再次使用相同的 thread_id 调用
    # LangGraph 会从上次的状态继续, 而不是重新开始
    # 此时, result2 的上下文会包含之前的对话历史
    config = {"configurable": {"thread_id": "1"}}
    result2 = agent.invoke(
        {"messages": [HumanMessage(content="我们刚才聊到哪了?")]},
        config
    )

    print(f"调用 LLM 总次数: {result2["llm_calls"]}次")
    for m in result2["messages"]:
        m.pretty_print()

c.其他基本用法

一.获取状态快照

StateSnapshot(
    # 当前状态值(如:对话消息列表)
    values={'messages': [用户消息, AI回复, 用户消息...]},
    # 接下来要执行的节点
    next=('generate_response',),
    # 配置信息
    config={'configurable': {'thread_id': '123', 'checkpoint_id': 'abc'}},

    # 元数据(步骤号、来源、写入信息等)
    metadata={'step': 2, 'source': 'loop', 'writes': {...}},
    # 父检查点(形成链表)
    parent_config={'configurable': {'thread_id': '123', 'checkpoint_id': 'def...'}},
    # 创建时间
    created_at=''
)

from langchain.messages import HumanMessage

config = {"configurable": {"thread_id": "1"}}

# 调用前的状态快照
snapshot = agent.get_state(config)
print(snapshot)

result1 = agent.invoke(
    {"messages": [HumanMessage(content="你好")]},
    config
)

# 调用后的状态快照
snapshot = agent.get_state(config)
print(snapshot)

二.获取状态历史记录

三.重放

from langchain.messages import HumanMessage

config = {"configurable": {"thread_id": "1"}}
# 第一次执行
result1 = agent.invoke(
    {"messages": [HumanMessage(content="今天西安的天气如何?")]},
    config
)

# 保存调用工具前的状态
print("-" * 80)
print(f"第一次执行历史: ")
to_replay = None
for state in agent.get_state_history(config):
    print("checkpoint_id: ", state.config["configurable"]["checkpoint_id"],
          "消息数: ", len(state.values["messages"]),
          "下一节点: ", state.next)
    if len(state.values["messages"]) == 2:  # 保存调用工具前的状态
        to_replay = state

print("-" * 80)
print(f"从{to_replay.next}节点开始重新执行, 重放配置: {to_replay.config}")

# 第二次执行: 重放
result2 = agent.invoke(None, config=to_replay.config)
print("-" * 80)

print(f"第二次执行历史: 重放后")
# 查看新的历史记录
for state in agent.get_state_history(config):
    print("checkpoint_id: ", state.config["configurable"]["checkpoint_id"],
          "消息数: ", len(state.values["messages"]),
          "下一节点: ", state.next)

result2['messages'][-1].pretty_print()

四.更新状态

from langchain.messages import HumanMessage
from langgraph.types import Overwrite

config = {"configurable": {"thread_id": "1"}}
# 第一次执行
result1 = agent.invoke(
    {"messages": [HumanMessage(content="今天西安的天气如何?")]},
    config
)

# 找到调用LLM前的步骤
print("-" * 80)
print(f"第一次执行历史: ")
selected_state = None
for state in agent.get_state_history(config):
    print("checkpoint_id: ", state.config["configurable"]["checkpoint_id"],
          "消息数: ", len(state.values["messages"]),
          "下一节点: ", state.next)
    if len(state.values["messages"]) == 1:  # 此时消息数为1;下一节点是'llm_call'
        selected_state = state

print("-" * 80)
print(f"更新前配置: {selected_state.config}")

# 根据指定的config,更新对应步骤的值
# 更新用户输入
new_config = agent.update_state(
    selected_state.config,
    {"messages": Overwrite([HumanMessage(content="今天北京的天气如何?")])}  # 清空消息,重新写入
)

print("-" * 80)
print(f"更新后配置: {new_config}")

# 第二次执行: 重放更新后的配置
result2 = agent.invoke(None, config=new_config)
for message in result2['messages']:
    message.pretty_print()

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐